""" 运行时路径访问守卫 在代码执行前注入,拦截所有文件操作 """ import os import sys from pathlib import Path from typing import Callable, Any class PathGuard: """ 路径访问守卫 在执行用户代码前注入,拦截所有文件操作函数, 确保只能访问 workspace 目录 """ def __init__(self, allowed_root: str): """ Args: allowed_root: 允许访问的根目录(绝对路径) """ self.allowed_root = Path(allowed_root).resolve() # 保存原始函数 self._original_open = open self._original_path_init = Path.__init__ def is_path_allowed(self, path: str) -> bool: """ 检查路径是否在允许的范围内 Args: path: 要检查的路径 Returns: bool: 是否允许访问 """ try: # 解析为绝对路径 abs_path = Path(path).resolve() # 检查是否在允许的根目录下 try: abs_path.relative_to(self.allowed_root) return True except ValueError: return False except Exception: # 路径解析失败,拒绝访问 return False def guarded_open(self, file, mode='r', *args, **kwargs): """ 受保护的 open 函数 拦截所有 open() 调用,检查路径是否合法 """ # 获取文件路径 if isinstance(file, (str, bytes, os.PathLike)): file_path = str(file) # 检查路径 if not self.is_path_allowed(file_path): raise PermissionError( f"安全限制: 禁止访问 workspace 外的路径: {file_path}\n" f"只允许访问: {self.allowed_root}" ) # 调用原始 open return self._original_open(file, mode, *args, **kwargs) def install(self): """安装守卫,替换内置函数""" import builtins builtins.open = self.guarded_open def uninstall(self): """卸载守卫,恢复原始函数""" import builtins builtins.open = self._original_open def generate_guard_code(workspace_path: str) -> str: """ 生成守卫代码,注入到用户代码前执行 Args: workspace_path: workspace 绝对路径 Returns: str: 守卫代码 """ guard_code = f''' # ==================== 安全守卫(自动注入)==================== import os import sys from pathlib import Path _ALLOWED_ROOT = Path(r"{workspace_path}").resolve() def _is_path_allowed(path): """检查路径是否在允许范围内""" try: abs_path = Path(path).resolve() try: abs_path.relative_to(_ALLOWED_ROOT) return True except ValueError: return False except Exception: return False # 保存原始 open _original_open = open def _guarded_open(file, mode='r', *args, **kwargs): """受保护的 open 函数""" if isinstance(file, (str, bytes, os.PathLike)): file_path = str(file) if not _is_path_allowed(file_path): raise PermissionError( f"安全限制: 禁止访问 workspace 外的路径: {{file_path}}\\n" f"只允许访问: {{_ALLOWED_ROOT}}" ) return _original_open(file, mode, *args, **kwargs) # 替换内置 open import builtins builtins.open = _guarded_open # 禁用网络相关模块(运行时检查) _FORBIDDEN_MODULES = {{ 'socket', 'requests', 'urllib', 'urllib3', 'http', 'ftplib', 'smtplib', 'telnetlib', 'aiohttp', 'httplib' }} _original_import = __builtins__.__import__ def _guarded_import(name, *args, **kwargs): """受保护的 import""" module_base = name.split('.')[0] if module_base in _FORBIDDEN_MODULES: raise ImportError( f"安全限制: 禁止导入网络模块: {{name}}\\n" f"执行器不允许联网操作" ) return _original_import(name, *args, **kwargs) __builtins__.__import__ = _guarded_import # ==================== 用户代码开始 ==================== ''' return guard_code def wrap_user_code(user_code: str, workspace_path: str) -> str: """ 包装用户代码,注入守卫 Args: user_code: 用户代码 workspace_path: workspace 绝对路径 Returns: str: 包装后的代码 """ guard_code = generate_guard_code(workspace_path) return guard_code + "\n" + user_code