from __future__ import annotations import difflib from typing import Any from .models import NewsItem def _item_score(item: NewsItem) -> int: score = 0 score += max(0, 200 - item.source_priority) if item.canonical_url: score += 20 if item.summary_raw: score += min(40, len(item.summary_raw)) if item.section_hint: score += 10 if item.source_role == "primary": score += 10 score -= len(item.quality_flags) * 10 return score def _merge_group(group: list[NewsItem], reason: str) -> tuple[NewsItem, list[NewsItem], dict[str, Any]]: keep = max(group, key=_item_score) removed = [item for item in group if item is not keep] for removed_item in removed: keep.duplicate_sources.append( { "id": removed_item.id, "source_group": removed_item.source_group, "source_label": removed_item.source_label, "url": removed_item.url, "reason": reason, } ) report_group = { "reason": reason, "keep_id": keep.id, "removed_ids": [item.id for item in removed], "confidence": "high", } return keep, removed, report_group def _group_by_key(items: list[NewsItem], key_name: str) -> dict[str, list[NewsItem]]: groups: dict[str, list[NewsItem]] = {} for item in items: key = getattr(item, key_name) if key: groups.setdefault(key, []).append(item) return {key: group for key, group in groups.items() if len(group) > 1} def _possible_duplicates(items: list[NewsItem]) -> list[dict[str, Any]]: possible: list[dict[str, Any]] = [] for index, left in enumerate(items): for right in items[index + 1 :]: if not left.title_norm or not right.title_norm: continue ratio = difflib.SequenceMatcher(None, left.title_norm, right.title_norm).ratio() if ratio >= 0.65: possible.append( { "item_ids": [left.id, right.id], "reason": "title_similarity", "similarity": round(ratio, 3), "confidence": "medium", } ) return possible def hard_dedup_items(items: list[NewsItem]) -> tuple[list[NewsItem], dict[str, Any]]: remaining = list(items) removed_object_ids: set[int] = set() groups_report: list[dict[str, Any]] = [] for key_name, reason in ( ("canonical_url", "same_canonical_url"), ("title_norm", "same_title_norm"), ): grouped = _group_by_key([item for item in remaining if id(item) not in removed_object_ids], key_name) for group in grouped.values(): active_group = [item for item in group if id(item) not in removed_object_ids] if len(active_group) < 2: continue keep, removed, report_group = _merge_group(active_group, reason) removed_object_ids.update(id(item) for item in removed) groups_report.append(report_group) deduped = [item for item in remaining if id(item) not in removed_object_ids] report = { "input_count": len(items), "output_count": len(deduped), "removed_count": len(removed_object_ids), "groups": groups_report, "possible_duplicates": _possible_duplicates(deduped), } return deduped, report