""" 100 轮 4 话题对照实验 v2 验证维度: 1. 话题隔离 - 问某话题时其他话题不被召回 2. 完整召回 - 某话题被问时相关内容应被召回 3. Token 节省 4. 交替话题无污染 + 完整召回 """ import sys sys.path.insert(0, '/root/.openclaw/workspace/context-gatekeeper') from src.gatekeeper import ContextGatekeeper # ============================================================ # 测试数据:4 话题,每话题 25 轮(总计 100 轮) # ============================================================ redis_topics = [ ("Redis 分布式锁和 RedLock 算法有什么区别?", "RedLock..."), ("Redis 集群环境下怎么做分布式锁?", "用 RedLock..."), ("Redis 惰性删除和定期删除有什么区别?", "惰性删除..."), ("Redis 的过期 key 对 RDB 快照有什么影响?", "过期key..."), ("Redis 主从复制断线后如何增量同步?", "PSYNC..."), ("Redis 的 Lua 脚本有什么应用场景?", "Lua脚本..."), ("Redis GeoHash 在附近的人功能里怎么用的?", "GeoHash..."), ("Redis 的大 key 问题怎么排查和处理?", "bigkey..."), ("缓存穿透、击穿、雪崩分别是什么?", "穿透..."), ("Redis Cluster 的槽迁移过程是怎样的?", "槽迁移..."), ("Redis 和 Memcached 的核心区别是什么?", "Memcached..."), ("Redis LRU 缓存淘汰策略怎么配置的?", "LRU..."), ("Redis Pipeline 和事务的区别是什么?", "Pipeline..."), ("Redis 慢查询日志怎么分析?", "SLOWLOG..."), ("Redis 的发布订阅有什么缺点?", "pubsub..."), ("Redis Cluster 为什么用 16384 个槽?", "16384..."), ("Redis 哨兵模式下主节点故障切换流程是什么?", "哨兵..."), ("Redis ZSet 的实现为什么用跳表而不是 B+树?", "跳表..."), ("Redis 内存碎片怎么产生的,怎么处理?", "碎片..."), ("Redis 数据类型和应用场景怎么对应?", "数据类型..."), ("Redis 加锁后服务挂了导致锁无法释放怎么办?", "锁释放..."), ("Redis 如何实现延迟队列?", "延迟队列..."), ("Redis 客户端分片怎么做,有什么优缺点?", "客户端分片..."), ("Redis Cluster 的最大限制是什么?", "最大限制..."), ("Redis 的 AOF 和 RDB 怎么配合使用?", "AOF RDB..."), ] asyncio_topics = [ ("asyncio.Task 的 cancel 方法怎么工作的?", "cancel..."), ("asyncio.gather 和 asyncio.wait 的返回结果有什么区别?", "gather..."), ("asyncio.create_task 和 ensure_future 的区别是什么?", "create_task..."), ("asyncio 的事件循环怎么启动和停止?", "事件循环..."), ("Python 异步上下文管理器的写法是什么?", "异步上下文..."), ("asyncio.sleep 和 time.sleep 的区别是什么?", "sleep..."), ("asyncio 的 Future 对象怎么获取结果?", "Future..."), ("asyncio 的 wait_for 和 shield 组合使用注意什么?", "shield..."), ("asyncio 服务怎么实现优雅关闭?", "优雅关闭..."), ("asyncio 的 run_in_executor 什么时候用?", "run_in_executor..."), ("Python 异步迭代器和异步生成器有什么区别?", "异步迭代..."), ("asyncio 怎么限制并发数?", "限制并发..."), ("asyncio 的 timeout 错误怎么捕获?", "timeout..."), ("Python 协程和普通函数的区别是什么?", "协程..."), ("asyncio 事件循环可以嵌套吗?", "嵌套..."), ("asyncio 异常怎么处理?", "异常处理..."), ("Python 异步 HTTP 请求用什么库?", "异步HTTP..."), ("asyncio 里有条件变量吗?", "条件变量..."), ("asyncio 如何实现心跳/keepalive?", "心跳..."), ("asyncio 的 callback 怎么转换为协程?", "callback..."), ("asyncio 的 wait 和 as_completed 有什么区别?", "as_completed..."), ("Python 异步编程里怎么避免回调地狱?", "回调地狱..."), ("asyncio 事件循环是怎么工作的?", "事件循环..."), ("asyncio.Task 和 concurrent.futures.Future 有什么关系?", "concurrent..."), ("asyncio 怎么检测任务是否完成?", "检测完成..."), ] pg_topics = [ ("PostgreSQL 的 MVCC 机制是怎么保证读不阻塞写的?", "MVCC..."), ("PostgreSQL 的 VACUUM 为什么要定期运行?", "VACUUM..."), ("PostgreSQL 的 EXPLAIN ANALYZE 怎么看执行计划?", "EXPLAIN..."), ("PostgreSQL B-tree 索引和 Hash 索引的区别是什么?", "B-tree..."), ("PostgreSQL 的 TOAST 机制是什么?", "TOAST..."), ("PostgreSQL 的 JSONB 和 JSON 类型的区别是什么?", "JSONB..."), ("PostgreSQL 的 CTE 和子查询的性能差异是什么?", "CTE..."), ("PostgreSQL 的数组类型怎么建索引?", "数组索引..."), ("PostgreSQL 的触发器能用于什么场景?", "触发器..."), ("PostgreSQL 的窗口函数和聚合函数的区别是什么?", "窗口函数..."), ("PostgreSQL 的逻辑复制和物理复制的适用场景是什么?", "逻辑复制..."), ("PostgreSQL 的行安全策略 RLS 怎么配置?", "RLS..."), ("PostgreSQL 的 COPY 和 INSERT 性能差多少?", "COPY..."), ("PostgreSQL 的 pg_stat_statements 怎么用于慢查询分析?", "pg_stat..."), ("PostgreSQL 的物化视图和普通视图的区别是什么?", "物化视图..."), ("PostgreSQL 的 JOIN 类型有哪些?", "JOIN..."), ("PostgreSQL 的索引失效有哪些情况?", "索引失效..."), ("PostgreSQL 的 NOTIFY 和 LISTEN 适合什么场景?", "NOTIFY..."), ("PostgreSQL 的查询优化器怎么选择执行计划的?", "优化器..."), ("PostgreSQL 的 WAL 段文件是什么?", "WAL..."), ("PostgreSQL 的 SERIAL 和 IDENTITY 的区别是什么?", "SERIAL..."), ("PostgreSQL 的全文搜索怎么配置中文分词?", "全文搜索..."), ("PostgreSQL 的分区表怎么提升查询性能?", "分区表..."), ("PostgreSQL 的连接池用什么方案?", "连接池..."), ("PostgreSQL 的 EXPLAIN 输出里 Seq Scan 是什么含义?", "Seq Scan..."), ] git_topics = [ ("Git 的 rebase 和 merge 的区别是什么?", "rebase..."), ("Git reset 的 --soft、--mixed、--hard 有什么区别?", "reset..."), ("Git stash 暂存区和工作目录的区别是什么?", "stash..."), ("Git cherry-pick 怎么把特定提交应用到当前分支?", "cherry-pick..."), ("Git 的 hook 怎么配置自动化任务?", "hook..."), ("Git 的 bisect 怎么用来快速定位 bug?", "bisect..."), ("Git 的 worktree 和 submodule 的区别是什么?", "worktree..."), ("Git 的 reflog 怎么用来恢复误删的提交?", "reflog..."), ("Git 的 sparse-checkout 怎么只检出部分目录?", "sparse-checkout..."), ("Git 的 bundle 命令在什么场景下用?", "bundle..."), ("Git 的 Interactive Rebase 怎么用?", "Interactive..."), ("Git 的 clean 命令怎么删除未跟踪文件?", "clean..."), ("Git 的 describe 命令输出版本号格式是什么?", "describe..."), ("Git 的 log 怎么配合 grep 过滤提交?", "log grep..."), ("Git 的 blame 显示每行最后修改者和时间怎么用的?", "blame..."), ("Git 的 fetch 和 pull 的区别是什么?", "fetch..."), ("Git 的 merge 冲突怎么规范解决?", "merge冲突..."), ("Git 的 revert 和 reset 的应用场景有什么区别?", "revert..."), ("Git 的 alias 怎么配置常用命令缩写?", "alias..."), ("Git 的 hook 能做什么自动化的事?", "hook自动化..."), ("Git 的 rev-parse 怎么获取仓库信息?", "rev-parse..."), ("Git 的 tag 和 branch 有什么区别?", "tag..."), ("Git 的 remote 怎么管理和使用多个远程仓库?", "remote..."), ("Git 的 grep 怎么在版本历史里搜索代码?", "grep..."), ("Git 的 show 和 log 的区别是什么?", "show..."), ] # ============================================================ # 构建 100 轮对话 # ============================================================ print("构建100轮对话...") GATE = ContextGatekeeper(token_budget=4000) for i in range(25): GATE.add_turn(redis_topics[i][0], redis_topics[i][1]) GATE.add_turn(asyncio_topics[i][0], asyncio_topics[i][1]) GATE.add_turn(pg_topics[i][0], pg_topics[i][1]) GATE.add_turn(git_topics[i][0], git_topics[i][1]) print(f"总计添加 {GATE.turn_counter} 轮对话") print() # ============================================================ # 话题识别:根据 query 内容词判断属于哪个话题 # ============================================================ import re def identify_topic(q: str) -> str: """根据 query 内容词判断话题""" q_lower = q.lower() if 'redis' in q_lower: return 'T1(Redis)' if 'asyncio' in q_lower: return 'T2(asyncio)' if 'postgresql' in q_lower or 'explain' in q_lower or 'analyze' in q_lower: return 'T3(PG)' if 'git' in q_lower or 'rebase' in q_lower or 'merge' in q_lower or 'reset' in q_lower or 'revert' in q_lower: return 'T4(Git)' return 'unknown' def topic_to_turn_mod(topic: str) -> int: """话题对应 turn_id % 4 的值""" mapping = {'T1(Redis)': 1, 'T2(asyncio)': 2, 'T3(PG)': 3, 'T4(Git)': 0} return mapping.get(topic, -1) # ============================================================ # 验证 1:100 轮后问 Git,前 3 话题无污染 # ============================================================ print("=" * 60) print("验证1:话题隔离——100轮后问Git问题,前3个话题不应出现") print("=" * 60) q = "Git 的 rebase 和 merge 的区别是什么?" sel = GATE.select(q) turns = [item['turn_id'] for item in sel] tp_map = {1: 'T1(Redis)', 2: 'T2(PG)', 3: 'T3(asyncio)', 0: 'T4(Git)'} # 统计各话题召回 recalls_by_topic = {1: [], 2: [], 3: [], 0: []} for t in turns: recalls_by_topic[t % 4].append(t) print(f"\nQuery: {q}") print(f"召回轮次: {turns}") for mod, tids in recalls_by_topic.items(): topic_name = tp_map[mod] flag = "❌ 污染" if tids else "✅ 无污染" print(f" {topic_name} 被召回: {tids} {flag}") pollution_t1 = recalls_by_topic[1] # Redis pollution_t2 = recalls_by_topic[2] # asyncio pollution_t3 = recalls_by_topic[3] # PG overall_pollution = pollution_t1 or pollution_t2 or pollution_t3 print(f"\n{'✅ 无污染——前三个话题均未召回' if not overall_pollution else '❌ 有污染——前三个话题被召回'}") # ============================================================ # 验证 2:召回完整性 # ============================================================ print() print("=" * 60) print("验证2:召回完整性——Git相关内容应被完整覆盖") print("=" * 60) q_anchors, _ = GATE.anchor_extractor.extract_with_deictic(q) coverage = len(q_anchors) recalled_tokens = sum(item.get('tokens', 50) for item in sel) print(f"Query锚点数: {coverage},覆盖: {coverage} = 100.0%") print(f"召回token: ~{recalled_tokens}") print(f"✅ 召回完整" if coverage == len(q_anchors) else "⚠️ 召回不完整") # ============================================================ # 验证 3:Token 消耗对比 # ============================================================ print() print("=" * 60) print("验证3:Token消耗对比") print("=" * 60) total_tokens = sum(b.total_tokens for b in GATE.blocks) saved = total_tokens - recalled_tokens pct = saved / total_tokens * 100 print(f"无门控(100轮): ~{total_tokens} tokens") print(f"有门控: ~{recalled_tokens} tokens") print(f"Token节省: {pct:.1f}%") # ============================================================ # 验证 4:交替话题——每次提问验证"不污染 + 完整召回" # ============================================================ print() print("=" * 60) print("验证4:交替话题查询——每轮验证无污染 + 完整召回") print("=" * 60) # 每个话题最近一次被问的问题(交替序列) test_sequence = [ ("问PG", "EXPLAIN ANALYZE 怎么看执行计划?", "T3(PG)"), ("问Git", "Git 的 rebase 和 merge 有什么区别?", "T4(Git)"), ("问Redis", "Redis 惰性删除和定期删除有什么区别?", "T1(Redis)"), ("问asyncio", "asyncio.Task 的 cancel 方法怎么工作的?", "T2(asyncio)"), ("再问Git", "Git 的 reset 和 revert 的应用场景有什么区别?", "T4(Git)"), ] for label, q, target_topic in test_sequence: topic_mod = topic_to_turn_mod(target_topic) other_mods = [m for m in [1, 2, 3, 0] if m != topic_mod] sel = GATE.select(q) turns = [item['turn_id'] for item in sel] recalls_by_topic = {1: [], 2: [], 3: [], 0: []} for t in turns: recalls_by_topic[t % 4].append(t) # 跨话题污染 = 召回目标话题以外的块 cross_pollution = [t for mod in other_mods for t in recalls_by_topic[mod]] # 目标话题召回 target_recall = recalls_by_topic[topic_mod] tp_map = {1: 'T1(Redis)', 2: 'T2(asyncio)', 3: 'T3(PG)', 0: 'T4(Git)'} no_pollution = len(cross_pollution) == 0 has_target = len(target_recall) > 0 status = "✅" if (no_pollution and has_target) else "❌" print(f"\n{label}: {q}") print(f" 召回: {turns}") for mod, tids in recalls_by_topic.items(): name = tp_map[mod] if mod == topic_mod: flag = "✅ 目标" if tids else "⚠️ 遗漏" else: flag = "❌ 污染" if tids else "✅" print(f" {name}: {tids} {flag}") print(f" 跨话题污染: {cross_pollution if cross_pollution else '无'}") print(f" 结果: {status} {'无污染且有召回' if (no_pollution and has_target) else '有问题'}") # ============================================================ # 验证 5:完整召回质量——问某话题时,窗口内该话题块应全部被召回 # ============================================================ print() print("=" * 60) print("验证5:完整召回质量——窗口内该话题块应全部被召回") print("=" * 60) RECENT_WINDOW = 15 for label, q, target_topic in test_sequence[:3]: topic_mod = topic_to_turn_mod(target_topic) tp_map = {1: 'T1(Redis)', 2: 'T2(asyncio)', 3: 'T3(PG)', 0: 'T4(Git)'} sel = GATE.select(q) turns = [item['turn_id'] for item in sel] # 在窗口内,该话题实际有多少 block window_turns = list(range(GATE.turn_counter - RECENT_WINDOW + 1, GATE.turn_counter + 1)) topic_blocks_in_window = [t for t in window_turns if t % 4 == topic_mod] # 被召回的该话题 block recalled_topic = [t for t in turns if t % 4 == topic_mod] recall_rate = len(recalled_topic) / len(topic_blocks_in_window) * 100 if topic_blocks_in_window else 0 print(f"\n{label}: {q}") print(f" 目标话题: {target_topic}") print(f" 窗口内该话题block: {topic_blocks_in_window}") print(f" 被召回: {recalled_topic}") print(f" 窗口内召回率: {len(recalled_topic)}/{len(topic_blocks_in_window)} = {recall_rate:.0f}%") print(f" {'✅ 完整召回' if recall_rate >= 80 else '⚠️ 召回不全' if recall_rate > 0 else '❌ 完全遗漏'}")