from __future__ import annotations import xml.etree.ElementTree as ET from datetime import date, datetime from email.utils import parsedate_to_datetime from typing import Any, Callable from ai_daily_report.models import SourceConfig from ai_daily_report.normalize import clean_text FetchText = Callable[[str, int], str] def _parse_pubdate(value: str) -> str | None: if not value: return None try: return parsedate_to_datetime(value).isoformat() except Exception: return None def _parse_run_date(value: str | None) -> date | None: if not value: return None try: return date.fromisoformat(value[:10]) except ValueError: return None def _parse_iso_date(value: str | None) -> date | None: if not value: return None try: return datetime.fromisoformat(value).date() except ValueError: return None def _within_max_item_age(published_at: str | None, *, run_date: str | None, max_item_age_days: int | None) -> bool: if max_item_age_days is None: return True published_date = _parse_iso_date(published_at) current_date = _parse_run_date(run_date) if published_date is None or current_date is None: return True return (current_date - published_date).days <= max_item_age_days def parse_rss_items( config: SourceConfig, xml_text: str, *, limit: int = 20, run_date: str | None = None, ) -> list[dict[str, Any]]: root = ET.fromstring(xml_text) channel = root.find("channel") raw_items = channel.findall("item") if channel is not None else [] items: list[dict[str, Any]] = [] for raw in raw_items: title = clean_text(raw.findtext("title") or "") if not title: continue summary = clean_text(raw.findtext("description") or "") published_at = _parse_pubdate(raw.findtext("pubDate") or "") if not _within_max_item_age( published_at, run_date=run_date, max_item_age_days=config.max_item_age_days, ): continue items.append( { "source_group": config.name, "source_label": config.name, "title_raw": title, "summary_raw": summary, "url": (raw.findtext("link") or "").strip(), "published_at": published_at, "origin_type": "rss", "section_hint": "", "language_hint": "en" if title.encode("utf-8").isascii() else "zh", } ) if len(items) >= limit: break return items def fetch_rss(config: SourceConfig, run_date: str, fetch_text: FetchText) -> list[dict[str, Any]]: return parse_rss_items(config, fetch_text(config.url, config.timeout_seconds), run_date=run_date)