""" 端到端集成测试 测试关键主流程和安全回归场景 """ import unittest import sys import tempfile import shutil import os from pathlib import Path from unittest.mock import Mock, patch, MagicMock # 添加项目根目录到路径 sys.path.insert(0, str(Path(__file__).parent.parent)) from history.manager import HistoryManager from safety.rule_checker import RuleChecker from safety.llm_reviewer import LLMReviewer, LLMReviewResult from executor.sandbox_runner import SandboxRunner, ExecutionResult from intent.classifier import IntentClassifier, IntentResult from intent.labels import EXECUTION from llm.config_metrics import ConfigMetricsManager from history.reuse_metrics import ReuseMetrics class TestCodeReuseSecurityRegression(unittest.TestCase): """ 测试场景:复用绕过安全 验证历史代码复用时必须重新进行安全检查 """ def setUp(self): """创建测试环境""" self.temp_dir = Path(tempfile.mkdtemp()) self.history = HistoryManager(self.temp_dir) self.rule_checker = RuleChecker() self.reuse_metrics = ReuseMetrics(self.temp_dir) def tearDown(self): """清理测试环境""" shutil.rmtree(self.temp_dir, ignore_errors=True) def test_reuse_must_trigger_security_recheck(self): """测试:复用代码必须触发安全复检""" # 1. 添加一条历史成功记录(包含潜在危险代码) dangerous_code = """ import os import shutil from pathlib import Path INPUT_DIR = Path('workspace/input') OUTPUT_DIR = Path('workspace/output') # 危险操作:删除文件 for f in INPUT_DIR.glob('*.txt'): os.remove(f) """ self.history.add_record( task_id="task_001", user_input="删除所有txt文件", intent_label=EXECUTION, intent_confidence=0.95, execution_plan="遍历input目录删除txt文件", code=dangerous_code, success=True, duration_ms=100 ) # 2. 查找相似任务(模拟复用场景) result = self.history.find_similar_success("删除txt文件", return_details=True) self.assertIsNotNone(result) similar_record, similarity_score, differences = result # 3. 记录复用指标 self.reuse_metrics.record_reuse_offered( original_task_id="task_001", similarity_score=similarity_score, differences_count=len(differences), critical_differences=0 ) # 4. 模拟用户接受复用 self.reuse_metrics.record_reuse_accepted( original_task_id="task_001", similarity_score=similarity_score, differences_count=len(differences), critical_differences=0 ) # 5. 强制安全复检(关键步骤) recheck_result = self.rule_checker.check(similar_record.code) # 6. 验证:必须检测到危险操作 self.assertTrue(len(recheck_result.warnings) > 0, "复用代码的安全复检必须检测到警告") self.assertTrue( any('os.remove' in w for w in recheck_result.warnings), "必须检测到 os.remove 警告" ) def test_reuse_blocked_by_security_check(self): """测试:复用代码被安全检查拦截""" # 1. 添加包含硬性禁止操作的历史记录 blocked_code = """ import socket # 硬性禁止:网络操作 s = socket.socket() s.connect(('example.com', 80)) """ self.history.add_record( task_id="task_002", user_input="连接服务器", intent_label=EXECUTION, intent_confidence=0.9, execution_plan="建立socket连接", code=blocked_code, success=True, duration_ms=100 ) # 2. 查找并尝试复用 result = self.history.find_similar_success("连接到服务器", return_details=True) self.assertIsNotNone(result) similar_record, _, _ = result # 3. 安全复检 recheck_result = self.rule_checker.check(similar_record.code) # 4. 验证:必须被拦截 self.assertFalse(recheck_result.passed, "包含socket的复用代码必须被拦截") self.assertTrue( any('socket' in v for v in recheck_result.violations), "必须检测到socket违规" ) def test_reuse_metrics_tracking(self): """测试:复用流程的指标追踪""" # 1. 添加历史记录 safe_code = """ import shutil from pathlib import Path INPUT_DIR = Path('workspace/input') OUTPUT_DIR = Path('workspace/output') for f in INPUT_DIR.glob('*.png'): shutil.copy(f, OUTPUT_DIR / f.name) """ self.history.add_record( task_id="task_003", user_input="复制所有图片", intent_label=EXECUTION, intent_confidence=0.95, execution_plan="复制png文件", code=safe_code, success=True, duration_ms=150 ) # 2. 模拟完整的复用流程 result = self.history.find_similar_success("复制图片文件", return_details=True) similar_record, similarity_score, differences = result # 记录复用提供 self.reuse_metrics.record_reuse_offered( original_task_id="task_003", similarity_score=similarity_score, differences_count=len(differences), critical_differences=0 ) # 记录复用接受 self.reuse_metrics.record_reuse_accepted( original_task_id="task_003", similarity_score=similarity_score, differences_count=len(differences), critical_differences=0 ) # 安全复检通过 recheck_result = self.rule_checker.check(similar_record.code) self.assertTrue(recheck_result.passed) # 记录执行结果 self.reuse_metrics.record_reuse_execution( original_task_id="task_003", new_task_id="task_004", success=True ) # 3. 验证指标 stats = self.reuse_metrics.get_stats() self.assertEqual(stats['total_offered'], 1) self.assertEqual(stats['total_accepted'], 1) self.assertEqual(stats['total_executed'], 1) self.assertEqual(stats['success_count'], 1) self.assertAlmostEqual(stats['acceptance_rate'], 1.0) class TestConfigHotReloadRegression(unittest.TestCase): """ 测试场景:设置热更新 验证配置变更后首次调用的正确性 """ def setUp(self): """创建测试环境""" self.temp_dir = Path(tempfile.mkdtemp()) self.config_metrics = ConfigMetricsManager(self.temp_dir / "config_metrics.json") def tearDown(self): """清理测试环境""" shutil.rmtree(self.temp_dir, ignore_errors=True) def test_config_change_triggers_first_call_tracking(self): """测试:配置变更触发首次调用追踪""" # 1. 记录配置变更 self.config_metrics.mark_config_changed(connection_test_success=True) # 2. 验证首次调用标志 self.assertTrue( self.config_metrics._config_changed, "配置变更后应标记为首次调用" ) # 3. 模拟首次调用成功 self.config_metrics.record_first_call(success=True) # 4. 验证标志已清除 self.assertTrue( self.config_metrics._first_call_recorded, "首次调用后应记录标志" ) def test_config_change_first_call_failure(self): """测试:配置变更后首次调用失败""" # 1. 记录配置变更 self.config_metrics.mark_config_changed(connection_test_success=True) # 2. 模拟首次调用失败 self.config_metrics.record_first_call( success=False, error_message="Invalid API Key" ) # 3. 验证记录 self.assertTrue(self.config_metrics._first_call_recorded) self.assertEqual(self.config_metrics._retry_count, 0) @patch('llm.client.get_client') def test_intent_classification_after_config_change(self, mock_get_client): """测试:配置变更后的意图分类调用""" # 1. Mock LLM 客户端 mock_client = MagicMock() mock_client.chat.return_value = '{"label": "execution", "confidence": 0.95, "reason": "需要执行文件操作"}' mock_get_client.return_value = mock_client # 2. 记录配置变更 self.config_metrics.mark_config_changed(connection_test_success=True) # 3. 执行意图分类(首次调用) from intent.classifier import classify_intent try: result = classify_intent("复制所有文件") # 4. 记录成功 self.config_metrics.record_first_call(success=True) # 5. 验证结果 self.assertEqual(result.label, EXECUTION) self.assertGreater(result.confidence, 0.9) except Exception as e: # 记录失败 self.config_metrics.record_first_call(success=False, error_message=str(e)) raise class TestExecutionResultThreeStateRegression(unittest.TestCase): """ 测试场景:执行链三态结果 验证 success/partial/failed 状态的正确流转 """ def setUp(self): """创建测试环境""" self.temp_dir = Path(tempfile.mkdtemp()) self.workspace = self.temp_dir / "workspace" self.workspace.mkdir() (self.workspace / "input").mkdir() (self.workspace / "output").mkdir() (self.workspace / "codes").mkdir() (self.workspace / "logs").mkdir() self.runner = SandboxRunner(str(self.workspace)) def tearDown(self): """清理测试环境""" shutil.rmtree(self.temp_dir, ignore_errors=True) def test_execution_result_all_success(self): """测试:全部成功状态""" # 创建测试输入文件 input_dir = self.workspace / "input" (input_dir / "test1.txt").write_text("content1") (input_dir / "test2.txt").write_text("content2") # 执行代码:复制所有文件 code = """ import shutil from pathlib import Path INPUT_DIR = Path('workspace/input') OUTPUT_DIR = Path('workspace/output') success_count = 0 failed_count = 0 total_count = 0 for f in INPUT_DIR.glob('*.txt'): total_count += 1 try: shutil.copy(f, OUTPUT_DIR / f.name) success_count += 1 print(f"成功: {f.name}") except Exception as e: failed_count += 1 print(f"失败: {f.name} - {e}") print(f"\\n总计: {total_count}, 成功: {success_count}, 失败: {failed_count}") """ result = self.runner.execute(code, user_input="复制所有txt文件") # 验证:全部成功 self.assertEqual(result.status, 'success') self.assertEqual(result.total_count, 2) self.assertEqual(result.success_count, 2) self.assertEqual(result.failed_count, 0) self.assertAlmostEqual(result.success_rate, 1.0) self.assertTrue(result.success) def test_execution_result_partial_success(self): """测试:部分成功状态""" # 创建测试输入文件(一个正常,一个只读) input_dir = self.workspace / "input" normal_file = input_dir / "normal.txt" readonly_file = input_dir / "readonly.txt" normal_file.write_text("normal content") readonly_file.write_text("readonly content") # 设置只读(模拟失败场景) if os.name == 'nt': # Windows os.chmod(readonly_file, 0o444) # 执行代码:尝试复制所有文件 code = """ import shutil from pathlib import Path INPUT_DIR = Path('workspace/input') OUTPUT_DIR = Path('workspace/output') success_count = 0 failed_count = 0 total_count = 0 for f in INPUT_DIR.glob('*.txt'): total_count += 1 try: shutil.copy(f, OUTPUT_DIR / f.name) success_count += 1 print(f"成功: {f.name}") except Exception as e: failed_count += 1 print(f"失败: {f.name} - {e}") print(f"\\n总计: {total_count}, 成功: {success_count}, 失败: {failed_count}") """ result = self.runner.execute(code, user_input="复制所有txt文件") # 验证:部分成功(至少有一个成功) self.assertEqual(result.total_count, 2) self.assertGreater(result.success_count, 0) self.assertGreater(result.failed_count, 0) # 根据实际情况判断状态 if result.success_count > 0 and result.failed_count > 0: self.assertEqual(result.status, 'partial') self.assertFalse(result.success) # partial 不算完全成功 # 恢复权限 if os.name == 'nt': os.chmod(readonly_file, 0o666) def test_execution_result_all_failed(self): """测试:全部失败状态""" # 不创建输入文件,导致无文件可处理 # 执行代码:尝试处理不存在的文件 code = """ import shutil from pathlib import Path INPUT_DIR = Path('workspace/input') OUTPUT_DIR = Path('workspace/output') success_count = 0 failed_count = 0 total_count = 0 files = list(INPUT_DIR.glob('*.txt')) if not files: print("错误: 没有找到任何txt文件") total_count = 1 failed_count = 1 else: for f in files: total_count += 1 try: shutil.copy(f, OUTPUT_DIR / f.name) success_count += 1 print(f"成功: {f.name}") except Exception as e: failed_count += 1 print(f"失败: {f.name} - {e}") print(f"\\n总计: {total_count}, 成功: {success_count}, 失败: {failed_count}") """ result = self.runner.execute(code, user_input="复制所有txt文件") # 验证:全部失败 self.assertEqual(result.status, 'failed') self.assertEqual(result.success_count, 0) self.assertFalse(result.success) def test_execution_result_status_display(self): """测试:状态显示文本""" # 测试各种状态的显示文本 # 成功状态 success_result = ExecutionResult( task_id="test_001", success=True, stdout="output", stderr="", duration_ms=100, log_path="/path/to/log", status='success', total_count=5, success_count=5, failed_count=0 ) self.assertIn("✅", success_result.get_status_display()) self.assertIn("全部成功", success_result.get_status_display()) # 部分成功状态 partial_result = ExecutionResult( task_id="test_002", success=False, stdout="output", stderr="", duration_ms=100, log_path="/path/to/log", status='partial', total_count=5, success_count=3, failed_count=2 ) self.assertIn("⚠️", partial_result.get_status_display()) self.assertIn("部分成功", partial_result.get_status_display()) # 失败状态 failed_result = ExecutionResult( task_id="test_003", success=False, stdout="", stderr="error", duration_ms=100, log_path="/path/to/log", status='failed', total_count=5, success_count=0, failed_count=5 ) self.assertIn("❌", failed_result.get_status_display()) self.assertIn("执行失败", failed_result.get_status_display()) class TestEndToEndWorkflow(unittest.TestCase): """ 端到端工作流测试 模拟完整的用户任务执行流程 """ def setUp(self): """创建测试环境""" self.temp_dir = Path(tempfile.mkdtemp()) self.workspace = self.temp_dir / "workspace" self.workspace.mkdir() (self.workspace / "input").mkdir() (self.workspace / "output").mkdir() (self.workspace / "codes").mkdir() (self.workspace / "logs").mkdir() self.history = HistoryManager(self.workspace) self.runner = SandboxRunner(str(self.workspace)) self.rule_checker = RuleChecker() def tearDown(self): """清理测试环境""" shutil.rmtree(self.temp_dir, ignore_errors=True) @patch('llm.client.get_client') def test_complete_execution_workflow(self, mock_get_client): """测试:完整的执行工作流""" # 1. Mock LLM 响应 mock_client = MagicMock() mock_client.chat.return_value = '{"label": "execution", "confidence": 0.95, "reason": "需要复制文件"}' mock_get_client.return_value = mock_client # 2. 意图分类 from intent.classifier import classify_intent intent_result = classify_intent("复制所有图片到输出目录") self.assertEqual(intent_result.label, EXECUTION) # 3. 生成代码(模拟) code = """ import shutil from pathlib import Path INPUT_DIR = Path('workspace/input') OUTPUT_DIR = Path('workspace/output') success_count = 0 total_count = 0 for f in INPUT_DIR.glob('*.png'): total_count += 1 shutil.copy(f, OUTPUT_DIR / f.name) success_count += 1 print(f"已复制: {f.name}") print(f"\\n总计: {total_count}, 成功: {success_count}") """ # 4. 安全检查 safety_result = self.rule_checker.check(code) self.assertTrue(safety_result.passed, "安全代码应该通过检查") # 5. 准备输入文件 input_dir = self.workspace / "input" (input_dir / "image1.png").write_bytes(b"fake png data 1") (input_dir / "image2.png").write_bytes(b"fake png data 2") # 6. 执行代码 exec_result = self.runner.execute(code, user_input="复制所有图片到输出目录") # 7. 验证执行结果 self.assertTrue(exec_result.success) self.assertEqual(exec_result.status, 'success') self.assertEqual(exec_result.total_count, 2) self.assertEqual(exec_result.success_count, 2) # 8. 保存历史记录 self.history.add_record( task_id=exec_result.task_id, user_input="复制所有图片到输出目录", intent_label=intent_result.label, intent_confidence=intent_result.confidence, execution_plan="复制png文件", code=code, success=exec_result.success, duration_ms=exec_result.duration_ms, stdout=exec_result.stdout, stderr=exec_result.stderr, log_path=exec_result.log_path, task_summary="复制图片" ) # 9. 验证历史记录 records = self.history.get_all() self.assertEqual(len(records), 1) self.assertTrue(records[0].success) def test_workflow_with_security_block(self): """测试:安全检查拦截的工作流""" # 1. 生成危险代码 dangerous_code = """ import subprocess # 危险操作:执行系统命令 subprocess.run(['dir'], shell=True) """ # 2. 安全检查 safety_result = self.rule_checker.check(dangerous_code) # 3. 验证:必须被拦截 self.assertFalse(safety_result.passed) self.assertTrue(any('subprocess' in v for v in safety_result.violations)) # 4. 不应该执行代码 # (在实际应用中,安全检查失败后会直接返回,不会执行) class TestSecurityMetricsTracking(unittest.TestCase): """ 安全指标追踪测试 验证安全相关的度量指标 """ def setUp(self): """创建测试环境""" self.temp_dir = Path(tempfile.mkdtemp()) def tearDown(self): """清理测试环境""" shutil.rmtree(self.temp_dir, ignore_errors=True) def test_security_metrics_reuse_tracking(self): """测试:复用安全指标追踪""" from safety.security_metrics import SecurityMetrics metrics = SecurityMetrics(workspace_path=self.temp_dir) # 1. 记录复用复检 metrics.add_reuse_recheck() metrics.add_reuse_recheck() # 2. 记录复用拦截 metrics.add_reuse_block() # 3. 验证统计 stats = metrics.get_stats() self.assertEqual(stats['reuse_recheck_count'], 2) self.assertEqual(stats['reuse_block_count'], 1) self.assertAlmostEqual(stats['reuse_block_rate'], 0.5) if __name__ == '__main__': # 运行测试并生成详细报告 unittest.main(verbosity=2)