Files
LocalAgent/executor/backup_manager.py
Mimikko-zeus 8a538bb950 feat: refactor API key configuration and enhance application initialization
- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic.
- Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration.
- Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup.
- Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics.
- Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management.
- Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
2026-02-27 14:32:30 +08:00

269 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
工作区备份管理器
提供自动备份、恢复和清理确认机制
"""
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Tuple
from dataclasses import dataclass
@dataclass
class BackupInfo:
"""备份信息"""
backup_id: str
timestamp: datetime
input_path: Optional[Path]
output_path: Optional[Path]
file_count: int
total_size: int # 字节
class BackupManager:
"""
备份管理器
功能:
1. 执行前自动备份 input/output 目录
2. 提供恢复机制
3. 自动清理过期备份
"""
def __init__(self, workspace_path: Path):
self.workspace = workspace_path
self.backup_root = self.workspace / ".backups"
self.backup_root.mkdir(parents=True, exist_ok=True)
# 备份保留策略:最多保留 10 个备份
self.max_backups = 10
def create_backup(self, input_dir: Path, output_dir: Path) -> Optional[BackupInfo]:
"""
创建备份
Args:
input_dir: input 目录
output_dir: output 目录
Returns:
BackupInfo 或 None如果目录为空则不备份
"""
# 检查是否有内容需要备份
input_files = list(input_dir.iterdir()) if input_dir.exists() else []
output_files = list(output_dir.iterdir()) if output_dir.exists() else []
if not input_files and not output_files:
return None # 无需备份
# 生成备份 ID
backup_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
backup_dir = self.backup_root / backup_id
backup_dir.mkdir(parents=True, exist_ok=True)
# 备份 input
input_backup_path = None
if input_files:
input_backup_path = backup_dir / "input"
shutil.copytree(input_dir, input_backup_path)
# 备份 output
output_backup_path = None
if output_files:
output_backup_path = backup_dir / "output"
shutil.copytree(output_dir, output_backup_path)
# 计算统计信息
file_count = len(input_files) + len(output_files)
total_size = self._calculate_dir_size(input_dir) + self._calculate_dir_size(output_dir)
# 创建备份信息文件
info_file = backup_dir / "info.txt"
info_content = f"""备份信息
========================================
备份 ID: {backup_id}
备份时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
文件数量: {file_count}
总大小: {self._format_size(total_size)}
Input 文件: {len(input_files)}
Output 文件: {len(output_files)}
"""
info_file.write_text(info_content, encoding='utf-8')
# 清理旧备份
self._cleanup_old_backups()
return BackupInfo(
backup_id=backup_id,
timestamp=datetime.now(),
input_path=input_backup_path,
output_path=output_backup_path,
file_count=file_count,
total_size=total_size
)
def restore_backup(self, backup_id: str, input_dir: Path, output_dir: Path) -> bool:
"""
恢复备份
Args:
backup_id: 备份 ID
input_dir: 目标 input 目录
output_dir: 目标 output 目录
Returns:
是否成功
"""
backup_dir = self.backup_root / backup_id
if not backup_dir.exists():
return False
try:
# 恢复 input
input_backup = backup_dir / "input"
if input_backup.exists():
# 清空目标目录
if input_dir.exists():
shutil.rmtree(input_dir)
# 恢复
shutil.copytree(input_backup, input_dir)
# 恢复 output
output_backup = backup_dir / "output"
if output_backup.exists():
# 清空目标目录
if output_dir.exists():
shutil.rmtree(output_dir)
# 恢复
shutil.copytree(output_backup, output_dir)
return True
except Exception as e:
print(f"恢复备份失败: {e}")
return False
def list_backups(self) -> List[BackupInfo]:
"""列出所有备份"""
backups = []
if not self.backup_root.exists():
return backups
for backup_dir in sorted(self.backup_root.iterdir(), reverse=True):
if not backup_dir.is_dir():
continue
backup_id = backup_dir.name
# 读取备份信息
input_backup = backup_dir / "input"
output_backup = backup_dir / "output"
input_path = input_backup if input_backup.exists() else None
output_path = output_backup if output_backup.exists() else None
# 计算统计信息
file_count = 0
total_size = 0
if input_path:
file_count += len(list(input_path.rglob("*")))
total_size += self._calculate_dir_size(input_path)
if output_path:
file_count += len(list(output_path.rglob("*")))
total_size += self._calculate_dir_size(output_path)
# 解析时间戳
try:
timestamp_str = backup_id.rsplit('_', 1)[0]
timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
except:
timestamp = datetime.now()
backups.append(BackupInfo(
backup_id=backup_id,
timestamp=timestamp,
input_path=input_path,
output_path=output_path,
file_count=file_count,
total_size=total_size
))
return backups
def get_latest_backup(self) -> Optional[BackupInfo]:
"""获取最新的备份"""
backups = self.list_backups()
return backups[0] if backups else None
def delete_backup(self, backup_id: str) -> bool:
"""删除指定备份"""
backup_dir = self.backup_root / backup_id
if not backup_dir.exists():
return False
try:
shutil.rmtree(backup_dir)
return True
except Exception as e:
print(f"删除备份失败: {e}")
return False
def _cleanup_old_backups(self):
"""清理过期备份(保留最新的 N 个)"""
backups = self.list_backups()
if len(backups) <= self.max_backups:
return
# 删除多余的旧备份
for backup in backups[self.max_backups:]:
self.delete_backup(backup.backup_id)
def _calculate_dir_size(self, directory: Path) -> int:
"""计算目录大小(字节)"""
if not directory.exists():
return 0
total_size = 0
for item in directory.rglob("*"):
if item.is_file():
try:
total_size += item.stat().st_size
except:
pass
return total_size
def _format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} TB"
def check_workspace_content(self, input_dir: Path, output_dir: Path) -> Tuple[bool, int, str]:
"""
检查工作区是否有内容
Returns:
(has_content, file_count, size_str)
"""
input_files = list(input_dir.iterdir()) if input_dir.exists() else []
output_files = list(output_dir.iterdir()) if output_dir.exists() else []
file_count = len(input_files) + len(output_files)
if file_count == 0:
return False, 0, "0 B"
total_size = self._calculate_dir_size(input_dir) + self._calculate_dir_size(output_dir)
size_str = self._format_size(total_size)
return True, file_count, size_str