Files
LocalAgent/history/manager.py
Mimikko-zeus 8a538bb950 feat: refactor API key configuration and enhance application initialization
- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic.
- Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration.
- Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup.
- Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics.
- Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management.
- Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
2026-02-27 14:32:30 +08:00

397 lines
12 KiB
Python

"""
任务历史记录管理器
保存和加载任务执行历史,集成数据治理策略
"""
import json
from datetime import datetime
from pathlib import Path
from typing import Optional, List
from dataclasses import dataclass, asdict
from history.data_governance import get_governance_policy, GovernanceMetrics
@dataclass
class TaskRecord:
"""任务记录"""
task_id: str
timestamp: str
user_input: str
intent_label: str
intent_confidence: float
execution_plan: str
code: str
success: bool
duration_ms: int
stdout: str
stderr: str
log_path: str
task_summary: str = "" # 任务摘要(由小模型生成)
_governance: dict = None # 治理元数据
_sanitization: dict = None # 脱敏信息
class HistoryManager:
"""
历史记录管理器
将任务历史保存为 JSON 文件,集成数据治理策略
"""
MAX_HISTORY_SIZE = 100 # 最多保存 100 条记录
AUTO_CLEANUP_ENABLED = True # 自动清理过期数据
def __init__(self, workspace_path: Optional[Path] = None):
if workspace_path:
self.workspace = workspace_path
else:
self.workspace = Path(__file__).parent.parent / "workspace"
self.history_file = self.workspace / "history.json"
self._history: List[TaskRecord] = []
# 初始化数据治理策略
self.governance = get_governance_policy(self.workspace)
self._load()
# 启动时自动清理过期数据
if self.AUTO_CLEANUP_ENABLED:
self._auto_cleanup()
def _load(self):
"""从文件加载历史记录"""
if self.history_file.exists():
try:
with open(self.history_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self._history = []
for record in data:
# 兼容旧数据(没有治理字段)
if '_governance' not in record:
record['_governance'] = None
if '_sanitization' not in record:
record['_sanitization'] = None
self._history.append(TaskRecord(**record))
except (json.JSONDecodeError, TypeError, KeyError) as e:
print(f"[警告] 加载历史记录失败: {e}")
self._history = []
else:
self._history = []
def _save(self):
"""保存历史记录到文件(应用数据治理策略)"""
try:
# 确保目录存在
self.history_file.parent.mkdir(parents=True, exist_ok=True)
# 应用数据治理策略
governed_data = []
for record in self._history:
record_dict = asdict(record)
# 如果记录还没有治理元数据,应用策略
if not record_dict.get('_governance'):
record_dict = self.governance.apply_policy(record_dict)
governed_data.append(record_dict)
with open(self.history_file, 'w', encoding='utf-8') as f:
json.dump(governed_data, f, ensure_ascii=False, indent=2)
# 收集并保存度量指标
metrics = self.governance.collect_metrics(governed_data)
self.governance.save_metrics(metrics)
except Exception as e:
print(f"[警告] 保存历史记录失败: {e}")
def add_record(
self,
task_id: str,
user_input: str,
intent_label: str,
intent_confidence: float,
execution_plan: str,
code: str,
success: bool,
duration_ms: int,
stdout: str = "",
stderr: str = "",
log_path: str = "",
task_summary: str = ""
) -> TaskRecord:
"""
添加一条任务记录
Args:
task_id: 任务 ID
user_input: 用户输入
intent_label: 意图标签
intent_confidence: 意图置信度
execution_plan: 执行计划
code: 生成的代码
success: 是否执行成功
duration_ms: 执行耗时(毫秒)
stdout: 标准输出
stderr: 标准错误
log_path: 日志文件路径
task_summary: 任务摘要
Returns:
TaskRecord: 创建的记录
"""
record = TaskRecord(
task_id=task_id,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
user_input=user_input,
intent_label=intent_label,
intent_confidence=intent_confidence,
execution_plan=execution_plan,
code=code,
success=success,
duration_ms=duration_ms,
stdout=stdout,
stderr=stderr,
log_path=log_path,
task_summary=task_summary
)
# 添加到列表开头(最新的在前)
self._history.insert(0, record)
# 限制历史记录数量
if len(self._history) > self.MAX_HISTORY_SIZE:
self._history = self._history[:self.MAX_HISTORY_SIZE]
# 保存
self._save()
return record
def get_all(self) -> List[TaskRecord]:
"""获取所有历史记录"""
return self._history.copy()
def get_recent(self, count: int = 10) -> List[TaskRecord]:
"""获取最近的 N 条记录"""
return self._history[:count]
def get_by_id(self, task_id: str) -> Optional[TaskRecord]:
"""根据任务 ID 获取记录"""
for record in self._history:
if record.task_id == task_id:
return record
return None
def delete_by_id(self, task_id: str) -> bool:
"""
根据任务 ID 删除记录
Args:
task_id: 任务 ID
Returns:
是否删除成功
"""
for i, record in enumerate(self._history):
if record.task_id == task_id:
self._history.pop(i)
self._save()
return True
return False
def delete_multiple(self, task_ids: List[str]) -> int:
"""
批量删除记录
Args:
task_ids: 任务 ID 列表
Returns:
删除的记录数量
"""
task_id_set = set(task_ids)
original_count = len(self._history)
self._history = [r for r in self._history if r.task_id not in task_id_set]
deleted_count = original_count - len(self._history)
if deleted_count > 0:
self._save()
return deleted_count
def clear(self):
"""清空历史记录"""
self._history = []
self._save()
def get_stats(self) -> dict:
"""获取统计信息"""
if not self._history:
return {
'total': 0,
'success': 0,
'failed': 0,
'success_rate': 0.0,
'avg_duration_ms': 0
}
total = len(self._history)
success = sum(1 for r in self._history if r.success)
failed = total - success
avg_duration = sum(r.duration_ms for r in self._history) / total
return {
'total': total,
'success': success,
'failed': failed,
'success_rate': success / total if total > 0 else 0.0,
'avg_duration_ms': int(avg_duration)
}
def find_similar_success(
self,
user_input: str,
threshold: float = 0.6,
return_details: bool = False
) -> Optional[TaskRecord] | tuple:
"""
查找相似的成功任务(增强版:结构化特征匹配)
Args:
user_input: 用户输入
threshold: 相似度阈值
return_details: 是否返回详细信息(相似度和差异列表)
Returns:
如果 return_details=False: 最相似的成功任务记录,如果没有则返回 None
如果 return_details=True: (TaskRecord, 相似度, 差异列表) 或 None
"""
from history.task_features import get_task_matcher
matcher = get_task_matcher()
best_match = None
best_score = 0.0
best_differences = []
for record in self._history:
if not record.success:
continue
# 使用增强的特征匹配
score, differences = matcher.calculate_similarity(
user_input,
record.user_input
)
if score > best_score and score >= threshold:
best_score = score
best_match = record
best_differences = differences
if best_match is None:
return None
if return_details:
return (best_match, best_score, best_differences)
else:
return best_match
def get_successful_records(self) -> List[TaskRecord]:
"""获取所有成功的任务记录"""
return [r for r in self._history if r.success]
def _auto_cleanup(self):
"""自动清理过期数据"""
try:
records_data = [asdict(r) for r in self._history]
kept_records, archived, deleted = self.governance.cleanup_expired(records_data)
if archived > 0 or deleted > 0:
# 更新历史记录
self._history = []
for record_dict in kept_records:
if '_governance' not in record_dict:
record_dict['_governance'] = None
if '_sanitization' not in record_dict:
record_dict['_sanitization'] = None
self._history.append(TaskRecord(**record_dict))
self._save()
print(f"[数据治理] 自动清理完成: 归档 {archived} 条, 删除 {deleted}")
except Exception as e:
print(f"[警告] 自动清理失败: {e}")
def manual_cleanup(self) -> dict:
"""
手动触发数据清理
Returns:
清理统计信息
"""
records_data = [asdict(r) for r in self._history]
kept_records, archived, deleted = self.governance.cleanup_expired(records_data)
# 更新历史记录
self._history = []
for record_dict in kept_records:
if '_governance' not in record_dict:
record_dict['_governance'] = None
if '_sanitization' not in record_dict:
record_dict['_sanitization'] = None
self._history.append(TaskRecord(**record_dict))
self._save()
return {
'archived': archived,
'deleted': deleted,
'remaining': len(self._history)
}
def get_governance_metrics(self) -> Optional[GovernanceMetrics]:
"""获取数据治理度量指标"""
return self.governance.load_metrics()
def export_sanitized(self, output_path: Path) -> int:
"""
导出脱敏后的历史记录
Args:
output_path: 导出文件路径
Returns:
导出的记录数量
"""
sanitized_data = []
for record in self._history:
record_dict = asdict(record)
# 确保已应用治理策略
if not record_dict.get('_governance'):
record_dict = self.governance.apply_policy(record_dict)
sanitized_data.append(record_dict)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(sanitized_data, f, ensure_ascii=False, indent=2)
return len(sanitized_data)
# 全局单例
_manager: Optional[HistoryManager] = None
def get_history_manager(workspace_path: Optional[Path] = None) -> HistoryManager:
"""获取历史记录管理器单例"""
global _manager
if _manager is None:
_manager = HistoryManager(workspace_path)
return _manager