from __future__ import annotations import json from typing import Any, Callable from .llm import parse_json_object from .models import NewsItem SemanticLlmCall = Callable[[str], str] def _build_prompt(items: list[NewsItem], candidates: list[dict[str, Any]]) -> str: item_payload = [ { "id": item.id, "title": item.title or item.title_raw, "summary": item.summary or item.summary_raw, "source": item.source_label, "section_hint": item.section_hint, } for item in items ] prompt = { "task": "Identify only high-confidence semantic duplicates. Do not curate or remove by importance.", "items": item_payload, "candidates": candidates, "dedupe_policy": [ "Use duplicate_groups only when items are substantially the same article/event and one can be removed.", "Use merge_groups when items cover the same concrete event from different angles; keep the best item and attach the others as supplementary sources instead of dropping the event context.", "Do not curate by importance. Do not merge unrelated follow-ups just because they mention the same company/model.", ], "output_schema": { "duplicate_groups": [ { "keep_id": "item id", "remove_ids": ["item id"], "confidence": "high|medium|low", "reason": "same concrete event reason", } ], "merge_groups": [ { "keep_id": "item id", "merge_ids": ["item id"], "confidence": "high|medium|low", "reason": "same event, complementary angle/source", } ], "not_duplicates": [], "uncertain": [], }, } return json.dumps(prompt, ensure_ascii=False) def _score(item: NewsItem) -> int: score = max(0, 200 - item.source_priority) if item.source_role == "primary": score += 10 if item.summary_raw: score += min(40, len(item.summary_raw)) if item.canonical_url: score += 20 score -= len(item.quality_flags) * 10 return score def _choose_keep(group_items: list[NewsItem], suggested_keep_id: str) -> NewsItem: suggested = [item for item in group_items if item.id == suggested_keep_id] if suggested: best = max(group_items, key=_score) if _score(suggested[0]) >= _score(best) - 10: return suggested[0] return max(group_items, key=_score) def semantic_dedup_items( items: list[NewsItem], candidates: list[dict[str, Any]], *, llm_call: SemanticLlmCall, max_deletion_ratio: float = 0.5, ) -> tuple[list[NewsItem], dict[str, Any]]: if not items or not candidates: return items, { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": [], "merge_groups": [], "uncertain": [], "errors": [], "skipped_for_deletion_ratio": False, } errors: list[str] = [] try: obj = parse_json_object(llm_call(_build_prompt(items, candidates))) except Exception as exc: return items, { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": [], "merge_groups": [], "uncertain": [], "errors": [f"{type(exc).__name__}: {exc}"], "skipped_for_deletion_ratio": False, } by_id = {item.id: item for item in items} candidate_sets = { frozenset(item_id for item_id in candidate.get("item_ids", []) if isinstance(item_id, str)) for candidate in candidates } candidate_removals: set[str] = set() valid_groups: list[dict[str, Any]] = [] valid_merge_groups: list[dict[str, Any]] = [] def _validate_group_ids(group: dict[str, Any], member_key: str) -> tuple[list[str], list[NewsItem]] | None: raw_ids = [group.get("keep_id")] + list(group.get(member_key) or []) if any(not isinstance(item_id, str) or item_id not in by_id for item_id in raw_ids): errors.append(f"invalid_ids_in_group: {group}") return None ids = [str(item_id) for item_id in raw_ids] group_set = frozenset(ids) if not any(group_set.issubset(candidate_set) for candidate_set in candidate_sets): errors.append(f"group_outside_candidates: {group}") return None return ids, [by_id[item_id] for item_id in ids] for group in obj.get("duplicate_groups", []) or []: if group.get("confidence") != "high": continue validated = _validate_group_ids(group, "remove_ids") if validated is None: continue ids, group_items = validated keep = _choose_keep(group_items, str(group.get("keep_id"))) remove_items = [item for item in group_items if item is not keep] candidate_removals.update(item.id for item in remove_items) valid_groups.append( { "keep_id": keep.id, "remove_ids": [item.id for item in remove_items], "confidence": "high", "reason": str(group.get("reason") or "semantic_duplicate"), } ) for group in obj.get("merge_groups", []) or []: if group.get("confidence") != "high": continue validated = _validate_group_ids(group, "merge_ids") if validated is None: continue ids, group_items = validated keep = _choose_keep(group_items, str(group.get("keep_id"))) merge_items = [item for item in group_items if item is not keep] valid_merge_groups.append( { "keep_id": keep.id, "merge_ids": [item.id for item in merge_items], "confidence": "high", "reason": str(group.get("reason") or "semantic_merge"), } ) deletion_ratio = len(candidate_removals) / len(items) if items else 0 if deletion_ratio > max_deletion_ratio: return items, { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": 0, "duplicate_groups": valid_groups, "merge_groups": valid_merge_groups, "uncertain": obj.get("uncertain", []) or [], "errors": errors, "skipped_for_deletion_ratio": True, } removed_ids: set[str] = set() def append_supplement(keep: NewsItem, source_item: NewsItem, reason: str, action: str) -> None: keep.duplicate_sources.append( { "id": source_item.id, "source_group": source_item.source_group, "source_label": source_item.source_label, "url": source_item.url, "title": source_item.title or source_item.title_raw, "summary": source_item.summary or source_item.summary_raw, "reason": reason, "action": action, } ) for group in valid_groups: keep = by_id[group["keep_id"]] for remove_id in group["remove_ids"]: removed = by_id[remove_id] append_supplement(keep, removed, group["reason"], "dedupe_remove") removed_ids.add(remove_id) for group in valid_merge_groups: keep = by_id[group["keep_id"]] for merge_id in group["merge_ids"]: if merge_id in removed_ids: continue append_supplement(keep, by_id[merge_id], group["reason"], "merge_supplement") deduped = [item for item in items if item.id not in removed_ids] report = { "input_count": len(items), "candidate_group_count": len(candidates), "removed_count": len(removed_ids), "duplicate_groups": valid_groups, "merge_groups": valid_merge_groups, "uncertain": obj.get("uncertain", []) or [], "errors": errors, "skipped_for_deletion_ratio": False, } return deduped, report