#!/usr/bin/env python3
import difflib
import json
import os
import re
import sys
import time
import urllib.request
import urllib.error
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from urllib.parse import urlparse
UA = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
CST = timezone(timedelta(hours=8))
NOW = datetime.now(CST)
TODAY = NOW.date().isoformat()
SINCE = NOW - timedelta(hours=30)
SCRIPT_DIR = Path.home() / '.hermes' / 'scripts'
OUT_DIR = SCRIPT_DIR / 'ai_morning_out'
OUT_DIR.mkdir(parents=True, exist_ok=True)
RSS_FEEDS = {
'InfoQ AI': 'https://feed.infoq.com/ai-ml-data-eng/',
'MIT科技评论AI': 'https://www.technologyreview.com/topic/artificial-intelligence/feed',
'量子位': 'https://www.qbitai.com/feed',
}
JUYA_RSS = 'https://imjuya.github.io/juya-ai-daily/rss.xml'
SECTION_ORDER = ['模型发布/更新', '产品与工具', '开发与工程', '行业与公司', '论文与研究', '人物与花絮', '观点与教程']
# ─── Data collection (unchanged) ────────────────────────────────────────────
def fetch_text(url: str) -> str:
req = urllib.request.Request(url, headers={'User-Agent': UA})
with urllib.request.urlopen(req, timeout=25) as r:
return r.read().decode('utf-8', 'ignore')
def parse_pubdate(text: str):
if not text:
return None
try:
dt = parsedate_to_datetime(text)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(CST)
except Exception:
return None
def clean_text(s: str) -> str:
s = re.sub(r'<[^>]+>', ' ', s or '')
s = s.replace(' ', ' ').replace('&', '&')
s = re.sub(r'\s+', ' ', s).strip()
return s
def source_name_from_url(url: str, fallback: str = '来源') -> str:
if not url:
return fallback
host = (urlparse(url).netloc or '').lower()
if host.startswith('www.'):
host = host[4:]
mapping = {
'x.com': 'X', 'twitter.com': 'X', 'github.com': 'GitHub', 'github.blog': 'GitHub Blog',
'openrouter.ai': 'OpenRouter', 'anthropic.com': 'Anthropic', 'cursor.com': 'Cursor',
'technologyreview.com': 'MIT科技评论AI', 'the-decoder.com': 'The Decoder', 'xiaohongshu.com': '小红书',
'mp.weixin.qq.com': '微信文章', 'qbitai.com': '量子位', 'ithome.com': 'IT之家', 'browse.sh': 'Browse.sh',
'huggingface.co': 'Hugging Face', 'openai.com': 'OpenAI', 'claude.com': 'Claude',
'theverge.com': 'The Verge', 'infoq.com': 'InfoQ', 'research.google': 'Google Research',
'simonwillison.net': 'Simon Willison', 'runwayml.com': 'Runway', 'perplexity.ai': 'Perplexity',
'venturebeat.com': 'VentureBeat', 'arxiv.org': 'arXiv', 'reuters.com': '路透社',
'bloomberg.com': 'Bloomberg', 'techcrunch.com': 'TechCrunch', 'wired.com': 'Wired',
'deepseek.com': 'DeepSeek', 'baidu.com': '百度', 'alibaba.com': '阿里',
}
for domain, name in mapping.items():
if host == domain or host.endswith('.' + domain):
return name
return host or fallback
def x_username_from_url(url: str) -> str:
"""Extract X/Twitter username from URL like https://x.com/OpenAIDevs/status/..."""
if not url:
return ''
host = (urlparse(url).netloc or '').lower()
if host.startswith('www.'):
host = host[4:]
if host not in ('x.com', 'twitter.com'):
return ''
parts = [p for p in urlparse(url).path.split('/') if p]
if len(parts) >= 1 and parts[0] not in ('i', 'search', 'explore', 'settings', 'notifications', 'home', 'compose'):
return parts[0]
return ''
def smart_source_label(url: str, api_source_name: str = '') -> str:
"""Generate a descriptive source label from URL, preferring specific names over generic API labels."""
x_user = x_username_from_url(url)
if x_user:
return f'X:@{x_user}'
url_name = source_name_from_url(url, '')
if url_name and url_name not in ('来源', ''):
host = (urlparse(url).netloc or '').lower()
path = (urlparse(url).path or '').lower()
if 'blog' in host or '/blog' in path or '/research' in path:
return f'{url_name}:Blog'
if '/index' in path or path.rstrip('/') in ('', '/about', '/products'):
return f'{url_name}:官网动态'
return url_name
if api_source_name and api_source_name not in ('AI HOT', '社交媒体/博客', '科技媒体', '公司官网', '公司博客', '社区/博客', '个人博客', '技术媒体'):
return api_source_name
return api_source_name or 'AI HOT'
def parse_aihot(today: str):
url = f'https://aihot.virxact.com/api/public/daily/{today}'
data = json.loads(fetch_text(url))
items = []
generated = data.get('generatedAt')
for sec in data.get('sections', []):
for it in sec.get('items', []):
item_url = (it.get('sourceUrl') or '').strip()
api_src = clean_text(it.get('sourceName', '')) or ''
items.append({
'source_group': 'AI HOT',
'source_label': smart_source_label(item_url, api_src),
'title_raw': clean_text(it.get('title', '')),
'summary_raw': clean_text(it.get('summary', '')),
'url': item_url,
'published_at': generated,
'origin_type': 'aihot_json',
'section_hint': sec.get('label') or '',
'language_hint': 'zh',
})
for flash in data.get('flashes', []) or []:
flash_url = (flash.get('sourceUrl') or '').strip()
api_src = clean_text(flash.get('sourceName', '')) or ''
items.append({
'source_group': 'AI HOT',
'source_label': smart_source_label(flash_url, api_src),
'title_raw': clean_text(flash.get('title', '')),
'summary_raw': clean_text(flash.get('summary', '')),
'url': flash_url,
'published_at': generated,
'origin_type': 'aihot_flash',
'section_hint': '快讯',
'language_hint': 'zh',
})
return items, data
def parse_rss(name: str, url: str):
xml = fetch_text(url)
root = ET.fromstring(xml)
channel = root.find('channel')
items = channel.findall('item') if channel is not None else []
out = []
for it in items[:20]:
pub = parse_pubdate(it.findtext('pubDate') or '')
if pub and pub < SINCE:
continue
link = (it.findtext('link') or '').strip()
title = clean_text(it.findtext('title') or '')
summary = clean_text(it.findtext('description') or '')
if not title:
continue
out.append({
'source_group': name,
'source_label': name,
'title_raw': title,
'summary_raw': summary,
'url': link,
'published_at': pub.isoformat() if pub else None,
'origin_type': 'rss',
'section_hint': '',
'language_hint': 'en' if len(re.findall(r'[A-Za-z]', title + ' ' + summary)) > len(re.findall(r'[\u4e00-\u9fff]', title + ' ' + summary)) else 'zh',
})
return out
def fetch_juya_rss(today: str):
"""Fetch 橘鸦 RSS and return (target_url, pub_date, html_content).
html_content is from content:encoded if available, else None.
Uses a longer timeout (45s) since GitHub Pages can be slow."""
req = urllib.request.Request(JUYA_RSS, headers={'User-Agent': UA})
with urllib.request.urlopen(req, timeout=45) as r:
xml = r.read().decode('utf-8', 'ignore')
root = ET.fromstring(xml)
channel = root.find('channel')
items = channel.findall('item') if channel is not None else []
target = None
pub = None
html_content = None
for it in items:
title = (it.findtext('title') or '').strip()
if title == today:
target = (it.findtext('link') or '').strip()
pub = parse_pubdate(it.findtext('pubDate') or '')
# Parse from RSS content:encoded to avoid a second HTTP request
ns = {'content': 'http://purl.org/rss/1.0/modules/content/'}
content_el = it.find('content:encoded', ns)
if content_el is not None and content_el.text:
html_content = content_el.text
break
return target, pub, html_content
def parse_juya(today: str):
target, pub, html_content = fetch_juya_rss(today)
if not target:
return []
# Try RSS content:encoded first; fall back to fetching the article page
if html_content is None:
try:
req = urllib.request.Request(target, headers={'User-Agent': UA})
with urllib.request.urlopen(req, timeout=45) as r:
html = r.read().decode('utf-8', 'ignore')
except Exception:
return []
m = re.search(r'
#(?P\d+) \s*]*>|', '\n', body_text, flags=re.I) body_text = re.sub(r'|||