144 lines
4.9 KiB
Python
144 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import json
|
|
from pathlib import Path
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
|
|
def read_env_file(env_path: Path) -> dict[str, str]:
|
|
env: dict[str, str] = {}
|
|
if not env_path.exists():
|
|
return env
|
|
text = env_path.read_text(encoding="utf-8", errors="ignore")
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
env[key.strip()] = value.strip().strip('"').strip("'")
|
|
return env
|
|
|
|
|
|
def load_env() -> dict[str, str]:
|
|
env: dict[str, str] = {}
|
|
env.update(read_env_file(PROJECT_ROOT / ".env"))
|
|
env.update(read_env_file(Path.home() / ".hermes" / ".env"))
|
|
env.update({key: value for key, value in os.environ.items() if value})
|
|
return env
|
|
|
|
|
|
def first_env(env: dict[str, str], *names: str) -> str:
|
|
for name in names:
|
|
value = (env.get(name) or "").strip()
|
|
if value:
|
|
return value
|
|
return ""
|
|
|
|
|
|
def _load_simple_yaml(path: Path) -> dict[str, object]:
|
|
if not path.exists():
|
|
return {}
|
|
root: dict[str, object] = {}
|
|
stack: list[tuple[int, dict[str, object]]] = [(-1, root)]
|
|
for raw_line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
if not raw_line.strip() or raw_line.lstrip().startswith("#") or ":" not in raw_line:
|
|
continue
|
|
indent = len(raw_line) - len(raw_line.lstrip(" "))
|
|
key, value = raw_line.strip().split(":", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
while stack and indent <= stack[-1][0]:
|
|
stack.pop()
|
|
current = stack[-1][1]
|
|
if value:
|
|
current[key] = value
|
|
else:
|
|
child: dict[str, object] = {}
|
|
current[key] = child
|
|
stack.append((indent, child))
|
|
return root
|
|
|
|
|
|
def _env_with_hermes(env: dict[str, str], hermes_dir: Path) -> dict[str, str]:
|
|
merged = dict(read_env_file(hermes_dir / ".env"))
|
|
merged.update(env)
|
|
return merged
|
|
|
|
|
|
def _provider_env_names(provider: str) -> tuple[str, str, str]:
|
|
prefix = provider.upper().replace("-", "_")
|
|
return f"{prefix}_API_KEY", f"{prefix}_BASE_URL", f"{prefix}_MODEL"
|
|
|
|
|
|
def _auth_json_key(env: dict[str, str], hermes_dir: Path, provider: str) -> str:
|
|
auth_path = hermes_dir / "auth.json"
|
|
if not auth_path.exists() or not provider:
|
|
return ""
|
|
try:
|
|
auth = json.loads(auth_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return ""
|
|
pool = auth.get("credential_pool", {}) or {}
|
|
provider_keys = [provider, provider.replace("-", "_")]
|
|
for key in provider_keys:
|
|
creds = pool.get(key, []) or []
|
|
if not creds:
|
|
continue
|
|
cred = creds[0]
|
|
source = str(cred.get("source") or "")
|
|
if source.startswith("env:"):
|
|
resolved = first_env(env, source[4:])
|
|
if resolved:
|
|
return resolved
|
|
token = str(cred.get("access_token") or "").strip()
|
|
if token:
|
|
return token
|
|
return ""
|
|
|
|
|
|
def resolve_llm_config(env: dict[str, str], *, hermes_dir: Path | None = None) -> dict[str, str]:
|
|
hermes_dir = hermes_dir or Path.home() / ".hermes"
|
|
env = _env_with_hermes(env, hermes_dir)
|
|
hermes_config = _load_simple_yaml(hermes_dir / "config.yaml")
|
|
model_config = hermes_config.get("model", {}) if isinstance(hermes_config.get("model"), dict) else {}
|
|
provider = str(model_config.get("provider") or "").strip()
|
|
provider_key, provider_base_url, provider_model = _provider_env_names(provider) if provider else ("", "", "")
|
|
|
|
api_key = first_env(env, "LLM_API_KEY")
|
|
base_url = first_env(env, "LLM_BASE_URL")
|
|
model = first_env(env, "LLM_MODEL")
|
|
|
|
if not api_key and provider:
|
|
api_key = first_env(env, provider_key) or _auth_json_key(env, hermes_dir, provider)
|
|
if not base_url and provider:
|
|
base_url = first_env(env, provider_base_url) or str(model_config.get("base_url") or "").strip()
|
|
if not model and provider:
|
|
model = first_env(env, provider_model) or str(model_config.get("default") or "").strip()
|
|
|
|
if not api_key:
|
|
api_key = first_env(env, "SUB2API_API_KEY", "XIAOMI_API_KEY", "OPENROUTER_API_KEY")
|
|
if not base_url:
|
|
base_url = first_env(env, "SUB2API_BASE_URL", "XIAOMI_BASE_URL", "OPENROUTER_BASE_URL")
|
|
if not model:
|
|
model = first_env(env, "SUB2API_MODEL", "XIAOMI_MODEL")
|
|
|
|
missing = [
|
|
name
|
|
for name, value in (
|
|
("LLM_API_KEY", api_key),
|
|
("LLM_BASE_URL", base_url),
|
|
("LLM_MODEL", model),
|
|
)
|
|
if not value
|
|
]
|
|
if missing:
|
|
raise ValueError("missing_llm_config: " + ",".join(missing))
|
|
return {"api_key": api_key, "base_url": base_url, "model": model}
|
|
|
|
|
|
def resolve_blog_token(env: dict[str, str]) -> str:
|
|
return first_env(env, "BLOG_SERVICE_TOKEN", "EPHRON_SERVICE_TOKEN")
|