Files
2026-06-04 15:21:56 +08:00

144 lines
4.9 KiB
Python

from __future__ import annotations
import os
import json
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
def read_env_file(env_path: Path) -> dict[str, str]:
env: dict[str, str] = {}
if not env_path.exists():
return env
text = env_path.read_text(encoding="utf-8", errors="ignore")
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
env[key.strip()] = value.strip().strip('"').strip("'")
return env
def load_env() -> dict[str, str]:
env: dict[str, str] = {}
env.update(read_env_file(PROJECT_ROOT / ".env"))
env.update(read_env_file(Path.home() / ".hermes" / ".env"))
env.update({key: value for key, value in os.environ.items() if value})
return env
def first_env(env: dict[str, str], *names: str) -> str:
for name in names:
value = (env.get(name) or "").strip()
if value:
return value
return ""
def _load_simple_yaml(path: Path) -> dict[str, object]:
if not path.exists():
return {}
root: dict[str, object] = {}
stack: list[tuple[int, dict[str, object]]] = [(-1, root)]
for raw_line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
if not raw_line.strip() or raw_line.lstrip().startswith("#") or ":" not in raw_line:
continue
indent = len(raw_line) - len(raw_line.lstrip(" "))
key, value = raw_line.strip().split(":", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
while stack and indent <= stack[-1][0]:
stack.pop()
current = stack[-1][1]
if value:
current[key] = value
else:
child: dict[str, object] = {}
current[key] = child
stack.append((indent, child))
return root
def _env_with_hermes(env: dict[str, str], hermes_dir: Path) -> dict[str, str]:
merged = dict(read_env_file(hermes_dir / ".env"))
merged.update(env)
return merged
def _provider_env_names(provider: str) -> tuple[str, str, str]:
prefix = provider.upper().replace("-", "_")
return f"{prefix}_API_KEY", f"{prefix}_BASE_URL", f"{prefix}_MODEL"
def _auth_json_key(env: dict[str, str], hermes_dir: Path, provider: str) -> str:
auth_path = hermes_dir / "auth.json"
if not auth_path.exists() or not provider:
return ""
try:
auth = json.loads(auth_path.read_text(encoding="utf-8"))
except Exception:
return ""
pool = auth.get("credential_pool", {}) or {}
provider_keys = [provider, provider.replace("-", "_")]
for key in provider_keys:
creds = pool.get(key, []) or []
if not creds:
continue
cred = creds[0]
source = str(cred.get("source") or "")
if source.startswith("env:"):
resolved = first_env(env, source[4:])
if resolved:
return resolved
token = str(cred.get("access_token") or "").strip()
if token:
return token
return ""
def resolve_llm_config(env: dict[str, str], *, hermes_dir: Path | None = None) -> dict[str, str]:
hermes_dir = hermes_dir or Path.home() / ".hermes"
env = _env_with_hermes(env, hermes_dir)
hermes_config = _load_simple_yaml(hermes_dir / "config.yaml")
model_config = hermes_config.get("model", {}) if isinstance(hermes_config.get("model"), dict) else {}
provider = str(model_config.get("provider") or "").strip()
provider_key, provider_base_url, provider_model = _provider_env_names(provider) if provider else ("", "", "")
api_key = first_env(env, "LLM_API_KEY")
base_url = first_env(env, "LLM_BASE_URL")
model = first_env(env, "LLM_MODEL")
if not api_key and provider:
api_key = first_env(env, provider_key) or _auth_json_key(env, hermes_dir, provider)
if not base_url and provider:
base_url = first_env(env, provider_base_url) or str(model_config.get("base_url") or "").strip()
if not model and provider:
model = first_env(env, provider_model) or str(model_config.get("default") or "").strip()
if not api_key:
api_key = first_env(env, "SUB2API_API_KEY", "XIAOMI_API_KEY", "OPENROUTER_API_KEY")
if not base_url:
base_url = first_env(env, "SUB2API_BASE_URL", "XIAOMI_BASE_URL", "OPENROUTER_BASE_URL")
if not model:
model = first_env(env, "SUB2API_MODEL", "XIAOMI_MODEL")
missing = [
name
for name, value in (
("LLM_API_KEY", api_key),
("LLM_BASE_URL", base_url),
("LLM_MODEL", model),
)
if not value
]
if missing:
raise ValueError("missing_llm_config: " + ",".join(missing))
return {"api_key": api_key, "base_url": base_url, "model": model}
def resolve_blog_token(env: dict[str, str]) -> str:
return first_env(env, "BLOG_SERVICE_TOKEN", "EPHRON_SERVICE_TOKEN")