""" 任务历史记录管理器 保存和加载任务执行历史 """ import json from datetime import datetime from pathlib import Path from typing import Optional, List from dataclasses import dataclass, asdict @dataclass class TaskRecord: """任务记录""" task_id: str timestamp: str user_input: str intent_label: str intent_confidence: float execution_plan: str code: str success: bool duration_ms: int stdout: str stderr: str log_path: str task_summary: str = "" # 任务摘要(由小模型生成) class HistoryManager: """ 历史记录管理器 将任务历史保存为 JSON 文件 """ MAX_HISTORY_SIZE = 100 # 最多保存 100 条记录 def __init__(self, workspace_path: Optional[Path] = None): if workspace_path: self.workspace = workspace_path else: self.workspace = Path(__file__).parent.parent / "workspace" self.history_file = self.workspace / "history.json" self._history: List[TaskRecord] = [] self._load() def _load(self): """从文件加载历史记录""" if self.history_file.exists(): try: with open(self.history_file, 'r', encoding='utf-8') as f: data = json.load(f) self._history = [TaskRecord(**record) for record in data] except (json.JSONDecodeError, TypeError, KeyError) as e: print(f"[警告] 加载历史记录失败: {e}") self._history = [] else: self._history = [] def _save(self): """保存历史记录到文件""" try: # 确保目录存在 self.history_file.parent.mkdir(parents=True, exist_ok=True) with open(self.history_file, 'w', encoding='utf-8') as f: data = [asdict(record) for record in self._history] json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"[警告] 保存历史记录失败: {e}") def add_record( self, task_id: str, user_input: str, intent_label: str, intent_confidence: float, execution_plan: str, code: str, success: bool, duration_ms: int, stdout: str = "", stderr: str = "", log_path: str = "", task_summary: str = "" ) -> TaskRecord: """ 添加一条任务记录 Args: task_id: 任务 ID user_input: 用户输入 intent_label: 意图标签 intent_confidence: 意图置信度 execution_plan: 执行计划 code: 生成的代码 success: 是否执行成功 duration_ms: 执行耗时(毫秒) stdout: 标准输出 stderr: 标准错误 log_path: 日志文件路径 task_summary: 任务摘要 Returns: TaskRecord: 创建的记录 """ record = TaskRecord( task_id=task_id, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), user_input=user_input, intent_label=intent_label, intent_confidence=intent_confidence, execution_plan=execution_plan, code=code, success=success, duration_ms=duration_ms, stdout=stdout, stderr=stderr, log_path=log_path, task_summary=task_summary ) # 添加到列表开头(最新的在前) self._history.insert(0, record) # 限制历史记录数量 if len(self._history) > self.MAX_HISTORY_SIZE: self._history = self._history[:self.MAX_HISTORY_SIZE] # 保存 self._save() return record def get_all(self) -> List[TaskRecord]: """获取所有历史记录""" return self._history.copy() def get_recent(self, count: int = 10) -> List[TaskRecord]: """获取最近的 N 条记录""" return self._history[:count] def get_by_id(self, task_id: str) -> Optional[TaskRecord]: """根据任务 ID 获取记录""" for record in self._history: if record.task_id == task_id: return record return None def delete_by_id(self, task_id: str) -> bool: """ 根据任务 ID 删除记录 Args: task_id: 任务 ID Returns: 是否删除成功 """ for i, record in enumerate(self._history): if record.task_id == task_id: self._history.pop(i) self._save() return True return False def delete_multiple(self, task_ids: List[str]) -> int: """ 批量删除记录 Args: task_ids: 任务 ID 列表 Returns: 删除的记录数量 """ task_id_set = set(task_ids) original_count = len(self._history) self._history = [r for r in self._history if r.task_id not in task_id_set] deleted_count = original_count - len(self._history) if deleted_count > 0: self._save() return deleted_count def clear(self): """清空历史记录""" self._history = [] self._save() def get_stats(self) -> dict: """获取统计信息""" if not self._history: return { 'total': 0, 'success': 0, 'failed': 0, 'success_rate': 0.0, 'avg_duration_ms': 0 } total = len(self._history) success = sum(1 for r in self._history if r.success) failed = total - success avg_duration = sum(r.duration_ms for r in self._history) / total return { 'total': total, 'success': success, 'failed': failed, 'success_rate': success / total if total > 0 else 0.0, 'avg_duration_ms': int(avg_duration) } def find_similar_success(self, user_input: str, threshold: float = 0.6) -> Optional[TaskRecord]: """ 查找相似的成功任务 使用简单的关键词匹配来判断相似度 Args: user_input: 用户输入 threshold: 相似度阈值 Returns: 最相似的成功任务记录,如果没有则返回 None """ # 提取关键词 def extract_keywords(text: str) -> set: # 简单分词:按空格和标点分割 import re words = re.findall(r'[\u4e00-\u9fa5]+|[a-zA-Z]+', text.lower()) # 过滤掉太短的词 return set(w for w in words if len(w) >= 2) input_keywords = extract_keywords(user_input) if not input_keywords: return None best_match = None best_score = 0.0 for record in self._history: if not record.success: continue record_keywords = extract_keywords(record.user_input) if not record_keywords: continue # 计算 Jaccard 相似度 intersection = len(input_keywords & record_keywords) union = len(input_keywords | record_keywords) score = intersection / union if union > 0 else 0 if score > best_score and score >= threshold: best_score = score best_match = record return best_match def get_successful_records(self) -> List[TaskRecord]: """获取所有成功的任务记录""" return [r for r in self._history if r.success] # 全局单例 _manager: Optional[HistoryManager] = None def get_history_manager(workspace_path: Optional[Path] = None) -> HistoryManager: """获取历史记录管理器单例""" global _manager if _manager is None: _manager = HistoryManager(workspace_path) return _manager