""" 工作区备份管理器 提供自动备份、恢复和清理确认机制 """ import shutil from datetime import datetime from pathlib import Path from typing import Optional, List, Tuple from dataclasses import dataclass @dataclass class BackupInfo: """备份信息""" backup_id: str timestamp: datetime input_path: Optional[Path] output_path: Optional[Path] file_count: int total_size: int # 字节 class BackupManager: """ 备份管理器 功能: 1. 执行前自动备份 input/output 目录 2. 提供恢复机制 3. 自动清理过期备份 """ def __init__(self, workspace_path: Path): self.workspace = workspace_path self.backup_root = self.workspace / ".backups" self.backup_root.mkdir(parents=True, exist_ok=True) # 备份保留策略:最多保留 10 个备份 self.max_backups = 10 def create_backup(self, input_dir: Path, output_dir: Path) -> Optional[BackupInfo]: """ 创建备份 Args: input_dir: input 目录 output_dir: output 目录 Returns: BackupInfo 或 None(如果目录为空则不备份) """ # 检查是否有内容需要备份 input_files = list(input_dir.iterdir()) if input_dir.exists() else [] output_files = list(output_dir.iterdir()) if output_dir.exists() else [] if not input_files and not output_files: return None # 无需备份 # 生成备份 ID backup_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f") backup_dir = self.backup_root / backup_id backup_dir.mkdir(parents=True, exist_ok=True) # 备份 input input_backup_path = None if input_files: input_backup_path = backup_dir / "input" shutil.copytree(input_dir, input_backup_path) # 备份 output output_backup_path = None if output_files: output_backup_path = backup_dir / "output" shutil.copytree(output_dir, output_backup_path) # 计算统计信息 file_count = len(input_files) + len(output_files) total_size = self._calculate_dir_size(input_dir) + self._calculate_dir_size(output_dir) # 创建备份信息文件 info_file = backup_dir / "info.txt" info_content = f"""备份信息 ======================================== 备份 ID: {backup_id} 备份时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} 文件数量: {file_count} 总大小: {self._format_size(total_size)} Input 文件: {len(input_files)} Output 文件: {len(output_files)} """ info_file.write_text(info_content, encoding='utf-8') # 清理旧备份 self._cleanup_old_backups() return BackupInfo( backup_id=backup_id, timestamp=datetime.now(), input_path=input_backup_path, output_path=output_backup_path, file_count=file_count, total_size=total_size ) def restore_backup(self, backup_id: str, input_dir: Path, output_dir: Path) -> bool: """ 恢复备份 Args: backup_id: 备份 ID input_dir: 目标 input 目录 output_dir: 目标 output 目录 Returns: 是否成功 """ backup_dir = self.backup_root / backup_id if not backup_dir.exists(): return False try: # 恢复 input input_backup = backup_dir / "input" if input_backup.exists(): # 清空目标目录 if input_dir.exists(): shutil.rmtree(input_dir) # 恢复 shutil.copytree(input_backup, input_dir) # 恢复 output output_backup = backup_dir / "output" if output_backup.exists(): # 清空目标目录 if output_dir.exists(): shutil.rmtree(output_dir) # 恢复 shutil.copytree(output_backup, output_dir) return True except Exception as e: print(f"恢复备份失败: {e}") return False def list_backups(self) -> List[BackupInfo]: """列出所有备份""" backups = [] if not self.backup_root.exists(): return backups for backup_dir in sorted(self.backup_root.iterdir(), reverse=True): if not backup_dir.is_dir(): continue backup_id = backup_dir.name # 读取备份信息 input_backup = backup_dir / "input" output_backup = backup_dir / "output" input_path = input_backup if input_backup.exists() else None output_path = output_backup if output_backup.exists() else None # 计算统计信息 file_count = 0 total_size = 0 if input_path: file_count += len(list(input_path.rglob("*"))) total_size += self._calculate_dir_size(input_path) if output_path: file_count += len(list(output_path.rglob("*"))) total_size += self._calculate_dir_size(output_path) # 解析时间戳 try: timestamp_str = backup_id.rsplit('_', 1)[0] timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") except: timestamp = datetime.now() backups.append(BackupInfo( backup_id=backup_id, timestamp=timestamp, input_path=input_path, output_path=output_path, file_count=file_count, total_size=total_size )) return backups def get_latest_backup(self) -> Optional[BackupInfo]: """获取最新的备份""" backups = self.list_backups() return backups[0] if backups else None def delete_backup(self, backup_id: str) -> bool: """删除指定备份""" backup_dir = self.backup_root / backup_id if not backup_dir.exists(): return False try: shutil.rmtree(backup_dir) return True except Exception as e: print(f"删除备份失败: {e}") return False def _cleanup_old_backups(self): """清理过期备份(保留最新的 N 个)""" backups = self.list_backups() if len(backups) <= self.max_backups: return # 删除多余的旧备份 for backup in backups[self.max_backups:]: self.delete_backup(backup.backup_id) def _calculate_dir_size(self, directory: Path) -> int: """计算目录大小(字节)""" if not directory.exists(): return 0 total_size = 0 for item in directory.rglob("*"): if item.is_file(): try: total_size += item.stat().st_size except: pass return total_size def _format_size(self, size_bytes: int) -> str: """格式化文件大小""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} TB" def check_workspace_content(self, input_dir: Path, output_dir: Path) -> Tuple[bool, int, str]: """ 检查工作区是否有内容 Returns: (has_content, file_count, size_str) """ input_files = list(input_dir.iterdir()) if input_dir.exists() else [] output_files = list(output_dir.iterdir()) if output_dir.exists() else [] file_count = len(input_files) + len(output_files) if file_count == 0: return False, 0, "0 B" total_size = self._calculate_dir_size(input_dir) + self._calculate_dir_size(output_dir) size_str = self._format_size(total_size) return True, file_count, size_str