Files
ai-daily-report/ai_daily_report/dedupe.py
2026-06-08 12:05:45 +08:00

183 lines
6.0 KiB
Python

from __future__ import annotations
import difflib
import re
from datetime import date, datetime
from typing import Any
from .models import NewsItem, PublishedUrlEntry, PublishedUrls
TITLE_SIMILARITY_THRESHOLD = 0.50
TOKEN_JACCARD_THRESHOLD = 0.40
TOKEN_EDIT_DISTANCE_THRESHOLD = 0.40
def _item_score(item: NewsItem) -> int:
score = 0
score += max(0, 200 - item.source_priority)
if item.canonical_url:
score += 20
if item.summary_raw:
score += min(40, len(item.summary_raw))
if item.section_hint:
score += 10
if item.source_role == "primary":
score += 10
score -= len(item.quality_flags) * 10
return score
def _merge_group(group: list[NewsItem], reason: str) -> tuple[NewsItem, list[NewsItem], dict[str, Any]]:
keep = max(group, key=_item_score)
removed = [item for item in group if item is not keep]
for removed_item in removed:
keep.duplicate_sources.append(
{
"id": removed_item.id,
"source_group": removed_item.source_group,
"source_label": removed_item.source_label,
"url": removed_item.url,
"reason": reason,
}
)
report_group = {
"reason": reason,
"keep_id": keep.id,
"removed_ids": [item.id for item in removed],
"confidence": "high",
}
return keep, removed, report_group
def _group_by_key(items: list[NewsItem], key_name: str) -> dict[str, list[NewsItem]]:
groups: dict[str, list[NewsItem]] = {}
for item in items:
key = getattr(item, key_name)
if key:
groups.setdefault(key, []).append(item)
return {key: group for key, group in groups.items() if len(group) > 1}
def _title_tokens(value: str) -> set[str]:
if not value:
return set()
return set(re.findall(r"[a-z0-9]+|[\u4e00-\u9fff]", value.lower()))
def _jaccard_similarity(left: set[str], right: set[str]) -> float:
if not left or not right:
return 0.0
return len(left & right) / len(left | right)
def _possible_duplicates(items: list[NewsItem]) -> list[dict[str, Any]]:
possible: list[dict[str, Any]] = []
for index, left in enumerate(items):
for right in items[index + 1 :]:
if not left.title_norm or not right.title_norm:
continue
ratio = difflib.SequenceMatcher(None, left.title_norm, right.title_norm).ratio()
jaccard = _jaccard_similarity(_title_tokens(left.title_norm), _title_tokens(right.title_norm))
if ratio >= TITLE_SIMILARITY_THRESHOLD or (
ratio >= TOKEN_EDIT_DISTANCE_THRESHOLD and jaccard >= TOKEN_JACCARD_THRESHOLD
):
possible.append(
{
"item_ids": [left.id, right.id],
"reason": "title_similarity",
"similarity": round(ratio, 3),
"token_jaccard": round(jaccard, 3),
"confidence": "medium",
}
)
return possible
def hard_dedup_items(items: list[NewsItem]) -> tuple[list[NewsItem], dict[str, Any]]:
remaining = list(items)
removed_object_ids: set[int] = set()
groups_report: list[dict[str, Any]] = []
for key_name, reason in (
("canonical_url", "same_canonical_url"),
("title_norm", "same_title_norm"),
):
grouped = _group_by_key([item for item in remaining if id(item) not in removed_object_ids], key_name)
for group in grouped.values():
active_group = [item for item in group if id(item) not in removed_object_ids]
if len(active_group) < 2:
continue
keep, removed, report_group = _merge_group(active_group, reason)
removed_object_ids.update(id(item) for item in removed)
groups_report.append(report_group)
deduped = [item for item in remaining if id(item) not in removed_object_ids]
report = {
"input_count": len(items),
"output_count": len(deduped),
"removed_count": len(removed_object_ids),
"groups": groups_report,
"possible_duplicates": _possible_duplicates(deduped),
}
return deduped, report
def _parse_date(value: str | None) -> date | None:
if not value:
return None
text = value.strip()
try:
return date.fromisoformat(text[:10])
except ValueError:
try:
return datetime.fromisoformat(text).date()
except ValueError:
return None
def _entry_within_window(entry: PublishedUrlEntry, *, run_date: str, max_age_days: int) -> bool:
if max_age_days < 0:
return True
current = _parse_date(run_date)
previous = _parse_date(entry.last_published) or _parse_date(entry.first_seen)
if current is None or previous is None:
return True
return (current - previous).days <= max_age_days
def cross_day_dedup_items(
items: list[NewsItem],
published_urls: PublishedUrls | None,
*,
run_date: str,
max_age_days: int = 7,
) -> tuple[list[NewsItem], dict[str, Any]]:
history = published_urls or PublishedUrls()
deduped: list[NewsItem] = []
removed: list[dict[str, Any]] = []
for item in items:
entry = history.urls.get(item.canonical_url) if item.canonical_url else None
if entry and _entry_within_window(entry, run_date=run_date, max_age_days=max_age_days):
removed.append(
{
"item_id": item.id,
"canonical_url": item.canonical_url,
"title": item.title or item.title_raw,
"first_seen": entry.first_seen,
"last_published": entry.last_published,
}
)
continue
deduped.append(item)
report = {
"input_count": len(items),
"output_count": len(deduped),
"removed_count": len(removed),
"removed": removed,
"max_age_days": max_age_days,
}
return deduped, report