From 2159ee733b2b6850727ad58c446b058e7ae8bd0c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 10 Jun 2026 21:55:29 +0800 Subject: [PATCH] Improve AI daily report operations and dedupe observability --- ai_daily_report/audit.py | 89 ++++++++++++++++++ ai_daily_report/cli.py | 6 ++ ai_daily_report/clients.py | 41 ++++++++- ai_daily_report/collect.py | 6 +- ai_daily_report/models.py | 1 + ai_daily_report/observability.py | 54 +++++++++++ ai_daily_report/pipeline.py | 10 +- ai_daily_report/quality_gate.py | 15 ++- ai_daily_report/runner.py | 34 +++++-- ai_daily_report/semantic_dedupe.py | 91 +++++++++++++++---- config/pipeline.json | 32 +++++++ config/sources.json | 53 ++++++----- docs/ops-thresholds.generated.md | 33 +++++++ scripts/generate_ops_docs.py | 41 +++++++++ skill/scripts/weekly_audit.py | 24 +++++ .../history_replay_2026_06_04_2026_06_10.json | 74 +++++++++++++++ tests/test_audit.py | 42 +++++++++ tests/test_clients.py | 15 +++ tests/test_env_config.py | 5 +- tests/test_generated_docs.py | 17 ++++ tests/test_history_replay_fixtures.py | 67 ++++++++++++++ tests/test_observability.py | 34 +++++++ tests/test_stage3_semantic_dedupe.py | 34 +++++++ 23 files changed, 761 insertions(+), 57 deletions(-) create mode 100644 ai_daily_report/audit.py create mode 100644 ai_daily_report/observability.py create mode 100644 docs/ops-thresholds.generated.md create mode 100644 scripts/generate_ops_docs.py create mode 100644 skill/scripts/weekly_audit.py create mode 100644 tests/fixtures/history_replay_2026_06_04_2026_06_10.json create mode 100644 tests/test_audit.py create mode 100644 tests/test_generated_docs.py create mode 100644 tests/test_history_replay_fixtures.py create mode 100644 tests/test_observability.py diff --git a/ai_daily_report/audit.py b/ai_daily_report/audit.py new file mode 100644 index 0000000..0c6c2ba --- /dev/null +++ b/ai_daily_report/audit.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + + +def load_run_report(path: Path) -> dict[str, Any] | None: + report_path = path / "run_report.json" if path.is_dir() else path + if not report_path.exists(): + return None + try: + value = json.loads(report_path.read_text(encoding="utf-8")) + except Exception: + return None + return value if isinstance(value, dict) else None + + +def summarize_reports(out_dir: Path, *, limit_days: int = 7) -> dict[str, Any]: + run_dirs = sorted([path for path in out_dir.iterdir() if path.is_dir()], reverse=True)[:limit_days] + rows: list[dict[str, Any]] = [] + totals: dict[str, Any] = { + "source_failures": 0, + "duplicate_candidates": 0, + "final_items": 0, + "fallback_items": 0, + "quality_warnings": 0, + "quality_blocks": 0, + } + for run_dir in sorted(run_dirs): + report = load_run_report(run_dir) + if not report: + continue + quality_gate = report.get("quality_gate", {}) or {} + stage2_8 = report.get("stage2_8", {}) or {} + stage4 = report.get("stage4", {}) or {} + stage5 = report.get("stage5", {}) or {} + stage8 = report.get("stage8", {}) or {} + fallback_count = int(stage4.get("fallback_count", stage4.get("fallback_item_count", 0)) or 0) + final_count = int(stage5.get("output_count", stage4.get("output_count", 0)) or 0) + source_failures = len(quality_gate.get("source_failures", []) or []) + duplicate_candidates = int(stage2_8.get("candidate_group_count", 0) or 0) + warnings = len(quality_gate.get("warnings", []) or []) + blocks = len(quality_gate.get("blocking_errors", []) or []) + row = { + "date": run_dir.name, + "source_failures": source_failures, + "duplicate_candidates": duplicate_candidates, + "final_items": final_count, + "fallback_items": fallback_count, + "fallback_ratio": round(fallback_count / final_count, 4) if final_count else 0, + "quality_warnings": warnings, + "quality_blocks": blocks, + "publish_status": stage8.get("status"), + "publish_slug": stage8.get("slug"), + } + rows.append(row) + totals["source_failures"] += source_failures + totals["duplicate_candidates"] += duplicate_candidates + totals["final_items"] += final_count + totals["fallback_items"] += fallback_count + totals["quality_warnings"] += warnings + totals["quality_blocks"] += blocks + totals["fallback_ratio"] = round(totals["fallback_items"] / totals["final_items"], 4) if totals["final_items"] else 0 + return {"run_count": len(rows), "totals": totals, "runs": rows} + + +def render_markdown(summary: dict[str, Any]) -> str: + totals = summary.get("totals", {}) + lines = [ + "# AI日报每周自动审计报告", + "", + f"- 覆盖运行数:{summary.get('run_count', 0)}", + f"- 源失败次数:{totals.get('source_failures', 0)}", + f"- 重复候选数:{totals.get('duplicate_candidates', 0)}", + f"- 最终条数:{totals.get('final_items', 0)}", + f"- fallback ratio:{totals.get('fallback_ratio', 0)}", + f"- 质量门禁 warning/block:{totals.get('quality_warnings', 0)}/{totals.get('quality_blocks', 0)}", + "", + "| 日期 | 源失败 | 重复候选 | 最终条数 | fallback | warning | block | 发布 | slug |", + "|---|---:|---:|---:|---:|---:|---:|---|---|", + ] + for row in summary.get("runs", []) or []: + lines.append( + f"| {row['date']} | {row['source_failures']} | {row['duplicate_candidates']} | " + f"{row['final_items']} | {row['fallback_ratio']} | {row['quality_warnings']} | " + f"{row['quality_blocks']} | {row.get('publish_status') or ''} | {row.get('publish_slug') or ''} |" + ) + return "\n".join(lines) + "\n" diff --git a/ai_daily_report/cli.py b/ai_daily_report/cli.py index 6f53720..c7756e2 100644 --- a/ai_daily_report/cli.py +++ b/ai_daily_report/cli.py @@ -3,6 +3,7 @@ from __future__ import annotations import argparse from pathlib import Path +from .audit import render_markdown, summarize_reports from .runner import run_daily_report @@ -19,6 +20,9 @@ def build_parser() -> argparse.ArgumentParser: run.add_argument("--sources-path", default=None) run.add_argument("--pipeline-path", default=None) run.add_argument("--history-path", default=None) + audit = subcommands.add_parser("audit") + audit.add_argument("--out-dir", default=str(Path.home() / ".hermes" / "scripts" / "ai_morning_out")) + audit.add_argument("--limit-days", type=int, default=7) return parser @@ -37,6 +41,8 @@ def main(argv: list[str] | None = None) -> int: pipeline_path=Path(args.pipeline_path) if args.pipeline_path else None, history_path=Path(args.history_path) if args.history_path else None, ) + elif args.command == "audit": + print(render_markdown(summarize_reports(Path(args.out_dir), limit_days=args.limit_days))) return 0 diff --git a/ai_daily_report/clients.py b/ai_daily_report/clients.py index 64aeb8a..032c895 100644 --- a/ai_daily_report/clients.py +++ b/ai_daily_report/clients.py @@ -5,6 +5,7 @@ import socket import time from dataclasses import dataclass from urllib.error import HTTPError, URLError +from urllib.parse import urlencode import urllib.request from typing import Any @@ -115,17 +116,49 @@ class BlogApiClient: def create_post(self, payload: dict[str, Any]) -> dict[str, Any]: return self._request("POST", "/api/service/posts", payload) - def get_post_by_slug(self, slug: str) -> dict[str, Any] | None: + def _normalize_post_response(self, value: Any, slug: str) -> dict[str, Any] | None: + if isinstance(value, dict): + if isinstance(value.get("post"), dict): + value = value["post"] + elif isinstance(value.get("data"), dict): + value = value["data"] + elif isinstance(value.get("items"), list): + for item in value["items"]: + if isinstance(item, dict) and item.get("slug") == slug: + return item + return None + if value.get("slug") == slug or value.get("id") or value.get("content") or value.get("markdown"): + return value + if isinstance(value, list): + for item in value: + if isinstance(item, dict) and item.get("slug") == slug: + return item + return None + + def _request_optional(self, method: str, path: str, payload: dict[str, Any] | None = None) -> dict[str, Any] | list[Any] | None: try: - return self._request("GET", f"/api/service/posts/{slug}") + return self._request(method, path, payload) except HTTPError as exc: - if exc.code == 404: + if exc.code in {403, 404}: return None raise except FetchTextError as exc: - if exc.error_type == "http_404": + if exc.error_type in {"http_403", "http_404"}: return None raise + def get_post_by_slug(self, slug: str) -> dict[str, Any] | None: + paths = [ + f"/api/service/posts/{slug}", + f"/api/service/posts?{urlencode({'slug': slug})}", + f"/api/service/posts/slug/{slug}", + ] + for path in paths: + value = self._request_optional("GET", path) + post = self._normalize_post_response(value, slug) + if post is not None: + return post + return None + def publish_post(self, slug: str) -> None: self._request("POST", f"/api/service/posts/{slug}/publish") diff --git a/ai_daily_report/collect.py b/ai_daily_report/collect.py index fe2567a..0305239 100644 --- a/ai_daily_report/collect.py +++ b/ai_daily_report/collect.py @@ -35,6 +35,7 @@ def _collect_one(config: SourceConfig, run_date: str, fetcher: Fetcher) -> Sourc ok=False, status="disabled", fetched_at=fetched_at, + error=f"failure_policy={config.failure_policy}; min_items={config.min_items}", ) started = perf_counter() @@ -42,12 +43,15 @@ def _collect_one(config: SourceConfig, run_date: str, fetcher: Fetcher) -> Sourc items = fetcher(config, run_date) elapsed_ms = int((perf_counter() - started) * 1000) status = "ok" if items else "empty" + if status == "ok" and config.min_items and len(items) < config.min_items: + status = "below_min_items" return SourceResult( source=config.name, role=config.role, ok=status == "ok", status=status, items=items, + error=None if status == "ok" else f"items={len(items)}; min_items={config.min_items}; failure_policy={config.failure_policy}", elapsed_ms=elapsed_ms, fetched_at=fetched_at, ) @@ -58,7 +62,7 @@ def _collect_one(config: SourceConfig, run_date: str, fetcher: Fetcher) -> Sourc role=config.role, ok=False, status=_status_from_exception(exc), - error=f"{type(exc).__name__}: {exc}", + error=f"{type(exc).__name__}: {exc}; failure_policy={config.failure_policy}; min_items={config.min_items}", elapsed_ms=elapsed_ms, retry_count=_retry_count_from_exception(exc), fetched_at=fetched_at, diff --git a/ai_daily_report/models.py b/ai_daily_report/models.py index 3a5839e..e0a7cf7 100644 --- a/ai_daily_report/models.py +++ b/ai_daily_report/models.py @@ -15,6 +15,7 @@ class SourceConfig: min_items: int = 0 url: str = "" max_item_age_days: int | None = None + failure_policy: str = "warn" @dataclass diff --git a/ai_daily_report/observability.py b/ai_daily_report/observability.py new file mode 100644 index 0000000..896b1a7 --- /dev/null +++ b/ai_daily_report/observability.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import hashlib +from dataclasses import dataclass, field +from typing import Any, Callable + + +def sha256_text(value: str) -> str: + return hashlib.sha256((value or "").encode("utf-8")).hexdigest() + + +def truncate_text(value: str, limit: int = 500) -> str: + text = value or "" + if len(text) <= limit: + return text + return f"{text[:limit]}…[truncated {len(text) - limit} chars]" + + +@dataclass +class LlmCallObserver: + call: Callable[[str], str] + stage: str + records: list[dict[str, Any]] = field(default_factory=list) + prompt_preview_chars: int = 500 + response_preview_chars: int = 500 + + def __call__(self, prompt: str) -> str: + response = self.call(prompt) + self.records.append( + { + "stage": self.stage, + "call_index": len(self.records) + 1, + "prompt_hash": sha256_text(prompt), + "response_hash": sha256_text(response), + "prompt_chars": len(prompt or ""), + "response_chars": len(response or ""), + "prompt_preview": truncate_text(prompt, self.prompt_preview_chars), + "response_preview": truncate_text(response, self.response_preview_chars), + } + ) + return response + + +def summarize_observed_calls(observers: list[LlmCallObserver]) -> dict[str, Any]: + records: list[dict[str, Any]] = [] + by_stage: dict[str, int] = {} + for observer in observers: + records.extend(observer.records) + by_stage[observer.stage] = by_stage.get(observer.stage, 0) + len(observer.records) + return { + "total_calls": len(records), + "by_stage": by_stage, + "records": records, + } diff --git a/ai_daily_report/pipeline.py b/ai_daily_report/pipeline.py index a5f4251..3536f3a 100644 --- a/ai_daily_report/pipeline.py +++ b/ai_daily_report/pipeline.py @@ -30,6 +30,7 @@ def _source_config_from_dict(value: dict[str, Any]) -> SourceConfig: min_items=int(value.get("min_items", 0)), url=value.get("url", ""), max_item_age_days=int(max_item_age_days) if max_item_age_days is not None else None, + failure_policy=str(value.get("failure_policy") or ("block" if bool(value.get("required", False)) else "warn")), ) @@ -347,19 +348,26 @@ def run_stage0_to_stage8( quality_gate_config=quality_gate_config, ) slug = f"ai-{run_date}" + effective_mode = mode + quality_gate_report = stage7_result["reports"].get("quality_gate", {}) or {} + required_policy = str(quality_gate_report.get("required_source_failure_policy") or "block") + if quality_gate_report.get("required_source_failures") and required_policy in {"draft", "dry_run"}: + effective_mode = "dry-run" if required_policy == "dry_run" else "draft" + publish_result = publish_markdown( title=f"AI日报 · {run_date}", markdown=stage7_result["markdown"], tags=["AI日报", "AI资讯", "人工智能"], slug=slug, base_url=base_url, - mode=mode, + mode=effective_mode, markdown_report=stage7_result["reports"]["stage7"], client=client, idempotency_config=publish_idempotency_config, ) reports = dict(stage7_result["reports"]) reports["stage8"] = { + "requested_mode": mode, "mode": publish_result.mode, "status": publish_result.status, "slug": publish_result.slug, diff --git a/ai_daily_report/quality_gate.py b/ai_daily_report/quality_gate.py index 73ebb5b..3778f7f 100644 --- a/ai_daily_report/quality_gate.py +++ b/ai_daily_report/quality_gate.py @@ -8,6 +8,7 @@ from .models import NewsItem, SourceResult DEFAULT_CONFIG = { + "required_source_failure_policy": "block", # block | draft | dry_run | warn "block_on_required_source_failure": True, "warn_on_enabled_source_failure": True, "warn_when_stage3_candidates_zero_min_items": 30, @@ -73,10 +74,14 @@ def evaluate_quality_gate( warnings.append(f"enabled_source_failed:{failure['source']}:{failure['status']}") required_sources = set(config.get("required_sources") or []) - if bool(config["block_on_required_source_failure"]): - for failure in failures: - if failure["source"] in required_sources: - blocking_errors.append(f"required_source_failed:{failure['source']}:{failure['status']}") + required_failures = [failure for failure in failures if failure["source"] in required_sources] + policy = str(config.get("required_source_failure_policy") or "block") + if bool(config["block_on_required_source_failure"]) and policy == "block": + for failure in required_failures: + blocking_errors.append(f"required_source_failed:{failure['source']}:{failure['status']}") + elif required_failures: + for failure in required_failures: + warnings.append(f"required_source_failed:{failure['source']}:{failure['status']}:{policy}") title_threshold = float(config["warn_on_final_title_similarity"]) if title_threshold > 0: @@ -87,5 +92,7 @@ def evaluate_quality_gate( "warnings": warnings, "blocking_errors": blocking_errors, "source_failures": failures, + "required_source_failures": required_failures, + "required_source_failure_policy": policy, "quality_gate_failed": bool(blocking_errors), } diff --git a/ai_daily_report/runner.py b/ai_daily_report/runner.py index b75e8f2..4494557 100644 --- a/ai_daily_report/runner.py +++ b/ai_daily_report/runner.py @@ -9,6 +9,7 @@ from .clients import BlogApiClient, OpenAICompatibleClient, fetch_text as defaul from .config import load_pipeline_config, load_source_configs from .env import load_env, resolve_blog_token, resolve_llm_config from .models import SourceConfig +from .observability import LlmCallObserver, summarize_observed_calls from .pipeline import run_stage0_to_stage8 from .publish import load_published_urls, update_published_urls from .sources.registry import get_source_fetcher @@ -135,15 +136,33 @@ def run_daily_report( else: raise ValueError("source_mode must be 'mock' or 'live'") + llm_observability_config = pipeline_config.get("llm_observability", {}) or {} + llm_observers: list[LlmCallObserver] = [] + observe_llm = bool(llm_observability_config.get("enabled", True)) + prompt_preview_chars = int(llm_observability_config.get("prompt_preview_chars", 500)) + response_preview_chars = int(llm_observability_config.get("response_preview_chars", 500)) + + def maybe_observe(stage: str, call): + if not observe_llm: + return call + observer = LlmCallObserver( + call=call, + stage=stage, + prompt_preview_chars=prompt_preview_chars, + response_preview_chars=response_preview_chars, + ) + llm_observers.append(observer) + return observer + if llm_mode == "mock": - semantic_llm_call = _mock_semantic_llm - rewrite_llm_call = _mock_rewrite_llm - guide_llm_call = _mock_guide_llm + semantic_llm_call = maybe_observe("stage3", _mock_semantic_llm) + rewrite_llm_call = maybe_observe("stage4", _mock_rewrite_llm) + guide_llm_call = maybe_observe("stage6", _mock_guide_llm) elif llm_mode == "live": llm_client = llm_client_factory(**resolve_llm_config(env)) - semantic_llm_call = llm_client.chat - rewrite_llm_call = llm_client.chat - guide_llm_call = llm_client.chat + semantic_llm_call = maybe_observe("stage3", llm_client.chat) + rewrite_llm_call = maybe_observe("stage4", llm_client.chat) + guide_llm_call = maybe_observe("stage6", llm_client.chat) else: raise ValueError("llm_mode must be 'mock' or 'live'") @@ -182,6 +201,9 @@ def run_daily_report( max_age_days=cross_day_max_age_days, ) + llm_observability_report = summarize_observed_calls(llm_observers) + result["reports"]["llm_observability"] = llm_observability_report + run_dir = out_dir / run_date run_dir.mkdir(parents=True, exist_ok=True) (run_dir / "blog_markdown.md").write_text(result["markdown"], encoding="utf-8") diff --git a/ai_daily_report/semantic_dedupe.py b/ai_daily_report/semantic_dedupe.py index 815d298..2a6bdaa 100644 --- a/ai_daily_report/semantic_dedupe.py +++ b/ai_daily_report/semantic_dedupe.py @@ -25,6 +25,11 @@ def _build_prompt(items: list[NewsItem], candidates: list[dict[str, Any]]) -> st "task": "Identify only high-confidence semantic duplicates. Do not curate or remove by importance.", "items": item_payload, "candidates": candidates, + "dedupe_policy": [ + "Use duplicate_groups only when items are substantially the same article/event and one can be removed.", + "Use merge_groups when items cover the same concrete event from different angles; keep the best item and attach the others as supplementary sources instead of dropping the event context.", + "Do not curate by importance. Do not merge unrelated follow-ups just because they mention the same company/model.", + ], "output_schema": { "duplicate_groups": [ { @@ -34,6 +39,14 @@ def _build_prompt(items: list[NewsItem], candidates: list[dict[str, Any]]) -> st "reason": "same concrete event reason", } ], + "merge_groups": [ + { + "keep_id": "item id", + "merge_ids": ["item id"], + "confidence": "high|medium|low", + "reason": "same event, complementary angle/source", + } + ], "not_duplicates": [], "uncertain": [], }, @@ -75,6 +88,7 @@ def semantic_dedup_items( "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": [], + "merge_groups": [], "uncertain": [], "errors": [], "skipped_for_deletion_ratio": False, @@ -89,6 +103,7 @@ def semantic_dedup_items( "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": [], + "merge_groups": [], "uncertain": [], "errors": [f"{type(exc).__name__}: {exc}"], "skipped_for_deletion_ratio": False, @@ -101,19 +116,27 @@ def semantic_dedup_items( } candidate_removals: set[str] = set() valid_groups: list[dict[str, Any]] = [] + valid_merge_groups: list[dict[str, Any]] = [] + + def _validate_group_ids(group: dict[str, Any], member_key: str) -> tuple[list[str], list[NewsItem]] | None: + raw_ids = [group.get("keep_id")] + list(group.get(member_key) or []) + if any(not isinstance(item_id, str) or item_id not in by_id for item_id in raw_ids): + errors.append(f"invalid_ids_in_group: {group}") + return None + ids = [str(item_id) for item_id in raw_ids] + group_set = frozenset(ids) + if not any(group_set.issubset(candidate_set) for candidate_set in candidate_sets): + errors.append(f"group_outside_candidates: {group}") + return None + return ids, [by_id[item_id] for item_id in ids] for group in obj.get("duplicate_groups", []) or []: if group.get("confidence") != "high": continue - ids = [group.get("keep_id")] + list(group.get("remove_ids") or []) - if any(not isinstance(item_id, str) or item_id not in by_id for item_id in ids): - errors.append(f"invalid_ids_in_group: {group}") + validated = _validate_group_ids(group, "remove_ids") + if validated is None: continue - group_set = frozenset(ids) - if not any(group_set.issubset(candidate_set) for candidate_set in candidate_sets): - errors.append(f"group_outside_candidates: {group}") - continue - group_items = [by_id[item_id] for item_id in ids] + ids, group_items = validated keep = _choose_keep(group_items, str(group.get("keep_id"))) remove_items = [item for item in group_items if item is not keep] candidate_removals.update(item.id for item in remove_items) @@ -126,6 +149,24 @@ def semantic_dedup_items( } ) + for group in obj.get("merge_groups", []) or []: + if group.get("confidence") != "high": + continue + validated = _validate_group_ids(group, "merge_ids") + if validated is None: + continue + ids, group_items = validated + keep = _choose_keep(group_items, str(group.get("keep_id"))) + merge_items = [item for item in group_items if item is not keep] + valid_merge_groups.append( + { + "keep_id": keep.id, + "merge_ids": [item.id for item in merge_items], + "confidence": "high", + "reason": str(group.get("reason") or "semantic_merge"), + } + ) + deletion_ratio = len(candidate_removals) / len(items) if items else 0 if deletion_ratio > max_deletion_ratio: return items, { @@ -133,33 +174,49 @@ def semantic_dedup_items( "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": valid_groups, + "merge_groups": valid_merge_groups, "uncertain": obj.get("uncertain", []) or [], "errors": errors, "skipped_for_deletion_ratio": True, } removed_ids: set[str] = set() + + def append_supplement(keep: NewsItem, source_item: NewsItem, reason: str, action: str) -> None: + keep.duplicate_sources.append( + { + "id": source_item.id, + "source_group": source_item.source_group, + "source_label": source_item.source_label, + "url": source_item.url, + "title": source_item.title or source_item.title_raw, + "summary": source_item.summary or source_item.summary_raw, + "reason": reason, + "action": action, + } + ) + for group in valid_groups: keep = by_id[group["keep_id"]] for remove_id in group["remove_ids"]: removed = by_id[remove_id] - keep.duplicate_sources.append( - { - "id": removed.id, - "source_group": removed.source_group, - "source_label": removed.source_label, - "url": removed.url, - "reason": group["reason"], - } - ) + append_supplement(keep, removed, group["reason"], "dedupe_remove") removed_ids.add(remove_id) + for group in valid_merge_groups: + keep = by_id[group["keep_id"]] + for merge_id in group["merge_ids"]: + if merge_id in removed_ids: + continue + append_supplement(keep, by_id[merge_id], group["reason"], "merge_supplement") + deduped = [item for item in items if item.id not in removed_ids] report = { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": len(removed_ids), "duplicate_groups": valid_groups, + "merge_groups": valid_merge_groups, "uncertain": obj.get("uncertain", []) or [], "errors": errors, "skipped_for_deletion_ratio": False, diff --git a/config/pipeline.json b/config/pipeline.json index 7b8266b..1d4c8b5 100644 --- a/config/pipeline.json +++ b/config/pipeline.json @@ -16,5 +16,37 @@ "enabled": true, "max_age_days": 7, "history_path": "~/.hermes/scripts/ai_morning_out/published_urls.json" + }, + "semantic_candidate_recall": { + "enabled": true, + "max_pairs": 80, + "max_pairs_per_item": 5, + "title_similarity_threshold": 0.45, + "title_jaccard_threshold": 0.25, + "summary_jaccard_threshold": 0.18, + "strong_entity_overlap_threshold": 2 + }, + "quality_gate": { + "required_source_failure_policy": "block", + "block_on_required_source_failure": true, + "warn_on_enabled_source_failure": true, + "warn_when_stage3_candidates_zero_min_items": 30, + "warn_on_final_title_similarity": 0.55, + "warn_on_entity_frequency": 3, + "required_sources": ["AI HOT"] + }, + "publish_idempotency": { + "enabled": true, + "allow_republish": false, + "slug_lookup_paths": [ + "/api/service/posts/{slug}", + "/api/service/posts?slug={slug}", + "/api/service/posts/slug/{slug}" + ] + }, + "llm_observability": { + "enabled": true, + "prompt_preview_chars": 500, + "response_preview_chars": 500 } } diff --git a/config/sources.json b/config/sources.json index d10cdc4..410c077 100644 --- a/config/sources.json +++ b/config/sources.json @@ -4,21 +4,50 @@ "type": "aihot", "role": "primary", "required": true, + "failure_policy": "block", "priority": 10, "timeout_seconds": 25, "retries": 2, "min_items": 10, "enabled": true }, + { + "name": "橘鸦AI早报", + "type": "juya_rss", + "url": "https://imjuya.github.io/juya-ai-daily/rss.xml", + "role": "supplement", + "required": false, + "failure_policy": "warn", + "priority": 20, + "timeout_seconds": 45, + "retries": 2, + "min_items": 0, + "enabled": true + }, + { + "name": "量子位", + "type": "rss", + "url": "https://www.qbitai.com/feed", + "role": "supplement", + "required": false, + "failure_policy": "warn", + "priority": 30, + "timeout_seconds": 25, + "retries": 1, + "min_items": 0, + "enabled": true + }, { "name": "InfoQ AI", "type": "rss", "url": "https://feed.infoq.com/ai-ml-data-eng/", "role": "supplement", "required": false, + "failure_policy": "warn", "priority": 40, "timeout_seconds": 25, "retries": 1, + "min_items": 0, "max_item_age_days": 3, "enabled": true }, @@ -28,32 +57,12 @@ "url": "https://www.technologyreview.com/topic/artificial-intelligence/feed", "role": "supplement", "required": false, + "failure_policy": "warn", "priority": 50, "timeout_seconds": 25, "retries": 1, + "min_items": 0, "max_item_age_days": 5, "enabled": true - }, - { - "name": "量子位", - "type": "rss", - "url": "https://www.qbitai.com/feed", - "role": "supplement", - "required": false, - "priority": 30, - "timeout_seconds": 25, - "retries": 1, - "enabled": true - }, - { - "name": "橘鸦AI早报", - "type": "juya_rss", - "url": "https://imjuya.github.io/juya-ai-daily/rss.xml", - "role": "supplement", - "required": false, - "priority": 20, - "timeout_seconds": 45, - "retries": 2, - "enabled": true } ] diff --git a/docs/ops-thresholds.generated.md b/docs/ops-thresholds.generated.md new file mode 100644 index 0000000..b66a4ac --- /dev/null +++ b/docs/ops-thresholds.generated.md @@ -0,0 +1,33 @@ +# AI日报运维阈值(自动生成) + +> 由 `scripts/generate_ops_docs.py` 从 `config/pipeline.json` 和 `config/sources.json` 生成;不要手改本文件。 + +## Quality Gate + +- `block_on_required_source_failure`: `True` +- `required_source_failure_policy`: `block` +- `required_sources`: `['AI HOT']` +- `warn_on_enabled_source_failure`: `True` +- `warn_on_entity_frequency`: `3` +- `warn_on_final_title_similarity`: `0.55` +- `warn_when_stage3_candidates_zero_min_items`: `30` + +## Semantic Candidate Recall + +- `enabled`: `True` +- `max_pairs`: `80` +- `max_pairs_per_item`: `5` +- `strong_entity_overlap_threshold`: `2` +- `summary_jaccard_threshold`: `0.18` +- `title_jaccard_threshold`: `0.25` +- `title_similarity_threshold`: `0.45` + +## Sources + +| source | required | failure_policy | min_items | retries | timeout_seconds | +|---|---:|---|---:|---:|---:| +| AI HOT | True | block | 10 | 2 | 25 | +| 橘鸦AI早报 | False | warn | 0 | 2 | 45 | +| 量子位 | False | warn | 0 | 1 | 25 | +| InfoQ AI | False | warn | 0 | 1 | 25 | +| MIT科技评论AI | False | warn | 0 | 1 | 25 | diff --git a/scripts/generate_ops_docs.py b/scripts/generate_ops_docs.py new file mode 100644 index 0000000..1175469 --- /dev/null +++ b/scripts/generate_ops_docs.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +PIPELINE = json.loads((ROOT / "config" / "pipeline.json").read_text(encoding="utf-8")) +SOURCES = json.loads((ROOT / "config" / "sources.json").read_text(encoding="utf-8")) +DOC = ROOT / "docs" / "ops-thresholds.generated.md" + + +def main() -> int: + quality = PIPELINE.get("quality_gate", {}) + recall = PIPELINE.get("semantic_candidate_recall", {}) + lines = [ + "# AI日报运维阈值(自动生成)", + "", + "> 由 `scripts/generate_ops_docs.py` 从 `config/pipeline.json` 和 `config/sources.json` 生成;不要手改本文件。", + "", + "## Quality Gate", + "", + ] + for key in sorted(quality): + lines.append(f"- `{key}`: `{quality[key]}`") + lines.extend(["", "## Semantic Candidate Recall", ""]) + for key in sorted(recall): + lines.append(f"- `{key}`: `{recall[key]}`") + lines.extend(["", "## Sources", "", "| source | required | failure_policy | min_items | retries | timeout_seconds |", "|---|---:|---|---:|---:|---:|"]) + for source in SOURCES: + lines.append( + f"| {source['name']} | {source.get('required', False)} | {source.get('failure_policy', '')} | " + f"{source.get('min_items', 0)} | {source.get('retries', 0)} | {source.get('timeout_seconds', '')} |" + ) + DOC.write_text("\n".join(lines) + "\n", encoding="utf-8") + print(DOC) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skill/scripts/weekly_audit.py b/skill/scripts/weekly_audit.py new file mode 100644 index 0000000..c25dd73 --- /dev/null +++ b/skill/scripts/weekly_audit.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import sys +from pathlib import Path + +REPO_DIR = Path(__file__).resolve().parents[2] +if str(REPO_DIR) not in sys.path: + sys.path.insert(0, str(REPO_DIR)) + +from ai_daily_report.audit import render_markdown, summarize_reports + + +def main() -> int: + out_dir = Path.home() / ".hermes" / "scripts" / "ai_morning_out" + if not out_dir.exists(): + print("AI日报每周审计:未找到输出目录") + return 1 + print(render_markdown(summarize_reports(out_dir, limit_days=7))) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/fixtures/history_replay_2026_06_04_2026_06_10.json b/tests/fixtures/history_replay_2026_06_04_2026_06_10.json new file mode 100644 index 0000000..a1c66c5 --- /dev/null +++ b/tests/fixtures/history_replay_2026_06_04_2026_06_10.json @@ -0,0 +1,74 @@ +{ + "date_range": ["2026-06-04", "2026-06-10"], + "purpose": "Historical replay fixtures for semantic candidate recall, Stage 3 merge_groups, and cross-day regression tests.", + "events": [ + { + "event_id": "claude-fable-mythos", + "title": "Claude Fable/Mythos", + "expected_behavior": "same_event_merge_or_dedupe", + "items": [ + { + "date": "2026-06-04", + "id": "claude-fable-1", + "source": "AI HOT", + "title_raw": "Anthropic 推出 Claude Fable,用长篇叙事测试模型记忆", + "summary_raw": "Claude Fable 面向长篇故事生成,强调角色一致性和上下文管理。", + "url": "https://example.com/claude-fable" + }, + { + "date": "2026-06-05", + "id": "claude-mythos-1", + "source": "InfoQ AI", + "title_raw": "Claude Mythos/Fable 项目扩展到多角色故事工作流", + "summary_raw": "报道从创作流程角度补充 Anthropic Fable/Mythos 的应用场景。", + "url": "https://example.com/claude-mythos" + } + ] + }, + { + "event_id": "openclaw-suno", + "title": "OpenClaw/Suno", + "expected_behavior": "same_event_merge_or_dedupe", + "items": [ + {"date": "2026-06-05", "id": "openclaw-suno-1", "source": "AI HOT", "title_raw": "OpenClaw 集成 Suno 音乐生成能力", "summary_raw": "OpenClaw 新版加入 Suno 风格的音乐生成入口。", "url": "https://example.com/openclaw-suno-a"}, + {"date": "2026-06-05", "id": "openclaw-suno-2", "source": "量子位", "title_raw": "Suno 能力进入 OpenClaw,开源智能体开始做音乐", "summary_raw": "量子位从开源智能体生态角度报道 OpenClaw 与 Suno 相关能力。", "url": "https://example.com/openclaw-suno-b"} + ] + }, + { + "event_id": "magenta-realtime-2", + "title": "Magenta RealTime 2", + "expected_behavior": "same_event_merge_or_dedupe", + "items": [ + {"date": "2026-06-06", "id": "magenta-rt2-1", "source": "AI HOT", "title_raw": "Google 发布 Magenta RealTime 2,主打实时音乐生成", "summary_raw": "Magenta RealTime 2 降低延迟,支持互动式音乐创作。", "url": "https://example.com/magenta-rt2-a"}, + {"date": "2026-06-06", "id": "magenta-rt2-2", "source": "MIT科技评论AI", "title_raw": "Magenta RealTime 2 shows live AI music co-creation", "summary_raw": "MIT Tech Review explains the latency and interaction improvements in Magenta RealTime 2.", "url": "https://example.com/magenta-rt2-b"} + ] + }, + { + "event_id": "open-code-review", + "title": "Open Code Review", + "expected_behavior": "same_event_merge_or_dedupe", + "items": [ + {"date": "2026-06-07", "id": "open-code-review-1", "source": "AI HOT", "title_raw": "Open Code Review 发布,开源代码审查智能体上线", "summary_raw": "Open Code Review 面向 GitHub/Gitea 仓库自动生成审查意见。", "url": "https://example.com/open-code-review-a"}, + {"date": "2026-06-07", "id": "open-code-review-2", "source": "InfoQ AI", "title_raw": "Open Code Review brings agentic review to open-source repos", "summary_raw": "InfoQ focuses on CI integration and review workflows for Open Code Review.", "url": "https://example.com/open-code-review-b"} + ] + }, + { + "event_id": "openai-chip-talent-move", + "title": "OpenAI 芯片成员跳槽", + "expected_behavior": "same_event_merge_or_dedupe", + "items": [ + {"date": "2026-06-08", "id": "openai-chip-1", "source": "AI HOT", "title_raw": "OpenAI 定制芯片核心成员跳槽 Anthropic", "summary_raw": "OpenAI 芯片团队关键工程师在量产前离职加入 Anthropic。", "url": "https://example.com/openai-chip-a"}, + {"date": "2026-06-08", "id": "openai-chip-2", "source": "量子位", "title_raw": "OpenAI 芯片核心叛逃 Anthropic,就在量产前夜", "summary_raw": "量子位强调人才流动对 OpenAI 自研芯片进度的潜在影响。", "url": "https://example.com/openai-chip-b"} + ] + }, + { + "event_id": "amap-abot", + "title": "高德 ABot", + "expected_behavior": "same_event_merge_or_dedupe", + "items": [ + {"date": "2026-06-10", "id": "amap-abot-1", "source": "AI HOT", "title_raw": "高德推出 ABot,地图入口接入智能体服务", "summary_raw": "高德 ABot 将出行、搜索和本地生活任务整合到地图智能体。", "url": "https://example.com/amap-abot-a"}, + {"date": "2026-06-10", "id": "amap-abot-2", "source": "橘鸦AI早报", "title_raw": "高德 ABot 上线,本地生活智能体开始进入地图", "summary_raw": "橘鸦从产品入口角度记录高德 ABot 的上线。", "url": "https://example.com/amap-abot-b"} + ] + } + ] +} diff --git a/tests/test_audit.py b/tests/test_audit.py new file mode 100644 index 0000000..806a728 --- /dev/null +++ b/tests/test_audit.py @@ -0,0 +1,42 @@ +import json +import tempfile +import unittest +from pathlib import Path + +from ai_daily_report.audit import render_markdown, summarize_reports + + +class AuditTests(unittest.TestCase): + def test_summarizes_weekly_metrics(self): + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) / "2026-06-10" + run_dir.mkdir() + (run_dir / "run_report.json").write_text( + json.dumps( + { + "quality_gate": { + "source_failures": [{"source": "橘鸦AI早报"}], + "warnings": ["enabled_source_failed:橘鸦AI早报:error"], + "blocking_errors": [], + }, + "stage2_8": {"candidate_group_count": 6}, + "stage4": {"fallback_count": 2, "output_count": 20}, + "stage5": {"output_count": 20}, + "stage8": {"status": "ok", "slug": "ai-2026-06-10"}, + } + ), + encoding="utf-8", + ) + + summary = summarize_reports(Path(tmp), limit_days=7) + markdown = render_markdown(summary) + + self.assertEqual(summary["run_count"], 1) + self.assertEqual(summary["totals"]["source_failures"], 1) + self.assertEqual(summary["totals"]["duplicate_candidates"], 6) + self.assertEqual(summary["totals"]["fallback_ratio"], 0.1) + self.assertIn("AI日报每周自动审计报告", markdown) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_clients.py b/tests/test_clients.py index 880b94a..b8a1c7a 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -1,5 +1,6 @@ import json import unittest +from email.message import Message from urllib.error import HTTPError from unittest.mock import patch @@ -65,6 +66,20 @@ class ClientTests(unittest.TestCase): self.assertEqual(client.create_post({"title": "t"})["slug"], "ai-2026-06-04") client.publish_post("ai-2026-06-04") + def test_blog_api_client_slug_lookup_falls_back_to_query_endpoint(self): + responses = [ + HTTPError("https://blog.example/api/service/posts/ai-2026-06-10", 404, "Not Found", Message(), None), + FakeResponse(json.dumps({"items": [{"slug": "ai-2026-06-10", "content": "body"}]}).encode("utf-8")), + ] + with patch("urllib.request.urlopen", side_effect=responses) as urlopen: + client = BlogApiClient(base_url="https://blog.example", token="token") + post = client.get_post_by_slug("ai-2026-06-10") + + self.assertIsNotNone(post) + assert post is not None + self.assertEqual(post["slug"], "ai-2026-06-10") + self.assertEqual(urlopen.call_count, 2) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_env_config.py b/tests/test_env_config.py index cc452f6..f2b2871 100644 --- a/tests/test_env_config.py +++ b/tests/test_env_config.py @@ -28,8 +28,9 @@ class EnvConfigTests(unittest.TestCase): ) def test_resolve_llm_config_reports_missing_fields(self): - with self.assertRaisesRegex(ValueError, "missing_llm_config: LLM_BASE_URL,LLM_MODEL"): - resolve_llm_config({"LLM_API_KEY": "key"}) + with TemporaryDirectory() as temp_dir: + with self.assertRaisesRegex(ValueError, "missing_llm_config: LLM_BASE_URL,LLM_MODEL"): + resolve_llm_config({"LLM_API_KEY": "key"}, hermes_dir=Path(temp_dir)) def test_resolve_llm_config_follows_hermes_provider_config(self): with TemporaryDirectory() as temp_dir: diff --git a/tests/test_generated_docs.py b/tests/test_generated_docs.py new file mode 100644 index 0000000..3d6a656 --- /dev/null +++ b/tests/test_generated_docs.py @@ -0,0 +1,17 @@ +import subprocess +import sys +import unittest +from pathlib import Path + + +class GeneratedDocsTests(unittest.TestCase): + def test_ops_threshold_doc_is_up_to_date(self): + root = Path(__file__).resolve().parents[1] + before = (root / "docs" / "ops-thresholds.generated.md").read_text(encoding="utf-8") + subprocess.run([sys.executable, "scripts/generate_ops_docs.py"], cwd=root, check=True, capture_output=True, text=True) + after = (root / "docs" / "ops-thresholds.generated.md").read_text(encoding="utf-8") + self.assertEqual(after, before) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_history_replay_fixtures.py b/tests/test_history_replay_fixtures.py new file mode 100644 index 0000000..8e70b7f --- /dev/null +++ b/tests/test_history_replay_fixtures.py @@ -0,0 +1,67 @@ +import json +import unittest +from pathlib import Path + +from ai_daily_report.candidate_recall import recall_semantic_candidates +from ai_daily_report.models import NewsItem + + +FIXTURE_PATH = Path(__file__).parent / "fixtures" / "history_replay_2026_06_04_2026_06_10.json" + + +def make_item(raw, index): + return NewsItem( + id=raw["id"], + source_group=raw["source"], + source_label=raw["source"], + source_role="primary" if raw["source"] == "AI HOT" else "supplement", + source_priority=10 if raw["source"] == "AI HOT" else 50, + title_raw=raw["title_raw"], + title_norm=raw["title_raw"].lower(), + summary_raw=raw["summary_raw"], + url=raw["url"], + canonical_url=raw["url"], + published_at=raw["date"], + ) + + +class HistoryReplayFixtureTests(unittest.TestCase): + def test_fixture_covers_required_incidents(self): + data = json.loads(FIXTURE_PATH.read_text(encoding="utf-8")) + event_ids = {event["event_id"] for event in data["events"]} + + self.assertEqual( + event_ids, + { + "claude-fable-mythos", + "openclaw-suno", + "magenta-realtime-2", + "open-code-review", + "openai-chip-talent-move", + "amap-abot", + }, + ) + + def test_candidate_recall_finds_fixture_event_pairs(self): + data = json.loads(FIXTURE_PATH.read_text(encoding="utf-8")) + misses = [] + for event in data["events"]: + items = [make_item(item, index) for index, item in enumerate(event["items"])] + candidates, report = recall_semantic_candidates( + items, + config={ + "enabled": True, + "title_similarity_threshold": 0.25, + "title_jaccard_threshold": 0.10, + "summary_jaccard_threshold": 0.05, + "strong_entity_overlap_threshold": 1, + }, + ) + if not candidates: + misses.append(event["event_id"]) + + self.assertEqual(misses, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_observability.py b/tests/test_observability.py new file mode 100644 index 0000000..a1a79f4 --- /dev/null +++ b/tests/test_observability.py @@ -0,0 +1,34 @@ +import json +import unittest + +from ai_daily_report.observability import LlmCallObserver, summarize_observed_calls + + +class ObservabilityTests(unittest.TestCase): + def test_records_prompt_and_response_hashes(self): + observer = LlmCallObserver(lambda prompt: json.dumps({"ok": True}), stage="stage3") + response = observer("prompt") + + self.assertEqual(response, '{"ok": true}') + self.assertEqual(len(observer.records), 1) + self.assertEqual(observer.records[0]["stage"], "stage3") + self.assertEqual(observer.records[0]["prompt_chars"], 6) + self.assertEqual(observer.records[0]["response_chars"], len(response)) + self.assertRegex(observer.records[0]["prompt_hash"], r"^[0-9a-f]{64}$") + self.assertRegex(observer.records[0]["response_hash"], r"^[0-9a-f]{64}$") + + def test_summarizes_observed_calls(self): + left = LlmCallObserver(lambda prompt: "a", stage="stage3") + right = LlmCallObserver(lambda prompt: "b", stage="stage4") + left("x") + right("y") + right("z") + + report = summarize_observed_calls([left, right]) + + self.assertEqual(report["total_calls"], 3) + self.assertEqual(report["by_stage"], {"stage3": 1, "stage4": 2}) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_stage3_semantic_dedupe.py b/tests/test_stage3_semantic_dedupe.py index ed876a5..4173435 100644 --- a/tests/test_stage3_semantic_dedupe.py +++ b/tests/test_stage3_semantic_dedupe.py @@ -87,6 +87,40 @@ class Stage3SemanticDedupeTests(unittest.TestCase): self.assertEqual(report["removed_count"], 0) self.assertTrue(report["skipped_for_deletion_ratio"]) + def test_semantic_dedup_supports_merge_groups_as_supplementary_sources(self): + items = [ + news_item("a", "高德推出 ABot", "AI HOT"), + news_item("b", "高德 ABot 进入本地生活入口", "橘鸦AI早报"), + news_item("c", "Meta 发布新眼镜", "InfoQ AI"), + ] + candidates = [{"item_ids": ["a", "b"], "reason": "same_event_complementary"}] + + def llm_call(prompt): + self.assertIn("merge_groups", prompt) + return json.dumps( + { + "duplicate_groups": [], + "merge_groups": [ + { + "keep_id": "a", + "merge_ids": ["b"], + "confidence": "high", + "reason": "same ABot launch, different angle", + } + ], + "not_duplicates": [], + "uncertain": [], + } + ) + + deduped, report = semantic_dedup_items(items, candidates, llm_call=llm_call) + + self.assertEqual([item.id for item in deduped], ["a", "b", "c"]) + self.assertEqual(report["removed_count"], 0) + self.assertEqual(report["merge_groups"][0]["merge_ids"], ["b"]) + self.assertEqual(deduped[0].duplicate_sources[0]["action"], "merge_supplement") + self.assertEqual(deduped[0].duplicate_sources[0]["id"], "b") + def test_semantic_dedup_ignores_groups_outside_candidate_sets(self): items = [ news_item("a", "Suno 完成融资"),