from __future__ import annotations import difflib import re from datetime import date, datetime from typing import Any from .models import NewsItem, PublishedUrlEntry, PublishedUrls TITLE_SIMILARITY_THRESHOLD = 0.50 TOKEN_JACCARD_THRESHOLD = 0.40 TOKEN_EDIT_DISTANCE_THRESHOLD = 0.40 def _item_score(item: NewsItem) -> int: score = 0 score += max(0, 200 - item.source_priority) if item.canonical_url: score += 20 if item.summary_raw: score += min(40, len(item.summary_raw)) if item.section_hint: score += 10 if item.source_role == "primary": score += 10 score -= len(item.quality_flags) * 10 return score def _merge_group(group: list[NewsItem], reason: str) -> tuple[NewsItem, list[NewsItem], dict[str, Any]]: keep = max(group, key=_item_score) removed = [item for item in group if item is not keep] for removed_item in removed: keep.duplicate_sources.append( { "id": removed_item.id, "source_group": removed_item.source_group, "source_label": removed_item.source_label, "url": removed_item.url, "reason": reason, } ) report_group = { "reason": reason, "keep_id": keep.id, "removed_ids": [item.id for item in removed], "confidence": "high", } return keep, removed, report_group def _group_by_key(items: list[NewsItem], key_name: str) -> dict[str, list[NewsItem]]: groups: dict[str, list[NewsItem]] = {} for item in items: key = getattr(item, key_name) if key: groups.setdefault(key, []).append(item) return {key: group for key, group in groups.items() if len(group) > 1} def _title_tokens(value: str) -> set[str]: if not value: return set() return set(re.findall(r"[a-z0-9]+|[\u4e00-\u9fff]", value.lower())) def _jaccard_similarity(left: set[str], right: set[str]) -> float: if not left or not right: return 0.0 return len(left & right) / len(left | right) def _possible_duplicates(items: list[NewsItem]) -> list[dict[str, Any]]: possible: list[dict[str, Any]] = [] for index, left in enumerate(items): for right in items[index + 1 :]: if not left.title_norm or not right.title_norm: continue ratio = difflib.SequenceMatcher(None, left.title_norm, right.title_norm).ratio() jaccard = _jaccard_similarity(_title_tokens(left.title_norm), _title_tokens(right.title_norm)) if ratio >= TITLE_SIMILARITY_THRESHOLD or ( ratio >= TOKEN_EDIT_DISTANCE_THRESHOLD and jaccard >= TOKEN_JACCARD_THRESHOLD ): possible.append( { "item_ids": [left.id, right.id], "reason": "title_similarity", "similarity": round(ratio, 3), "token_jaccard": round(jaccard, 3), "confidence": "medium", } ) return possible def hard_dedup_items(items: list[NewsItem]) -> tuple[list[NewsItem], dict[str, Any]]: remaining = list(items) removed_object_ids: set[int] = set() groups_report: list[dict[str, Any]] = [] for key_name, reason in ( ("canonical_url", "same_canonical_url"), ("title_norm", "same_title_norm"), ): grouped = _group_by_key([item for item in remaining if id(item) not in removed_object_ids], key_name) for group in grouped.values(): active_group = [item for item in group if id(item) not in removed_object_ids] if len(active_group) < 2: continue keep, removed, report_group = _merge_group(active_group, reason) removed_object_ids.update(id(item) for item in removed) groups_report.append(report_group) deduped = [item for item in remaining if id(item) not in removed_object_ids] report = { "input_count": len(items), "output_count": len(deduped), "removed_count": len(removed_object_ids), "groups": groups_report, "possible_duplicates": _possible_duplicates(deduped), } return deduped, report def _parse_date(value: str | None) -> date | None: if not value: return None text = value.strip() try: return date.fromisoformat(text[:10]) except ValueError: try: return datetime.fromisoformat(text).date() except ValueError: return None def _entry_within_window(entry: PublishedUrlEntry, *, run_date: str, max_age_days: int) -> bool: if max_age_days < 0: return True current = _parse_date(run_date) previous = _parse_date(entry.last_published) or _parse_date(entry.first_seen) if current is None or previous is None: return True return (current - previous).days <= max_age_days def cross_day_dedup_items( items: list[NewsItem], published_urls: PublishedUrls | None, *, run_date: str, max_age_days: int = 7, ) -> tuple[list[NewsItem], dict[str, Any]]: history = published_urls or PublishedUrls() deduped: list[NewsItem] = [] removed: list[dict[str, Any]] = [] for item in items: entry = history.urls.get(item.canonical_url) if item.canonical_url else None if entry and _entry_within_window(entry, run_date=run_date, max_age_days=max_age_days): removed.append( { "item_id": item.id, "canonical_url": item.canonical_url, "title": item.title or item.title_raw, "first_seen": entry.first_seen, "last_published": entry.last_published, } ) continue deduped.append(item) report = { "input_count": len(items), "output_count": len(deduped), "removed_count": len(removed), "removed": removed, "max_age_days": max_age_days, } return deduped, report