Enhance README and message handler to support GitHub repository URLs. Updated installation instructions for skills to include GitHub URLs and improved error handling for skill source resolution in the codebase.

This commit is contained in:
Mimikko-zeus
2026-03-03 12:45:19 +08:00
parent 0def2deb9b
commit e39e201455
4 changed files with 306 additions and 27 deletions

View File

@@ -61,6 +61,11 @@ python main.py
- 本地技能名(如 `weather` - 本地技能名(如 `weather`
- URLzip 包) - URLzip 包)
- GitHub 简写(`owner/repo``owner/repo#branch` - GitHub 简写(`owner/repo``owner/repo#branch`
- GitHub 仓库 URL`https://github.com/op7418/Humanizer-zh.git`
兼容说明:
- 若源中包含标准技能结构(`skill.json` + `main.py`),按原方式安装
- 若仅包含 `SKILL.md`,会自动生成适配技能并提供 `read_skill_doc` 工具读取文档内容
### 模型命令 ### 模型命令

View File

@@ -13,6 +13,7 @@ import sys
import tempfile import tempfile
import time import time
from typing import Any, Callable, Dict, List, Optional, Tuple from typing import Any, Callable, Dict, List, Optional, Tuple
import urllib.parse
import urllib.request import urllib.request
import zipfile import zipfile
import os import os
@@ -346,17 +347,73 @@ class {"".join(p.capitalize() for p in skill_key.split("_"))}Skill(Skill):
await self.unload_skill(skill_name) await self.unload_skill(skill_name)
return await self.load_skill(skill_name) return await self.load_skill(skill_name)
def _resolve_network_url(self, source: str) -> str: def _parse_github_repo_source(
"""支持 URL 与 GitHub 简写。""" self,
source: str,
) -> Optional[Tuple[str, str, str, Optional[str]]]:
"""解析 GitHub 仓库 URL返回 owner/repo/branch/subpath。"""
try:
parsed = urllib.parse.urlparse(source.strip())
except Exception:
return None
if parsed.scheme not in {"http", "https"}:
return None
if parsed.netloc.lower() != "github.com":
return None
parts = [part for part in parsed.path.split("/") if part]
if len(parts) < 2:
return None
owner = parts[0]
repo = parts[1]
if repo.endswith(".git"):
repo = repo[:-4]
if not owner or not repo:
return None
branch = "main"
subpath: Optional[str] = None
if len(parts) >= 4 and parts[2] == "tree":
branch = parts[3]
if len(parts) > 4:
subpath = "/".join(parts[4:])
elif len(parts) > 2:
# 不支持 /issues、/blob 等深链接
return None
return owner, repo, branch, subpath
def _resolve_network_source(
self,
source: str,
) -> Tuple[str, Optional[str], Optional[str]]:
"""将网络来源解析为下载 URL并返回安装提示信息。"""
source = source.strip() source = source.strip()
github_repo = self._parse_github_repo_source(source)
if github_repo:
owner, repo, branch, subpath = github_repo
codeload_url = f"https://codeload.github.com/{owner}/{repo}/zip/refs/heads/{branch}"
return codeload_url, self.normalize_skill_key(repo), subpath
if source.startswith(("http://", "https://")): if source.startswith(("http://", "https://")):
return source return source, None, None
if self._GITHUB_SHORTCUT_PATTERN.match(source): if self._GITHUB_SHORTCUT_PATTERN.match(source):
repo, _, branch = source.partition("#") repo_ref, _, branch = source.partition("#")
owner, repo = repo_ref.split("/", 1)
if repo.endswith(".git"):
repo = repo[:-4]
branch = branch or "main" branch = branch or "main"
return f"https://codeload.github.com/{repo}/zip/refs/heads/{branch}" codeload_url = f"https://codeload.github.com/{owner}/{repo}/zip/refs/heads/{branch}"
return codeload_url, self.normalize_skill_key(repo), None
raise ValueError("source 必须是 URL 或 owner/repo[#branch]") raise ValueError("source 必须是 URL 或 owner/repo[#branch]")
@@ -397,6 +454,106 @@ class {"".join(p.capitalize() for p in skill_key.split("_"))}Skill(Skill):
return sorted(uniq.items(), key=lambda x: x[0]) return sorted(uniq.items(), key=lambda x: x[0])
def _find_codex_skill_candidates(self, root_dir: Path) -> List[Tuple[str, Path]]:
"""在目录中扫描仅包含 SKILL.md 的候选项。"""
candidates: List[Tuple[str, Path]] = []
for skill_doc in root_dir.rglob("SKILL.md"):
candidate_dir = skill_doc.parent
if (candidate_dir / "skill.json").exists() and (candidate_dir / "main.py").exists():
continue
try:
skill_key = self.normalize_skill_key(candidate_dir.name)
except ValueError:
continue
candidates.append((skill_key, candidate_dir))
uniq: Dict[str, Path] = {}
for key, path in candidates:
uniq[key] = path
return sorted(uniq.items(), key=lambda x: x[0])
def _scope_extract_root(self, extract_root: Path, subpath: Optional[str]) -> Path:
"""将解压目录缩小到 repo 子路径(若指定)。"""
if not subpath:
return extract_root
clean_subpath = Path(subpath.strip("/\\"))
if str(clean_subpath) == ".":
return extract_root
direct = extract_root / clean_subpath
if direct.exists():
return direct
for child in extract_root.iterdir():
candidate = child / clean_subpath
if candidate.exists():
return candidate
return extract_root
@staticmethod
def _extract_markdown_title(markdown_content: str) -> str:
for line in markdown_content.splitlines():
stripped = line.strip()
if stripped.startswith("#"):
return stripped.lstrip("#").strip()
return ""
def _install_codex_skill_adapter(self, source_dir: Path, target_path: Path, skill_key: str):
"""将 Codex SKILL.md 转换为可加载的项目技能。"""
skill_doc = source_dir / "SKILL.md"
if not skill_doc.exists():
raise FileNotFoundError(f"SKILL.md not found in {source_dir}")
content = skill_doc.read_text(encoding="utf-8")
title = self._extract_markdown_title(content)
description = (
f"Imported from SKILL.md: {title}" if title else f"Imported from SKILL.md ({skill_key})"
)
target_path.mkdir(parents=True, exist_ok=True)
(target_path / "SKILL.md").write_text(content, encoding="utf-8")
(target_path / "__init__.py").write_text("", encoding="utf-8")
metadata = {
"name": skill_key,
"version": "1.0.0",
"description": description,
"author": "imported",
"dependencies": [],
"enabled": True,
}
with open(target_path / "skill.json", "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
class_name = "".join(word.capitalize() for word in skill_key.split("_")) + "Skill"
main_code = f'''"""{skill_key} adapter skill (generated from SKILL.md)"""
from pathlib import Path
from src.ai.skills.base import Skill
class {class_name}(Skill):
async def initialize(self):
self.register_tool("read_skill_doc", self.read_skill_doc)
async def read_skill_doc(self) -> str:
skill_doc = Path(__file__).with_name("SKILL.md")
if not skill_doc.exists():
return "SKILL.md not found."
return skill_doc.read_text(encoding="utf-8")
async def cleanup(self):
pass
'''
(target_path / "main.py").write_text(main_code, encoding="utf-8")
def install_skill_from_source( def install_skill_from_source(
self, self,
source: str, source: str,
@@ -410,6 +567,8 @@ class {"".join(p.capitalize() for p in skill_key.split("_"))}Skill(Skill):
with tempfile.TemporaryDirectory(prefix="qqbot_skill_") as tmp: with tempfile.TemporaryDirectory(prefix="qqbot_skill_") as tmp:
tmp_dir = Path(tmp) tmp_dir = Path(tmp)
extract_root: Optional[Path] = None extract_root: Optional[Path] = None
source_hint_key: Optional[str] = None
source_subpath: Optional[str] = None
source_path = Path(source) source_path = Path(source)
if source_path.exists(): if source_path.exists():
@@ -423,7 +582,7 @@ class {"".join(p.capitalize() for p in skill_key.split("_"))}Skill(Skill):
return False, "本地 source 仅支持目录或 zip 文件" return False, "本地 source 仅支持目录或 zip 文件"
else: else:
try: try:
url = self._resolve_network_url(source) url, source_hint_key, source_subpath = self._resolve_network_source(source)
except ValueError as exc: except ValueError as exc:
return False, str(exc) return False, str(exc)
@@ -449,13 +608,15 @@ class {"".join(p.capitalize() for p in skill_key.split("_"))}Skill(Skill):
extract_root = tmp_dir / "extract" extract_root = tmp_dir / "extract"
extract_root = self._scope_extract_root(extract_root, source_subpath)
candidates = self._find_skill_candidates(extract_root) candidates = self._find_skill_candidates(extract_root)
if not candidates: use_codex_adapter = False
return False, "未找到可安装技能(需包含 skill.json 与 main.py"
selected_key: Optional[str] = None selected_key: Optional[str] = None
selected_path: Optional[Path] = None selected_path: Optional[Path] = None
if candidates:
if desired_key: if desired_key:
for key, path in candidates: for key, path in candidates:
if key == desired_key: if key == desired_key:
@@ -470,6 +631,39 @@ class {"".join(p.capitalize() for p in skill_key.split("_"))}Skill(Skill):
names = ", ".join([k for k, _ in candidates]) names = ", ".join([k for k, _ in candidates])
return False, f"检测到多个技能,请指定 skill_name。可选: {names}" return False, f"检测到多个技能,请指定 skill_name。可选: {names}"
selected_key, selected_path = candidates[0] selected_key, selected_path = candidates[0]
else:
codex_candidates = self._find_codex_skill_candidates(extract_root)
if not codex_candidates:
return False, "未找到可安装技能(需包含 skill.json 与 main.py或 SKILL.md"
use_codex_adapter = True
if desired_key:
for key, path in codex_candidates:
if key == desired_key:
selected_key, selected_path = key, path
break
if not selected_path:
if len(codex_candidates) == 1:
selected_key = desired_key
selected_path = codex_candidates[0][1]
else:
names = ", ".join([k for k, _ in codex_candidates])
return False, f"源中未找到技能 {desired_key},可选: {names}"
else:
if source_hint_key:
for key, path in codex_candidates:
if key == source_hint_key:
selected_key, selected_path = key, path
break
if not selected_path:
if len(codex_candidates) > 1:
names = ", ".join([k for k, _ in codex_candidates])
return False, f"检测到多个 SKILL.md 技能,请指定 skill_name。可选: {names}"
selected_key, selected_path = codex_candidates[0]
if source_hint_key:
selected_key = source_hint_key
assert selected_key is not None and selected_path is not None assert selected_key is not None and selected_path is not None
target_path = self._get_skill_path(selected_key) target_path = self._get_skill_path(selected_key)
@@ -479,12 +673,16 @@ class {"".join(p.capitalize() for p in skill_key.split("_"))}Skill(Skill):
return False, f"技能已存在: {selected_key}" return False, f"技能已存在: {selected_key}"
shutil.rmtree(target_path) shutil.rmtree(target_path)
if use_codex_adapter:
self._install_codex_skill_adapter(selected_path, target_path, selected_key)
else:
shutil.copytree( shutil.copytree(
selected_path, selected_path,
target_path, target_path,
ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".git", ".github"), ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".git", ".github"),
) )
self._ensure_skill_package_layout(target_path, selected_key) self._ensure_skill_package_layout(target_path, selected_key)
importlib.invalidate_caches() importlib.invalidate_caches()
logger.info(f"✅ 安装技能成功: {selected_key} <- {source}") logger.info(f"✅ 安装技能成功: {selected_key} <- {source}")

View File

@@ -73,7 +73,7 @@ class MessageHandler:
"技能命令:\n" "技能命令:\n"
f"{command_name}{command_name} list\n" f"{command_name}{command_name} list\n"
f"{command_name} install <source> [skill_name]\n" f"{command_name} install <source> [skill_name]\n"
f" source 支持本地技能名、URL、owner/repo、owner/repo#branch\n" f" source 支持本地技能名、URL、owner/repo、owner/repo#branch、GitHub 仓库 URL(.git)\n"
f"{command_name} uninstall <skill_name>\n" f"{command_name} uninstall <skill_name>\n"
f"{command_name} reload <skill_name>" f"{command_name} reload <skill_name>"
) )

View File

@@ -0,0 +1,76 @@
import asyncio
import io
from pathlib import Path
import zipfile
from src.ai.skills.base import SkillsManager
def _build_codex_skill_zip_bytes(markdown_text: str, root_name: str = "Humanizer-zh-main") -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"{root_name}/SKILL.md", markdown_text)
return buffer.getvalue()
def test_resolve_network_source_supports_github_git_url(tmp_path: Path):
manager = SkillsManager(tmp_path / "skills")
url, hint_key, subpath = manager._resolve_network_source(
"https://github.com/op7418/Humanizer-zh.git"
)
assert url == "https://codeload.github.com/op7418/Humanizer-zh/zip/refs/heads/main"
assert hint_key == "humanizer_zh"
assert subpath is None
def test_install_skill_from_local_skill_markdown_source(tmp_path: Path):
source_dir = tmp_path / "Humanizer-zh-main"
source_dir.mkdir(parents=True, exist_ok=True)
(source_dir / "SKILL.md").write_text(
"# Humanizer-zh\n\nUse natural and human-like Chinese tone.\n",
encoding="utf-8",
)
manager = SkillsManager(tmp_path / "skills")
ok, installed_key = manager.install_skill_from_source(str(source_dir), skill_name="humanizer_zh")
assert ok
assert installed_key == "humanizer_zh"
installed_dir = tmp_path / "skills" / "humanizer_zh"
assert (installed_dir / "skill.json").exists()
assert (installed_dir / "main.py").exists()
assert (installed_dir / "SKILL.md").exists()
assert asyncio.run(manager.load_skill("humanizer_zh"))
tools = manager.get_all_tools()
assert "humanizer_zh.read_skill_doc" in tools
text = asyncio.run(tools["humanizer_zh.read_skill_doc"]())
assert "Humanizer-zh" in text
def test_install_skill_from_github_git_url_uses_repo_zip_and_markdown_adapter(
tmp_path: Path, monkeypatch
):
manager = SkillsManager(tmp_path / "skills")
zip_bytes = _build_codex_skill_zip_bytes(
"# Humanizer-zh\n\nUse natural and human-like Chinese tone.\n"
)
captured_urls = []
def fake_download(url: str, output_zip: Path):
captured_urls.append(url)
output_zip.write_bytes(zip_bytes)
monkeypatch.setattr(manager, "_download_zip", fake_download)
ok, installed_key = manager.install_skill_from_source(
"https://github.com/op7418/Humanizer-zh.git"
)
assert ok
assert installed_key == "humanizer_zh"
assert captured_urls == [
"https://codeload.github.com/op7418/Humanizer-zh/zip/refs/heads/main"
]
assert (tmp_path / "skills" / "humanizer_zh" / "SKILL.md").exists()