""" 沙箱执行器 在受限环境中执行生成的 Python 代码 """ import os import sys import subprocess import uuid from datetime import datetime from pathlib import Path from typing import Optional from dataclasses import dataclass from .path_guard import wrap_user_code from .backup_manager import BackupManager @dataclass class ExecutionResult: """ 执行结果(三态模型) 状态定义: - success: 全部成功 - partial: 部分成功(有成功也有失败) - failed: 全部失败或执行异常 """ status: str # 'success' | 'partial' | 'failed' task_id: str stdout: str stderr: str return_code: int log_path: str duration_ms: int # 统计字段 success_count: int = 0 failed_count: int = 0 total_count: int = 0 @property def success(self) -> bool: """向后兼容的 success 属性""" return self.status == 'success' @property def success_rate(self) -> float: """成功率""" if self.total_count == 0: return 0.0 return self.success_count / self.total_count def get_status_display(self) -> str: """获取状态的中文显示""" status_map = { 'success': '✅ 全部成功', 'partial': '⚠️ 部分成功', 'failed': '❌ 执行失败' } return status_map.get(self.status, '未知状态') class SandboxRunner: """ 沙箱执行器 特性: 1. 使用 subprocess 启动独立 Python 进程 2. 工作目录限定为 workspace 3. 捕获所有输出 4. 写入日志文件 """ def __init__(self, workspace_path: Optional[str] = None): if workspace_path: self.workspace = Path(workspace_path) else: # 默认使用项目根目录下的 workspace self.workspace = Path(__file__).parent.parent / "workspace" self.input_dir = self.workspace / "input" self.output_dir = self.workspace / "output" self.logs_dir = self.workspace / "logs" self.codes_dir = self.workspace / "codes" # 确保目录存在 self.input_dir.mkdir(parents=True, exist_ok=True) self.output_dir.mkdir(parents=True, exist_ok=True) self.logs_dir.mkdir(parents=True, exist_ok=True) self.codes_dir.mkdir(parents=True, exist_ok=True) # 初始化备份管理器 self.backup_manager = BackupManager(self.workspace) def save_task_code(self, code: str, task_id: Optional[str] = None, inject_guard: bool = True) -> tuple[str, Path]: """ 保存任务代码到文件 Args: code: Python 代码 task_id: 任务 ID(可选,自动生成) inject_guard: 是否注入路径守卫(默认 True) Returns: (task_id, code_path) """ if not task_id: task_id = self._generate_task_id() # 注入运行时守卫 if inject_guard: code = wrap_user_code(code, str(self.workspace.resolve())) code_path = self.codes_dir / f"task_{task_id}.py" code_path.write_text(code, encoding='utf-8') return task_id, code_path def execute(self, code: str, task_id: Optional[str] = None, timeout: int = 60, inject_guard: bool = True, user_input: str = "", is_retry: bool = False) -> ExecutionResult: """ 执行代码 Args: code: Python 代码 task_id: 任务 ID timeout: 超时时间(秒) inject_guard: 是否注入运行时守卫(默认 True) user_input: 用户输入(用于度量记录) is_retry: 是否是重试(用于度量记录) Returns: ExecutionResult: 执行结果 """ # 保存代码(注入守卫) task_id, code_path = self.save_task_code(code, task_id, inject_guard=inject_guard) # 准备日志 log_path = self.logs_dir / f"task_{task_id}.log" start_time = datetime.now() try: # 使用 subprocess 执行 result = subprocess.run( [sys.executable, str(code_path)], cwd=str(self.workspace), capture_output=True, text=True, timeout=timeout, # 不继承父进程的环境变量中的网络代理等 env=self._get_safe_env() ) end_time = datetime.now() duration_ms = int((end_time - start_time).total_seconds() * 1000) # 写入日志 self._write_log( log_path=log_path, task_id=task_id, code_path=code_path, stdout=result.stdout, stderr=result.stderr, return_code=result.returncode, duration_ms=duration_ms ) # 分析执行结果(三态判断) status, success_count, failed_count, total_count = self._analyze_execution_result( result.returncode, result.stdout, result.stderr ) # 记录执行度量指标 from executor.execution_metrics import get_execution_metrics metrics = get_execution_metrics(self.workspace) metrics.record_execution( task_id=task_id, status=status, success_count=success_count, failed_count=failed_count, total_count=total_count, duration_ms=duration_ms, user_input=user_input, is_retry=is_retry ) return ExecutionResult( status=status, task_id=task_id, stdout=result.stdout, stderr=result.stderr, return_code=result.returncode, log_path=str(log_path), duration_ms=duration_ms, success_count=success_count, failed_count=failed_count, total_count=total_count ) except subprocess.TimeoutExpired: end_time = datetime.now() duration_ms = int((end_time - start_time).total_seconds() * 1000) error_msg = f"执行超时(超过 {timeout} 秒)" self._write_log( log_path=log_path, task_id=task_id, code_path=code_path, stdout="", stderr=error_msg, return_code=-1, duration_ms=duration_ms ) return ExecutionResult( status='failed', task_id=task_id, stdout="", stderr=error_msg, return_code=-1, log_path=str(log_path), duration_ms=duration_ms, success_count=0, failed_count=0, total_count=0 ) except Exception as e: end_time = datetime.now() duration_ms = int((end_time - start_time).total_seconds() * 1000) error_msg = f"执行异常: {str(e)}" self._write_log( log_path=log_path, task_id=task_id, code_path=code_path, stdout="", stderr=error_msg, return_code=-1, duration_ms=duration_ms ) return ExecutionResult( status='failed', task_id=task_id, stdout="", stderr=error_msg, return_code=-1, log_path=str(log_path), duration_ms=duration_ms, success_count=0, failed_count=0, total_count=0 ) def _generate_task_id(self) -> str: """生成任务 ID""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") short_uuid = uuid.uuid4().hex[:6] return f"{timestamp}_{short_uuid}" def clear_workspace(self, clear_input: bool = True, clear_output: bool = True, create_backup: bool = True) -> Optional[str]: """ 清空工作目录(支持自动备份) Args: clear_input: 是否清空 input 目录 clear_output: 是否清空 output 目录 create_backup: 是否创建备份(默认 True) Returns: 备份 ID(如果创建了备份) """ backup_id = None # 创建备份 if create_backup: backup_info = self.backup_manager.create_backup(self.input_dir, self.output_dir) if backup_info: backup_id = backup_info.backup_id # 清空目录 if clear_input: self._clear_directory(self.input_dir) if clear_output: self._clear_directory(self.output_dir) return backup_id def restore_from_backup(self, backup_id: str) -> bool: """ 从备份恢复工作区 Args: backup_id: 备份 ID Returns: 是否成功 """ return self.backup_manager.restore_backup(backup_id, self.input_dir, self.output_dir) def check_workspace_content(self) -> tuple[bool, int, str]: """ 检查工作区是否有内容 Returns: (has_content, file_count, size_str) """ return self.backup_manager.check_workspace_content(self.input_dir, self.output_dir) def _clear_directory(self, directory: Path) -> None: """ 清空目录中的所有文件和子目录 Args: directory: 要清空的目录路径 """ if not directory.exists(): return import shutil for item in directory.iterdir(): try: if item.is_file(): item.unlink() elif item.is_dir(): shutil.rmtree(item) except Exception as e: # 忽略删除失败的文件(可能被占用) print(f"Warning: Failed to delete {item}: {e}") def _analyze_execution_result( self, return_code: int, stdout: str, stderr: str ) -> tuple[str, int, int, int]: """ 分析执行结果(三态模型) 返回: (status, success_count, failed_count, total_count) - status: 'success' | 'partial' | 'failed' - success_count: 成功数量 - failed_count: 失败数量 - total_count: 总数量 """ import re # return code 不为 0 直接判定为 failed if return_code != 0: return ('failed', 0, 0, 0) # 尝试从输出中提取统计信息 success_count = 0 failed_count = 0 total_count = 0 output = stdout if stdout else "" # 模式 1: "成功 X 个, 失败 Y 个" pattern_cn = r'成功\s*[::]\s*(\d+)\s*个.*?失败\s*[::]\s*(\d+)\s*个' match = re.search(pattern_cn, output) if match: success_count = int(match.group(1)) failed_count = int(match.group(2)) total_count = success_count + failed_count # 模式 2: "成功 X 个" 和 "失败 Y 个" 分开 if total_count == 0: success_match = re.search(r'成功\s*[::]\s*(\d+)\s*个', output) failed_match = re.search(r'失败\s*[::]\s*(\d+)\s*个', output) if success_match: success_count = int(success_match.group(1)) if failed_match: failed_count = int(failed_match.group(1)) if success_count > 0 or failed_count > 0: total_count = success_count + failed_count # 模式 3: 英文 "success: X, failed: Y" if total_count == 0: pattern_en = r'success[:\s]+(\d+).*?fail(?:ed)?[:\s]+(\d+)' match = re.search(pattern_en, output.lower()) if match: success_count = int(match.group(1)) failed_count = int(match.group(2)) total_count = success_count + failed_count # 模式 4: "处理了 X 个文件" 或 "total: X" if total_count == 0: total_match = re.search(r'(?:处理|total)[:\s]+(\d+)', output.lower()) if total_match: total_count = int(total_match.group(1)) # 如果没有明确的失败信息,假设全部成功 if not re.search(r'失败|error|exception|failed', output.lower()): success_count = total_count failed_count = 0 # 如果提取到了统计信息,根据数量判断状态 if total_count > 0: if failed_count == 0: return ('success', success_count, failed_count, total_count) elif success_count == 0: return ('failed', success_count, failed_count, total_count) else: return ('partial', success_count, failed_count, total_count) # 没有统计信息,使用关键词判断 output_lower = output.lower() has_error = any(keyword in output_lower for keyword in [ '失败', 'error', 'exception', 'traceback', 'failed' ]) # 检查是否是 "失败 0 个" 这种情况 if has_error: if re.search(r'失败\s*[::]\s*0\s*个', output) or \ re.search(r'failed[:\s]+0', output_lower): has_error = False if has_error: return ('failed', 0, 0, 0) # 默认认为成功 return ('success', 0, 0, 0) def _check_execution_success(self, return_code: int, stdout: str, stderr: str) -> bool: """ 检查执行是否成功(向后兼容方法,已废弃) 建议使用 _analyze_execution_result 获取三态结果 """ status, _, _, _ = self._analyze_execution_result(return_code, stdout, stderr) return status == 'success' def _get_safe_env(self) -> dict: """获取安全的环境变量(移除网络代理等)""" safe_env = os.environ.copy() # 移除可能的网络代理设置 proxy_vars = [ 'HTTP_PROXY', 'HTTPS_PROXY', 'http_proxy', 'https_proxy', 'ALL_PROXY', 'all_proxy', 'NO_PROXY', 'no_proxy' ] for var in proxy_vars: safe_env.pop(var, None) return safe_env def _write_log( self, log_path: Path, task_id: str, code_path: Path, stdout: str, stderr: str, return_code: int, duration_ms: int ): """写入执行日志""" log_content = f"""======================================== 任务执行日志 ======================================== 任务 ID: {task_id} 代码文件: {code_path} 执行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} 耗时: {duration_ms} ms 返回码: {return_code} 状态: {"成功" if return_code == 0 else "失败"} ======================================== 标准输出 (stdout) ======================================== {stdout if stdout else "(无输出)"} ======================================== 标准错误 (stderr) ======================================== {stderr if stderr else "(无错误)"} """ log_path.write_text(log_content, encoding='utf-8') def run_task(code: str, task_id: Optional[str] = None) -> ExecutionResult: """便捷函数:执行任务""" runner = SandboxRunner() return runner.execute(code, task_id)