Files
LocalAgent/executor/path_guard.py
Mimikko-zeus 8a538bb950 feat: refactor API key configuration and enhance application initialization
- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic.
- Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration.
- Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup.
- Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics.
- Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management.
- Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
2026-02-27 14:32:30 +08:00

174 lines
4.6 KiB
Python

"""
运行时路径访问守卫
在代码执行前注入,拦截所有文件操作
"""
import os
import sys
from pathlib import Path
from typing import Callable, Any
class PathGuard:
"""
路径访问守卫
在执行用户代码前注入,拦截所有文件操作函数,
确保只能访问 workspace 目录
"""
def __init__(self, allowed_root: str):
"""
Args:
allowed_root: 允许访问的根目录(绝对路径)
"""
self.allowed_root = Path(allowed_root).resolve()
# 保存原始函数
self._original_open = open
self._original_path_init = Path.__init__
def is_path_allowed(self, path: str) -> bool:
"""
检查路径是否在允许的范围内
Args:
path: 要检查的路径
Returns:
bool: 是否允许访问
"""
try:
# 解析为绝对路径
abs_path = Path(path).resolve()
# 检查是否在允许的根目录下
try:
abs_path.relative_to(self.allowed_root)
return True
except ValueError:
return False
except Exception:
# 路径解析失败,拒绝访问
return False
def guarded_open(self, file, mode='r', *args, **kwargs):
"""
受保护的 open 函数
拦截所有 open() 调用,检查路径是否合法
"""
# 获取文件路径
if isinstance(file, (str, bytes, os.PathLike)):
file_path = str(file)
# 检查路径
if not self.is_path_allowed(file_path):
raise PermissionError(
f"安全限制: 禁止访问 workspace 外的路径: {file_path}\n"
f"只允许访问: {self.allowed_root}"
)
# 调用原始 open
return self._original_open(file, mode, *args, **kwargs)
def install(self):
"""安装守卫,替换内置函数"""
import builtins
builtins.open = self.guarded_open
def uninstall(self):
"""卸载守卫,恢复原始函数"""
import builtins
builtins.open = self._original_open
def generate_guard_code(workspace_path: str) -> str:
"""
生成守卫代码,注入到用户代码前执行
Args:
workspace_path: workspace 绝对路径
Returns:
str: 守卫代码
"""
guard_code = f'''
# ==================== 安全守卫(自动注入)====================
import os
import sys
from pathlib import Path
_ALLOWED_ROOT = Path(r"{workspace_path}").resolve()
def _is_path_allowed(path):
"""检查路径是否在允许范围内"""
try:
abs_path = Path(path).resolve()
try:
abs_path.relative_to(_ALLOWED_ROOT)
return True
except ValueError:
return False
except Exception:
return False
# 保存原始 open
_original_open = open
def _guarded_open(file, mode='r', *args, **kwargs):
"""受保护的 open 函数"""
if isinstance(file, (str, bytes, os.PathLike)):
file_path = str(file)
if not _is_path_allowed(file_path):
raise PermissionError(
f"安全限制: 禁止访问 workspace 外的路径: {{file_path}}\\n"
f"只允许访问: {{_ALLOWED_ROOT}}"
)
return _original_open(file, mode, *args, **kwargs)
# 替换内置 open
import builtins
builtins.open = _guarded_open
# 禁用网络相关模块(运行时检查)
_FORBIDDEN_MODULES = {{
'socket', 'requests', 'urllib', 'urllib3', 'http',
'ftplib', 'smtplib', 'telnetlib', 'aiohttp', 'httplib'
}}
_original_import = __builtins__.__import__
def _guarded_import(name, *args, **kwargs):
"""受保护的 import"""
module_base = name.split('.')[0]
if module_base in _FORBIDDEN_MODULES:
raise ImportError(
f"安全限制: 禁止导入网络模块: {{name}}\\n"
f"执行器不允许联网操作"
)
return _original_import(name, *args, **kwargs)
__builtins__.__import__ = _guarded_import
# ==================== 用户代码开始 ====================
'''
return guard_code
def wrap_user_code(user_code: str, workspace_path: str) -> str:
"""
包装用户代码,注入守卫
Args:
user_code: 用户代码
workspace_path: workspace 绝对路径
Returns:
str: 包装后的代码
"""
guard_code = generate_guard_code(workspace_path)
return guard_code + "\n" + user_code