From b46cef2c7badff2a8d509cf9402675f3bf6e5f64 Mon Sep 17 00:00:00 2001 From: Mimikko-zeus Date: Wed, 10 Jun 2026 21:31:13 +0800 Subject: [PATCH] Add Stage 2.8 recall, quality gate, retries, and publish idempotency --- .learnings/ERRORS.md | 144 ++++++++++++++++ ai_daily_report/candidate_recall.py | 162 ++++++++++++++++++ ai_daily_report/clients.py | 73 +++++++- ai_daily_report/collect.py | 15 ++ ai_daily_report/pipeline.py | 74 ++++++++ ai_daily_report/publish.py | 50 ++++++ ai_daily_report/quality_gate.py | 91 ++++++++++ ai_daily_report/runner.py | 24 ++- ...-06-10-ai-daily-full-chain-optimization.md | 130 ++++++++++++++ tests/test_candidate_recall.py | 79 +++++++++ tests/test_clients.py | 25 ++- tests/test_quality_gate.py | 78 +++++++++ tests/test_runner.py | 120 +++++++++++++ tests/test_stage0_collect.py | 13 ++ tests/test_stage0_to_4_pipeline.py | 136 +++++++++++++++ tests/test_stage8_publish.py | 45 ++++- 16 files changed, 1253 insertions(+), 6 deletions(-) create mode 100644 .learnings/ERRORS.md create mode 100644 ai_daily_report/candidate_recall.py create mode 100644 ai_daily_report/quality_gate.py create mode 100644 docs/plans/2026-06-10-ai-daily-full-chain-optimization.md create mode 100644 tests/test_candidate_recall.py create mode 100644 tests/test_quality_gate.py diff --git a/.learnings/ERRORS.md b/.learnings/ERRORS.md new file mode 100644 index 0000000..984f706 --- /dev/null +++ b/.learnings/ERRORS.md @@ -0,0 +1,144 @@ +## [ERR-20260606-001] computer_use_helper_startup + +**Logged**: 2026-06-06T00:00:00+08:00 +**Priority**: medium +**Status**: pending +**Area**: infra + +### Summary +Computer Use helper failed during Windows automation startup. + +### Error +```text +node_repl kernel exited unexpectedly +windows sandbox failed: spawn setup refresh +``` + +### Context +- Operation attempted: initialize Computer Use and list Windows apps. +- Retried after resetting the JavaScript session. +- Both attempts failed before any app automation actions were taken. + +### Suggested Fix +Investigate the Computer Use Windows helper startup path and sandbox setup; retry after the helper/runtime is refreshed. + +### Metadata +- Reproducible: yes +- Related Files: C:/Users/12256/.codex/plugins/cache/openai-bundled/computer-use/26.602.40724/scripts/computer-use-client.mjs + +--- + +## [ERR-20260610-001] absolute_path_prefixed_with_workspace + +**Logged**: 2026-06-10T00:00:00+08:00 +**Priority**: low +**Status**: pending +**Area**: docs + +### Summary +An absolute skill file path was accidentally prefixed with the current workspace path when verifying completion. + +### Error +```text +Get-Content : Cannot find path 'E:\Codes\ai-daily-report\C:\Users\12256\.codex\superpowers\skills\verification-before-completion\SKILL.md' +``` + +### Context +- Operation attempted: read `C:\Users\12256\.codex\superpowers\skills\verification-before-completion\SKILL.md`. +- The command used a malformed literal path that concatenated the workspace root and the absolute path. +- Re-running with the actual absolute path succeeded. + +### Suggested Fix +When reading skill files or other absolute Windows paths, pass the `C:\...` path directly and do not combine it with the workspace path. + +### Metadata +- Reproducible: yes +- Related Files: C:\Users\12256\.codex\superpowers\skills\verification-before-completion\SKILL.md + +--- + +## [ERR-20260608-003] git_push_auth_failed + +**Logged**: 2026-06-08T00:00:00+08:00 +**Priority**: medium +**Status**: pending +**Area**: infra + +### Summary +`git push origin main` failed because the Gitea remote rejected authentication. + +### Error +```text +remote: Failed to authenticate user +fatal: Authentication failed for 'https://gitea.ephron.ren/Elaina/ai-daily-report.git/' +``` + +### Context +- Operation attempted: push committed cross-day dedupe fix to `origin/main`. +- Local commit exists: `07786e3 fix: add cross-day dedupe`. +- Test suite passed before commit: `79 passed`. + +### Suggested Fix +Refresh Git credentials for `https://gitea.ephron.ren` or switch the remote to an authenticated SSH/HTTPS URL, then rerun `git push origin main`. + +### Metadata +- Reproducible: yes +- Related Files: git remote origin + +--- + +## [ERR-20260608-002] powershell_convertfromjson_mojibake + +**Logged**: 2026-06-08T00:00:00+08:00 +**Priority**: low +**Status**: pending +**Area**: tests + +### Summary +PowerShell `ConvertFrom-Json` failed on a generated report containing existing mojibake section labels, while Python `json.loads` parsed the same report successfully. + +### Error +```text +ConvertFrom-Json : Invalid object passed in, ':' or '}' expected. +``` + +### Context +- Operation attempted: verify CLI dry-run output by piping `run_report.json` through `ConvertFrom-Json`. +- Follow-up verification with Python `json.loads` succeeded and confirmed `stage2_5` and `stage8` fields. + +### Suggested Fix +Use Python's JSON parser for verification in this repository when report content includes mojibake-rendered non-ASCII strings. + +### Metadata +- Reproducible: yes +- Related Files: run_report.json + +--- + +## [ERR-20260608-001] apply_patch_context_encoding + +**Logged**: 2026-06-08T00:00:00+08:00 +**Priority**: low +**Status**: pending +**Area**: tests + +### Summary +`apply_patch` failed when matching context lines that contained mojibake-rendered Chinese text. + +### Error +```text +apply_patch verification failed: Failed to find expected lines +``` + +### Context +- Operation attempted: update `tests/test_stage2_dedupe.py` with a patch anchored on displayed non-ASCII strings. +- The file content rendered differently enough that the expected context did not match. + +### Suggested Fix +Use ASCII-only anchors, line-number inspection, or smaller structural context when patching files that contain mojibake-rendered non-ASCII text. + +### Metadata +- Reproducible: yes +- Related Files: tests/test_stage2_dedupe.py + +--- diff --git a/ai_daily_report/candidate_recall.py b/ai_daily_report/candidate_recall.py new file mode 100644 index 0000000..dd4bdb7 --- /dev/null +++ b/ai_daily_report/candidate_recall.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import difflib +import re +from collections import defaultdict +from typing import Any + +from .dedupe import _jaccard_similarity, _title_tokens +from .models import NewsItem + + +DEFAULT_CONFIG = { + "enabled": True, + "max_pairs": 80, + "max_pairs_per_item": 5, + "title_similarity_threshold": 0.45, + "title_jaccard_threshold": 0.25, + "summary_jaccard_threshold": 0.18, + "strong_entity_overlap_threshold": 2, +} + +STOP_ENTITIES = { + "AI", + "API", + "CLI", + "LLM", + "Open Source", + "GitHub", + "Google", + "OpenAI", + "Anthropic", + "Microsoft", + "Meta", + "Amazon", + "NVIDIA", +} + + +def _config_value(config: dict[str, Any], name: str): + return (config or {}).get(name, DEFAULT_CONFIG[name]) + + +def _text_tokens(value: str) -> set[str]: + return _title_tokens(value) + + +def _entity_tokens(value: str) -> set[str]: + text = value or "" + entities = set(re.findall(r"\b[A-Z][A-Za-z0-9]*(?:[- ][A-Z0-9][A-Za-z0-9]*)*\b", text)) + entities.update(re.findall(r"[\u4e00-\u9fffA-Za-z0-9]*[A-Za-z]+[0-9]+[A-Za-z0-9-]*", text)) + cleaned = {entity.strip() for entity in entities if len(entity.strip()) >= 3} + return {entity for entity in cleaned if entity not in STOP_ENTITIES} + + +def _pair_key(item_ids: list[str]) -> frozenset[str]: + return frozenset(item_ids) + + +def _candidate_score(left: NewsItem, right: NewsItem, config: dict[str, Any]) -> tuple[float, str, dict[str, Any]] | None: + title_ratio = difflib.SequenceMatcher(None, left.title_norm, right.title_norm).ratio() + title_jaccard = _jaccard_similarity(_text_tokens(left.title_norm), _text_tokens(right.title_norm)) + summary_jaccard = _jaccard_similarity(_text_tokens(left.summary_raw), _text_tokens(right.summary_raw)) + left_entities = _entity_tokens(f"{left.title_raw} {left.summary_raw}") + right_entities = _entity_tokens(f"{right.title_raw} {right.summary_raw}") + shared_entities = sorted(left_entities & right_entities) + strong_entity_threshold = int(_config_value(config, "strong_entity_overlap_threshold")) + + if len(shared_entities) >= strong_entity_threshold and summary_jaccard > 0: + score = min(1.0, 0.55 + len(shared_entities) * 0.1 + summary_jaccard * 0.35) + return score, "strong_entity_overlap", { + "shared_entities": shared_entities, + "title_similarity": round(title_ratio, 3), + "title_jaccard": round(title_jaccard, 3), + "summary_jaccard": round(summary_jaccard, 3), + } + + if title_ratio >= float(_config_value(config, "title_similarity_threshold")) and ( + title_jaccard >= float(_config_value(config, "title_jaccard_threshold")) + or summary_jaccard >= float(_config_value(config, "summary_jaccard_threshold")) * 2 + or shared_entities + ): + return title_ratio, "title_similarity", { + "title_similarity": round(title_ratio, 3), + "title_jaccard": round(title_jaccard, 3), + "summary_jaccard": round(summary_jaccard, 3), + } + + if ( + title_jaccard >= float(_config_value(config, "title_jaccard_threshold")) + and summary_jaccard >= float(_config_value(config, "summary_jaccard_threshold")) + ): + score = (title_jaccard + summary_jaccard) / 2 + return score, "title_summary_jaccard", { + "title_similarity": round(title_ratio, 3), + "title_jaccard": round(title_jaccard, 3), + "summary_jaccard": round(summary_jaccard, 3), + } + + return None + + +def recall_semantic_candidates( + items: list[NewsItem], + *, + existing_candidates: list[dict[str, Any]] | None = None, + config: dict[str, Any] | None = None, +) -> tuple[list[dict[str, Any]], dict[str, Any]]: + config = {**DEFAULT_CONFIG, **(config or {})} + existing_candidates = list(existing_candidates or []) + if not bool(config.get("enabled", True)): + return existing_candidates, { + "enabled": False, + "input_count": len(items), + "existing_candidate_group_count": len(existing_candidates), + "added_candidate_group_count": 0, + "candidate_group_count": len(existing_candidates), + "candidates": existing_candidates, + } + + existing_keys = {_pair_key(list(candidate.get("item_ids", []) or [])) for candidate in existing_candidates} + pair_counts: defaultdict[str, int] = defaultdict(int) + recalled: list[dict[str, Any]] = [] + + for index, left in enumerate(items): + for right in items[index + 1 :]: + if pair_counts[left.id] >= int(config["max_pairs_per_item"]): + continue + if pair_counts[right.id] >= int(config["max_pairs_per_item"]): + continue + key = frozenset({left.id, right.id}) + if key in existing_keys: + continue + scored = _candidate_score(left, right, config) + if scored is None: + continue + score, reason, evidence = scored + recalled.append( + { + "item_ids": [left.id, right.id], + "reason": reason, + "score": round(score, 3), + "confidence": "medium", + **evidence, + } + ) + pair_counts[left.id] += 1 + pair_counts[right.id] += 1 + if len(recalled) >= int(config["max_pairs"]): + break + if len(recalled) >= int(config["max_pairs"]): + break + + candidates = existing_candidates + recalled + report = { + "enabled": True, + "input_count": len(items), + "existing_candidate_group_count": len(existing_candidates), + "added_candidate_group_count": len(recalled), + "candidate_group_count": len(candidates), + "candidates": candidates, + } + return candidates, report diff --git a/ai_daily_report/clients.py b/ai_daily_report/clients.py index 2fd3359..64aeb8a 100644 --- a/ai_daily_report/clients.py +++ b/ai_daily_report/clients.py @@ -1,6 +1,10 @@ from __future__ import annotations import json +import socket +import time +from dataclasses import dataclass +from urllib.error import HTTPError, URLError import urllib.request from typing import Any @@ -8,10 +12,61 @@ from typing import Any UA = "Mozilla/5.0 (compatible; ai-daily-report/1.0)" -def fetch_text(url: str, timeout_seconds: int) -> str: +@dataclass +class FetchTextError(Exception): + error_type: str + message: str + http_status: int | None = None + attempts: int = 1 + + def __str__(self) -> str: + return self.message + + +def _classify_fetch_exception(exc: Exception) -> tuple[str, int | None, bool]: + if isinstance(exc, HTTPError): + if exc.code == 404: + return "http_404", exc.code, False + if exc.code in {429, 500, 502, 503, 504}: + return f"http_{exc.code}", exc.code, True + return f"http_{exc.code}", exc.code, False + if isinstance(exc, TimeoutError | socket.timeout): + return "timeout", None, True + if isinstance(exc, URLError): + reason = exc.reason + if isinstance(reason, TimeoutError | socket.timeout): + return "timeout", None, True + return "network_error", None, True + return "fetch_error", None, False + + +def fetch_text( + url: str, + timeout_seconds: int, + *, + retries: int = 0, + backoff_seconds: float = 0.5, +) -> str: req = urllib.request.Request(url, headers={"User-Agent": UA}) - with urllib.request.urlopen(req, timeout=timeout_seconds) as response: - return response.read().decode("utf-8", "ignore") + attempts = max(1, retries + 1) + last_error: FetchTextError | None = None + for attempt in range(1, attempts + 1): + try: + with urllib.request.urlopen(req, timeout=timeout_seconds) as response: + return response.read().decode("utf-8", "ignore") + except Exception as exc: + error_type, http_status, retryable = _classify_fetch_exception(exc) + last_error = FetchTextError( + error_type=error_type, + message=f"{type(exc).__name__}: {exc}", + http_status=http_status, + attempts=attempt, + ) + if not retryable or attempt >= attempts: + raise last_error from exc + if backoff_seconds > 0: + time.sleep(backoff_seconds * (2 ** (attempt - 1))) + raise last_error or FetchTextError("fetch_error", "unknown fetch error", attempts=attempts) class OpenAICompatibleClient: @@ -60,5 +115,17 @@ class BlogApiClient: def create_post(self, payload: dict[str, Any]) -> dict[str, Any]: return self._request("POST", "/api/service/posts", payload) + def get_post_by_slug(self, slug: str) -> dict[str, Any] | None: + try: + return self._request("GET", f"/api/service/posts/{slug}") + except HTTPError as exc: + if exc.code == 404: + return None + raise + except FetchTextError as exc: + if exc.error_type == "http_404": + return None + raise + def publish_post(self, slug: str) -> None: self._request("POST", f"/api/service/posts/{slug}/publish") diff --git a/ai_daily_report/collect.py b/ai_daily_report/collect.py index b1c947e..fe2567a 100644 --- a/ai_daily_report/collect.py +++ b/ai_daily_report/collect.py @@ -5,6 +5,7 @@ from datetime import datetime, timezone from time import perf_counter from typing import Callable, Iterable, Any +from .clients import FetchTextError from .models import SourceConfig, SourceResult @@ -12,11 +13,19 @@ Fetcher = Callable[[SourceConfig, str], list[dict[str, Any]]] def _status_from_exception(exc: Exception) -> str: + if isinstance(exc, FetchTextError): + return exc.error_type if isinstance(exc, TimeoutError): return "timeout" return "error" +def _retry_count_from_exception(exc: Exception) -> int: + if isinstance(exc, FetchTextError): + return max(0, exc.attempts - 1) + return 0 + + def _collect_one(config: SourceConfig, run_date: str, fetcher: Fetcher) -> SourceResult: fetched_at = datetime.now(timezone.utc).isoformat() if not config.enabled: @@ -51,6 +60,7 @@ def _collect_one(config: SourceConfig, run_date: str, fetcher: Fetcher) -> Sourc status=_status_from_exception(exc), error=f"{type(exc).__name__}: {exc}", elapsed_ms=elapsed_ms, + retry_count=_retry_count_from_exception(exc), fetched_at=fetched_at, ) @@ -91,5 +101,10 @@ def collect_sources( "raw_item_count": sum(len(result.items) for result in results), "source_counts": {result.source: len(result.items) for result in results}, "statuses": {result.source: result.status for result in results}, + "error_types": { + result.source: result.status + for result in results + if not result.ok and result.status != "disabled" + }, } return results, report diff --git a/ai_daily_report/pipeline.py b/ai_daily_report/pipeline.py index 0abbeaa..a5f4251 100644 --- a/ai_daily_report/pipeline.py +++ b/ai_daily_report/pipeline.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any from .assemble import assemble_markdown +from .candidate_recall import recall_semantic_candidates from .classify import classify_and_order_items from .collect import Fetcher, collect_sources from .dedupe import cross_day_dedup_items, hard_dedup_items @@ -10,6 +11,7 @@ from .guide import GuideLlmCall, generate_guide from .models import PublishedUrls, SourceConfig from .normalize import normalize_items from .publish import BlogClient, publish_markdown +from .quality_gate import evaluate_quality_gate from .rewrite import RewriteLlmCall, rewrite_items from .semantic_dedupe import SemanticLlmCall, semantic_dedup_items @@ -49,6 +51,11 @@ def run_stage0_to_stage2( source_priorities=source_priorities, ) deduped_items, stage2_report = hard_dedup_items(normalized_items) + artifacts = { + "stage0_sources": source_results, + "stage1_items": normalized_items, + "stage2_items": deduped_items, + } return { "source_results": source_results, "items": deduped_items, @@ -57,6 +64,7 @@ def run_stage0_to_stage2( "stage1": stage1_report, "stage2": stage2_report, }, + "artifacts": artifacts, } @@ -90,10 +98,13 @@ def run_stage0_to_stage2_5( reports = dict(stage2_result["reports"]) stage2_5_report.setdefault("enabled", cross_day_dedup_enabled) reports["stage2_5"] = stage2_5_report + artifacts = dict(stage2_result.get("artifacts", {})) + artifacts["stage2_5_items"] = items return { "source_results": stage2_result["source_results"], "items": items, "reports": reports, + "artifacts": artifacts, } @@ -107,6 +118,10 @@ def run_stage0_to_stage4( published_urls: PublishedUrls | None = None, cross_day_dedup_enabled: bool = True, cross_day_dedup_max_age_days: int = 7, + semantic_dedup_max_deletion_ratio: float = 0.5, + rewrite_batch_size: int = 30, + semantic_candidate_recall_config: dict[str, Any] | None = None, + quality_gate_config: dict[str, Any] | None = None, ) -> dict[str, Any]: stage2_5_result = run_stage0_to_stage2_5( source_configs, @@ -123,22 +138,35 @@ def run_stage0_to_stage4( for candidate in stage2_5_result["reports"]["stage2"].get("possible_duplicates", []) if set(candidate.get("item_ids", [])).issubset(remaining_ids) ] + candidates, stage2_8_report = recall_semantic_candidates( + items, + existing_candidates=candidates, + config=semantic_candidate_recall_config, + ) semantic_items, stage3_report = semantic_dedup_items( items, candidates, llm_call=semantic_llm_call, + max_deletion_ratio=semantic_dedup_max_deletion_ratio, ) rewritten_items, stage4_report = rewrite_items( semantic_items, llm_call=rewrite_llm_call, + batch_size=rewrite_batch_size, ) reports = dict(stage2_5_result["reports"]) + reports["stage2_8"] = stage2_8_report reports["stage3"] = stage3_report reports["stage4"] = stage4_report + artifacts = dict(stage2_5_result.get("artifacts", {})) + artifacts["stage2_8_candidates"] = candidates + artifacts["stage3_items"] = semantic_items + artifacts["stage4_items"] = rewritten_items return { "source_results": stage2_5_result["source_results"], "items": rewritten_items, "reports": reports, + "artifacts": artifacts, } @@ -152,6 +180,10 @@ def run_stage0_to_stage5( published_urls: PublishedUrls | None = None, cross_day_dedup_enabled: bool = True, cross_day_dedup_max_age_days: int = 7, + semantic_dedup_max_deletion_ratio: float = 0.5, + rewrite_batch_size: int = 30, + semantic_candidate_recall_config: dict[str, Any] | None = None, + quality_gate_config: dict[str, Any] | None = None, ) -> dict[str, Any]: stage4_result = run_stage0_to_stage4( source_configs, @@ -162,6 +194,9 @@ def run_stage0_to_stage5( published_urls=published_urls, cross_day_dedup_enabled=cross_day_dedup_enabled, cross_day_dedup_max_age_days=cross_day_dedup_max_age_days, + semantic_dedup_max_deletion_ratio=semantic_dedup_max_deletion_ratio, + rewrite_batch_size=rewrite_batch_size, + semantic_candidate_recall_config=semantic_candidate_recall_config, ) classified_items, stage5_report = classify_and_order_items(stage4_result["items"]) reports = dict(stage4_result["reports"]) @@ -170,6 +205,7 @@ def run_stage0_to_stage5( "source_results": stage4_result["source_results"], "items": classified_items, "reports": reports, + "artifacts": stage4_result.get("artifacts", {}), } @@ -184,6 +220,9 @@ def run_stage0_to_stage6( published_urls: PublishedUrls | None = None, cross_day_dedup_enabled: bool = True, cross_day_dedup_max_age_days: int = 7, + semantic_dedup_max_deletion_ratio: float = 0.5, + rewrite_batch_size: int = 30, + semantic_candidate_recall_config: dict[str, Any] | None = None, ) -> dict[str, Any]: stage5_result = run_stage0_to_stage5( source_configs, @@ -194,6 +233,9 @@ def run_stage0_to_stage6( published_urls=published_urls, cross_day_dedup_enabled=cross_day_dedup_enabled, cross_day_dedup_max_age_days=cross_day_dedup_max_age_days, + semantic_dedup_max_deletion_ratio=semantic_dedup_max_deletion_ratio, + rewrite_batch_size=rewrite_batch_size, + semantic_candidate_recall_config=semantic_candidate_recall_config, ) guide, stage6_report = generate_guide(stage5_result["items"], llm_call=guide_llm_call) reports = dict(stage5_result["reports"]) @@ -203,6 +245,7 @@ def run_stage0_to_stage6( "items": stage5_result["items"], "guide": guide, "reports": reports, + "artifacts": stage5_result.get("artifacts", {}), } @@ -217,6 +260,10 @@ def run_stage0_to_stage7( published_urls: PublishedUrls | None = None, cross_day_dedup_enabled: bool = True, cross_day_dedup_max_age_days: int = 7, + semantic_dedup_max_deletion_ratio: float = 0.5, + rewrite_batch_size: int = 30, + semantic_candidate_recall_config: dict[str, Any] | None = None, + quality_gate_config: dict[str, Any] | None = None, ) -> dict[str, Any]: stage6_result = run_stage0_to_stage6( source_configs, @@ -228,6 +275,9 @@ def run_stage0_to_stage7( published_urls=published_urls, cross_day_dedup_enabled=cross_day_dedup_enabled, cross_day_dedup_max_age_days=cross_day_dedup_max_age_days, + semantic_dedup_max_deletion_ratio=semantic_dedup_max_deletion_ratio, + rewrite_batch_size=rewrite_batch_size, + semantic_candidate_recall_config=semantic_candidate_recall_config, ) markdown, stage7_report = assemble_markdown(stage6_result["items"], stage6_result["guide"]) upstream_blocking_errors: list[str] = [] @@ -238,13 +288,26 @@ def run_stage0_to_stage7( existing_errors = list(stage7_report.get("blocking_errors", []) or []) stage7_report["blocking_errors"] = existing_errors + upstream_blocking_errors reports = dict(stage6_result["reports"]) + quality_gate_report = evaluate_quality_gate( + stage6_result["items"], + source_results=stage6_result["source_results"], + reports=reports, + config=quality_gate_config, + ) + if quality_gate_report.get("blocking_errors"): + existing_errors = list(stage7_report.get("blocking_errors", []) or []) + stage7_report["blocking_errors"] = existing_errors + list(quality_gate_report["blocking_errors"]) + reports["quality_gate"] = quality_gate_report reports["stage7"] = stage7_report + artifacts = dict(stage6_result.get("artifacts", {})) + artifacts["quality_gate"] = quality_gate_report return { "source_results": stage6_result["source_results"], "items": stage6_result["items"], "guide": stage6_result["guide"], "markdown": markdown, "reports": reports, + "artifacts": artifacts, } @@ -262,6 +325,11 @@ def run_stage0_to_stage8( published_urls: PublishedUrls | None = None, cross_day_dedup_enabled: bool = True, cross_day_dedup_max_age_days: int = 7, + semantic_dedup_max_deletion_ratio: float = 0.5, + rewrite_batch_size: int = 30, + semantic_candidate_recall_config: dict[str, Any] | None = None, + quality_gate_config: dict[str, Any] | None = None, + publish_idempotency_config: dict[str, Any] | None = None, ) -> dict[str, Any]: stage7_result = run_stage0_to_stage7( source_configs, @@ -273,6 +341,10 @@ def run_stage0_to_stage8( published_urls=published_urls, cross_day_dedup_enabled=cross_day_dedup_enabled, cross_day_dedup_max_age_days=cross_day_dedup_max_age_days, + semantic_dedup_max_deletion_ratio=semantic_dedup_max_deletion_ratio, + rewrite_batch_size=rewrite_batch_size, + semantic_candidate_recall_config=semantic_candidate_recall_config, + quality_gate_config=quality_gate_config, ) slug = f"ai-{run_date}" publish_result = publish_markdown( @@ -284,6 +356,7 @@ def run_stage0_to_stage8( mode=mode, markdown_report=stage7_result["reports"]["stage7"], client=client, + idempotency_config=publish_idempotency_config, ) reports = dict(stage7_result["reports"]) reports["stage8"] = { @@ -301,4 +374,5 @@ def run_stage0_to_stage8( "markdown": stage7_result["markdown"], "publish": publish_result, "reports": reports, + "artifacts": stage7_result.get("artifacts", {}), } diff --git a/ai_daily_report/publish.py b/ai_daily_report/publish.py index 39b84e6..af9cdad 100644 --- a/ai_daily_report/publish.py +++ b/ai_daily_report/publish.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import hashlib from dataclasses import dataclass from datetime import date, datetime, timezone from pathlib import Path @@ -20,6 +21,9 @@ class PublishResult: class BlogClient(Protocol): + def get_post_by_slug(self, slug: str) -> dict[str, Any] | None: + ... + def create_post(self, payload: dict[str, Any]) -> dict[str, Any]: ... @@ -153,6 +157,18 @@ def dry_run_publish(slug: str, base_url: str) -> PublishResult: ) +def _content_hash(value: str) -> str: + return hashlib.sha256((value or "").encode("utf-8")).hexdigest() + + +def _get_existing_post(client: BlogClient, slug: str) -> dict[str, Any] | None: + getter = getattr(client, "get_post_by_slug", None) + if getter is None: + return None + existing = getter(slug) + return existing if isinstance(existing, dict) else None + + def publish_markdown( *, title: str, @@ -163,6 +179,7 @@ def publish_markdown( mode: str, markdown_report: dict[str, Any], client: BlogClient | None, + idempotency_config: dict[str, Any] | None = None, ) -> PublishResult: blocking_errors = markdown_report.get("blocking_errors", []) or [] blog_url = f"{base_url.rstrip('/')}/posts/{slug}" @@ -187,6 +204,39 @@ def publish_markdown( error="missing_blog_client", ) + idempotency_config = idempotency_config or {} + if bool(idempotency_config.get("enabled", False)): + try: + existing_post = _get_existing_post(client, slug) + except Exception as exc: + return PublishResult( + mode=mode, + status="failed", + slug=slug, + blog_url=blog_url, + public_ok=False, + error=f"idempotency_check_failed:{type(exc).__name__}: {exc}", + ) + if existing_post is not None: + existing_content = str(existing_post.get("content") or existing_post.get("markdown") or "") + if _content_hash(existing_content) == _content_hash(markdown): + return PublishResult( + mode=mode, + status="already_published", + slug=slug, + blog_url=blog_url, + public_ok=True, + ) + if not bool(idempotency_config.get("allow_republish", False)): + return PublishResult( + mode=mode, + status="blocked", + slug=slug, + blog_url=blog_url, + public_ok=False, + error="slug_already_exists", + ) + payload = {"title": title, "content": markdown, "tags": tags, "slug": slug} try: create_resp = client.create_post(payload) diff --git a/ai_daily_report/quality_gate.py b/ai_daily_report/quality_gate.py new file mode 100644 index 0000000..73ebb5b --- /dev/null +++ b/ai_daily_report/quality_gate.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import difflib +from typing import Any + +from .dedupe import _title_tokens +from .models import NewsItem, SourceResult + + +DEFAULT_CONFIG = { + "block_on_required_source_failure": True, + "warn_on_enabled_source_failure": True, + "warn_when_stage3_candidates_zero_min_items": 30, + "warn_on_final_title_similarity": 0.55, + "warn_on_entity_frequency": 3, + "required_sources": [], +} + + +def _config(config: dict[str, Any] | None) -> dict[str, Any]: + return {**DEFAULT_CONFIG, **(config or {})} + + +def _source_failures(source_results: list[SourceResult]) -> list[dict[str, Any]]: + failures: list[dict[str, Any]] = [] + for result in source_results: + if result.ok or result.status == "disabled": + continue + failures.append( + { + "source": result.source, + "role": result.role, + "status": result.status, + "error": result.error, + } + ) + return failures + + +def _similar_title_warnings(items: list[NewsItem], threshold: float) -> list[str]: + warnings: list[str] = [] + for index, left in enumerate(items): + left_title = left.title or left.title_raw + for right in items[index + 1 :]: + right_title = right.title or right.title_raw + if len(_title_tokens(left_title)) < 2 or len(_title_tokens(right_title)) < 2: + continue + ratio = difflib.SequenceMatcher(None, left_title.lower(), right_title.lower()).ratio() + if ratio >= threshold: + warnings.append(f"final_title_similarity:{left.id}:{right.id}:{ratio:.3f}") + return warnings + + +def evaluate_quality_gate( + items: list[NewsItem], + *, + source_results: list[SourceResult], + reports: dict[str, Any], + config: dict[str, Any] | None = None, +) -> dict[str, Any]: + config = _config(config) + warnings: list[str] = [] + blocking_errors: list[str] = [] + + stage3_report = reports.get("stage3", {}) or {} + min_items = int(config["warn_when_stage3_candidates_zero_min_items"]) + if len(items) > min_items and int(stage3_report.get("candidate_group_count", 0)) == 0: + warnings.append("stage3_candidates_zero") + + failures = _source_failures(source_results) + if bool(config["warn_on_enabled_source_failure"]): + for failure in failures: + warnings.append(f"enabled_source_failed:{failure['source']}:{failure['status']}") + + required_sources = set(config.get("required_sources") or []) + if bool(config["block_on_required_source_failure"]): + for failure in failures: + if failure["source"] in required_sources: + blocking_errors.append(f"required_source_failed:{failure['source']}:{failure['status']}") + + title_threshold = float(config["warn_on_final_title_similarity"]) + if title_threshold > 0: + warnings.extend(_similar_title_warnings(items, title_threshold)) + + return { + "input_count": len(items), + "warnings": warnings, + "blocking_errors": blocking_errors, + "source_failures": failures, + "quality_gate_failed": bool(blocking_errors), + } diff --git a/ai_daily_report/runner.py b/ai_daily_report/runner.py index 293b102..b75e8f2 100644 --- a/ai_daily_report/runner.py +++ b/ai_daily_report/runner.py @@ -104,6 +104,11 @@ def run_daily_report( cross_day_config = pipeline_config.get("cross_day_dedup", {}) or {} cross_day_enabled = bool(cross_day_config.get("enabled", True)) cross_day_max_age_days = int(cross_day_config.get("max_age_days", 7)) + semantic_dedup_max_deletion_ratio = float(pipeline_config.get("semantic_dedup_max_deletion_ratio", 0.5)) + rewrite_batch_size = int(pipeline_config.get("rewrite_batch_size", 30)) + semantic_candidate_recall_config = pipeline_config.get("semantic_candidate_recall", {}) or {} + quality_gate_config = pipeline_config.get("quality_gate", {}) or {} + publish_idempotency_config = pipeline_config.get("publish_idempotency", {}) or {} configured_history_path = history_path or Path( str(cross_day_config.get("history_path") or "~/.hermes/scripts/ai_morning_out/published_urls.json") ).expanduser() @@ -119,7 +124,13 @@ def run_daily_report( def fetcher(config: SourceConfig, current_date: str) -> list[dict[str, Any]]: source_fetcher = get_source_fetcher(config.type) - return source_fetcher(config, current_date, fetch_text) + def configured_fetch_text(url: str, timeout_seconds: int) -> str: + try: + return fetch_text(url, timeout_seconds, retries=config.retries) + except TypeError: + return fetch_text(url, timeout_seconds) + + return source_fetcher(config, current_date, configured_fetch_text) else: raise ValueError("source_mode must be 'mock' or 'live'") @@ -156,6 +167,11 @@ def run_daily_report( published_urls=published_urls, cross_day_dedup_enabled=cross_day_enabled, cross_day_dedup_max_age_days=cross_day_max_age_days, + semantic_dedup_max_deletion_ratio=semantic_dedup_max_deletion_ratio, + rewrite_batch_size=rewrite_batch_size, + semantic_candidate_recall_config=semantic_candidate_recall_config, + quality_gate_config=quality_gate_config, + publish_idempotency_config=publish_idempotency_config, ) if cross_day_enabled and result["publish"].mode == "publish" and result["publish"].status == "ok": @@ -173,9 +189,15 @@ def run_daily_report( json.dumps(result["reports"], ensure_ascii=False, indent=2, default=_json_default), encoding="utf-8", ) + for artifact_name, artifact_value in result.get("artifacts", {}).items(): + (run_dir / f"{artifact_name}.json").write_text( + json.dumps(artifact_value, ensure_ascii=False, indent=2, default=_json_default), + encoding="utf-8", + ) return { "run_dir": str(run_dir), "markdown": result["markdown"], "reports": result["reports"], "publish": result["publish"], + "artifacts": result.get("artifacts", {}), } diff --git a/docs/plans/2026-06-10-ai-daily-full-chain-optimization.md b/docs/plans/2026-06-10-ai-daily-full-chain-optimization.md new file mode 100644 index 0000000..8aed782 --- /dev/null +++ b/docs/plans/2026-06-10-ai-daily-full-chain-optimization.md @@ -0,0 +1,130 @@ +# AI Daily Full Chain Optimization Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Add the first quality safety layer for the AI daily report pipeline: semantic candidate recall, quality gate reporting, stage snapshots, and effective pipeline configuration. + +**Architecture:** Keep the existing stage functions and add a rule-based Stage 2.8 between cross-day URL dedupe and LLM semantic dedupe. Quality gate stays deterministic and report-only for dry-run visibility, while publish blocking can consume its `blocking_errors` through the existing Stage 7/8 guard path. Runner persists stage artifacts from the pipeline result without changing generated content. + +**Tech Stack:** Python standard library, `unittest`, existing dataclass models and pipeline modules. + +--- + +### Task 1: Make Pipeline Config Effective + +**Files:** +- Modify: `ai_daily_report/pipeline.py` +- Modify: `ai_daily_report/runner.py` +- Test: `tests/test_stage0_to_4_pipeline.py` +- Test: `tests/test_runner.py` + +**Step 1: Write failing tests** + +Use existing tests that call `run_stage0_to_stage4(..., semantic_dedup_max_deletion_ratio=0.1, rewrite_batch_size=1)` and expect Stage 4 `batch_count == 3`. + +**Step 2: Run tests to verify failure** + +Run: `python -m pytest tests/test_stage0_to_4_pipeline.py tests/test_runner.py -q` + +Expected: failure from unexpected keyword arguments or ignored config. + +**Step 3: Implement minimal code** + +Thread `semantic_dedup_max_deletion_ratio` into `semantic_dedup_items()` and `rewrite_batch_size` into `rewrite_items()`. Read both from `pipeline.json` in `runner.py`. + +**Step 4: Verify** + +Run the same tests and expect pass. + +### Task 2: Add Stage 2.8 Candidate Recall + +**Files:** +- Create: `ai_daily_report/candidate_recall.py` +- Modify: `ai_daily_report/pipeline.py` +- Test: `tests/test_candidate_recall.py` +- Test: `tests/test_stage0_to_4_pipeline.py` + +**Step 1: Write failing tests** + +Add tests proving related Claude Fable/Mythos items are recalled even when Stage 2 title candidates are empty, while unrelated Gemini/Gemma items are not grouped by company name alone. + +**Step 2: Run tests to verify failure** + +Run: `python -m pytest tests/test_candidate_recall.py tests/test_stage0_to_4_pipeline.py -q` + +Expected: import failure for the new module or zero recalled candidates. + +**Step 3: Implement minimal code** + +Use deterministic title similarity, token Jaccard, summary Jaccard, and strong entity overlap to produce candidate groups with `item_ids`, `reason`, `score`, and evidence fields. + +**Step 4: Verify** + +Run targeted tests and expect pass. + +### Task 3: Add Quality Gate Reporting + +**Files:** +- Create: `ai_daily_report/quality_gate.py` +- Modify: `ai_daily_report/pipeline.py` +- Test: `tests/test_quality_gate.py` + +**Step 1: Write failing tests** + +Add tests for warnings when Stage 3 candidates are zero for large item sets, enabled sources fail, and required sources fail. + +**Step 2: Run tests to verify failure** + +Run: `python -m pytest tests/test_quality_gate.py -q` + +Expected: import failure for the new module. + +**Step 3: Implement minimal code** + +Return a report with `warnings`, `blocking_errors`, `source_failures`, and `quality_gate_failed`. Add it after Stage 7 and propagate blocking errors into Stage 7 before publish. + +**Step 4: Verify** + +Run quality gate and publish-path tests. + +### Task 4: Persist Stage Snapshots + +**Files:** +- Modify: `ai_daily_report/pipeline.py` +- Modify: `ai_daily_report/runner.py` +- Test: `tests/test_runner.py` + +**Step 1: Write failing tests** + +Assert that a mock run writes `stage0_sources.json`, `stage1_items.json`, `stage2_items.json`, `stage2_5_items.json`, `stage2_8_candidates.json`, `stage3_items.json`, `stage4_items.json`, and `quality_gate.json`. + +**Step 2: Run tests to verify failure** + +Run: `python -m pytest tests/test_runner.py -q` + +Expected: snapshot files are missing. + +**Step 3: Implement minimal code** + +Have pipeline results carry an `artifacts` dict and have runner serialize the requested JSON files using the existing dataclass serializer. + +**Step 4: Verify** + +Run runner tests and inspect generated files through assertions. + +### Task 5: Full Regression + +**Files:** +- All touched files + +**Step 1: Run targeted tests** + +Run: `python -m pytest tests/test_candidate_recall.py tests/test_quality_gate.py tests/test_stage0_to_4_pipeline.py tests/test_runner.py -q` + +**Step 2: Run full test suite** + +Run: `python -m pytest -q` + +**Step 3: Fix regressions** + +Fix only issues caused by this change set. diff --git a/tests/test_candidate_recall.py b/tests/test_candidate_recall.py new file mode 100644 index 0000000..9284e75 --- /dev/null +++ b/tests/test_candidate_recall.py @@ -0,0 +1,79 @@ +import unittest + +from ai_daily_report.candidate_recall import recall_semantic_candidates +from ai_daily_report.models import NewsItem +from ai_daily_report.normalize import normalize_title + + +def item(item_id, title, summary): + return NewsItem( + id=item_id, + source_group="AI HOT", + source_label="AI HOT", + source_role="primary", + source_priority=10, + title_raw=title, + title_norm=normalize_title(title), + summary_raw=summary, + url=f"https://example.com/{item_id}", + canonical_url=f"https://example.com/{item_id}", + ) + + +class CandidateRecallTests(unittest.TestCase): + def test_recalls_shared_event_entities_when_titles_are_not_stage2_similar(self): + items = [ + item( + "a", + "Anthropic 被曝开发 Claude Fable", + "Anthropic 正在开发名为 Claude Fable 和 Claude Mythos 的新产品。", + ), + item( + "b", + "Claude Mythos 进入内部测试", + "Anthropic 的 Claude Mythos 与 Claude Fable 面向内容生成场景。", + ), + item( + "c", + "Gemini CLI 发布更新", + "Google 为 Gemini CLI 增加新的开发者命令。", + ), + ] + + candidates, report = recall_semantic_candidates(items, existing_candidates=[]) + + candidate_sets = [set(candidate["item_ids"]) for candidate in candidates] + self.assertIn({"a", "b"}, candidate_sets) + self.assertNotIn({"a", "c"}, candidate_sets) + self.assertEqual(report["candidate_group_count"], 1) + self.assertEqual(candidates[0]["reason"], "strong_entity_overlap") + + def test_does_not_group_same_company_different_products_without_event_overlap(self): + items = [ + item("gemini", "Google 发布 Gemini CLI", "Google 发布面向开发者的 Gemini CLI 工具。"), + item("gemma", "Google 开源 Gemma 3n", "Google 开源 Gemma 3n 模型,面向端侧部署。"), + ] + + candidates, report = recall_semantic_candidates(items, existing_candidates=[]) + + self.assertEqual(candidates, []) + self.assertEqual(report["candidate_group_count"], 0) + + def test_preserves_existing_candidates_and_adds_new_ones_without_duplicates(self): + items = [ + item("a", "Anthropic 发布 Claude Fable", "Claude Fable 与 Claude Mythos 同时曝光。"), + item("b", "Claude Mythos 新功能曝光", "Claude Mythos 和 Claude Fable 是 Anthropic 新项目。"), + ] + + candidates, report = recall_semantic_candidates( + items, + existing_candidates=[{"item_ids": ["a", "b"], "reason": "title_similarity"}], + ) + + self.assertEqual(len(candidates), 1) + self.assertEqual(candidates[0]["reason"], "title_similarity") + self.assertEqual(report["existing_candidate_group_count"], 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_clients.py b/tests/test_clients.py index ccf9e9d..880b94a 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -1,8 +1,9 @@ import json import unittest +from urllib.error import HTTPError from unittest.mock import patch -from ai_daily_report.clients import BlogApiClient, OpenAICompatibleClient, fetch_text +from ai_daily_report.clients import FetchTextError, BlogApiClient, OpenAICompatibleClient, fetch_text class FakeResponse: @@ -26,6 +27,28 @@ class ClientTests(unittest.TestCase): with patch("urllib.request.urlopen", return_value=FakeResponse("ok".encode("utf-8"))): self.assertEqual(fetch_text("https://example.com", 1), "ok") + def test_fetch_text_retries_transient_http_errors(self): + responses = [ + HTTPError("https://example.com", 503, "Service Unavailable", {}, None), + FakeResponse("ok".encode("utf-8")), + ] + with patch("urllib.request.urlopen", side_effect=responses) as urlopen: + self.assertEqual(fetch_text("https://example.com", 1, retries=1, backoff_seconds=0), "ok") + + self.assertEqual(urlopen.call_count, 2) + + def test_fetch_text_does_not_retry_404_and_classifies_error(self): + with patch( + "urllib.request.urlopen", + side_effect=HTTPError("https://example.com", 404, "Not Found", {}, None), + ) as urlopen: + with self.assertRaises(FetchTextError) as context: + fetch_text("https://example.com", 1, retries=2, backoff_seconds=0) + + self.assertEqual(urlopen.call_count, 1) + self.assertEqual(context.exception.error_type, "http_404") + self.assertEqual(context.exception.http_status, 404) + def test_openai_compatible_client_returns_message_content(self): body = json.dumps({"choices": [{"message": {"content": "hello"}}]}).encode("utf-8") with patch("urllib.request.urlopen", return_value=FakeResponse(body)): diff --git a/tests/test_quality_gate.py b/tests/test_quality_gate.py new file mode 100644 index 0000000..ef0384f --- /dev/null +++ b/tests/test_quality_gate.py @@ -0,0 +1,78 @@ +import unittest + +from ai_daily_report.models import NewsItem, SourceResult +from ai_daily_report.quality_gate import evaluate_quality_gate + + +def news_item(item_id, title="Story"): + return NewsItem( + id=item_id, + source_group="AI HOT", + source_label="AI HOT", + source_role="primary", + source_priority=10, + title_raw=f"{title} {item_id}", + title_norm=f"{title} {item_id}".lower(), + summary_raw="summary", + url=f"https://example.com/{item_id}", + canonical_url=f"https://example.com/{item_id}", + ) + + +class QualityGateTests(unittest.TestCase): + def test_warns_when_stage3_candidates_zero_for_large_item_set(self): + items = [news_item(str(index)) for index in range(31)] + report = evaluate_quality_gate( + items, + source_results=[], + reports={"stage3": {"candidate_group_count": 0}}, + config={"warn_when_stage3_candidates_zero_min_items": 30}, + ) + + self.assertIn("stage3_candidates_zero", report["warnings"]) + self.assertEqual(report["blocking_errors"], []) + + def test_warns_on_enabled_source_failure(self): + report = evaluate_quality_gate( + [news_item("a")], + source_results=[ + SourceResult( + source="橘鸦AI早报", + role="supplement", + ok=False, + status="error", + error="HTTPError: 404", + ) + ], + reports={"stage3": {"candidate_group_count": 1}}, + config={"warn_on_enabled_source_failure": True}, + ) + + self.assertIn("enabled_source_failed:橘鸦AI早报:error", report["warnings"]) + self.assertEqual(report["source_failures"][0]["source"], "橘鸦AI早报") + + def test_blocks_required_source_failure_when_configured(self): + report = evaluate_quality_gate( + [news_item("a")], + source_results=[ + SourceResult( + source="AI HOT", + role="primary", + ok=False, + status="timeout", + error="TimeoutError", + ) + ], + reports={"stage3": {"candidate_group_count": 1}}, + config={ + "block_on_required_source_failure": True, + "required_sources": ["AI HOT"], + }, + ) + + self.assertIn("required_source_failed:AI HOT:timeout", report["blocking_errors"]) + self.assertTrue(report["quality_gate_failed"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_runner.py b/tests/test_runner.py index 9f7249a..978213a 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -22,8 +22,128 @@ class RunnerTests(unittest.TestCase): run_dir = Path(result["run_dir"]) self.assertTrue((run_dir / "blog_markdown.md").exists()) self.assertTrue((run_dir / "run_report.json").exists()) + for filename in [ + "stage0_sources.json", + "stage1_items.json", + "stage2_items.json", + "stage2_5_items.json", + "stage2_8_candidates.json", + "stage3_items.json", + "stage4_items.json", + "quality_gate.json", + ]: + self.assertTrue((run_dir / filename).exists(), filename) self.assertEqual(result["reports"]["stage8"]["status"], "ok") + def test_run_daily_report_passes_pipeline_config_to_stage_functions(self): + class FakeLlmClient: + def chat(self, prompt): + payload = json.loads(prompt) + if "candidates" in payload: + first_candidate = payload["candidates"][0]["item_ids"] + return json.dumps( + { + "duplicate_groups": [ + { + "keep_id": first_candidate[0], + "remove_ids": [first_candidate[1]], + "confidence": "high", + "reason": "same event", + } + ], + "not_duplicates": [], + "uncertain": [], + } + ) + if "allowed_sections" in payload: + return json.dumps( + { + "rewrites": [ + { + "id": item["id"], + "title": item["title_raw"], + "summary": item["summary_raw"], + "flags": [], + } + for item in payload["items"] + ] + } + ) + return json.dumps( + { + "intro": "Daily intro.", + "theme": "Pipeline config.", + "threads": [ + { + "title": "Config thread", + "text": "Config values reached the pipeline.", + "item_ids": [payload["items"][0]["id"]], + "kind": "thread", + } + ], + "conclusion": "Done.", + } + ) + + with TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + pipeline_config = temp_path / "pipeline.json" + pipeline_config.write_text( + json.dumps( + { + "semantic_dedup_max_deletion_ratio": 0.1, + "rewrite_batch_size": 1, + "cross_day_dedup": {"enabled": False}, + } + ), + encoding="utf-8", + ) + source_config = temp_path / "sources.json" + source_config.write_text( + json.dumps( + [ + { + "name": "AI HOT", + "type": "rss", + "url": "https://feed.example/rss", + "role": "primary", + "priority": 10, + "enabled": True, + } + ] + ), + encoding="utf-8", + ) + + def fetch_text(url, timeout): + return """ +Anthropic launches Claude Codehttps://example.com/aAnthropic launches Claude Code for developers. +Anthropic launch Claude Codehttps://example.com/bAnthropic launch Claude Code for coding. +Gemini CLI updatehttps://example.com/cGoogle updates Gemini CLI. +""" + + result = run_daily_report( + run_date="2026-06-10", + mode="dry-run", + source_mode="live", + llm_mode="live", + out_dir=temp_path / "out", + base_url="https://blog.example", + sources_path=source_config, + pipeline_path=pipeline_config, + fetch_text=fetch_text, + env={ + "LLM_API_KEY": "test-key", + "LLM_BASE_URL": "https://llm.example/v1", + "LLM_MODEL": "test-model", + }, + llm_client_factory=lambda **config: FakeLlmClient(), + ) + + self.assertTrue(result["reports"]["stage3"]["skipped_for_deletion_ratio"]) + self.assertEqual(result["reports"]["stage4"]["batch_count"], 3) + self.assertIn("quality_gate", result["reports"]) + def test_run_daily_report_live_sources_can_use_config_and_fetch_text(self): with TemporaryDirectory() as temp_dir: out_dir = Path(temp_dir) / "out" diff --git a/tests/test_stage0_collect.py b/tests/test_stage0_collect.py index 7d31c20..90b431c 100644 --- a/tests/test_stage0_collect.py +++ b/tests/test_stage0_collect.py @@ -1,5 +1,6 @@ import unittest +from ai_daily_report.clients import FetchTextError from ai_daily_report.collect import collect_sources from ai_daily_report.models import SourceConfig @@ -44,6 +45,18 @@ class Stage0CollectTests(unittest.TestCase): self.assertEqual(report["failed_source_count"], 1) self.assertEqual(report["raw_item_count"], 1) + def test_collect_sources_records_fetch_text_error_metadata(self): + configs = [SourceConfig(name="RSS", type="rss", retries=2)] + + def fetcher(config, run_date): + raise FetchTextError("http_404", "HTTPError: 404", http_status=404, attempts=1) + + results, report = collect_sources(configs, "2026-06-10", fetcher=fetcher) + + self.assertEqual(results[0].status, "http_404") + self.assertEqual(results[0].retry_count, 0) + self.assertIn("http_404", report["error_types"]["RSS"]) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_stage0_to_4_pipeline.py b/tests/test_stage0_to_4_pipeline.py index 5295922..0768835 100644 --- a/tests/test_stage0_to_4_pipeline.py +++ b/tests/test_stage0_to_4_pipeline.py @@ -6,6 +6,81 @@ from ai_daily_report.models import PublishedUrlEntry, PublishedUrls class Stage0To4PipelineTests(unittest.TestCase): + def test_run_stage0_to_stage4_passes_semantic_and_rewrite_config(self): + configs = [{"name": "AI HOT", "type": "fake", "role": "primary", "priority": 10}] + seen = {} + + def fetcher(config, run_date): + return [ + { + "title_raw": "Anthropic launches Claude Code", + "summary_raw": "Anthropic launches Claude Code for developers.", + "url": "https://example.com/a", + "source_label": config.name, + }, + { + "title_raw": "Anthropic launch Claude Code", + "summary_raw": "Anthropic launch Claude Code for coding.", + "url": "https://example.com/b", + "source_label": config.name, + }, + { + "title_raw": "Gemini CLI update", + "summary_raw": "Google updates Gemini CLI.", + "url": "https://example.com/c", + "source_label": config.name, + }, + ] + + def semantic_llm_call(prompt): + payload = json.loads(prompt) + seen["semantic_prompt"] = payload + first_candidate = payload["candidates"][0]["item_ids"] + return json.dumps( + { + "duplicate_groups": [ + { + "keep_id": first_candidate[0], + "remove_ids": [first_candidate[1]], + "confidence": "high", + "reason": "same event", + } + ], + "not_duplicates": [], + "uncertain": [], + } + ) + + def rewrite_llm_call(prompt): + payload = json.loads(prompt) + seen.setdefault("rewrite_batches", []).append(len(payload["items"])) + return json.dumps( + { + "rewrites": [ + { + "id": item["id"], + "title": item["title_raw"], + "summary": item["summary_raw"], + "flags": [], + } + for item in payload["items"] + ] + } + ) + + result = run_stage0_to_stage4( + configs, + "2026-06-10", + fetcher=fetcher, + semantic_llm_call=semantic_llm_call, + rewrite_llm_call=rewrite_llm_call, + semantic_dedup_max_deletion_ratio=0.1, + rewrite_batch_size=1, + ) + + self.assertTrue(result["reports"]["stage3"]["skipped_for_deletion_ratio"]) + self.assertEqual(seen["rewrite_batches"], [1, 1, 1]) + def test_run_stage0_to_stage4_semantic_dedupes_and_rewrites(self): configs = [ {"name": "AI HOT", "type": "fake", "role": "primary", "priority": 10}, @@ -127,6 +202,67 @@ class Stage0To4PipelineTests(unittest.TestCase): self.assertEqual(result["reports"]["stage2_5"]["removed_count"], 1) self.assertEqual([entry["title_raw"] for entry in seen_rewrite_payloads[0]["items"]], ["Fresh story"]) + def test_run_stage0_to_stage4_uses_stage2_8_recalled_candidates(self): + configs = [{"name": "AI HOT", "type": "fake", "role": "primary", "priority": 10}] + seen = {} + + def fetcher(config, run_date): + return [ + { + "title_raw": "Anthropic 被曝开发 Claude Fable", + "summary_raw": "Anthropic 正在开发名为 Claude Fable 和 Claude Mythos 的新产品。", + "url": "https://example.com/fable", + "source_label": config.name, + }, + { + "title_raw": "Claude Mythos 进入内部测试", + "summary_raw": "Anthropic 的 Claude Mythos 与 Claude Fable 面向内容生成场景。", + "url": "https://example.com/mythos", + "source_label": config.name, + }, + { + "title_raw": "Google 开源 Gemma 3n", + "summary_raw": "Google 开源 Gemma 3n 模型,面向端侧部署。", + "url": "https://example.com/gemma", + "source_label": config.name, + }, + ] + + def semantic_llm_call(prompt): + payload = json.loads(prompt) + seen["candidate_count"] = len(payload["candidates"]) + seen["candidate_reasons"] = [candidate["reason"] for candidate in payload["candidates"]] + return json.dumps({"duplicate_groups": [], "not_duplicates": [], "uncertain": []}) + + def rewrite_llm_call(prompt): + payload = json.loads(prompt) + return json.dumps( + { + "rewrites": [ + { + "id": entry["id"], + "title": entry["title_raw"], + "summary": entry["summary_raw"], + "flags": [], + } + for entry in payload["items"] + ] + }, + ensure_ascii=False, + ) + + result = run_stage0_to_stage4( + configs, + "2026-06-10", + fetcher=fetcher, + semantic_llm_call=semantic_llm_call, + rewrite_llm_call=rewrite_llm_call, + ) + + self.assertEqual(seen["candidate_count"], 1) + self.assertIn("strong_entity_overlap", seen["candidate_reasons"]) + self.assertEqual(result["reports"]["stage2_8"]["added_candidate_group_count"], 1) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_stage8_publish.py b/tests/test_stage8_publish.py index 169c6ae..630f20b 100644 --- a/tests/test_stage8_publish.py +++ b/tests/test_stage8_publish.py @@ -7,9 +7,10 @@ from ai_daily_report.publish import load_published_urls, publish_markdown, updat class FakeBlogClient: - def __init__(self): + def __init__(self, existing_post=None): self.created_payload = None self.published_slug = None + self.existing_post = existing_post def create_post(self, payload): self.created_payload = payload @@ -18,6 +19,9 @@ class FakeBlogClient: def publish_post(self, slug): self.published_slug = slug + def get_post_by_slug(self, slug): + return self.existing_post + class Stage8PublishTests(unittest.TestCase): def test_publish_markdown_dry_run_does_not_call_client(self): @@ -74,6 +78,45 @@ class Stage8PublishTests(unittest.TestCase): self.assertEqual(client.published_slug, "ai-2026-06-04") self.assertEqual(result.blog_url, "https://blog.example/posts/ai-2026-06-04") + def test_publish_markdown_returns_already_published_for_same_slug_and_content(self): + markdown = "## 导览\n\n> ok" + client = FakeBlogClient(existing_post={"slug": "ai-2026-06-04", "content": markdown}) + + result = publish_markdown( + title="AI日报 · 2026-06-04", + markdown=markdown, + tags=["AI日报"], + slug="ai-2026-06-04", + base_url="https://blog.example", + mode="publish", + markdown_report={"blocking_errors": []}, + client=client, + idempotency_config={"enabled": True}, + ) + + self.assertEqual(result.status, "already_published") + self.assertIsNone(client.created_payload) + self.assertIsNone(client.published_slug) + + def test_publish_markdown_blocks_existing_slug_with_different_content(self): + client = FakeBlogClient(existing_post={"slug": "ai-2026-06-04", "content": "old"}) + + result = publish_markdown( + title="AI日报 · 2026-06-04", + markdown="new", + tags=["AI日报"], + slug="ai-2026-06-04", + base_url="https://blog.example", + mode="publish", + markdown_report={"blocking_errors": []}, + client=client, + idempotency_config={"enabled": True}, + ) + + self.assertEqual(result.status, "blocked") + self.assertIn("slug_already_exists", result.error) + self.assertIsNone(client.created_payload) + def test_update_published_urls_writes_canonical_urls_for_final_items(self): with TemporaryDirectory() as temp_dir: history_path = Path(temp_dir) / "published_urls.json"