diff --git a/src/ai/memory.py b/src/ai/memory.py index 8ea4751..76151ae 100644 --- a/src/ai/memory.py +++ b/src/ai/memory.py @@ -3,6 +3,8 @@ """ import asyncio import hashlib +import shutil +import time import uuid from typing import List, Dict, Optional, Tuple, Callable, Awaitable from dataclasses import dataclass, field @@ -98,18 +100,60 @@ class MemorySystem: # 初始化向量存储 if use_vector_db: - try: - # 使用 Chroma 向量数据库 - chroma_path = storage_path.parent / "chroma_db" - self.vector_store: VectorStore = ChromaVectorStore(chroma_path) + chroma_path = storage_path.parent / "chroma_db" + chroma_store = self._init_chroma_store(chroma_path) + if chroma_store is not None: + self.vector_store = chroma_store logger.info("Using Chroma vector store") - except Exception as e: - logger.warning(f"Chroma 初始化失败,降级为 JSON 存储: {e}") + else: self.vector_store = JSONVectorStore(storage_path) else: # 使用 JSON 存储(向后兼容) self.vector_store = JSONVectorStore(storage_path) logger.info("使用 JSON 存储") + + @staticmethod + def _is_chroma_table_conflict(error: Exception) -> bool: + msg = str(error).lower() + return "table embeddings already exists" in msg + + def _init_chroma_store(self, chroma_path: Path) -> Optional[VectorStore]: + """初始化 Chroma,遇到已知 sqlite schema 冲突时尝试修复。""" + try: + return ChromaVectorStore(chroma_path) + except Exception as error: + if not self._is_chroma_table_conflict(error): + logger.warning(f"Chroma 初始化失败,降级为 JSON 存储: {error}") + return None + + # 先做一次短暂重试,处理并发启动时的瞬时冲突。 + logger.warning(f"Chroma 初始化出现 schema 冲突,正在重试: {error}") + time.sleep(0.2) + try: + return ChromaVectorStore(chroma_path) + except Exception as retry_error: + if not self._is_chroma_table_conflict(retry_error): + logger.warning(f"Chroma 重试失败,降级为 JSON 存储: {retry_error}") + return None + + backup_name = ( + f"{chroma_path.name}_backup_conflict_" + f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" + ) + backup_path = chroma_path.parent / backup_name + + try: + if chroma_path.exists(): + shutil.move(str(chroma_path), str(backup_path)) + chroma_path.mkdir(parents=True, exist_ok=True) + repaired = ChromaVectorStore(chroma_path) + logger.warning( + f"检测到 Chroma 元数据库冲突,已重建目录并保留备份: {backup_path}" + ) + return repaired + except Exception as repair_error: + logger.warning(f"Chroma 修复失败,降级为 JSON 存储: {repair_error}") + return None @staticmethod def _normalize_embedding(values: List[float], dim: int = 1024) -> List[float]: @@ -463,4 +507,3 @@ class MemorySystem: async def close(self): """关闭记忆系统。""" await self.vector_store.close() - diff --git a/tests/test_memory_chroma_recovery.py b/tests/test_memory_chroma_recovery.py new file mode 100644 index 0000000..09a5777 --- /dev/null +++ b/tests/test_memory_chroma_recovery.py @@ -0,0 +1,78 @@ +from pathlib import Path + +import src.ai.memory as memory_module + + +class _FakeJSONStore: + def __init__(self, storage_path: Path): + self.storage_path = storage_path + + +def test_memory_retries_once_on_chroma_conflict(monkeypatch, tmp_path: Path): + class _FlakyChroma: + calls = 0 + + def __init__(self, persist_directory: Path): + type(self).calls += 1 + self.persist_directory = persist_directory + if type(self).calls == 1: + raise RuntimeError("table embeddings already exists") + + monkeypatch.setattr(memory_module, "ChromaVectorStore", _FlakyChroma) + monkeypatch.setattr(memory_module, "JSONVectorStore", _FakeJSONStore) + + memory = memory_module.MemorySystem( + storage_path=tmp_path / "long_term_memory.json", + use_vector_db=True, + ) + + assert isinstance(memory.vector_store, _FlakyChroma) + assert _FlakyChroma.calls == 2 + + +def test_memory_rebuilds_chroma_directory_when_conflict_persists(monkeypatch, tmp_path: Path): + class _RecoverOnThirdTryChroma: + calls = 0 + + def __init__(self, persist_directory: Path): + type(self).calls += 1 + self.persist_directory = persist_directory + if type(self).calls <= 2: + raise RuntimeError("table embeddings already exists") + + monkeypatch.setattr(memory_module, "ChromaVectorStore", _RecoverOnThirdTryChroma) + monkeypatch.setattr(memory_module, "JSONVectorStore", _FakeJSONStore) + + storage_root = tmp_path / "ai" + chroma_dir = storage_root / "chroma_db" + chroma_dir.mkdir(parents=True, exist_ok=True) + (chroma_dir / "marker.txt").write_text("legacy-data", encoding="utf-8") + + memory = memory_module.MemorySystem( + storage_path=storage_root / "long_term_memory.json", + use_vector_db=True, + ) + + assert isinstance(memory.vector_store, _RecoverOnThirdTryChroma) + assert _RecoverOnThirdTryChroma.calls == 3 + + backups = sorted(storage_root.glob("chroma_db_backup_conflict_*")) + assert backups, "expected backup directory after repair" + assert (backups[-1] / "marker.txt").exists() + assert chroma_dir.exists() + + +def test_memory_falls_back_to_json_if_chroma_repair_fails(monkeypatch, tmp_path: Path): + class _AlwaysFailChroma: + def __init__(self, persist_directory: Path): + raise RuntimeError("table embeddings already exists") + + monkeypatch.setattr(memory_module, "ChromaVectorStore", _AlwaysFailChroma) + monkeypatch.setattr(memory_module, "JSONVectorStore", _FakeJSONStore) + + memory = memory_module.MemorySystem( + storage_path=tmp_path / "long_term_memory.json", + use_vector_db=True, + ) + + assert isinstance(memory.vector_store, _FakeJSONStore)