225 lines
8.2 KiB
Python
225 lines
8.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any, Callable
|
|
|
|
from .llm import parse_json_object
|
|
from .models import NewsItem
|
|
|
|
|
|
SemanticLlmCall = Callable[[str], str]
|
|
|
|
|
|
def _build_prompt(items: list[NewsItem], candidates: list[dict[str, Any]]) -> str:
|
|
item_payload = [
|
|
{
|
|
"id": item.id,
|
|
"title": item.title or item.title_raw,
|
|
"summary": item.summary or item.summary_raw,
|
|
"source": item.source_label,
|
|
"section_hint": item.section_hint,
|
|
}
|
|
for item in items
|
|
]
|
|
prompt = {
|
|
"task": "Identify only high-confidence semantic duplicates. Do not curate or remove by importance.",
|
|
"items": item_payload,
|
|
"candidates": candidates,
|
|
"dedupe_policy": [
|
|
"Use duplicate_groups only when items are substantially the same article/event and one can be removed.",
|
|
"Use merge_groups when items cover the same concrete event from different angles; keep the best item and attach the others as supplementary sources instead of dropping the event context.",
|
|
"Do not curate by importance. Do not merge unrelated follow-ups just because they mention the same company/model.",
|
|
],
|
|
"output_schema": {
|
|
"duplicate_groups": [
|
|
{
|
|
"keep_id": "item id",
|
|
"remove_ids": ["item id"],
|
|
"confidence": "high|medium|low",
|
|
"reason": "same concrete event reason",
|
|
}
|
|
],
|
|
"merge_groups": [
|
|
{
|
|
"keep_id": "item id",
|
|
"merge_ids": ["item id"],
|
|
"confidence": "high|medium|low",
|
|
"reason": "same event, complementary angle/source",
|
|
}
|
|
],
|
|
"not_duplicates": [],
|
|
"uncertain": [],
|
|
},
|
|
}
|
|
return json.dumps(prompt, ensure_ascii=False)
|
|
|
|
|
|
def _score(item: NewsItem) -> int:
|
|
score = max(0, 200 - item.source_priority)
|
|
if item.source_role == "primary":
|
|
score += 10
|
|
if item.summary_raw:
|
|
score += min(40, len(item.summary_raw))
|
|
if item.canonical_url:
|
|
score += 20
|
|
score -= len(item.quality_flags) * 10
|
|
return score
|
|
|
|
|
|
def _choose_keep(group_items: list[NewsItem], suggested_keep_id: str) -> NewsItem:
|
|
suggested = [item for item in group_items if item.id == suggested_keep_id]
|
|
if suggested:
|
|
best = max(group_items, key=_score)
|
|
if _score(suggested[0]) >= _score(best) - 10:
|
|
return suggested[0]
|
|
return max(group_items, key=_score)
|
|
|
|
|
|
def semantic_dedup_items(
|
|
items: list[NewsItem],
|
|
candidates: list[dict[str, Any]],
|
|
*,
|
|
llm_call: SemanticLlmCall,
|
|
max_deletion_ratio: float = 0.5,
|
|
) -> tuple[list[NewsItem], dict[str, Any]]:
|
|
if not items or not candidates:
|
|
return items, {
|
|
"input_count": len(items),
|
|
"candidate_group_count": len(candidates),
|
|
"removed_count": 0,
|
|
"duplicate_groups": [],
|
|
"merge_groups": [],
|
|
"uncertain": [],
|
|
"errors": [],
|
|
"skipped_for_deletion_ratio": False,
|
|
}
|
|
|
|
errors: list[str] = []
|
|
try:
|
|
obj = parse_json_object(llm_call(_build_prompt(items, candidates)))
|
|
except Exception as exc:
|
|
return items, {
|
|
"input_count": len(items),
|
|
"candidate_group_count": len(candidates),
|
|
"removed_count": 0,
|
|
"duplicate_groups": [],
|
|
"merge_groups": [],
|
|
"uncertain": [],
|
|
"errors": [f"{type(exc).__name__}: {exc}"],
|
|
"skipped_for_deletion_ratio": False,
|
|
}
|
|
|
|
by_id = {item.id: item for item in items}
|
|
candidate_sets = {
|
|
frozenset(item_id for item_id in candidate.get("item_ids", []) if isinstance(item_id, str))
|
|
for candidate in candidates
|
|
}
|
|
candidate_removals: set[str] = set()
|
|
valid_groups: list[dict[str, Any]] = []
|
|
valid_merge_groups: list[dict[str, Any]] = []
|
|
|
|
def _validate_group_ids(group: dict[str, Any], member_key: str) -> tuple[list[str], list[NewsItem]] | None:
|
|
raw_ids = [group.get("keep_id")] + list(group.get(member_key) or [])
|
|
if any(not isinstance(item_id, str) or item_id not in by_id for item_id in raw_ids):
|
|
errors.append(f"invalid_ids_in_group: {group}")
|
|
return None
|
|
ids = [str(item_id) for item_id in raw_ids]
|
|
group_set = frozenset(ids)
|
|
if not any(group_set.issubset(candidate_set) for candidate_set in candidate_sets):
|
|
errors.append(f"group_outside_candidates: {group}")
|
|
return None
|
|
return ids, [by_id[item_id] for item_id in ids]
|
|
|
|
for group in obj.get("duplicate_groups", []) or []:
|
|
if group.get("confidence") != "high":
|
|
continue
|
|
validated = _validate_group_ids(group, "remove_ids")
|
|
if validated is None:
|
|
continue
|
|
ids, group_items = validated
|
|
keep = _choose_keep(group_items, str(group.get("keep_id")))
|
|
remove_items = [item for item in group_items if item is not keep]
|
|
candidate_removals.update(item.id for item in remove_items)
|
|
valid_groups.append(
|
|
{
|
|
"keep_id": keep.id,
|
|
"remove_ids": [item.id for item in remove_items],
|
|
"confidence": "high",
|
|
"reason": str(group.get("reason") or "semantic_duplicate"),
|
|
}
|
|
)
|
|
|
|
for group in obj.get("merge_groups", []) or []:
|
|
if group.get("confidence") != "high":
|
|
continue
|
|
validated = _validate_group_ids(group, "merge_ids")
|
|
if validated is None:
|
|
continue
|
|
ids, group_items = validated
|
|
keep = _choose_keep(group_items, str(group.get("keep_id")))
|
|
merge_items = [item for item in group_items if item is not keep]
|
|
valid_merge_groups.append(
|
|
{
|
|
"keep_id": keep.id,
|
|
"merge_ids": [item.id for item in merge_items],
|
|
"confidence": "high",
|
|
"reason": str(group.get("reason") or "semantic_merge"),
|
|
}
|
|
)
|
|
|
|
deletion_ratio = len(candidate_removals) / len(items) if items else 0
|
|
if deletion_ratio > max_deletion_ratio:
|
|
return items, {
|
|
"input_count": len(items),
|
|
"candidate_group_count": len(candidates),
|
|
"removed_count": 0,
|
|
"duplicate_groups": valid_groups,
|
|
"merge_groups": valid_merge_groups,
|
|
"uncertain": obj.get("uncertain", []) or [],
|
|
"errors": errors,
|
|
"skipped_for_deletion_ratio": True,
|
|
}
|
|
|
|
removed_ids: set[str] = set()
|
|
|
|
def append_supplement(keep: NewsItem, source_item: NewsItem, reason: str, action: str) -> None:
|
|
keep.duplicate_sources.append(
|
|
{
|
|
"id": source_item.id,
|
|
"source_group": source_item.source_group,
|
|
"source_label": source_item.source_label,
|
|
"url": source_item.url,
|
|
"title": source_item.title or source_item.title_raw,
|
|
"summary": source_item.summary or source_item.summary_raw,
|
|
"reason": reason,
|
|
"action": action,
|
|
}
|
|
)
|
|
|
|
for group in valid_groups:
|
|
keep = by_id[group["keep_id"]]
|
|
for remove_id in group["remove_ids"]:
|
|
removed = by_id[remove_id]
|
|
append_supplement(keep, removed, group["reason"], "dedupe_remove")
|
|
removed_ids.add(remove_id)
|
|
|
|
for group in valid_merge_groups:
|
|
keep = by_id[group["keep_id"]]
|
|
for merge_id in group["merge_ids"]:
|
|
if merge_id in removed_ids:
|
|
continue
|
|
append_supplement(keep, by_id[merge_id], group["reason"], "merge_supplement")
|
|
|
|
deduped = [item for item in items if item.id not in removed_ids]
|
|
report = {
|
|
"input_count": len(items),
|
|
"candidate_group_count": len(candidates),
|
|
"removed_count": len(removed_ids),
|
|
"duplicate_groups": valid_groups,
|
|
"merge_groups": valid_merge_groups,
|
|
"uncertain": obj.get("uncertain", []) or [],
|
|
"errors": errors,
|
|
"skipped_for_deletion_ratio": False,
|
|
}
|
|
return deduped, report
|