from __future__ import annotations import hashlib import html import re import unicodedata from collections import Counter from datetime import datetime, timezone from typing import Any from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse from .models import NewsItem, SourceResult TRACKING_QUERY_PREFIXES = ("utm_",) TRACKING_QUERY_KEYS = {"fbclid", "gclid", "spm", "from", "ref"} def clean_text(value: str) -> str: text = html.unescape(value or "") text = re.sub(r"<[^>]+>", " ", text) text = re.sub(r"\s+", " ", text).strip() return text def canonicalize_url(url: str) -> str: if not url: return "" parsed = urlparse(url.strip()) scheme = (parsed.scheme or "https").lower() host = (parsed.netloc or "").lower() if host.startswith("www."): host = host[4:] if host == "twitter.com": host = "x.com" query = [] for key, value in parse_qsl(parsed.query, keep_blank_values=True): key_lower = key.lower() if key_lower in TRACKING_QUERY_KEYS: continue if any(key_lower.startswith(prefix) for prefix in TRACKING_QUERY_PREFIXES): continue query.append((key, value)) path = parsed.path or "" if len(path) > 1: path = path.rstrip("/") return urlunparse((scheme, host, path, "", urlencode(query), "")) def normalize_title(title: str) -> str: text = unicodedata.normalize("NFKC", title or "").lower() text = re.sub(r"[^\w\u4e00-\u9fff]+", "", text) return text def _item_id(canonical_url: str, source_group: str, title_norm: str, published_at: str | None) -> str: seed = canonical_url or "|".join([source_group, title_norm, published_at or ""]) digest = hashlib.sha1(seed.encode("utf-8")).hexdigest()[:16] return f"item_{digest}" def _quality_flags(title: str, summary: str, url: str) -> list[str]: flags: list[str] = [] if not url: flags.append("missing_url") if not summary: flags.append("missing_summary") if len(normalize_title(title)) < 3: flags.append("short_title") return flags def normalize_items( source_results: list[SourceResult], *, run_date: str, source_priorities: dict[str, int] | None = None, ) -> tuple[list[NewsItem], dict[str, Any]]: source_priorities = source_priorities or {} collected_at = datetime.now(timezone.utc).isoformat() items: list[NewsItem] = [] flag_counts: Counter[str] = Counter() id_counts: Counter[str] = Counter() input_count = 0 for source_result in source_results: for raw in source_result.items: input_count += 1 title = clean_text(str(raw.get("title_raw") or raw.get("title") or "")) summary = clean_text(str(raw.get("summary_raw") or raw.get("summary") or "")) url = str(raw.get("url") or "").strip() canonical_url = canonicalize_url(url) title_norm = normalize_title(title) flags = _quality_flags(title, summary, canonical_url) flag_counts.update(flags) source_label = clean_text(str(raw.get("source_label") or source_result.source)) published_at = raw.get("published_at") base_id = _item_id(canonical_url, source_result.source, title_norm, published_at) id_counts[base_id] += 1 item_id = base_id if id_counts[base_id] == 1 else f"{base_id}_{id_counts[base_id]}" items.append( NewsItem( id=item_id, source_group=source_result.source, source_label=source_label, source_role=source_result.role, source_priority=source_priorities.get(source_result.source, 100), title_raw=title, title_norm=title_norm, summary_raw=summary, url=url, canonical_url=canonical_url, published_at=published_at, collected_at=collected_at, origin_type=str(raw.get("origin_type") or ""), section_hint=str(raw.get("section_hint") or ""), language_hint=str(raw.get("language_hint") or ""), quality_flags=flags, ) ) report = { "run_date": run_date, "input_count": input_count, "output_count": len(items), "quality_flag_counts": dict(flag_counts), } return items, report