from __future__ import annotations import os import json from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[1] def read_env_file(env_path: Path) -> dict[str, str]: env: dict[str, str] = {} if not env_path.exists(): return env text = env_path.read_text(encoding="utf-8", errors="ignore") for line in text.splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) env[key.strip()] = value.strip().strip('"').strip("'") return env def load_env() -> dict[str, str]: env: dict[str, str] = {} env.update(read_env_file(PROJECT_ROOT / ".env")) env.update(read_env_file(Path.home() / ".hermes" / ".env")) env.update({key: value for key, value in os.environ.items() if value}) return env def first_env(env: dict[str, str], *names: str) -> str: for name in names: value = (env.get(name) or "").strip() if value: return value return "" def _load_simple_yaml(path: Path) -> dict[str, object]: if not path.exists(): return {} root: dict[str, object] = {} stack: list[tuple[int, dict[str, object]]] = [(-1, root)] for raw_line in path.read_text(encoding="utf-8", errors="ignore").splitlines(): if not raw_line.strip() or raw_line.lstrip().startswith("#") or ":" not in raw_line: continue indent = len(raw_line) - len(raw_line.lstrip(" ")) key, value = raw_line.strip().split(":", 1) key = key.strip() value = value.strip().strip('"').strip("'") while stack and indent <= stack[-1][0]: stack.pop() current = stack[-1][1] if value: current[key] = value else: child: dict[str, object] = {} current[key] = child stack.append((indent, child)) return root def _env_with_hermes(env: dict[str, str], hermes_dir: Path) -> dict[str, str]: merged = dict(read_env_file(hermes_dir / ".env")) merged.update(env) return merged def _provider_env_names(provider: str) -> tuple[str, str, str]: prefix = provider.upper().replace("-", "_") return f"{prefix}_API_KEY", f"{prefix}_BASE_URL", f"{prefix}_MODEL" def _auth_json_key(env: dict[str, str], hermes_dir: Path, provider: str) -> str: auth_path = hermes_dir / "auth.json" if not auth_path.exists() or not provider: return "" try: auth = json.loads(auth_path.read_text(encoding="utf-8")) except Exception: return "" pool = auth.get("credential_pool", {}) or {} provider_keys = [provider, provider.replace("-", "_")] for key in provider_keys: creds = pool.get(key, []) or [] if not creds: continue cred = creds[0] source = str(cred.get("source") or "") if source.startswith("env:"): resolved = first_env(env, source[4:]) if resolved: return resolved token = str(cred.get("access_token") or "").strip() if token: return token return "" def resolve_llm_config(env: dict[str, str], *, hermes_dir: Path | None = None) -> dict[str, str]: hermes_dir = hermes_dir or Path.home() / ".hermes" env = _env_with_hermes(env, hermes_dir) hermes_config = _load_simple_yaml(hermes_dir / "config.yaml") model_config = hermes_config.get("model", {}) if isinstance(hermes_config.get("model"), dict) else {} provider = str(model_config.get("provider") or "").strip() provider_key, provider_base_url, provider_model = _provider_env_names(provider) if provider else ("", "", "") api_key = first_env(env, "LLM_API_KEY") base_url = first_env(env, "LLM_BASE_URL") model = first_env(env, "LLM_MODEL") if not api_key and provider: api_key = first_env(env, provider_key) or _auth_json_key(env, hermes_dir, provider) if not base_url and provider: base_url = first_env(env, provider_base_url) or str(model_config.get("base_url") or "").strip() if not model and provider: model = first_env(env, provider_model) or str(model_config.get("default") or "").strip() if not api_key: api_key = first_env(env, "SUB2API_API_KEY", "XIAOMI_API_KEY", "OPENROUTER_API_KEY") if not base_url: base_url = first_env(env, "SUB2API_BASE_URL", "XIAOMI_BASE_URL", "OPENROUTER_BASE_URL") if not model: model = first_env(env, "SUB2API_MODEL", "XIAOMI_MODEL") missing = [ name for name, value in ( ("LLM_API_KEY", api_key), ("LLM_BASE_URL", base_url), ("LLM_MODEL", model), ) if not value ] if missing: raise ValueError("missing_llm_config: " + ",".join(missing)) return {"api_key": api_key, "base_url": base_url, "model": model} def resolve_blog_token(env: dict[str, str]) -> str: return first_env(env, "BLOG_SERVICE_TOKEN", "EPHRON_SERVICE_TOKEN")