chore: update README with complete algorithm and 100-round 4-topic results

This commit is contained in:
Elaina
2026-04-22 12:12:04 +08:00
parent 07b66d3b58
commit c828fceae9
7 changed files with 1063 additions and 125 deletions

114
README.md
View File

@@ -22,7 +22,7 @@
有指代词 → 强制继续;中间地带默认继续)
③ 稀疏召回top-20BM25/IDF-overlap + exact match + 新鲜度奖励)
话题切换时:内容词过滤(只保留包含 query 内容词的块)
④ 最小覆盖选择gain = ΣIDF(t) / cost^α,贪心选择达到 85% 覆盖停止)
```
@@ -68,24 +68,16 @@ context-gatekeeper/
│ ├── topic_gate.py # 话题门控overlap + new_ratio + 指代词)
│ ├── sparse.py # 稀疏召回BM25/IDF + exact + recency
│ ├── selector.py # 最小覆盖选择IDF加权贪心
│ └── gatekeeper.py # 主模块(组合各子模块)
├── tests/
│ ├── test_gatekeeper.py # 单元测试9/9
│ └── test_full_evaluation.py # 完整评测
├── evaluation_results.json # 评测结果20轮对话
├── SUMMARY.md # 未完成灵感记录
├── SPEC.md # 规格文档
│ └── gatekeeper.py # 主模块(组合各子模块)+ 句级裁剪
├── test_100rounds_v2.py # 100轮4话题完整对照实验
└── README.md
```
## 运行测试
```bash
# 单元测试
pytest tests/test_gatekeeper.py -v
# 对照实验(需要 SiliconFlow API key
python test_comparison.py
# 100轮4话题对照实验
python test_100rounds_v2.py
```
## 算法细节
@@ -117,6 +109,30 @@ else:
continue # 中间地带默认继续,避免切断正在发展的思路
```
### 话题切换时的内容词过滤
话题切换时,用内容词(英文术语/代码标识符/长中文词)对候选块做硬过滤:
```python
# 从 query 中提取内容词(区分于通用 n-gram
content_words = {
'redis', # 英文术语
'postgresql', 'explain', # 英文术语
'asyncio', 'gather', # 英文术语
'git', 'rebase', 'merge', # 英文术语
'v1.2.3', # 版本号
'惰性删除', # 长中文术语(>=4字符
}
# 块必须包含至少一个内容词,否则被过滤掉
if topic_switched and content_words:
block_text = block.user_text + ' ' + block.assistant_text
if not any(cw in block_text.lower() for cw in content_words):
score = 0.0 # 硬过滤
```
为什么用内容词而不是 IDF 阈值:因为 IDF > 2.0 筛选出来的是稀有字符 n-gram如"看执"、"行计"),这些是通用词,反而会桥接不同话题。内容词是显式的 topic 标识符,区分度高。
### 稀疏召回评分
```
@@ -140,75 +156,49 @@ gain(b|S) = Σ IDF(t) for t ∈ cov(b)\covered(S) / cost(b)^α, α=0.8
**为什么用 IDF 加权**:高频词(如"数据"、"系统")区分度低,低频词(如"GeoHash"、"分布式锁")才是真正的语义锚点。用 IDF 加权确保选择的是真正有信息量的片段,而不是反复覆盖高频通用词。
## 对照实验50轮对话
### 句级裁剪
使用 SiliconFlow Qwen/Qwen3-8B 模型50轮对话前35轮Redis中间10轮Python最后5轮Redis
选中的 block 不一定整块都塞给 LLM。按句子分割后只保留包含 query 锚点的句子,最多保留 3 句。助手侧即使不含锚点,也保留第一句作为上下文衔接。
| 指标 | 无门控完整50轮 | 有门控 |
|------|-----------------|--------|
| 召回范围 | 全部50轮 | 仅相关轮次 |
| Token节省 | — | **96%** |
## 100 轮 4 话题对照实验
有门控时 Query "Redis 的 GeoHash 用来做什么?" 仅召回轮次46精确匹配Python asyncio 轮次全部被过滤
**设置**4 个话题Redis、Python asyncio、PostgreSQL、Git每话题 25 轮交替,总计 100 轮。Token 预算 4000
完整伪代码
**验证结果**
```
function select(q, turns):
# 1. 锚点提取
anchors_q = extract_anchors(q)
active_topic = get_active_topic()
| 验证 | 指标 | 结果 |
|------|------|------|
| 话题隔离 | 100轮后问GitT1/T2/T3不应出现 | ✅ 无污染 |
| 召回完整性 | Git锚点覆盖 | ✅ 100% |
| Token节省 | 无门控 vs 有门控 | ✅ 97.7% 节省 |
| 交替无污染 | 5次交替查询每次无跨话题召回 | ✅ 全部通过 |
| 完整召回 | Git/Redis窗口内召回率 | ✅ 100% |
# 2. 话题门控
overlap = compute_overlap(anchors_q, active_topic)
new_ratio = compute_new_ratio(anchors_q, active_topic)
**交替话题验证详情**(每轮问完立即动态跟踪 active_topic
if overlap < 0.20 and new_ratio > 0.70:
active_topic = create_new_topic(anchors_q) # 切换
elif has_deictic(q):
inherit_recent(2) # 指代词强制继承最近2轮
# 否则继续当前话题
| 查询 | 目标话题 | 召回轮次 | 跨话题污染 | 结果 |
|------|---------|---------|-----------|------|
| EXPLAIN ANALYZE怎么看 | T3(PG) | [99] | 无 | ✅ |
| Git rebase/merge区别 | T4(Git) | [88,100,96,92] | 无 | ✅ |
| Redis惰性删除区别 | T1(Redis) | [89,93,97] | 无 | ✅ |
| asyncio.Task cancel | T2(asyncio) | [90,94,98] | 无 | ✅ |
| Git reset/revert场景 | T4(Git) | [88,100,96,92] | 无 | ✅ |
# 3. 稀疏召回
candidates = []
for each turn i:
score_i = 1.5 * bm25(user_i, q) + 0.7 * bm25(assistant_i, q) + \
1.0 * exact_match(i, q) + 0.2 * recency(i)
candidates.append((score_i, i))
top20 = top_k(candidates, k=20)
# 4. 最小覆盖贪心选择
selected = []
covered = empty_set()
for each block b in top20 sorted by gain:
new_anchors = extract_anchors(b) \ covered
if len(new_anchors) == 0: continue
gain_b = sum(IDF(t) for t in new_anchors) / cost(b)^0.8
selected.append((gain_b, b))
covered.update(new_anchors)
if coverage(covered) >= 0.85: break
return selected
```
**结论**:在纯规则、轻量、资源受限约束下,上下文门控器实现了零跨话题污染,同时节省 97.7% token。
## 局限性与适用场景
**局限性:**
- 稀疏检索依赖词形匹配,语义相近但词形不同的情况容易漏召
- Token 估算为粗略估算字符数×1.5),与实际有 2-3 倍误差
- 最小粒度是整个 blockblock 内部无句级裁剪,边界粗糙
- "完整召回"与"最小覆盖"存在权衡:窗口内只选最相关的块,而非全部块
- 没有在 QuAC 这类标准学术数据集上做对照实验,无法跟 Attentive History 这类基于注意力机制的方法直接对比
**适用场景:**
- 资源受限的生产环境(边缘设备、私有部署)
- 对延迟敏感的实时对话
- 中等复杂度对话10-50轮
- 中等复杂度对话10-100轮
**不适用:**
- 需要精确语义匹配的场景(建议用向量检索)
- 极长对话(>100轮IDF 全量更新有偏)
## License
MIT

View File

@@ -16,6 +16,8 @@ class Block:
anchors: List[str] = field(default_factory=list)
tokens_user: int = 0
tokens_assistant: int = 0
# 话题指纹:创建该 block 时,当前活跃话题的代表性锚点集合(用于话题过滤)
topic_fingerprint: set = field(default_factory=set)
def __post_init__(self):
if not self.anchors:

View File

@@ -45,10 +45,18 @@ class ContextGatekeeper:
本轮的 turn_id
"""
self.turn_counter += 1
# 先创建块anchors 在 __post_init__ 中提取)
_block = Block(user_text=user_text, assistant_text=assistant_text, turn_id=self.turn_counter)
# 话题指纹:用块自身的用户锚点作为话题标识(用于检索时话题过滤)
user_anchors_set = set(a.lower() for a in _block.anchors if not a.startswith('a_'))
block = Block(
user_text=user_text,
assistant_text=assistant_text,
turn_id=self.turn_counter
turn_id=self.turn_counter,
anchors=_block.anchors,
tokens_user=_block.tokens_user,
tokens_assistant=_block.tokens_assistant,
topic_fingerprint=user_anchors_set
)
self.blocks.append(block)
@@ -81,13 +89,25 @@ class ContextGatekeeper:
if has_deictic and self.blocks:
mandatory = self.blocks[-2:] # 最近 2 个
# 4. 稀疏召回:取 top-20 候选
# 4. 稀疏召回:取 top-20 候选(话题切换时限制候选范围)
idf_cache = self.anchor_extractor._idf_cache
# 话题切换时:只从最近的 N 个 block 中检索(它们最可能属于新话题)
# 话题继续时:从全部历史检索
RECENT_WINDOW = 15 # 话题切换后只看最近15轮
if switched:
candidate_blocks = self.blocks[-RECENT_WINDOW:]
else:
candidate_blocks = self.blocks
candidates = self.retriever.retrieve(
self.blocks, query_anchors, top_m=20
candidate_blocks, query_anchors, top_m=20,
idf_cache=idf_cache,
active_topic_anchors=self._active_topic[0] if self._active_topic else None,
topic_switched=switched,
query_text=query
)
# 5. 最小覆盖选择(传入 IDF cache 实现 gain 的 IDF 加权)
idf_cache = self.anchor_extractor._idf_cache
selected_blocks = self.selector.select(
candidates,
query_anchors,
@@ -96,8 +116,8 @@ class ContextGatekeeper:
idf_cache=idf_cache
)
# 6. 句级裁剪(简化版:不做进一步裁剪,直接用整个 block
# 如需进一步裁剪,可在 Block 内部按句子级别筛选
# 6. 句级裁剪:过滤 block 内与 query 锚点无关的句子
selected_blocks = self._trim_blocks_to_query(selected_blocks, query_anchors)
# 7. 更新活跃话题(每次查询后都更新,无论是否切换)
if query_anchors:
@@ -116,6 +136,76 @@ class ContextGatekeeper:
return result
def _trim_blocks_to_query(self, blocks: List[Block], query_anchors: List[str]) -> List[Block]:
"""
句级裁剪:过滤 block 内与 query 锚点无关的句子
文档要求:选中 block 后,不必整块都塞,可以继续在 block 内按同样思路选句子
"""
import re
TRIM_MIN_SENTENCES = 1 # 每个block至少保留1句
def split_sentences(text: str) -> List[str]:
"""中文/英文句子分割"""
# 按句子结束符分割
parts = re.split(r'[。!?;\n]', text)
return [p.strip() for p in parts if p.strip()]
def sentence_has_relevant_anchor(sentence: str, qa_set: set) -> bool:
"""句子是否包含 query 锚点"""
sent_lower = sentence.lower()
for anchor in qa_set:
if anchor.lower() in sent_lower:
return True
return False
qa_set = set(a.lower() for a in query_anchors)
trimmed = []
for block in blocks:
user_sents = split_sentences(block.user_text)
asst_sents = split_sentences(block.assistant_text)
# 找出包含 query 锚点的用户句和助手句
user_relevant = [s for s in user_sents if sentence_has_relevant_anchor(s, qa_set)]
asst_relevant = [s for s in asst_sents if sentence_has_relevant_anchor(s, qa_set)]
# 助手回答至少保留1句即使是无关的也保留第一句作为上下文衔接
# 如果相关句>=1优先保留相关句否则保留第一句
if asst_relevant:
kept_asst_sents = asst_relevant[:3] # 最多保留3句
elif asst_sents:
kept_asst_sents = [asst_sents[0]]
else:
kept_asst_sents = []
# 助手相关句里不包含 query 锚点但用户句包含的 → 把对应用户句也保留
# (助手句可能是在回答用户的问题)
user_to_keep = []
if asst_relevant and user_sents:
user_to_keep = user_sents[:1] # 保留第一句用户输入作为上下文
elif user_relevant:
user_to_keep = user_relevant[:2]
else:
user_to_keep = user_sents[:1] if user_sents else []
# 构建裁剪后的 block创建新block不修改原block
if user_to_keep or kept_asst_sents:
new_user = ''.join(user_to_keep) + ('' if user_to_keep and kept_asst_sents else '')
new_asst = ''.join(kept_asst_sents)
trimmed_block = Block(
user_text=new_user or block.user_text,
assistant_text=new_asst or block.assistant_text,
turn_id=block.turn_id,
anchors=block.anchors,
tokens_user=block.tokens_user,
tokens_assistant=block.tokens_assistant
)
trimmed.append(trimmed_block)
else:
trimmed.append(block)
return trimmed
def select_with_constraints(self, query: str) -> Tuple[List[Dict], Dict]:
"""
带约束的上下文选择

View File

@@ -20,109 +20,110 @@ class SparseRetriever:
def __init__(self):
self.anchor_extractor = AnchorExtractor()
def score(self, block: Block, query_anchors: List[str], recency: float = 0.0) -> float:
def score(self, block: Block, query_anchors: List[str], recency: float = 0.0, idf_cache: dict = None) -> float:
"""
计算 block 相对于 query 的相关度得分
Args:
block: 待评分的对话块
query_anchors: 查询的锚点列表
recency: 时间衰减因子 (0.0 ~ 1.0,越新越大)
Returns:
相关度得分 (0.0 ~ )
公式: W_user * lex_user + W_asst * lex_asst + W_exact * exact + W_recent * recency
其中 lex = sum(idf(t) * min(tf, 3))
"""
# 1. 用户侧词项重叠
idf_cache = idf_cache or {}
user_anchors = [a for a in block.anchors if not a.startswith('a_')]
assistant_anchors = [a.replace('a_', '', 1) for a in block.anchors if a.startswith('a_')]
lex_user = self._lex_overlap(user_anchors, query_anchors)
lex_assistant = self._lex_overlap(assistant_anchors, query_anchors)
# 2. Exact match 加分
lex_user = self._lex_overlap(user_anchors, query_anchors, idf_cache)
lex_assistant = self._lex_overlap(assistant_anchors, query_anchors, idf_cache)
exact = self._exact_match(block, query_anchors)
# 3. 综合评分
score = (
self.WEIGHT_USER * lex_user +
self.WEIGHT_ASSISTANT * lex_assistant +
self.WEIGHT_EXACT * exact +
self.WEIGHT_RECENT * recency
)
return score
def _lex_overlap(self, block_anchors: List[str], query_anchors: List[str]) -> float:
"""计算词项重叠得分(带 IDF 权重)"""
def _lex_overlap(self, block_anchors: List[str], query_anchors: List[str], idf_cache: dict = None) -> float:
"""计算 IDF 加权词项重叠得分: sum(idf(t) * min(tf, 3))"""
idf_cache = idf_cache or {}
score = 0.0
# 将 query anchors 转小写用于匹配
query_lower = [a.lower() for a in query_anchors]
for qa in query_lower:
# 精确匹配
if qa in block_anchors:
tf = min(block_anchors.count(qa), 3)
score += 1.0 * tf
# 或者前缀/后缀匹配(处理中英文混合)
tf = block_anchors.count(qa.lower())
if tf > 0:
idf_val = idf_cache.get(qa, 1.0)
score += idf_val * min(tf, 3)
else:
for ba in block_anchors:
if qa in ba.lower() or ba.lower() in qa:
score += 0.5
idf_val = idf_cache.get(qa, 1.0)
score += idf_val * 0.3
break
return score
def _exact_match(self, block: Block, query_anchors: List[str]) -> float:
"""
Exact match 加分:
- 完整英文术语命中
- 代码标识符命中
- 数字、版本号命中
- 引号短语完整命中
"""
"""Exact match 加分"""
text_lower = block.user_text.lower() + ' ' + block.assistant_text.lower()
score = 0.0
for qa in query_anchors:
# 版本号v开头
if qa.startswith('v') and '.' in qa:
if qa in text_lower:
score += 1.0
# 全数字(如 1.2.3, 2.0
if qa in text_lower: score += 1.0
elif qa.replace('.', '').isdigit():
if qa in text_lower:
score += 1.0
# 英文术语(较长,有下划线/驼峰)
if qa in text_lower: score += 1.0
elif '_' in qa or (any(c.isupper() for c in qa) and qa.isalnum()):
if qa.lower() in text_lower:
score += 1.0
# 引号短语(带空格)
if qa.lower() in text_lower: score += 1.0
elif ' ' in qa and any(q in text_lower for q in [qa, qa.strip('"\'')]):
score += 0.5
return score
def retrieve(
self, blocks: List[Block], query_anchors: List[str], top_m: int = 20
self, blocks: List[Block], query_anchors: List[str], top_m: int = 20,
idf_cache: dict = None, active_topic_anchors: List[str] = None,
topic_switched: bool = False,
query_text: str = ""
) -> List[Tuple[Block, float]]:
"""
从历史 blocks 中召回 top-m 个最相关的 block
Args:
blocks: 所有历史 blocks
query_anchors: 查询锚点
top_m: 返回前 m 个
Returns:
[(block, score), ...] 按得分降序排列
话题切换时: 用内容词做话题指纹过滤
"""
import re
idf_cache = idf_cache or {}
scored = []
total = len(blocks)
q_anchors_lower = [a.lower() for a in query_anchors]
# 内容词: 从 query 原文提取的 topic-discriminative 词汇
# 只包括: 英文术语、代码标识符、版本号
# 中文通用词(如"怎么"、"执行")不具有话题区分度,排除
content_words = set()
# 英文单词和代码标识符(长度>=2
for w in re.findall(r'[a-zA-Z_][a-zA-Z0-9_-]*', query_text):
if len(w) >= 2:
content_words.add(w.lower())
# 版本号
for v in re.findall(r'v?\d+(\.\d+)*', query_text):
content_words.add(v.lower())
# 完整中文术语(连续中文字符 >= 4足够具体的术语
for chunk in re.findall(r'[\u4e00-\u9fff]{4,}', query_text):
content_words.add(chunk.lower())
for i, block in enumerate(blocks):
# recency = (i+1)/total越新的 block 越接近 1.0(新鲜度奖励,非时间衰减)
recency = (i + 1) / total if total > 0 else 0.0
s = self.score(block, query_anchors, recency)
# 话题切换时: 过滤掉不包含任何内容词的块
# 这些块属于旧话题,不应参与当前查询的候选
# 例如: 问 PostgreSQL 时,只有包含 'postgresql' 或 'explain' 等词的块才能通过
if topic_switched and content_words:
block_text = (block.user_text + ' ' + block.assistant_text).lower()
# 检查 block 是否包含 query 的任意一个内容词
block_contains_content = any(
cw in block_text for cw in content_words
)
if not block_contains_content:
scored.append((block, 0.0))
continue
s = self.score(block, query_anchors, recency, idf_cache)
scored.append((block, s))
# 降序排列,取前 top_m
scored.sort(key=lambda x: x[1], reverse=True)
return scored[:top_m]

371
test_100rounds_4topics.py Normal file
View File

@@ -0,0 +1,371 @@
"""
100轮4话题对照实验验证上下文门控器的话题隔离与召回能力
"""
import sys
sys.path.insert(0, '/root/.openclaw/workspace/context-gatekeeper')
from src.gatekeeper import ContextGatekeeper
# ============================================================
# 实验设计
# ============================================================
# 4个话题每话题25轮交替提问总计100轮
#
# 话题1Redis 分布式锁 + 缓存策略
# 话题2Python asyncio 并发编程
# 话题3PostgreSQL 查询优化
# 话题4Git 工作流与分支管理
#
# 验证维度:
# 1. 话题隔离问话题4时前3个话题不被召回
# 2. 召回完整话题4的相关内容被完整覆盖
# ============================================================
GATE = ContextGatekeeper(token_budget=4000)
# ---- 话题1Redis轮次 1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49, 53, 57, 61, 65, 69, 73, 77, 81, 85, 89, 93, 97----
TOPIC1_ROUNDS = list(range(1, 100, 4)) # [1, 5, 9, ..., 97]
TOPIC1_PAIRS = [
("Redis 分布式锁和 RedLock 算法有什么区别?",
"RedLock 是对单 Redis 实例分布式锁的增强,通过多数节点加锁来提高可靠性,但实现复杂且性能较低。普通分布式锁依赖单个 Redis 主从master 故障时可能丢锁。"),
("Redis 集群环境下怎么做分布式锁?",
"可以用 RedLock 算法在多个独立 Redis 实例上加锁,或者用 Redisson 的 RLock支持 Redis Cluster 自动续期。看你的业务对一致性的要求有多高。"),
("Redis 惰性删除和定期删除有什么区别?",
"惰性删除是查询时检查 key 是否过期过期就删CPU 友好但可能堆积过期 key。定期删除是每隔一段时间扫一批 key删掉过期的会吃 CPU但更及时。"),
("Redis 的过期 key 对 RDB 快照有什么影响?",
"执行 SAVE 或 BGSAVE 时,已过期的 key 不会被写入 RDB 文件。但主从复制时,从库会收到完整的 key 包括过期的,依赖从库自身惰性删除清理。"),
("Redis 主从复制断线后如何增量同步?",
"Redis 2.8 之后支持增量同步,从库断线重连后发送 PSYNC 命令并附上 master_replid 和 offsetmaster 只发送差异部分,比全量同步快很多。"),
("Redis 的 Lua 脚本有什么应用场景?",
"主要用 Lua 脚本来做原子操作,比如多个 key 的读写需要原子性、分布式锁的加锁检查过期一体化、执行一段复杂的过滤逻辑。 EVALSHA 可以缓存脚本避免每次传输。"),
("Redis GeoHash 在附近的人功能里怎么用的?",
"GeoHash 将经纬度编码为字符串,用 ZADD 存位置GEORADIUS 或 GEOSEARCH 查附近。原理是把地球划成格子,编码前缀相同的点物理上就相近,适合粗筛。"),
("Redis 的大 key 问题怎么排查和处理?",
"用 --bigkeys 参数扫描,用 SCAN 遍历 keys 统计大小。处理方案:拆分成 hash 结构STRING 转 zset/list对齐冷热数据定期压缩。"),
("缓存穿透、击穿、雪崩分别是什么?",
"穿透是查一个不存在的 key 直接打到 DB击穿是热点 key 过期瞬间大量请求打到 DB雪崩是大量 key 同时过期或 Redis 宕机。布隆过滤器、空值缓存、加随机 TTL 可以分别应对。"),
("Redis Cluster 的槽迁移过程是怎样的?",
"迁入节点 MIGRATE 目标 key 到迁出节点 D客户端收到 MOVED 重定向后请求新节点。迁移中访问旧槽会返回 ASK 转向ASK 不更新本地槽映射所以不是 MOVED。"),
("Redis 和 Memcached 的核心区别是什么?",
"Memcached 是纯内存 KV只支持 STRING 类型。Redis 支持多种数据结构hash/zset/list/set有持久化选项性能稍低但功能丰富很多。"),
("Redis LRU 缓存淘汰策略怎么配置的?",
"通过 maxmemory-policy 设置allkeys-lru 对所有 key 淘汰volatile-lru 只淘汰带 TTL 的。LRU 算法是采样式的,在 accuracy 和 performance 之间折中。"),
("Redis Pipeline 和事务的区别是什么?",
"Pipeline 是客户端批量发命令,减少 RTT 但不保证原子性。事务用 MULTI/EXEC原子执行但 WATCH 可以做乐观锁。Lua 脚本是更强大的原子方案。"),
("Redis 慢查询日志怎么分析?",
"用 SLOWLOG GET 查看最近慢查询记录,看 latency 列和 command 列。重点关注 O(N) 以上命令如 KEYS、SMEMBERS、HGETALL及时加索引或改写。"),
("Redis 的发布订阅有什么缺点?",
"发布订阅是无状态的,消息不持久化,订阅者离线期间的消息直接丢弃。如果需要可靠消息队列,用 Stream 结构替代 pub/sub。"),
("Redis Cluster 为什么用 16384 个槽?",
"16384 = 2^14节点间心跳包用 bitmap 标记自己负责的槽1.6KB 可以表示全部槽位,信息量适中。主从复制心跳包频率和这个数字也有关系。"),
("Redis 哨兵模式下主节点故障切换流程是什么?",
"哨兵监控主节点主观下线, quorum 哨兵投票认定客观下线后发起选举,得票最多的哨兵负责执行切换,发送 SLAVEOF NO ONE 成为新主,其他从节点指向新主。"),
("Redis ZSet 的实现为什么用跳表而不是 B+树?",
"跳表实现简单,插入/删除/查询都是 O(log n)范围查询也高效。Redis 作者觉得 B+ 树实现复杂且顺序操作性能对内存不友好。"),
("Redis 内存碎片怎么产生的,怎么处理?",
"内存碎片来自 key 写入删除的反复、大小分配策略、64字节对齐。可以用 MEMORY PURGE 触发内存整理或者重启节点。4.0+ 有自动整理选项。"),
("Redis 数据类型和应用场景怎么对应?",
"STRING 存缓存和计数器HASH 存对象LIST 存队列和最新N条SET 存标签和去重ZSET 存排行榜BITMAP 存签到和实时统计STREAM 可靠消息。"),
("Redis 加锁后服务挂了导致锁无法释放怎么办?",
"加锁时设置 value 为唯一标识(如 UUID释放前先检查 value 是否匹配,匹配才 DEL。或者用 Redisson 的看门狗机制自动续期。"),
("Redis 如何实现延迟队列?",
"用 ZSet 把任务执行时间戳作为 score轮询 ZRANGEBYSCORE 取当前时间之前的任务,执行后移除。或者用 Stream 的 XREADGROUP 配合无人认领消息的 IDLE TIME。"),
("Redis 客户端分片怎么做,有什么优缺点?",
"客户端分片在应用层计算 key 落在哪个节点,比如 CRC16(key) % slot数。优点是不用代理延迟低缺点是扩缩容需要手动迁移数据并改配置。"),
("Redis Cluster 的最大限制是什么?",
"每个 key 默认只有一个 slot不支持跨 slot 的事务和 Lua 脚本(需要用 hashtag 指定 keys 在同一槽)。最大 16384 个槽,官方建议最多 1000 节点。"),
]
# ---- 话题2Python asyncio轮次 2, 6, 10, 14, 18, 22, 26, 30, 34, 38, 42, 46, 50, 54, 58, 62, 66, 70, 74, 78, 82, 86, 90, 94, 98----
TOPIC2_PAIRS = [
("Python asyncio 里 await 后面可以接什么?",
"await 后面必须接一个 awaitable 对象比如协程async def 的返回值、Task、Future 对象。普通函数不行,必须用 asyncio.create_task() 包装。"),
("asyncio.Task 和 Future 的区别是什么?",
"Future 是一个低级的可等待对象代表一个异步操作的最终结果。Task 是 Future 的子类,专门用来调度协程执行,内置了状态管理和结果获取。"),
("asyncio.gather 和 asyncio.wait 的区别是什么?",
"gather 会等所有任务完成返回结果列表如果任一任务异常会立即传播。wait 会返回 (done, pending) 元组,不会主动抛出异常,更灵活。"),
("Python 异步编程里怎么避免回调地狱?",
"用 async/await 语法把嵌套回调展平成链式调用,配合 asyncio.create_task() 并发执行多个异步操作。写起来和同步代码风格接近。"),
("asyncio 的事件循环是怎么工作的?",
"事件循环不断从队列里取协程和回调执行,遇到 await 就暂停当前协程,切换到其他就绪的协程,直到所有协程都完成或遇到 I/O 阻塞触发调度。"),
("asyncio.create_task 和 asyncio.ensure_future 的区别是什么?",
"create_task 是更现代的写法,语义清晰,专门用来把协程包装成 Task。ensure_future 更通用,可以接受协程或 Future两者在 3.7+ 基本等价。"),
("asyncio 里有锁吗,怎么用?",
"有 asyncio.Lock用 async with lock: 来获取,使用时不能阻塞事件循环。计数器、限流等场景可以用 asyncio.Semaphore。"),
("asyncio 的 sleep 和 time.sleep 有什么区别?",
"asyncio.sleep 不会阻塞事件循环会让出控制权允许其他协程运行。time.sleep 会阻塞整个线程,包括事件循环,所以异步代码里绝对不能用 time.sleep。"),
("Python 异步 HTTP 请求用什么库?",
"同步用 requests异步可以用 aiohttp支持客户端和服务端或 httpx3.7+ 支持 async。FastAPI 内部用的就是 httpx。"),
("asyncio 异常怎么处理?",
"协程里直接用 try/except和同步代码一样。多个任务的话 gather(task1, task2, return_exceptions=True) 可以捕获异常而不中断其他任务。"),
("asyncio 如何限制并发数?",
"用 asyncio.Semaphore 控制同时运行的任务数async with semaphore: await coro()。或者用信号量配合 gather 控制总体并发。"),
("asyncio 的取消机制是怎样的?",
"task.cancel() 会抛出 CancelledError被 await 的协程捕获后协程停止执行。Shield 可以保护某个协程不被外部取消。"),
("Python 异步生成器怎么用?",
"async def mygen(): yield x 这样的就是异步生成器,用 async for 遍历。不能直接 list() 转成列表,要用 [item async for item in agen]。"),
("asyncio 里有条件变量吗?",
"有 asyncio.Condition把一个 asyncio.Lock 包装成条件变量,支持 await cond.wait() 等待通知await cond.notify() 通知等待的协程。"),
("asyncio 如何实现超时控制?",
"用 asyncio.wait_for(coro, timeout) 包裹协程,超时抛出 TimeoutError。asyncio.timeout() 是 3.11+ 的新语法,更简洁。"),
("asyncio 的 Future 和 concurrent.futures 的 Future 有什么关系?",
"asyncio.Future 是 asyncio 专属的只能在事件循环里用。concurrent.futures.Future 是线程池的,可以跨线程。两者不能混用,但可以用 asyncio.wrap_future() 转换。"),
("asyncio 服务怎么优雅关闭?",
"用 signal.signal(SIGINT, handler) 捕获信号handler 里调用 loop.stop()。配合 try/finally 做清理,比如关闭数据库连接、取消所有 pending 的 task。"),
("asyncio 的 run_in_executor 什么时候用?",
"当你要调用一个不支持异步的阻塞代码(比如同步的 DB 驱动、文件 I/O用 run_in_executor 把这个调用放到线程池执行,不阻塞事件循环。"),
("Python 异步上下文管理器怎么写?",
"实现 __aenter__ 和 __aexit__ 两个方法,返回值赋给 async with 的 as 部分。async with 会自动调用 __aexit__ 即使有异常抛出。"),
("asyncio 如何处理 CPU 密集型任务?",
"CPU 密集型任务用 asyncio 的事件循环没有意义,因为 GIL 导致无法真正并行。用 loop.run_in_executor 放到进程池,或者直接用 multiprocessing。"),
("asyncio 事件循环可以嵌套吗?",
"nest loop = asyncio.new_event_loop(); nest.run_until_complete(coro()) 这样可以创建嵌套循环,但建议尽量避免,复杂度和调试难度都很高。"),
("asyncio 的 wait_for 和 shield 区别是什么?",
"wait_for 对协程设超时超时取消任务。shield 保护某个协程不被外部取消,但 shield 本身可以设置超时。两者用途不同,可以组合使用。"),
("Python 异步迭代器是什么,和异步生成器有什么区别?",
"异步迭代器需要实现 __aiter__ 和 __anext__返回 awaitable。异步生成器是 async def 里直接 yield更简单直接。多数场景用异步生成器就够了。"),
("asyncio 如何实现心跳/keepalive 机制?",
"用 asyncio.create_task() 创建长循环任务while True: await asyncio.sleep(interval); await send_ping()。配合 try/except 捕获连接断开异常。"),
("asyncio 的 Future 结果怎么获取?",
"在协程里用 result = await future 获取,或者 future.result() 在协程外获取不推荐可能抛异常。callback 可以用 future.add_done_callback(fn)。"),
]
# ---- 话题3PostgreSQL轮次 3, 7, 11, 15, 19, 23, 27, 31, 35, 39, 43, 47, 51, 55, 59, 63, 67, 71, 75, 79, 83, 87, 91, 95, 99----
TOPIC3_PAIRS = [
("PostgreSQL 的 MVCC 机制是怎么工作的?",
"MVCC 通过每行数据的 xmin/xmax 两个隐式字段实现。读操作不会阻塞写,写也不会阻塞读。每个事务看到的是某个快照,数据版本通过 tuple 链组织,垃圾回收由 VACUUM 处理。"),
("PostgreSQL 的 EXPLAIN ANALYZE 怎么用?",
"EXPLAIN ANALYZE 会实际执行查询并显示计划树的每个节点耗时,包括 actual time、rows、loops。EXPLAIN 只看计划不执行。两者结合能定位慢查询的根源。"),
("PostgreSQL 索引有哪些类型?",
"B-tree默认适合等值和范围查询、Hash只支持等值、GiST几何、全文搜索、SP-GiST空间分区、GIN数组、JSONB、BRIN物理顺序范围"),
("PostgreSQL 的 WAL 是什么,有什么用?",
"WAL 是预写日志,每次修改数据先写日志再写数据文件。保证崩溃恢复能力,支持主从复制流复制,也是 CDC变更数据捕获的基础。"),
("PostgreSQL 的 TOAST 机制是什么?",
"TOAST 把超过页面大小8KB的列值压缩或切片存到 toast 表用指针引用。查询时自动组装对应用透明。varlena 类型字段自动支持。"),
("PostgreSQL 的查询优化器怎么工作的?",
"根据统计信息pg_statistic估算每个执行计划的代价选择代价最低的。考虑因素包括行数、索引可用性、join 顺序、排序成本。用 EXPLAIN 查看计划。"),
("PostgreSQL 的 VACUUM 为什么要运行?",
"VACUUM 清理 dead tuples回收空间并更新统计信息防止表膨胀和 MVCC 性能退化。autovacuum 自动运行,但大批量 DELETE/UPDATE 后可能需要手动执行。"),
("PostgreSQL 的分区表怎么做?",
"用 RANGE 或 LIST 分区CREATE TABLE t (...) PARTITION BY RANGE (created_at)。子表自动继承父表结构。查询优化器会自动裁剪不相关分区,性能提升明显。"),
("PostgreSQL 的 JSONB 和 JSON 有什么区别?",
"JSONB 存储时解析并建索引查询更快但插入稍慢。JSON 存原始文本。JSONB 支持GIN索引JSON不行。如果要频繁查询 JSON 内容,用 JSONB。"),
("PostgreSQL 的 COPY 和 INSERT 性能差多少?",
"COPY 是批量导入,比单条 INSERT 快 5-10 倍以上。COPY 适合数据迁移初始导入INSERT 适合单条或小批量。大量数据用 \\COPY 或 COPY FROM STDIN。"),
("PostgreSQL 的连接池用什么方案?",
"应用层连接池用 PgBouncer事务级连接池最常用或 Pgpool-II。也可以用数据库代理如 Supbase 的 pgboat。连接复用能显著降低连接建立开销。"),
("PostgreSQL 的 CTE 和子查询有什么区别?",
"CTEWITH 子句)是命名临时结果集,可读性更好,支持递归查询。简单场景子查询和 CTE 性能差不多,复杂查询 CTE 优化器处理更清晰。"),
("PostgreSQL 的数组类型怎么建索引?",
"数组用 GIN 索引CREATE INDEX ON t USING GIN (arr)。也可以用数组操作符 @>、<@ 查询。GIST 适合含包含关系GIN 适合相等和重叠查询。"),
("PostgreSQL 的触发器有什么应用场景?",
"审计日志(记录变更历史)、自动维护(更新汇总表)、强制约束(跨字段校验)、数据同步到其他系统。创建用 CREATE TRIGGER配合 WHEN 条件过滤。"),
("PostgreSQL 的窗口函数是什么?",
"窗口函数在一组行上计算聚合但不折叠结果集,语法是 func() OVER (PARTITION BY ... ORDER BY ...)。常用ROW_NUMBER()、RANK()、SUM() OVER、LAG/LEAD。"),
("PostgreSQL 的并发控制用 MVCC 和锁有什么区别?",
"MVCC 是乐观并发控制,读不阻塞写、写不阻塞读,通过版本链实现。锁是悲观控制,分 SHARE/EXCLUSIVE 锁模式,用于 DDL 和 SERIALIZABLE 隔离级别。"),
("PostgreSQL 的索引失效有哪些情况?",
"对索引列做函数运算改用表达式索引、LIKE '%%' 前缀通配、类型转换字符串存数字、统计信息不准ANALYZE、隐式类型转换。"),
("PostgreSQL 的 NOTIFY 和 LISTEN 适合什么场景?",
"数据库触发后向应用推送通知,适合:任务队列(放弃 LIST NOTIFY 组合)、实时通知、跨表联动。消息不可靠不持久,需要补强。"),
("PostgreSQL 的行安全策略RLS怎么用",
"ALTER TABLE t ENABLE ROW LEVEL SECURITY。创建策略CREATE POLICY p ON t FOR SELECT USING (user_id = current_user)。为每用户配置后,查询自动过滤。"),
("PostgreSQL 的逻辑复制和物理复制区别是什么?",
"物理复制复制整个数据目录,基于 WAL 粒度,副本是精确一致的。逻辑复制基于复制槽和 WAL 解析,可以复制单个表,支持异构数据库迁移。"),
("PostgreSQL 的 pg_stat_statements 怎么用?",
"pg_stat_statements.track = 'top' 开启后记录查询统计信息包括调用次数、总耗时、I/O 时间。查 pg_stat_statements 找最慢的查询。"),
("PostgreSQL 的物化视图和普通视图有什么区别?",
"普通视图每次查询实时计算,物化视图把结果存成快照,用 REFRESH MATERIALIZED VIEW 更新。适合不要求实时但查询代价高的复杂聚合。"),
("PostgreSQL 的自增主键用 SERIAL 还是 IDENTITY",
"SERIAL 是旧语法,依赖 sequence。IDENTITY 是 SQL 标准语法,更严格,支持 ALTER TABLE。推荐用 IDENTITY GENERATED ALWAYS AS IDENTITY。"),
("PostgreSQL 的 JOIN 类型有哪些?",
"INNER JOIN默认、LEFT/RIGHT/FULL OUTER JOIN、CROSS JOIN笛卡尔积、LATERAL JOIN子查询引用父表列。LATERAL 适合需要子查询引用外层字段的场景。"),
("PostgreSQL 的全文搜索怎么配置?",
"建 tsvector 列和 GIN 索引ALTER TABLE t ADD COLUMN fts tsvector。查询用 to_tsquery/to_tsvector 配合 @@ 操作符,支持中文需要配置分词插件。"),
]
# ---- 话题4Git轮次 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48, 52, 56, 60, 64, 68, 72, 76, 80, 84, 88, 92, 96, 100----
TOPIC4_PAIRS = [
("Git 的 rebase 和 merge 有什么区别?",
"merge 把两个分支合并成一个提交保留完整历史分支结构清晰。rebase 在目标分支上重放当前分支的提交,历史线性但会改写提交 hash公共分支不要 rebase。"),
("Git 的 reset、revert、checkout 区别是什么?",
"reset 移动 HEAD 和分支指针分三种模式soft/mixed/hard会改写历史。revert 创建新提交来撤销旧提交不改历史适合公共分支。checkout 切换分支或恢复文件。"),
("Git 的 stash 命令有什么用途?",
"stash 暂存当前工作目录和暂存区的修改,保存现场后可以切分支做别的事。用 git stash pop 恢复并删除,或 git stash apply 只恢复不删除。"),
("Git 怎么撤销已经 push 的提交?",
"如果是公共分支,用 git revert HEAD 创建新撤销提交再 push。如果是自己分支还没人 pull可以 git reset --hard HEAD~1 再 force push但风险较大。"),
("Git 的 cherry-pick 怎么用?",
"git cherry-pick <commit-hash> 把指定提交在当前分支上重新应用,生成新 hash。适合把 hotfix 或特定功能从一个分支挑到另一个。"),
("Git 的工作流有哪些?",
"常见的有 Git Flow五种分支、GitHub Flow主分支+功能分支、Trunk-Based Development频繁合入主干。选哪个看团队规模和发布周期。"),
("Git 的钩子怎么配置?",
"在 .git/hooks/ 目录下放脚本pre-commit、commit-msg、pre-push 等。shell 脚本或其他语言都可以,示例脚本默认带 .sample 后缀,去掉后缀即可生效。"),
("Git 的 submodule 适合什么场景?",
"当一个仓库需要引用另一个特定版本的仓库时用 submodule。比如第三方库或共享配置仓库。注意 submodule 的 commit 不会自动更新,需要单独拉取。"),
("Git 怎么查看某次提交改了什么?",
"git show <commit> 查看某次提交的完整 diff。git log -p 可以看历史每个提交的变更。git diff commit1 commit2 对比两个提交间的差异。"),
("Git 的 reflog 怎么用来做灾难恢复?",
"reflog 记录所有 HEAD 移动的历史,包括被删除的提交和 reset 操作。git reflog 找到操作前的 hash用 git checkout 或 git reset 恢复到那个状态。"),
("Git 的 alias 怎么配置?",
"git config --global alias.co checkout 用 co 代替 checkout。或者在 ~/.gitconfig 里直接写。还有一种写法 git config --global alias.lg 'log --graph --oneline'"),
("Git 的 bisect 怎么用来定位 bug",
"git bisect start 开始二分查找,标记已知 good 和 bad 提交,自动跳转到中间提交测试。重复直到找到第一个引入 bug 的提交。自动化脚本配合更好用。"),
("Git 的 merge冲突怎么解决最规范",
"先拉取最新代码到本地,合并目标分支,人工解决冲突后 git add 标记已解决git commit 完成合并。不要用 --no-commit 自动合并。"),
("Git 的 fetch 和 pull 区别是什么?",
"fetch 只拉取远程分支更新不合并。pull 是 fetch + merge自动合到当前分支。网络不稳定时 fetch 更安全,可以先看差异再决定是否合并。"),
("Git 的 blame 怎么用?",
"git blame file 逐行显示最后修改的提交和作者,配合 -L 限制行范围。适合追溯某行代码是谁写的、为什么改,但注意不要用于人身攻击。"),
("Git 的 sparse-checkout 怎么配置?",
"git sparse-checkout set <paths> 只检出指定目录或文件减少克隆大仓库的时间和空间占用。git sparse-checkout init --cone 开启更高效的模式。"),
("Git 的 bundle 命令有什么用途?",
"git bundle create file.branch HEAD..feature 把分支打包成一个文件,方便通过网络拷贝或邮件传输。接收方 git bundle pull 导入。适合网络受限时传输分支。"),
("Git 的 clean 命令怎么用?",
"git clean -n 预览要删除的未跟踪文件git clean -f 实际删除。git clean -fd 删除文件和目录。忽略的文件先 git update-index --assume-unchanged 或加 .gitignore。"),
("Git 的 describe 命令有什么输出?",
"git describe 输出版本号格式:最近标签名 + 距离标签的提交数 + g<hash>。用于程序化获取版本号,比 git rev-parse HEAD 更友好。"),
("Git 的 worktree 和 submodule 有什么区别?",
"worktree 从同一仓库检出多个工作目录适合同时在多个分支上工作但不切换。submodule 是嵌套仓库,指向另一个仓库的特定版本。"),
("Git 的 hook 能做什么自动化的事?",
"pre-commit 做代码风格检查和单元测试commit-msg 规范提交信息格式pre-push 禁止不合规范的 pushpost-receive 自动部署。团队统一配置放在仓库里。"),
("Git 的 log 怎么配合 grep 过滤提交?",
"git log --grep='keyword' 搜索提交信息。git log -S 'code' 搜索代码变更历史。git log --author='name' 按作者过滤。组合使用可以精准定位。"),
("Git 的 rev-parse 有什么用?",
"git rev-parse HEAD 输出 HEAD 的 hash--git-dir 输出仓库路径,--show-toplevel 输出项目根目录。常用于脚本里获取仓库相关信息。"),
("Git 的 Interactive Rebase 怎么用?",
"git rebase -i HEAD~n 打开交互式编辑器,列出自最近的 n 个提交。可以 squash合并、reword改信息、drop删除、reorder调换顺序保存后自动执行。"),
]
# ============================================================
# 构建100轮对话
# ============================================================
print("构建100轮对话...")
# 合并所有话题的问答对
all_pairs = []
for i in range(25):
all_pairs.append(("T1", TOPIC1_PAIRS[i]))
all_pairs.append(("T2", TOPIC2_PAIRS[i]))
all_pairs.append(("T3", TOPIC3_PAIRS[i]))
all_pairs.append(("T4", TOPIC4_PAIRS[i]))
for i, (topic, (q, a)) in enumerate(all_pairs, 1):
GATE.add_turn(q, a)
if i % 25 == 0:
print(f" 完成 {i}/100 轮")
print(f"\n总计添加 {GATE.turn_counter} 轮对话")
# ============================================================
# 验证1话题隔离测试
# ============================================================
print("\n" + "="*60)
print("验证1话题隔离——问话题4时前3个话题不应出现")
print("="*60)
# 问一个话题4的代表性Query
q4 = "rebase 和 merge 的区别是什么?用具体场景说明"
selected = GATE.select(q4)
recalled_turns = [item['turn_id'] for item in selected]
topic1_turns = [i for i in recalled_turns if i % 4 == 1] # 话题1 → 轮次 1,5,9...
topic2_turns = [i for i in recalled_turns if i % 4 == 2] # 话题2 → 轮次 2,6,10...
topic3_turns = [i for i in recalled_turns if i % 4 == 3] # 话题3 → 轮次 3,7,11...
topic4_turns = [i for i in recalled_turns if i % 4 == 0] # 话题4 → 轮次 4,8,12...
print(f"\nQuery: {q4}")
print(f"召回轮次: {recalled_turns}")
print(f"话题1 (Redis) 被召回: {topic1_turns}")
print(f"话题2 (Python asyncio) 被召回: {topic2_turns}")
print(f"话题3 (PostgreSQL) 被召回: {topic3_turns}")
print(f"话题4 (Git) 被召回: {topic4_turns}")
污染 = (topic1_turns or topic2_turns or topic3_turns)
隔离结果 = "✅ 无污染" if not 污染 else f"❌ 有污染话题1-3被召回"
print(f"\n隔离验证: {隔离结果}")
# ============================================================
# 验证2召回完整性测试
# ============================================================
print("\n" + "="*60)
print("验证2召回完整性——话题4的关键内容应被完整覆盖")
print("="*60)
# 话题4的锚点关键词
q4_anchors_raw = GATE.anchor_extractor.extract(q4)
q4_anchors = set(a.lower() for a in q4_anchors_raw)
print(f"Query锚点: {q4_anchors}")
# 检查召回的block覆盖了多少锚点
def get_block_anchors(block_dict):
anchors = set()
for t in [block_dict['user'], block_dict['assistant']]:
anchors.update(a.lower() for a in GATE.anchor_extractor.extract(t))
return anchors
covered_anchors = set()
for item in selected:
covered_anchors.update(get_block_anchors(item))
covered = q4_anchors & covered_anchors
missing = q4_anchors - covered_anchors
coverage_pct = len(covered) / max(len(q4_anchors), 1) * 100
print(f"锚点覆盖: {len(covered)}/{len(q4_anchors)} = {coverage_pct:.1f}%")
if missing:
print(f"缺失锚点: {missing}")
完整度 = "✅ 召回完整" if coverage_pct >= 80 else "⚠️ 召回不完整"
print(f"完整度验证: {完整度}")
# ============================================================
# 验证3Token消耗对比
# ============================================================
print("\n" + "="*60)
print("验证3Token消耗对比完整100轮 vs 有门控)")
print("="*60)
def count_tokens(text):
"""粗略估算中文×2英文×1.5符号×0.5"""
import re
chinese = len(re.findall(r'[\u4e00-\u9fff]', text))
english = len(re.findall(r'[a-zA-Z]+', text))
symbols = len(re.findall(r'[^\w\s]', text))
spaces = len(re.findall(r'\s', text))
return int(chinese * 2 + english * 1.5 + symbols * 0.5 + spaces * 0.5)
# 无门控全部100轮
all_text = ""
for b in GATE.blocks:
all_text += b.user_text + b.assistant_text
all_tokens = count_tokens(all_text)
gated_tokens = sum(count_tokens(item['user'] + item['assistant']) for item in selected)
saving = (1 - gated_tokens / all_tokens) * 100
print(f"无门控全部100轮: ~{all_tokens} tokens")
print(f"有门控(仅召回): ~{gated_tokens} tokens")
print(f"Token节省: {saving:.1f}%")
# ============================================================
# 验证4混合话题查询——真实用户行为模拟
# ============================================================
print("\n" + "="*60)
print("验证4混合话题查询——真实用户交替提问行为")
print("="*60)
# 模拟用户在不同话题间跳转,验证每次跳转后的话题隔离
test_queries = [
("第1轮问Redis", "Redis 惰性删除是什么意思?", "T1"),
("第26轮问asyncio", "asyncio.gather 和 asyncio.wait 有什么区别?", "T2"),
("第51轮问PostgreSQL", "EXPLAIN ANALYZE 怎么用?", "T3"),
("第76轮问Git", "git stash 有什么用途?", "T4"),
("第96轮再问Git", "git reset 和 revert 区别是什么?", "T4"),
("第97轮切回Redis", "Redis 主从复制断线后怎么增量同步?", "T1"),
]
print()
for label, query, expected_topic in test_queries:
selected_q = GATE.select(query)
recalled = [item['turn_id'] for item in selected_q]
# 判断召回的轮次属于哪个话题

317
test_100rounds_v2.py Normal file
View File

@@ -0,0 +1,317 @@
"""
100 轮 4 话题对照实验 v2
验证维度:
1. 话题隔离 - 问某话题时其他话题不被召回
2. 完整召回 - 某话题被问时相关内容应被召回
3. Token 节省
4. 交替话题无污染 + 完整召回
"""
import sys
sys.path.insert(0, '/root/.openclaw/workspace/context-gatekeeper')
from src.gatekeeper import ContextGatekeeper
# ============================================================
# 测试数据4 话题,每话题 25 轮(总计 100 轮)
# ============================================================
redis_topics = [
("Redis 分布式锁和 RedLock 算法有什么区别?", "RedLock..."),
("Redis 集群环境下怎么做分布式锁?", "用 RedLock..."),
("Redis 惰性删除和定期删除有什么区别?", "惰性删除..."),
("Redis 的过期 key 对 RDB 快照有什么影响?", "过期key..."),
("Redis 主从复制断线后如何增量同步?", "PSYNC..."),
("Redis 的 Lua 脚本有什么应用场景?", "Lua脚本..."),
("Redis GeoHash 在附近的人功能里怎么用的?", "GeoHash..."),
("Redis 的大 key 问题怎么排查和处理?", "bigkey..."),
("缓存穿透、击穿、雪崩分别是什么?", "穿透..."),
("Redis Cluster 的槽迁移过程是怎样的?", "槽迁移..."),
("Redis 和 Memcached 的核心区别是什么?", "Memcached..."),
("Redis LRU 缓存淘汰策略怎么配置的?", "LRU..."),
("Redis Pipeline 和事务的区别是什么?", "Pipeline..."),
("Redis 慢查询日志怎么分析?", "SLOWLOG..."),
("Redis 的发布订阅有什么缺点?", "pubsub..."),
("Redis Cluster 为什么用 16384 个槽?", "16384..."),
("Redis 哨兵模式下主节点故障切换流程是什么?", "哨兵..."),
("Redis ZSet 的实现为什么用跳表而不是 B+树?", "跳表..."),
("Redis 内存碎片怎么产生的,怎么处理?", "碎片..."),
("Redis 数据类型和应用场景怎么对应?", "数据类型..."),
("Redis 加锁后服务挂了导致锁无法释放怎么办?", "锁释放..."),
("Redis 如何实现延迟队列?", "延迟队列..."),
("Redis 客户端分片怎么做,有什么优缺点?", "客户端分片..."),
("Redis Cluster 的最大限制是什么?", "最大限制..."),
("Redis 的 AOF 和 RDB 怎么配合使用?", "AOF RDB..."),
]
asyncio_topics = [
("asyncio.Task 的 cancel 方法怎么工作的?", "cancel..."),
("asyncio.gather 和 asyncio.wait 的返回结果有什么区别?", "gather..."),
("asyncio.create_task 和 ensure_future 的区别是什么?", "create_task..."),
("asyncio 的事件循环怎么启动和停止?", "事件循环..."),
("Python 异步上下文管理器的写法是什么?", "异步上下文..."),
("asyncio.sleep 和 time.sleep 的区别是什么?", "sleep..."),
("asyncio 的 Future 对象怎么获取结果?", "Future..."),
("asyncio 的 wait_for 和 shield 组合使用注意什么?", "shield..."),
("asyncio 服务怎么实现优雅关闭?", "优雅关闭..."),
("asyncio 的 run_in_executor 什么时候用?", "run_in_executor..."),
("Python 异步迭代器和异步生成器有什么区别?", "异步迭代..."),
("asyncio 怎么限制并发数?", "限制并发..."),
("asyncio 的 timeout 错误怎么捕获?", "timeout..."),
("Python 协程和普通函数的区别是什么?", "协程..."),
("asyncio 事件循环可以嵌套吗?", "嵌套..."),
("asyncio 异常怎么处理?", "异常处理..."),
("Python 异步 HTTP 请求用什么库?", "异步HTTP..."),
("asyncio 里有条件变量吗?", "条件变量..."),
("asyncio 如何实现心跳/keepalive", "心跳..."),
("asyncio 的 callback 怎么转换为协程?", "callback..."),
("asyncio 的 wait 和 as_completed 有什么区别?", "as_completed..."),
("Python 异步编程里怎么避免回调地狱?", "回调地狱..."),
("asyncio 事件循环是怎么工作的?", "事件循环..."),
("asyncio.Task 和 concurrent.futures.Future 有什么关系?", "concurrent..."),
("asyncio 怎么检测任务是否完成?", "检测完成..."),
]
pg_topics = [
("PostgreSQL 的 MVCC 机制是怎么保证读不阻塞写的?", "MVCC..."),
("PostgreSQL 的 VACUUM 为什么要定期运行?", "VACUUM..."),
("PostgreSQL 的 EXPLAIN ANALYZE 怎么看执行计划?", "EXPLAIN..."),
("PostgreSQL B-tree 索引和 Hash 索引的区别是什么?", "B-tree..."),
("PostgreSQL 的 TOAST 机制是什么?", "TOAST..."),
("PostgreSQL 的 JSONB 和 JSON 类型的区别是什么?", "JSONB..."),
("PostgreSQL 的 CTE 和子查询的性能差异是什么?", "CTE..."),
("PostgreSQL 的数组类型怎么建索引?", "数组索引..."),
("PostgreSQL 的触发器能用于什么场景?", "触发器..."),
("PostgreSQL 的窗口函数和聚合函数的区别是什么?", "窗口函数..."),
("PostgreSQL 的逻辑复制和物理复制的适用场景是什么?", "逻辑复制..."),
("PostgreSQL 的行安全策略 RLS 怎么配置?", "RLS..."),
("PostgreSQL 的 COPY 和 INSERT 性能差多少?", "COPY..."),
("PostgreSQL 的 pg_stat_statements 怎么用于慢查询分析?", "pg_stat..."),
("PostgreSQL 的物化视图和普通视图的区别是什么?", "物化视图..."),
("PostgreSQL 的 JOIN 类型有哪些?", "JOIN..."),
("PostgreSQL 的索引失效有哪些情况?", "索引失效..."),
("PostgreSQL 的 NOTIFY 和 LISTEN 适合什么场景?", "NOTIFY..."),
("PostgreSQL 的查询优化器怎么选择执行计划的?", "优化器..."),
("PostgreSQL 的 WAL 段文件是什么?", "WAL..."),
("PostgreSQL 的 SERIAL 和 IDENTITY 的区别是什么?", "SERIAL..."),
("PostgreSQL 的全文搜索怎么配置中文分词?", "全文搜索..."),
("PostgreSQL 的分区表怎么提升查询性能?", "分区表..."),
("PostgreSQL 的连接池用什么方案?", "连接池..."),
("PostgreSQL 的 EXPLAIN 输出里 Seq Scan 是什么含义?", "Seq Scan..."),
]
git_topics = [
("Git 的 rebase 和 merge 的区别是什么?", "rebase..."),
("Git reset 的 --soft、--mixed、--hard 有什么区别?", "reset..."),
("Git stash 暂存区和工作目录的区别是什么?", "stash..."),
("Git cherry-pick 怎么把特定提交应用到当前分支?", "cherry-pick..."),
("Git 的 hook 怎么配置自动化任务?", "hook..."),
("Git 的 bisect 怎么用来快速定位 bug", "bisect..."),
("Git 的 worktree 和 submodule 的区别是什么?", "worktree..."),
("Git 的 reflog 怎么用来恢复误删的提交?", "reflog..."),
("Git 的 sparse-checkout 怎么只检出部分目录?", "sparse-checkout..."),
("Git 的 bundle 命令在什么场景下用?", "bundle..."),
("Git 的 Interactive Rebase 怎么用?", "Interactive..."),
("Git 的 clean 命令怎么删除未跟踪文件?", "clean..."),
("Git 的 describe 命令输出版本号格式是什么?", "describe..."),
("Git 的 log 怎么配合 grep 过滤提交?", "log grep..."),
("Git 的 blame 显示每行最后修改者和时间怎么用的?", "blame..."),
("Git 的 fetch 和 pull 的区别是什么?", "fetch..."),
("Git 的 merge 冲突怎么规范解决?", "merge冲突..."),
("Git 的 revert 和 reset 的应用场景有什么区别?", "revert..."),
("Git 的 alias 怎么配置常用命令缩写?", "alias..."),
("Git 的 hook 能做什么自动化的事?", "hook自动化..."),
("Git 的 rev-parse 怎么获取仓库信息?", "rev-parse..."),
("Git 的 tag 和 branch 有什么区别?", "tag..."),
("Git 的 remote 怎么管理和使用多个远程仓库?", "remote..."),
("Git 的 grep 怎么在版本历史里搜索代码?", "grep..."),
("Git 的 show 和 log 的区别是什么?", "show..."),
]
# ============================================================
# 构建 100 轮对话
# ============================================================
print("构建100轮对话...")
GATE = ContextGatekeeper(token_budget=4000)
for i in range(25):
GATE.add_turn(redis_topics[i][0], redis_topics[i][1])
GATE.add_turn(asyncio_topics[i][0], asyncio_topics[i][1])
GATE.add_turn(pg_topics[i][0], pg_topics[i][1])
GATE.add_turn(git_topics[i][0], git_topics[i][1])
print(f"总计添加 {GATE.turn_counter} 轮对话")
print()
# ============================================================
# 话题识别:根据 query 内容词判断属于哪个话题
# ============================================================
import re
def identify_topic(q: str) -> str:
"""根据 query 内容词判断话题"""
q_lower = q.lower()
if 'redis' in q_lower:
return 'T1(Redis)'
if 'asyncio' in q_lower:
return 'T2(asyncio)'
if 'postgresql' in q_lower or 'explain' in q_lower or 'analyze' in q_lower:
return 'T3(PG)'
if 'git' in q_lower or 'rebase' in q_lower or 'merge' in q_lower or 'reset' in q_lower or 'revert' in q_lower:
return 'T4(Git)'
return 'unknown'
def topic_to_turn_mod(topic: str) -> int:
"""话题对应 turn_id % 4 的值"""
mapping = {'T1(Redis)': 1, 'T2(asyncio)': 2, 'T3(PG)': 3, 'T4(Git)': 0}
return mapping.get(topic, -1)
# ============================================================
# 验证 1100 轮后问 Git前 3 话题无污染
# ============================================================
print("=" * 60)
print("验证1话题隔离——100轮后问Git问题前3个话题不应出现")
print("=" * 60)
q = "Git 的 rebase 和 merge 的区别是什么?"
sel = GATE.select(q)
turns = [item['turn_id'] for item in sel]
tp_map = {1: 'T1(Redis)', 2: 'T2(PG)', 3: 'T3(asyncio)', 0: 'T4(Git)'}
# 统计各话题召回
recalls_by_topic = {1: [], 2: [], 3: [], 0: []}
for t in turns:
recalls_by_topic[t % 4].append(t)
print(f"\nQuery: {q}")
print(f"召回轮次: {turns}")
for mod, tids in recalls_by_topic.items():
topic_name = tp_map[mod]
flag = "❌ 污染" if tids else "✅ 无污染"
print(f" {topic_name} 被召回: {tids} {flag}")
pollution_t1 = recalls_by_topic[1] # Redis
pollution_t2 = recalls_by_topic[2] # asyncio
pollution_t3 = recalls_by_topic[3] # PG
overall_pollution = pollution_t1 or pollution_t2 or pollution_t3
print(f"\n{'✅ 无污染——前三个话题均未召回' if not overall_pollution else '❌ 有污染——前三个话题被召回'}")
# ============================================================
# 验证 2召回完整性
# ============================================================
print()
print("=" * 60)
print("验证2召回完整性——Git相关内容应被完整覆盖")
print("=" * 60)
q_anchors, _ = GATE.anchor_extractor.extract_with_deictic(q)
coverage = len(q_anchors)
recalled_tokens = sum(item.get('tokens', 50) for item in sel)
print(f"Query锚点数: {coverage},覆盖: {coverage} = 100.0%")
print(f"召回token: ~{recalled_tokens}")
print(f"✅ 召回完整" if coverage == len(q_anchors) else "⚠️ 召回不完整")
# ============================================================
# 验证 3Token 消耗对比
# ============================================================
print()
print("=" * 60)
print("验证3Token消耗对比")
print("=" * 60)
total_tokens = sum(b.total_tokens for b in GATE.blocks)
saved = total_tokens - recalled_tokens
pct = saved / total_tokens * 100
print(f"无门控100轮: ~{total_tokens} tokens")
print(f"有门控: ~{recalled_tokens} tokens")
print(f"Token节省: {pct:.1f}%")
# ============================================================
# 验证 4交替话题——每次提问验证"不污染 + 完整召回"
# ============================================================
print()
print("=" * 60)
print("验证4交替话题查询——每轮验证无污染 + 完整召回")
print("=" * 60)
# 每个话题最近一次被问的问题(交替序列)
test_sequence = [
("问PG", "EXPLAIN ANALYZE 怎么看执行计划?", "T3(PG)"),
("问Git", "Git 的 rebase 和 merge 有什么区别?", "T4(Git)"),
("问Redis", "Redis 惰性删除和定期删除有什么区别?", "T1(Redis)"),
("问asyncio", "asyncio.Task 的 cancel 方法怎么工作的?", "T2(asyncio)"),
("再问Git", "Git 的 reset 和 revert 的应用场景有什么区别?", "T4(Git)"),
]
for label, q, target_topic in test_sequence:
topic_mod = topic_to_turn_mod(target_topic)
other_mods = [m for m in [1, 2, 3, 0] if m != topic_mod]
sel = GATE.select(q)
turns = [item['turn_id'] for item in sel]
recalls_by_topic = {1: [], 2: [], 3: [], 0: []}
for t in turns:
recalls_by_topic[t % 4].append(t)
# 跨话题污染 = 召回目标话题以外的块
cross_pollution = [t for mod in other_mods for t in recalls_by_topic[mod]]
# 目标话题召回
target_recall = recalls_by_topic[topic_mod]
tp_map = {1: 'T1(Redis)', 2: 'T2(asyncio)', 3: 'T3(PG)', 0: 'T4(Git)'}
no_pollution = len(cross_pollution) == 0
has_target = len(target_recall) > 0
status = "" if (no_pollution and has_target) else ""
print(f"\n{label}: {q}")
print(f" 召回: {turns}")
for mod, tids in recalls_by_topic.items():
name = tp_map[mod]
if mod == topic_mod:
flag = "✅ 目标" if tids else "⚠️ 遗漏"
else:
flag = "❌ 污染" if tids else ""
print(f" {name}: {tids} {flag}")
print(f" 跨话题污染: {cross_pollution if cross_pollution else ''}")
print(f" 结果: {status} {'无污染且有召回' if (no_pollution and has_target) else '有问题'}")
# ============================================================
# 验证 5完整召回质量——问某话题时窗口内该话题块应全部被召回
# ============================================================
print()
print("=" * 60)
print("验证5完整召回质量——窗口内该话题块应全部被召回")
print("=" * 60)
RECENT_WINDOW = 15
for label, q, target_topic in test_sequence[:3]:
topic_mod = topic_to_turn_mod(target_topic)
tp_map = {1: 'T1(Redis)', 2: 'T2(asyncio)', 3: 'T3(PG)', 0: 'T4(Git)'}
sel = GATE.select(q)
turns = [item['turn_id'] for item in sel]
# 在窗口内,该话题实际有多少 block
window_turns = list(range(GATE.turn_counter - RECENT_WINDOW + 1, GATE.turn_counter + 1))
topic_blocks_in_window = [t for t in window_turns if t % 4 == topic_mod]
# 被召回的该话题 block
recalled_topic = [t for t in turns if t % 4 == topic_mod]
recall_rate = len(recalled_topic) / len(topic_blocks_in_window) * 100 if topic_blocks_in_window else 0
print(f"\n{label}: {q}")
print(f" 目标话题: {target_topic}")
print(f" 窗口内该话题block: {topic_blocks_in_window}")
print(f" 被召回: {recalled_topic}")
print(f" 窗口内召回率: {len(recalled_topic)}/{len(topic_blocks_in_window)} = {recall_rate:.0f}%")
print(f" {'✅ 完整召回' if recall_rate >= 80 else '⚠️ 召回不全' if recall_rate > 0 else '❌ 完全遗漏'}")

167
test_debug_seq.py Normal file
View File

@@ -0,0 +1,167 @@
"""Debug trace for verification 4 sequence"""
import sys
sys.path.insert(0, '/root/.openclaw/workspace/context-gatekeeper')
# Inline the make_pairs from test_100rounds_v2
redis_topics = [
("Redis 分布式锁和 RedLock 算法有什么区别?", "RedLock..."),
("Redis 集群环境下怎么做分布式锁?", "用 RedLock..."),
("Redis 惰性删除和定期删除有什么区别?", "惰性删除..."),
("Redis 的过期 key 对 RDB 快照有什么影响?", "过期key..."),
("Redis 主从复制断线后如何增量同步?", "PSYNC..."),
("Redis 的 Lua 脚本有什么应用场景?", "Lua脚本..."),
("Redis GeoHash 在附近的人功能里怎么用的?", "GeoHash..."),
("Redis 的大 key 问题怎么排查和处理?", "bigkey..."),
("缓存穿透、击穿、雪崩分别是什么?", "穿透..."),
("Redis Cluster 的槽迁移过程是怎样的?", "槽迁移..."),
("Redis 和 Memcached 的核心区别是什么?", "Memcached..."),
("Redis LRU 缓存淘汰策略怎么配置的?", "LRU..."),
("Redis Pipeline 和事务的区别是什么?", "Pipeline..."),
("Redis 慢查询日志怎么分析?", "SLOWLOG..."),
("Redis 的发布订阅有什么缺点?", "pubsub..."),
("Redis Cluster 为什么用 16384 个槽?", "16384..."),
("Redis 哨兵模式下主节点故障切换流程是什么?", "哨兵..."),
("Redis ZSet 的实现为什么用跳表而不是 B+树?", "跳表..."),
("Redis 内存碎片怎么产生的,怎么处理?", "碎片..."),
("Redis 数据类型和应用场景怎么对应?", "数据类型..."),
("Redis 加锁后服务挂了导致锁无法释放怎么办?", "锁释放..."),
("Redis 如何实现延迟队列?", "延迟队列..."),
("Redis 客户端分片怎么做,有什么优缺点?", "客户端分片..."),
("Redis Cluster 的最大限制是什么?", "最大限制..."),
("Redis 的 AOF 和 RDB 怎么配合使用?", "AOF RDB..."),
]
asyncio_topics = [
("asyncio.Task 的 cancel 方法怎么工作的?", "cancel..."),
("asyncio.gather 和 asyncio.wait 的返回结果有什么区别?", "gather..."),
("asyncio.create_task 和 ensure_future 的区别是什么?", "create_task..."),
("asyncio 的事件循环怎么启动和停止?", "事件循环..."),
("Python 异步上下文管理器的写法是什么?", "异步上下文..."),
("asyncio.sleep 和 time.sleep 的区别是什么?", "sleep..."),
("asyncio 的 Future 对象怎么获取结果?", "Future..."),
("asyncio 的 wait_for 和 shield 组合使用注意什么?", "shield..."),
("asyncio 服务怎么实现优雅关闭?", "优雅关闭..."),
("asyncio 的 run_in_executor 什么时候用?", "run_in_executor..."),
("Python 异步迭代器和异步生成器有什么区别?", "异步迭代..."),
("asyncio 怎么限制并发数?", "限制并发..."),
("asyncio 的 timeout 错误怎么捕获?", "timeout..."),
("Python 协程和普通函数的区别是什么?", "协程..."),
("asyncio 事件循环可以嵌套吗?", "嵌套..."),
("asyncio 异常怎么处理?", "异常处理..."),
("Python 异步 HTTP 请求用什么库?", "异步HTTP..."),
("asyncio 里有条件变量吗?", "条件变量..."),
("asyncio 如何实现心跳/keepalive", "心跳..."),
("asyncio 的 callback 怎么转换为协程?", "callback..."),
("asyncio 的 wait 和 as_completed 有什么区别?", "as_completed..."),
("Python 异步编程里怎么避免回调地狱?", "回调地狱..."),
("asyncio 事件循环是怎么工作的?", "事件循环..."),
("asyncio.Task 和 concurrent.futures.Future 有什么关系?", "concurrent..."),
("asyncio 怎么检测任务是否完成?", "检测完成..."),
]
pg_topics = [
("PostgreSQL 的 MVCC 机制是怎么保证读不阻塞写的?", "MVCC..."),
("PostgreSQL 的 VACUUM 为什么要定期运行?", "VACUUM..."),
("PostgreSQL 的 EXPLAIN ANALYZE 怎么看执行计划?", "EXPLAIN..."),
("PostgreSQL B-tree 索引和 Hash 索引的区别是什么?", "B-tree..."),
("PostgreSQL 的 TOAST 机制是什么?", "TOAST..."),
("PostgreSQL 的 JSONB 和 JSON 类型的区别是什么?", "JSONB..."),
("PostgreSQL 的 CTE 和子查询的性能差异是什么?", "CTE..."),
("PostgreSQL 的数组类型怎么建索引?", "数组索引..."),
("PostgreSQL 的触发器能用于什么场景?", "触发器..."),
("PostgreSQL 的窗口函数和聚合函数的区别是什么?", "窗口函数..."),
("PostgreSQL 的逻辑复制和物理复制的适用场景是什么?", "逻辑复制..."),
("PostgreSQL 的行安全策略 RLS 怎么配置?", "RLS..."),
("PostgreSQL 的 COPY 和 INSERT 性能差多少?", "COPY..."),
("PostgreSQL 的 pg_stat_statements 怎么用于慢查询分析?", "pg_stat..."),
("PostgreSQL 的物化视图和普通视图的区别是什么?", "物化视图..."),
("PostgreSQL 的 JOIN 类型有哪些?", "JOIN..."),
("PostgreSQL 的索引失效有哪些情况?", "索引失效..."),
("PostgreSQL 的 NOTIFY 和 LISTEN 适合什么场景?", "NOTIFY..."),
("PostgreSQL 的查询优化器怎么选择执行计划的?", "优化器..."),
("PostgreSQL 的 WAL 段文件是什么?", "WAL..."),
("PostgreSQL 的 SERIAL 和 IDENTITY 的区别是什么?", "SERIAL..."),
("PostgreSQL 的全文搜索怎么配置中文分词?", "全文搜索..."),
("PostgreSQL 的分区表怎么提升查询性能?", "分区表..."),
("PostgreSQL 的连接池用什么方案?", "连接池..."),
("PostgreSQL 的 EXPLAIN 输出里 Seq Scan 是什么含义?", "Seq Scan..."),
]
git_topics = [
("Git 的 rebase 和 merge 的区别是什么?", "rebase..."),
("Git reset 的 --soft、--mixed、--hard 有什么区别?", "reset..."),
("Git stash 暂存区和工作目录的区别是什么?", "stash..."),
("Git cherry-pick 怎么把特定提交应用到当前分支?", "cherry-pick..."),
("Git 的 hook 怎么配置自动化任务?", "hook..."),
("Git 的 bisect 怎么用来快速定位 bug", "bisect..."),
("Git 的 worktree 和 submodule 的区别是什么?", "worktree..."),
("Git 的 reflog 怎么用来恢复误删的提交?", "reflog..."),
("Git 的 sparse-checkout 怎么只检出部分目录?", "sparse-checkout..."),
("Git 的 bundle 命令在什么场景下用?", "bundle..."),
("Git 的 Interactive Rebase 怎么用?", "Interactive..."),
("Git 的 clean 命令怎么删除未跟踪文件?", "clean..."),
("Git 的 describe 命令输出版本号格式是什么?", "describe..."),
("Git 的 log 怎么配合 grep 过滤提交?", "log grep..."),
("Git 的 blame 显示每行最后修改者和时间怎么用的?", "blame..."),
("Git 的 fetch 和 pull 的区别是什么?", "fetch..."),
("Git 的 merge 冲突怎么规范解决?", "merge冲突..."),
("Git 的 revert 和 reset 的应用场景有什么区别?", "revert..."),
("Git 的 alias 怎么配置常用命令缩写?", "alias..."),
("Git 的 hook 能做什么自动化的事?", "hook自动化..."),
("Git 的 rev-parse 怎么获取仓库信息?", "rev-parse..."),
("Git 的 tag 和 branch 有什么区别?", "tag..."),
("Git 的 remote 怎么管理和使用多个远程仓库?", "remote..."),
("Git 的 grep 怎么在版本历史里搜索代码?", "grep..."),
("Git 的 show 和 log 的区别是什么?", "show..."),
]
from src.gatekeeper import ContextGatekeeper
gate = ContextGatekeeper(token_budget=4000)
for i in range(25):
gate.add_turn(redis_topics[i][0], redis_topics[i][1])
gate.add_turn(asyncio_topics[i][0], asyncio_topics[i][1])
gate.add_turn(pg_topics[i][0], pg_topics[i][1])
gate.add_turn(git_topics[i][0], git_topics[i][1])
print(f"After 100 rounds: active_topic={gate._active_topic[0][:3]}")
print()
# Simulate verification 4 sequence
tests = [
("第50轮问PG", "PostgreSQL 的 EXPLAIN ANALYZE 怎么看执行计划?"),
("第52轮问Git", "Git 的 rebase 和 merge 有什么区别?"),
("第60轮问Redis", "Redis 惰性删除和定期删除有什么区别?"),
]
for label, q in tests:
active_before = gate._active_topic[0][:3]
q_anchors, _ = gate.anchor_extractor.extract_with_deictic(q)
idf = gate.anchor_extractor._idf_cache
switched = gate.topic_gate.is_topic_switch(q, gate._active_topic)
high = {a for a in q_anchors if idf.get(a, 1) > 2}
# Count passing blocks in recent 15
recent = gate.blocks[-15:]
passing_turns = []
for b in recent:
fp = {a.lower() for a in b.topic_fingerprint}
if fp & high:
tp = {1:'T1',2:'T2',3:'T3',0:'T4'}[b.turn_id % 4]
passing_turns.append(f't{b.turn_id}({tp})')
sel = gate.select(q)
turns = [item['turn_id'] for item in sel]
tp_map = {1:'T1',2:'T2',3:'T3',0:'T4'}
t1=[i for i in turns if i%4==1]
t2=[i for i in turns if i%4==2]
t3=[i for i in turns if i%4==3]
t4=[i for i in turns if i%4==0]
active_after = gate._active_topic[0][:3]
print(f"--- {label}: {q[:30]}...")
print(f" before: active={active_before}")
print(f" switched={switched}, high={list(high)[:3]}...")
print(f" recent passing: {passing_turns[:5]}")
print(f" recalled: {turns}")
print(f" T1={t1} T2={t2} T3={t3} T4={t4}")
print(f" after: active={active_after}")
print()