from __future__ import annotations import json from typing import Any, Callable from .llm import parse_json_object from .models import NewsItem SemanticLlmCall = Callable[[str], str] def _build_prompt(items: list[NewsItem], candidates: list[dict[str, Any]]) -> str: item_payload = [ { "id": item.id, "title": item.title or item.title_raw, "summary": item.summary or item.summary_raw, "source": item.source_label, "section_hint": item.section_hint, } for item in items ] prompt = { "task": "Identify only high-confidence semantic duplicates. Do not curate or remove by importance.", "items": item_payload, "candidates": candidates, "output_schema": { "duplicate_groups": [ { "keep_id": "item id", "remove_ids": ["item id"], "confidence": "high|medium|low", "reason": "same concrete event reason", } ], "not_duplicates": [], "uncertain": [], }, } return json.dumps(prompt, ensure_ascii=False) def _score(item: NewsItem) -> int: score = max(0, 200 - item.source_priority) if item.source_role == "primary": score += 10 if item.summary_raw: score += min(40, len(item.summary_raw)) if item.canonical_url: score += 20 score -= len(item.quality_flags) * 10 return score def _choose_keep(group_items: list[NewsItem], suggested_keep_id: str) -> NewsItem: suggested = [item for item in group_items if item.id == suggested_keep_id] if suggested: best = max(group_items, key=_score) if _score(suggested[0]) >= _score(best) - 10: return suggested[0] return max(group_items, key=_score) def semantic_dedup_items( items: list[NewsItem], candidates: list[dict[str, Any]], *, llm_call: SemanticLlmCall, max_deletion_ratio: float = 0.5, ) -> tuple[list[NewsItem], dict[str, Any]]: if not items or not candidates: return items, { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": [], "uncertain": [], "errors": [], "skipped_for_deletion_ratio": False, } errors: list[str] = [] try: obj = parse_json_object(llm_call(_build_prompt(items, candidates))) except Exception as exc: return items, { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": [], "uncertain": [], "errors": [f"{type(exc).__name__}: {exc}"], "skipped_for_deletion_ratio": False, } by_id = {item.id: item for item in items} candidate_sets = { frozenset(item_id for item_id in candidate.get("item_ids", []) if isinstance(item_id, str)) for candidate in candidates } candidate_removals: set[str] = set() valid_groups: list[dict[str, Any]] = [] for group in obj.get("duplicate_groups", []) or []: if group.get("confidence") != "high": continue ids = [group.get("keep_id")] + list(group.get("remove_ids") or []) if any(not isinstance(item_id, str) or item_id not in by_id for item_id in ids): errors.append(f"invalid_ids_in_group: {group}") continue group_set = frozenset(ids) if not any(group_set.issubset(candidate_set) for candidate_set in candidate_sets): errors.append(f"group_outside_candidates: {group}") continue group_items = [by_id[item_id] for item_id in ids] keep = _choose_keep(group_items, str(group.get("keep_id"))) remove_items = [item for item in group_items if item is not keep] candidate_removals.update(item.id for item in remove_items) valid_groups.append( { "keep_id": keep.id, "remove_ids": [item.id for item in remove_items], "confidence": "high", "reason": str(group.get("reason") or "semantic_duplicate"), } ) deletion_ratio = len(candidate_removals) / len(items) if items else 0 if deletion_ratio > max_deletion_ratio: return items, { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": valid_groups, "uncertain": obj.get("uncertain", []) or [], "errors": errors, "skipped_for_deletion_ratio": True, } removed_ids: set[str] = set() for group in valid_groups: keep = by_id[group["keep_id"]] for remove_id in group["remove_ids"]: removed = by_id[remove_id] keep.duplicate_sources.append( { "id": removed.id, "source_group": removed.source_group, "source_label": removed.source_label, "url": removed.url, "reason": group["reason"], } ) removed_ids.add(remove_id) deduped = [item for item in items if item.id not in removed_ids] report = { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": len(removed_ids), "duplicate_groups": valid_groups, "uncertain": obj.get("uncertain", []) or [], "errors": errors, "skipped_for_deletion_ratio": False, } return deduped, report