""" Skills 系统 - 可扩展技能插件框架。 """ from dataclasses import dataclass import importlib import inspect import json from pathlib import Path import re import shutil import sys import tempfile import time from typing import Any, Callable, Dict, List, Optional, Tuple import urllib.request import zipfile import os import stat from src.utils.logger import setup_logger logger = setup_logger("SkillsSystem") @dataclass class SkillMetadata: """技能元数据。""" name: str version: str description: str author: str dependencies: List[str] enabled: bool = True class Skill: """技能基类。""" def __init__(self): self.metadata: Optional[SkillMetadata] = None self.tools: Dict[str, Callable] = {} self.manager = None async def initialize(self): """初始化技能。""" async def cleanup(self): """清理技能。""" def get_tools(self) -> Dict[str, Callable]: """获取技能提供的工具。""" return self.tools def register_tool(self, name: str, func: Callable): """注册工具。""" self.tools[name] = func class SkillsManager: """技能管理器。""" _SKILL_KEY_PATTERN = re.compile(r"[^a-zA-Z0-9_]") _GITHUB_SHORTCUT_PATTERN = re.compile( r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+(?:#[A-Za-z0-9_.-]+)?$" ) def __init__(self, skills_dir: Path): self.skills_dir = skills_dir self.skills: Dict[str, Skill] = {} self.skills_dir.mkdir(parents=True, exist_ok=True) logger.info(f"✅ Skills 目录: {skills_dir}") @classmethod def normalize_skill_key(cls, raw_name: str) -> str: """将任意输入规范化为可导入的 Python 包名。""" key = raw_name.strip().lower().replace("-", "_").replace(" ", "_") key = cls._SKILL_KEY_PATTERN.sub("_", key) key = re.sub(r"_+", "_", key).strip("_") if not key: raise ValueError("技能名不能为空") if key[0].isdigit(): key = f"skill_{key}" return key def _get_skill_path(self, skill_name: str) -> Path: return self.skills_dir / self.normalize_skill_key(skill_name) @staticmethod def _on_rmtree_error(func, path, exc_info): """Handle Windows readonly/locked file deletion errors.""" try: os.chmod(path, stat.S_IWRITE) func(path) except Exception: # Keep original failure path for upper retry logic. pass def _read_metadata(self, skill_path: Path, fallback_name: str) -> Dict[str, Any]: metadata_file = skill_path / "skill.json" if metadata_file.exists(): with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) else: metadata = {} metadata.setdefault("name", fallback_name) metadata.setdefault("version", "1.0.0") metadata.setdefault("description", f"{fallback_name} skill") metadata.setdefault("author", "unknown") metadata.setdefault("dependencies", []) metadata.setdefault("enabled", True) with open(metadata_file, "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) return metadata def _ensure_skill_package_layout(self, skill_path: Path, skill_key: str): """确保技能目录满足运行最小结构。""" skill_path.mkdir(parents=True, exist_ok=True) init_file = skill_path / "__init__.py" if not init_file.exists(): init_file.write_text("", encoding="utf-8") main_file = skill_path / "main.py" if not main_file.exists(): template = f'''"""{skill_key} skill""" from src.ai.skills.base import Skill class {"".join(p.capitalize() for p in skill_key.split("_"))}Skill(Skill): async def initialize(self): self.register_tool("ping", self.ping) async def ping(self, text: str = "ok") -> str: return text async def cleanup(self): pass ''' main_file.write_text(template, encoding="utf-8") self._read_metadata(skill_path, skill_key) async def load_skill(self, skill_name: str) -> bool: """加载技能。""" try: skill_name = self.normalize_skill_key(skill_name) if skill_name in self.skills: logger.info(f"✅ 技能已加载: {skill_name}") return True skill_path = self._get_skill_path(skill_name) if not skill_path.exists(): logger.error(f"❌ 技能不存在: {skill_name}") return False metadata_file = skill_path / "skill.json" if not metadata_file.exists(): logger.error(f"❌ 技能元数据不存在: {skill_name}") return False with open(metadata_file, "r", encoding="utf-8") as f: metadata_dict = json.load(f) metadata = SkillMetadata(**metadata_dict) if not metadata.enabled: logger.info(f"⏸️ 技能已禁用: {skill_name}") return False module_path = f"skills.{skill_name}.main" importlib.invalidate_caches() try: old_dont_write = sys.dont_write_bytecode sys.dont_write_bytecode = True try: if module_path in sys.modules: module = importlib.reload(sys.modules[module_path]) else: module = importlib.import_module(module_path) finally: sys.dont_write_bytecode = old_dont_write except Exception as exc: logger.error(f"❌ 无法导入技能模块 {module_path}: {exc}") return False skill_class = None for _, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass(obj, Skill) and obj != Skill: skill_class = obj break if not skill_class: logger.error(f"❌ 技能中未找到 Skill 子类: {skill_name}") return False skill = skill_class() skill.metadata = metadata skill.manager = self await skill.initialize() self.skills[skill_name] = skill logger.info(f"✅ 加载技能: {skill_name} v{metadata.version}") return True except Exception as exc: logger.error(f"❌ 加载技能失败 {skill_name}: {exc}") return False async def load_all_skills(self): """加载所有可用技能。""" for skill_name in self.list_available_skills(): await self.load_skill(skill_name) async def unload_skill(self, skill_name: str) -> bool: """仅卸载内存中的技能。""" skill_name = self.normalize_skill_key(skill_name) if skill_name not in self.skills: return False skill = self.skills[skill_name] await skill.cleanup() del self.skills[skill_name] sys.modules.pop(f"skills.{skill_name}.main", None) sys.modules.pop(f"skills.{skill_name}", None) importlib.invalidate_caches() logger.info(f"✅ 卸载技能: {skill_name}") return True async def uninstall_skill(self, skill_name: str, delete_files: bool = True) -> bool: """卸载技能并可选删除文件。""" skill_name = self.normalize_skill_key(skill_name) if skill_name in self.skills: await self.unload_skill(skill_name) if not delete_files: return True skill_path = self._get_skill_path(skill_name) if not skill_path.exists(): return False removed = False for _ in range(3): try: shutil.rmtree(skill_path, ignore_errors=False, onerror=self._on_rmtree_error) except PermissionError: pass if not skill_path.exists(): removed = True break time.sleep(0.2) if not removed: try: metadata_file = skill_path / "skill.json" metadata = {} if metadata_file.exists(): with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) metadata["enabled"] = False with open(metadata_file, "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) logger.warning(f"⚠️ 删除目录失败,已软卸载技能: {skill_name}") return True except Exception: return False importlib.invalidate_caches() logger.info(f"✅ 删除技能目录: {skill_name}") return True def get_skill(self, skill_name: str) -> Optional[Skill]: """获取已加载技能实例。""" skill_name = self.normalize_skill_key(skill_name) return self.skills.get(skill_name) def list_skills(self) -> List[str]: """列出已加载技能。""" return sorted(self.skills.keys()) def list_available_skills(self) -> List[str]: """列出可加载技能目录。""" if not self.skills_dir.exists(): return [] available: List[str] = [] for skill_dir in self.skills_dir.iterdir(): if not skill_dir.is_dir() or skill_dir.name.startswith("_"): continue if (skill_dir / "skill.json").exists() and (skill_dir / "main.py").exists(): try: with open(skill_dir / "skill.json", "r", encoding="utf-8") as f: metadata = json.load(f) if not metadata.get("enabled", True): continue available.append(self.normalize_skill_key(skill_dir.name)) except ValueError: continue except Exception: continue return sorted(set(available)) def get_all_tools(self) -> Dict[str, Callable]: """获取全部技能工具。""" all_tools: Dict[str, Callable] = {} for skill_name, skill in self.skills.items(): for tool_name, tool_func in skill.get_tools().items(): all_tools[f"{skill_name}.{tool_name}"] = tool_func return all_tools async def reload_skill(self, skill_name: str) -> bool: """重载技能。""" skill_name = self.normalize_skill_key(skill_name) if skill_name in self.skills: await self.unload_skill(skill_name) return await self.load_skill(skill_name) def _resolve_network_url(self, source: str) -> str: """支持 URL 与 GitHub 简写。""" source = source.strip() if source.startswith(("http://", "https://")): return source if self._GITHUB_SHORTCUT_PATTERN.match(source): repo, _, branch = source.partition("#") branch = branch or "main" return f"https://codeload.github.com/{repo}/zip/refs/heads/{branch}" raise ValueError("source 必须是 URL 或 owner/repo[#branch]") def _download_zip(self, url: str, output_zip: Path): """下载 zip 包到本地。""" req = urllib.request.Request(url, headers={"User-Agent": "QQBot-Skills/1.0"}) with urllib.request.urlopen(req, timeout=30) as resp: data = resp.read() output_zip.write_bytes(data) def _find_skill_candidates(self, root_dir: Path) -> List[Tuple[str, Path]]: """在目录中扫描技能候选项。""" candidates: List[Tuple[str, Path]] = [] for metadata_file in root_dir.rglob("skill.json"): candidate_dir = metadata_file.parent if not (candidate_dir / "main.py").exists(): continue try: with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) raw_name = str(metadata.get("name") or candidate_dir.name) except Exception: raw_name = candidate_dir.name try: skill_key = self.normalize_skill_key(raw_name) except ValueError: continue candidates.append((skill_key, candidate_dir)) uniq: Dict[str, Path] = {} for key, path in candidates: uniq[key] = path return sorted(uniq.items(), key=lambda x: x[0]) def install_skill_from_source( self, source: str, skill_name: Optional[str] = None, overwrite: bool = False, ) -> Tuple[bool, str]: """从网络或本地源安装技能目录(仅落盘,不自动加载)。""" desired_key = self.normalize_skill_key(skill_name) if skill_name else None with tempfile.TemporaryDirectory(prefix="qqbot_skill_") as tmp: tmp_dir = Path(tmp) extract_root: Optional[Path] = None source_path = Path(source) if source_path.exists(): if source_path.is_dir(): extract_root = source_path elif source_path.is_file() and source_path.suffix.lower() == ".zip": with zipfile.ZipFile(source_path, "r") as zf: zf.extractall(tmp_dir / "extract") extract_root = tmp_dir / "extract" else: return False, "本地 source 仅支持目录或 zip 文件" else: try: url = self._resolve_network_url(source) except ValueError as exc: return False, str(exc) download_zip = tmp_dir / "download.zip" try: self._download_zip(url, download_zip) except Exception as exc: # GitHub 简写默认 main 失败时尝试 master if "codeload.github.com" in url and url.endswith("/main"): fallback = url[:-4] + "master" try: self._download_zip(fallback, download_zip) except Exception: return False, f"下载技能失败: {exc}" else: return False, f"下载技能失败: {exc}" try: with zipfile.ZipFile(download_zip, "r") as zf: zf.extractall(tmp_dir / "extract") except Exception as exc: return False, f"解压技能失败: {exc}" extract_root = tmp_dir / "extract" candidates = self._find_skill_candidates(extract_root) if not candidates: return False, "未找到可安装技能(需包含 skill.json 与 main.py)" selected_key: Optional[str] = None selected_path: Optional[Path] = None if desired_key: for key, path in candidates: if key == desired_key: selected_key, selected_path = key, path break if not selected_path: names = ", ".join([k for k, _ in candidates]) return False, f"源中未找到技能 {desired_key},可选: {names}" else: if len(candidates) > 1: names = ", ".join([k for k, _ in candidates]) return False, f"检测到多个技能,请指定 skill_name。可选: {names}" selected_key, selected_path = candidates[0] assert selected_key is not None and selected_path is not None target_path = self._get_skill_path(selected_key) if target_path.exists(): if not overwrite: return False, f"技能已存在: {selected_key}" shutil.rmtree(target_path) shutil.copytree( selected_path, target_path, ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".git", ".github"), ) self._ensure_skill_package_layout(target_path, selected_key) importlib.invalidate_caches() logger.info(f"✅ 安装技能成功: {selected_key} <- {source}") return True, selected_key def create_skill_template( skill_name: str, output_dir: Path, description: str = "技能描述", author: str = "QQBot", ): """创建技能模板。""" skill_key = SkillsManager.normalize_skill_key(skill_name) skill_dir = output_dir / skill_key skill_dir.mkdir(parents=True, exist_ok=True) metadata = { "name": skill_key, "version": "1.0.0", "description": description, "author": author, "dependencies": [], "enabled": True, } with open(skill_dir / "skill.json", "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) class_name = "".join(word.capitalize() for word in skill_key.split("_")) + "Skill" main_code = f'''"""{skill_key} skill""" from src.ai.skills.base import Skill class {class_name}(Skill): async def initialize(self): self.register_tool("example_tool", self.example_tool) async def example_tool(self, text: str) -> str: return f"{skill_key} 收到: {{text}}" async def cleanup(self): pass ''' with open(skill_dir / "main.py", "w", encoding="utf-8") as f: f.write(main_code) with open(skill_dir / "__init__.py", "w", encoding="utf-8") as f: f.write("") readme = f"""# {skill_key} ## 描述 {description} ## 工具 - example_tool(text) """ with open(skill_dir / "README.md", "w", encoding="utf-8") as f: f.write(readme) logger.info(f"✅ 创建技能模板: {skill_dir}")