"""AI integration tests.""" import asyncio import json import os from pathlib import Path import shutil import stat import sys import tempfile import time import zipfile from dotenv import load_dotenv project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.ai import AIClient from src.ai.base import ModelConfig, ModelProvider from src.ai.memory import MemorySystem from src.ai.skills import SkillsManager, create_skill_template from src.handlers.message_handler_ai import MessageHandler load_dotenv(project_root / ".env") TEST_DATA_DIR = Path("data/ai_test") def _safe_rmtree(path: Path): if not path.exists(): return def _onerror(func, target, exc_info): try: os.chmod(target, stat.S_IWRITE) func(target) except Exception: pass for _ in range(3): try: shutil.rmtree(path, onerror=_onerror) return except PermissionError: time.sleep(0.2) def _safe_unlink(path: Path): if not path.exists(): return for _ in range(3): try: path.unlink() return except PermissionError: time.sleep(0.2) def _read_env(name: str, default=None): value = os.getenv(name) if value is None: return default value = value.strip() if not value or value.startswith("#"): return default return value def get_ai_config() -> ModelConfig: provider_map = { "openai": ModelProvider.OPENAI, "anthropic": ModelProvider.ANTHROPIC, "deepseek": ModelProvider.DEEPSEEK, "qwen": ModelProvider.QWEN, "siliconflow": ModelProvider.OPENAI, } provider_str = (_read_env("AI_PROVIDER", "openai") or "openai").lower() provider = provider_map.get(provider_str, ModelProvider.OPENAI) return ModelConfig( provider=provider, model_name=_read_env("AI_MODEL", "gpt-3.5-turbo") or "gpt-3.5-turbo", api_key=_read_env("AI_API_KEY", "") or "", api_base=_read_env("AI_API_BASE"), temperature=0.7, ) def get_embed_config() -> ModelConfig: provider_map = { "openai": ModelProvider.OPENAI, "anthropic": ModelProvider.ANTHROPIC, "deepseek": ModelProvider.DEEPSEEK, "qwen": ModelProvider.QWEN, "siliconflow": ModelProvider.OPENAI, } provider_str = (_read_env("AI_EMBED_PROVIDER", "openai") or "openai").lower() provider = provider_map.get(provider_str, ModelProvider.OPENAI) api_key = _read_env("AI_EMBED_API_KEY") or _read_env("AI_API_KEY", "") or "" api_base = _read_env("AI_EMBED_API_BASE") or _read_env("AI_API_BASE") return ModelConfig( provider=provider, model_name=_read_env("AI_EMBED_MODEL", "text-embedding-3-small") or "text-embedding-3-small", api_key=api_key, api_base=api_base, temperature=0.0, ) class FakeMessage: def __init__(self, content: str): from types import SimpleNamespace self.content = content self.author = SimpleNamespace(id="test_user") self.replies = [] async def reply(self, content: str): self.replies.append(content) def make_handler() -> MessageHandler: from types import SimpleNamespace fake_bot = SimpleNamespace(robot=SimpleNamespace(id="test_bot")) handler = MessageHandler(fake_bot) handler.ai_client = AIClient(get_ai_config(), data_dir=TEST_DATA_DIR) handler.skills_manager = SkillsManager(Path("skills")) handler.model_profiles_path = TEST_DATA_DIR / "models_test.json" TEST_DATA_DIR.mkdir(parents=True, exist_ok=True) _safe_unlink(handler.model_profiles_path) handler._ai_initialized = True return handler async def _test_basic_chat(): print("=== test_basic_chat ===") config = get_ai_config() if not config.api_key: print("skip: AI_API_KEY not configured") return embed_config = get_embed_config() client = AIClient(config, embed_config=embed_config, data_dir=TEST_DATA_DIR) response = await client.chat( user_id="test_user", user_message="你好,请介绍一下你自己", use_memory=False, use_tools=False, ) assert response print("ok: chat response length", len(response)) async def _test_memory(): print("=== test_memory ===") config = get_ai_config() if not config.api_key: print("skip: AI_API_KEY not configured") return client = AIClient(config, embed_config=get_embed_config(), data_dir=TEST_DATA_DIR) await client.chat(user_id="test_user", user_message="鎴戝彨寮犱笁", use_memory=True) await client.chat(user_id="test_user", user_message="what is my name", use_memory=True) short_term, long_term = await client.memory.get_context("test_user") assert len(short_term) >= 2 # 重要性改为模型评估后,是否入长期记忆取决于模型打分,不再固定断言数量。 assert isinstance(long_term, list) print("ok: memory short/long", len(short_term), len(long_term)) async def _test_personality(): print("=== test_personality ===") client = AIClient(get_ai_config(), data_dir=TEST_DATA_DIR) names = client.list_personalities() assert names assert client.set_personality(names[0]) key = "roleplay_test" added = client.personality.add_personality( key, client.personality.get_personality("default"), ) assert added assert key in client.list_personalities() assert client.personality.remove_personality(key) assert key not in client.list_personalities() print("ok: personality add/remove") async def _test_skills(): print("=== test_skills ===") manager = SkillsManager(Path("skills")) assert await manager.load_skill("weather") tools = manager.get_all_tools() assert "weather.get_weather" in tools weather = await tools["weather.get_weather"](city="鍖椾含") assert weather assert await manager.load_skill("skills_creator") tools = manager.get_all_tools() assert "skills_creator.create_skill" in tools await manager.unload_skill("weather") await manager.unload_skill("skills_creator") print("ok: skills load/unload") async def _test_skill_commands(): print("=== test_skill_commands ===") handler = make_handler() skill_key = f"cmd_zip_skill_{int(time.time() * 1000)}" # Prepare a zip package source for install testing tmp_root = TEST_DATA_DIR / "tmp_skill_pkg" if tmp_root.exists(): _safe_rmtree(tmp_root) tmp_root.mkdir(parents=True, exist_ok=True) create_skill_template(skill_key, tmp_root, description="zip skill", author="test") zip_path = TEST_DATA_DIR / f"{skill_key}.zip" with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for file in (tmp_root / skill_key).rglob("*"): if file.is_file(): zf.write(file, file.relative_to(tmp_root)) install_msg = FakeMessage(f"/skills install {zip_path}") await handler._handle_command(install_msg, install_msg.content) assert install_msg.replies, "install command no reply" list_msg = FakeMessage("/skills") await handler._handle_command(list_msg, list_msg.content) assert list_msg.replies, "list command no reply" reload_msg = FakeMessage(f"/skills reload {skill_key}") await handler._handle_command(reload_msg, reload_msg.content) assert reload_msg.replies, "reload command no reply" uninstall_msg = FakeMessage(f"/skills uninstall {skill_key}") await handler._handle_command(uninstall_msg, uninstall_msg.content) assert uninstall_msg.replies, "uninstall command no reply" if tmp_root.exists(): _safe_rmtree(tmp_root) _safe_unlink(zip_path) print("ok: skills install/reload/uninstall command") async def _test_personality_commands(): print("=== test_personality_commands ===") handler = make_handler() intro = "You are a hot-blooded anime hero. Speak directly and stay in-character." add_cmd = ( "/personality add roleplay_hero " f"{intro}" ) add_msg = FakeMessage(add_cmd) await handler._handle_command(add_msg, add_msg.content) assert add_msg.replies set_msg = FakeMessage("/personality set roleplay_hero") await handler._handle_command(set_msg, set_msg.content) assert set_msg.replies assert intro in handler.ai_client.personality.get_system_prompt() remove_msg = FakeMessage("/personality remove roleplay_hero") await handler._handle_command(remove_msg, remove_msg.content) assert remove_msg.replies assert "roleplay_hero" not in handler.ai_client.list_personalities() print("ok: personality add/set/remove command") async def _test_model_commands(): print("=== test_model_commands ===") handler = make_handler() list_msg = FakeMessage("/models") await handler._handle_command(list_msg, list_msg.content) assert list_msg.replies assert "default" in list_msg.replies[-1].lower() add_msg = FakeMessage("/models add roleplay_llm openai gpt-4o-mini") await handler._handle_command(add_msg, add_msg.content) assert add_msg.replies assert handler.active_model_key == "roleplay_llm" assert "roleplay_llm" in handler.model_profiles switch_msg = FakeMessage("/models switch default") await handler._handle_command(switch_msg, switch_msg.content) assert switch_msg.replies assert handler.active_model_key == "default" current_msg = FakeMessage("/models current") await handler._handle_command(current_msg, current_msg.content) assert current_msg.replies old_config = handler.ai_client.config shortcut_model = "Qwen/Qwen2.5-7B-Instruct" shortcut_key = handler._normalize_model_key(shortcut_model) shortcut_add_msg = FakeMessage(f"/models add {shortcut_model}") await handler._handle_command(shortcut_add_msg, shortcut_add_msg.content) assert shortcut_add_msg.replies assert handler.active_model_key == shortcut_key assert handler.ai_client.config.model_name == shortcut_model assert handler.ai_client.config.provider == old_config.provider assert handler.ai_client.config.api_base == old_config.api_base assert handler.ai_client.config.api_key == old_config.api_key shortcut_remove_msg = FakeMessage(f"/models remove {shortcut_key}") await handler._handle_command(shortcut_remove_msg, shortcut_remove_msg.content) assert shortcut_remove_msg.replies assert shortcut_key not in handler.model_profiles remove_msg = FakeMessage("/models remove roleplay_llm") await handler._handle_command(remove_msg, remove_msg.content) assert remove_msg.replies assert "roleplay_llm" not in handler.model_profiles _safe_unlink(handler.model_profiles_path) print("ok: model add/switch/remove command") async def _test_memory_commands(): print("=== test_memory_commands ===") handler = make_handler() user_id = "test_user" await handler.ai_client.clear_all_memory(user_id) add_msg = FakeMessage("/memory add this is a long-term memory test") await handler._handle_command(add_msg, add_msg.content) assert add_msg.replies assert "已新增长期记忆" in add_msg.replies[-1] memory_id = add_msg.replies[-1].split(": ", 1)[1].split(" ", 1)[0] assert memory_id list_msg = FakeMessage("/memory list 5") await handler._handle_command(list_msg, list_msg.content) assert list_msg.replies assert memory_id in list_msg.replies[-1] get_msg = FakeMessage(f"/memory get {memory_id}") await handler._handle_command(get_msg, get_msg.content) assert get_msg.replies assert memory_id in get_msg.replies[-1] search_msg = FakeMessage("/memory search 长期记忆") await handler._handle_command(search_msg, search_msg.content) assert search_msg.replies assert memory_id in search_msg.replies[-1] update_msg = FakeMessage(f"/memory update {memory_id} 这是更新后的长期记忆") await handler._handle_command(update_msg, update_msg.content) assert update_msg.replies assert "已更新长期记忆" in update_msg.replies[-1] # Build short-term memory then clear only short-term. await handler.ai_client.memory.add_message( user_id=user_id, role="user", content="short memory for clear short test", ) assert handler.ai_client.memory.short_term.get(user_id) clear_short_msg = FakeMessage("/clear short") await handler._handle_command(clear_short_msg, clear_short_msg.content) assert clear_short_msg.replies assert not handler.ai_client.memory.short_term.get(user_id) # Long-term memory should still exist after clearing short-term only. still_exists = await handler.ai_client.get_long_term_memory(user_id, memory_id) assert still_exists is not None delete_msg = FakeMessage(f"/memory delete {memory_id}") await handler._handle_command(delete_msg, delete_msg.content) assert delete_msg.replies assert "已删除长期记忆" in delete_msg.replies[-1] removed = await handler.ai_client.get_long_term_memory(user_id, memory_id) assert removed is None print("ok: memory command CRUD + clear short") async def _test_plain_text_output(): print("=== test_plain_text_output ===") handler = make_handler() md_text = "# 标题\n**加粗** 和 `代码`\n- 列表\n[链接](https://example.com)" plain = handler._plain_text(md_text) assert "#" not in plain assert "**" not in plain assert "`" not in plain assert "[" not in plain assert "](" not in plain print("ok: markdown stripped") async def _test_skills_creator_autoload(): print("=== test_skills_creator_autoload ===") from types import SimpleNamespace fake_bot = SimpleNamespace(robot=SimpleNamespace(id="test_bot")) handler = MessageHandler(fake_bot) handler.model_profiles_path = TEST_DATA_DIR / "models_autoload_test.json" _safe_unlink(handler.model_profiles_path) await handler._init_ai() assert handler.skills_manager is not None assert "skills_creator" in handler.skills_manager.list_skills() tool_names = [tool.name for tool in handler.ai_client.tools.list()] assert "skills_creator.create_skill" in tool_names print("ok: skills_creator autoloaded") async def _test_mcp(): print("=== test_mcp ===") from src.ai.mcp import MCPManager from src.ai.mcp.servers import FileSystemMCPServer manager = MCPManager(Path("config/mcp.json")) fs_server = FileSystemMCPServer(root_path=Path("data")) await manager.register_server(fs_server) tools = await manager.get_all_tools_for_ai() assert len(tools) >= 1 print("ok: mcp tools", len(tools)) async def _test_long_task(): print("=== test_long_task ===") client = AIClient(get_ai_config(), data_dir=TEST_DATA_DIR) async def step1(): await asyncio.sleep(0.1) return "step1" async def step2(): await asyncio.sleep(0.1) return "step2" client.task_manager.register_action("step1", step1) client.task_manager.register_action("step2", step2) task_id = await client.create_long_task( user_id="test_user", title="test", description="test task", steps=[ {"description": "s1", "action": "step1", "params": {}}, {"description": "s2", "action": "step2", "params": {}}, ], ) await client.start_task(task_id) await asyncio.sleep(0.5) status = client.get_task_status(task_id) assert status is not None assert status["status"] in {"completed", "running"} print("ok: long task", status["status"]) async def _test_memory_importance_evaluator(): print("=== test_memory_importance_evaluator ===") called = {"value": False} async def fake_importance_eval(content, metadata): called["value"] = True assert "用户:" in content assert "助手:" in content return 0.91 store_path = TEST_DATA_DIR / "importance_test.json" _safe_unlink(store_path) memory = MemorySystem( storage_path=store_path, importance_evaluator=fake_importance_eval, use_vector_db=False, ) stored = await memory.add_qa_pair( user_id="u1", question="请记住我的昵称是小明", answer="好的,我记住了你的昵称是小明", metadata={"source": "test"}, ) assert called["value"] assert stored is not None assert "用户:" in stored.content assert "助手:" in stored.content assert "小明" in stored.content long_term = await memory.list_long_term("u1") assert len(long_term) == 1 # add_message 仅写入短期记忆,不触发长期记忆评分写入。 await memory.add_message(user_id="u1", role="user", content="单条短期消息") long_term_after_single = await memory.list_long_term("u1") assert len(long_term_after_single) == 1 memory_without_eval = MemorySystem( storage_path=TEST_DATA_DIR / "importance_fallback_test.json", use_vector_db=False, ) fallback_score = await memory_without_eval._evaluate_importance("任意内容", None) assert fallback_score == 0.5 await memory.close() await memory_without_eval.close() _safe_unlink(store_path) _safe_unlink(TEST_DATA_DIR / "importance_fallback_test.json") print("ok: memory importance evaluator") def test_basic_chat(): asyncio.run(_test_basic_chat()) def test_memory(): asyncio.run(_test_memory()) def test_personality(): asyncio.run(_test_personality()) def test_skills(): asyncio.run(_test_skills()) def test_skill_commands(): asyncio.run(_test_skill_commands()) def test_personality_commands(): asyncio.run(_test_personality_commands()) def test_model_commands(): asyncio.run(_test_model_commands()) def test_memory_commands(): asyncio.run(_test_memory_commands()) def test_plain_text_output(): asyncio.run(_test_plain_text_output()) def test_skills_creator_autoload(): asyncio.run(_test_skills_creator_autoload()) def test_mcp(): asyncio.run(_test_mcp()) def test_long_task(): asyncio.run(_test_long_task()) def test_memory_importance_evaluator(): asyncio.run(_test_memory_importance_evaluator()) async def main(): print("寮€濮?AI 鍔熻兘娴嬭瘯") await _test_personality() await _test_skills() await _test_skill_commands() await _test_personality_commands() await _test_model_commands() await _test_memory_commands() await _test_plain_text_output() await _test_skills_creator_autoload() await _test_mcp() await _test_long_task() await _test_memory_importance_evaluator() config = get_ai_config() if config.api_key: await _test_basic_chat() await _test_memory() else: print("跳过需要 API Key 的对话/记忆测试") print("娴嬭瘯瀹屾垚") if __name__ == "__main__": asyncio.run(main())