""" 数据治理单元测试 """ import unittest import tempfile import json from pathlib import Path from datetime import datetime, timedelta from history.data_sanitizer import DataSanitizer, SensitiveType from history.data_governance import DataGovernancePolicy, DataLevel from history.manager import HistoryManager class TestDataSanitizer(unittest.TestCase): """测试数据脱敏器""" def setUp(self): self.sanitizer = DataSanitizer() def test_file_path_detection(self): """测试文件路径检测""" text = "文件保存在 C:\\Users\\test\\document.txt 中" matches = self.sanitizer.find_sensitive_data(text) self.assertTrue(any(m.type == SensitiveType.FILE_PATH for m in matches)) def test_email_detection(self): """测试邮箱检测""" text = "联系邮箱: test@example.com" matches = self.sanitizer.find_sensitive_data(text) self.assertTrue(any(m.type == SensitiveType.EMAIL for m in matches)) def test_phone_detection(self): """测试电话号码检测""" text = "手机号: 13812345678" matches = self.sanitizer.find_sensitive_data(text) self.assertTrue(any(m.type == SensitiveType.PHONE for m in matches)) def test_ip_detection(self): """测试IP地址检测""" text = "服务器地址: 192.168.1.100" matches = self.sanitizer.find_sensitive_data(text) self.assertTrue(any(m.type == SensitiveType.IP_ADDRESS for m in matches)) def test_sanitize_text(self): """测试文本脱敏""" text = "邮箱 test@example.com 手机 13812345678" sanitized, matches = self.sanitizer.sanitize(text) self.assertNotIn("test@example.com", sanitized) self.assertNotIn("13812345678", sanitized) self.assertEqual(len(matches), 2) def test_sensitivity_score(self): """测试敏感度评分""" # 低敏感度 low_text = "这是一段普通文本" self.assertLess(self.sanitizer.get_sensitivity_score(low_text), 0.3) # 高敏感度(使用更明显的敏感信息) high_text = "密码: password123, API密钥: sk-1234567890abcdefghijklmnopqrstuvwxyz123456789012, 邮箱: admin@company.com, 手机: 13812345678" self.assertGreater(self.sanitizer.get_sensitivity_score(high_text), 0.5) class TestDataGovernance(unittest.TestCase): """测试数据治理策略""" def setUp(self): self.temp_dir = Path(tempfile.mkdtemp()) self.policy = DataGovernancePolicy(self.temp_dir) def tearDown(self): import shutil shutil.rmtree(self.temp_dir, ignore_errors=True) def test_classify_low_sensitivity(self): """测试低敏感度分类""" record = { 'user_input': '计算1+1', 'code': 'print(1+1)', 'stdout': '2', 'stderr': '', 'execution_plan': '执行简单计算' } classification = self.policy.classify_record(record) self.assertEqual(classification.level, DataLevel.FULL) self.assertLess(classification.sensitivity_score, 0.3) def test_classify_high_sensitivity(self): """测试高敏感度分类""" record = { 'user_input': '读取配置文件 /etc/config.json', 'code': 'password = "secret123"\napi_key = "sk-1234567890abcdefghijklmnopqrstuvwxyz123456789012"', 'stdout': 'API_KEY=sk-1234567890abcdefghijklmnopqrstuvwxyz123456789012\nemail=admin@company.com\nphone=13812345678', 'stderr': 'Error at /home/user/secret/config.json', 'execution_plan': '读取敏感配置' } classification = self.policy.classify_record(record) # 由于敏感信息较多,应该至少是脱敏级别 self.assertGreater(classification.sensitivity_score, 0.2) def test_apply_policy_minimal(self): """测试最小化策略应用""" record = { 'task_id': 'test-001', 'timestamp': datetime.now().isoformat(), 'user_input': 'password=secret123', 'code': 'API_KEY="sk-test"', 'stdout': 'token: abc123', 'stderr': '', 'execution_plan': '测试', 'intent_label': 'test', 'intent_confidence': 0.9, 'success': True, 'duration_ms': 100, 'log_path': '', 'task_summary': '测试任务' } result = self.policy.apply_policy(record) # 应该有治理元数据 self.assertIn('_governance', result) self.assertIn('level', result['_governance']) def test_expiration_check(self): """测试过期检查""" # 未过期记录 record_valid = { '_governance': { 'expires_at': (datetime.now() + timedelta(days=1)).isoformat() } } self.assertFalse(self.policy.check_expiration(record_valid)) # 已过期记录 record_expired = { '_governance': { 'expires_at': (datetime.now() - timedelta(days=1)).isoformat() } } self.assertTrue(self.policy.check_expiration(record_expired)) def test_cleanup_expired(self): """测试过期清理""" records = [ { 'task_id': '1', '_governance': { 'level': DataLevel.FULL.value, 'expires_at': (datetime.now() - timedelta(days=1)).isoformat(), 'sensitive_fields': [] } }, { 'task_id': '2', '_governance': { 'level': DataLevel.SANITIZED.value, 'expires_at': (datetime.now() - timedelta(days=1)).isoformat() } }, { 'task_id': '3', '_governance': { 'level': DataLevel.MINIMAL.value, 'expires_at': (datetime.now() - timedelta(days=1)).isoformat() } } ] kept, archived, deleted = self.policy.cleanup_expired(records) # 完整数据应降级,脱敏数据应归档,最小化数据应删除 self.assertGreater(len(kept), 0) self.assertGreater(archived + deleted, 0) class TestHistoryManager(unittest.TestCase): """测试历史记录管理器""" def setUp(self): self.temp_dir = Path(tempfile.mkdtemp()) self.manager = HistoryManager(self.temp_dir) def tearDown(self): import shutil shutil.rmtree(self.temp_dir, ignore_errors=True) def test_add_record_with_governance(self): """测试添加记录时应用治理策略""" record = self.manager.add_record( task_id='test-001', user_input='测试输入', intent_label='test', intent_confidence=0.9, execution_plan='测试计划', code='print("test")', success=True, duration_ms=100, stdout='test', stderr='', log_path='', task_summary='测试' ) self.assertIsNotNone(record) self.assertEqual(record.task_id, 'test-001') def test_save_and_load_with_governance(self): """测试保存和加载带治理元数据的记录""" self.manager.add_record( task_id='test-002', user_input='测试', intent_label='test', intent_confidence=0.9, execution_plan='测试', code='test', success=True, duration_ms=100 ) # 重新加载 new_manager = HistoryManager(self.temp_dir) records = new_manager.get_all() self.assertEqual(len(records), 1) self.assertEqual(records[0].task_id, 'test-002') def test_manual_cleanup(self): """测试手动清理""" # 添加一条过期记录 self.manager.add_record( task_id='test-003', user_input='测试', intent_label='test', intent_confidence=0.9, execution_plan='测试', code='test', success=True, duration_ms=100 ) # 手动修改过期时间 if self.manager._history: record_dict = { 'task_id': 'test-004', 'timestamp': datetime.now().isoformat(), 'user_input': 'test', 'intent_label': 'test', 'intent_confidence': 0.9, 'execution_plan': 'test', 'code': 'test', 'success': True, 'duration_ms': 100, 'stdout': '', 'stderr': '', 'log_path': '', 'task_summary': '', '_governance': { 'level': DataLevel.MINIMAL.value, 'expires_at': (datetime.now() - timedelta(days=1)).isoformat() }, '_sanitization': None } from history.manager import TaskRecord self.manager._history.append(TaskRecord(**record_dict)) self.manager._save() stats = self.manager.manual_cleanup() self.assertIn('archived', stats) self.assertIn('deleted', stats) self.assertIn('remaining', stats) def test_export_sanitized(self): """测试导出脱敏数据""" self.manager.add_record( task_id='test-005', user_input='测试邮箱 test@example.com', intent_label='test', intent_confidence=0.9, execution_plan='测试', code='test', success=True, duration_ms=100 ) export_path = self.temp_dir / "export.json" count = self.manager.export_sanitized(export_path) self.assertGreater(count, 0) self.assertTrue(export_path.exists()) # 验证导出内容 with open(export_path, 'r', encoding='utf-8') as f: data = json.load(f) self.assertEqual(len(data), count) def run_tests(): """运行所有测试""" loader = unittest.TestLoader() suite = unittest.TestSuite() suite.addTests(loader.loadTestsFromTestCase(TestDataSanitizer)) suite.addTests(loader.loadTestsFromTestCase(TestDataGovernance)) suite.addTests(loader.loadTestsFromTestCase(TestHistoryManager)) runner = unittest.TextTestRunner(verbosity=2) result = runner.run(suite) return result.wasSuccessful() if __name__ == '__main__': success = run_tests() exit(0 if success else 1)