- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic. - Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration. - Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup. - Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics. - Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management. - Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
327 lines
11 KiB
Python
327 lines
11 KiB
Python
"""
|
|
数据治理单元测试
|
|
"""
|
|
|
|
import unittest
|
|
import tempfile
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
|
|
from history.data_sanitizer import DataSanitizer, SensitiveType
|
|
from history.data_governance import DataGovernancePolicy, DataLevel
|
|
from history.manager import HistoryManager
|
|
|
|
|
|
class TestDataSanitizer(unittest.TestCase):
|
|
"""测试数据脱敏器"""
|
|
|
|
def setUp(self):
|
|
self.sanitizer = DataSanitizer()
|
|
|
|
def test_file_path_detection(self):
|
|
"""测试文件路径检测"""
|
|
text = "文件保存在 C:\\Users\\test\\document.txt 中"
|
|
matches = self.sanitizer.find_sensitive_data(text)
|
|
|
|
self.assertTrue(any(m.type == SensitiveType.FILE_PATH for m in matches))
|
|
|
|
def test_email_detection(self):
|
|
"""测试邮箱检测"""
|
|
text = "联系邮箱: test@example.com"
|
|
matches = self.sanitizer.find_sensitive_data(text)
|
|
|
|
self.assertTrue(any(m.type == SensitiveType.EMAIL for m in matches))
|
|
|
|
def test_phone_detection(self):
|
|
"""测试电话号码检测"""
|
|
text = "手机号: 13812345678"
|
|
matches = self.sanitizer.find_sensitive_data(text)
|
|
|
|
self.assertTrue(any(m.type == SensitiveType.PHONE for m in matches))
|
|
|
|
def test_ip_detection(self):
|
|
"""测试IP地址检测"""
|
|
text = "服务器地址: 192.168.1.100"
|
|
matches = self.sanitizer.find_sensitive_data(text)
|
|
|
|
self.assertTrue(any(m.type == SensitiveType.IP_ADDRESS for m in matches))
|
|
|
|
def test_sanitize_text(self):
|
|
"""测试文本脱敏"""
|
|
text = "邮箱 test@example.com 手机 13812345678"
|
|
sanitized, matches = self.sanitizer.sanitize(text)
|
|
|
|
self.assertNotIn("test@example.com", sanitized)
|
|
self.assertNotIn("13812345678", sanitized)
|
|
self.assertEqual(len(matches), 2)
|
|
|
|
def test_sensitivity_score(self):
|
|
"""测试敏感度评分"""
|
|
# 低敏感度
|
|
low_text = "这是一段普通文本"
|
|
self.assertLess(self.sanitizer.get_sensitivity_score(low_text), 0.3)
|
|
|
|
# 高敏感度(使用更明显的敏感信息)
|
|
high_text = "密码: password123, API密钥: sk-1234567890abcdefghijklmnopqrstuvwxyz123456789012, 邮箱: admin@company.com, 手机: 13812345678"
|
|
self.assertGreater(self.sanitizer.get_sensitivity_score(high_text), 0.5)
|
|
|
|
|
|
class TestDataGovernance(unittest.TestCase):
|
|
"""测试数据治理策略"""
|
|
|
|
def setUp(self):
|
|
self.temp_dir = Path(tempfile.mkdtemp())
|
|
self.policy = DataGovernancePolicy(self.temp_dir)
|
|
|
|
def tearDown(self):
|
|
import shutil
|
|
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
|
|
|
def test_classify_low_sensitivity(self):
|
|
"""测试低敏感度分类"""
|
|
record = {
|
|
'user_input': '计算1+1',
|
|
'code': 'print(1+1)',
|
|
'stdout': '2',
|
|
'stderr': '',
|
|
'execution_plan': '执行简单计算'
|
|
}
|
|
|
|
classification = self.policy.classify_record(record)
|
|
self.assertEqual(classification.level, DataLevel.FULL)
|
|
self.assertLess(classification.sensitivity_score, 0.3)
|
|
|
|
def test_classify_high_sensitivity(self):
|
|
"""测试高敏感度分类"""
|
|
record = {
|
|
'user_input': '读取配置文件 /etc/config.json',
|
|
'code': 'password = "secret123"\napi_key = "sk-1234567890abcdefghijklmnopqrstuvwxyz123456789012"',
|
|
'stdout': 'API_KEY=sk-1234567890abcdefghijklmnopqrstuvwxyz123456789012\nemail=admin@company.com\nphone=13812345678',
|
|
'stderr': 'Error at /home/user/secret/config.json',
|
|
'execution_plan': '读取敏感配置'
|
|
}
|
|
|
|
classification = self.policy.classify_record(record)
|
|
# 由于敏感信息较多,应该至少是脱敏级别
|
|
self.assertGreater(classification.sensitivity_score, 0.2)
|
|
|
|
def test_apply_policy_minimal(self):
|
|
"""测试最小化策略应用"""
|
|
record = {
|
|
'task_id': 'test-001',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'user_input': 'password=secret123',
|
|
'code': 'API_KEY="sk-test"',
|
|
'stdout': 'token: abc123',
|
|
'stderr': '',
|
|
'execution_plan': '测试',
|
|
'intent_label': 'test',
|
|
'intent_confidence': 0.9,
|
|
'success': True,
|
|
'duration_ms': 100,
|
|
'log_path': '',
|
|
'task_summary': '测试任务'
|
|
}
|
|
|
|
result = self.policy.apply_policy(record)
|
|
|
|
# 应该有治理元数据
|
|
self.assertIn('_governance', result)
|
|
self.assertIn('level', result['_governance'])
|
|
|
|
def test_expiration_check(self):
|
|
"""测试过期检查"""
|
|
# 未过期记录
|
|
record_valid = {
|
|
'_governance': {
|
|
'expires_at': (datetime.now() + timedelta(days=1)).isoformat()
|
|
}
|
|
}
|
|
self.assertFalse(self.policy.check_expiration(record_valid))
|
|
|
|
# 已过期记录
|
|
record_expired = {
|
|
'_governance': {
|
|
'expires_at': (datetime.now() - timedelta(days=1)).isoformat()
|
|
}
|
|
}
|
|
self.assertTrue(self.policy.check_expiration(record_expired))
|
|
|
|
def test_cleanup_expired(self):
|
|
"""测试过期清理"""
|
|
records = [
|
|
{
|
|
'task_id': '1',
|
|
'_governance': {
|
|
'level': DataLevel.FULL.value,
|
|
'expires_at': (datetime.now() - timedelta(days=1)).isoformat(),
|
|
'sensitive_fields': []
|
|
}
|
|
},
|
|
{
|
|
'task_id': '2',
|
|
'_governance': {
|
|
'level': DataLevel.SANITIZED.value,
|
|
'expires_at': (datetime.now() - timedelta(days=1)).isoformat()
|
|
}
|
|
},
|
|
{
|
|
'task_id': '3',
|
|
'_governance': {
|
|
'level': DataLevel.MINIMAL.value,
|
|
'expires_at': (datetime.now() - timedelta(days=1)).isoformat()
|
|
}
|
|
}
|
|
]
|
|
|
|
kept, archived, deleted = self.policy.cleanup_expired(records)
|
|
|
|
# 完整数据应降级,脱敏数据应归档,最小化数据应删除
|
|
self.assertGreater(len(kept), 0)
|
|
self.assertGreater(archived + deleted, 0)
|
|
|
|
|
|
class TestHistoryManager(unittest.TestCase):
|
|
"""测试历史记录管理器"""
|
|
|
|
def setUp(self):
|
|
self.temp_dir = Path(tempfile.mkdtemp())
|
|
self.manager = HistoryManager(self.temp_dir)
|
|
|
|
def tearDown(self):
|
|
import shutil
|
|
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
|
|
|
def test_add_record_with_governance(self):
|
|
"""测试添加记录时应用治理策略"""
|
|
record = self.manager.add_record(
|
|
task_id='test-001',
|
|
user_input='测试输入',
|
|
intent_label='test',
|
|
intent_confidence=0.9,
|
|
execution_plan='测试计划',
|
|
code='print("test")',
|
|
success=True,
|
|
duration_ms=100,
|
|
stdout='test',
|
|
stderr='',
|
|
log_path='',
|
|
task_summary='测试'
|
|
)
|
|
|
|
self.assertIsNotNone(record)
|
|
self.assertEqual(record.task_id, 'test-001')
|
|
|
|
def test_save_and_load_with_governance(self):
|
|
"""测试保存和加载带治理元数据的记录"""
|
|
self.manager.add_record(
|
|
task_id='test-002',
|
|
user_input='测试',
|
|
intent_label='test',
|
|
intent_confidence=0.9,
|
|
execution_plan='测试',
|
|
code='test',
|
|
success=True,
|
|
duration_ms=100
|
|
)
|
|
|
|
# 重新加载
|
|
new_manager = HistoryManager(self.temp_dir)
|
|
records = new_manager.get_all()
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0].task_id, 'test-002')
|
|
|
|
def test_manual_cleanup(self):
|
|
"""测试手动清理"""
|
|
# 添加一条过期记录
|
|
self.manager.add_record(
|
|
task_id='test-003',
|
|
user_input='测试',
|
|
intent_label='test',
|
|
intent_confidence=0.9,
|
|
execution_plan='测试',
|
|
code='test',
|
|
success=True,
|
|
duration_ms=100
|
|
)
|
|
|
|
# 手动修改过期时间
|
|
if self.manager._history:
|
|
record_dict = {
|
|
'task_id': 'test-004',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'user_input': 'test',
|
|
'intent_label': 'test',
|
|
'intent_confidence': 0.9,
|
|
'execution_plan': 'test',
|
|
'code': 'test',
|
|
'success': True,
|
|
'duration_ms': 100,
|
|
'stdout': '',
|
|
'stderr': '',
|
|
'log_path': '',
|
|
'task_summary': '',
|
|
'_governance': {
|
|
'level': DataLevel.MINIMAL.value,
|
|
'expires_at': (datetime.now() - timedelta(days=1)).isoformat()
|
|
},
|
|
'_sanitization': None
|
|
}
|
|
|
|
from history.manager import TaskRecord
|
|
self.manager._history.append(TaskRecord(**record_dict))
|
|
self.manager._save()
|
|
|
|
stats = self.manager.manual_cleanup()
|
|
|
|
self.assertIn('archived', stats)
|
|
self.assertIn('deleted', stats)
|
|
self.assertIn('remaining', stats)
|
|
|
|
def test_export_sanitized(self):
|
|
"""测试导出脱敏数据"""
|
|
self.manager.add_record(
|
|
task_id='test-005',
|
|
user_input='测试邮箱 test@example.com',
|
|
intent_label='test',
|
|
intent_confidence=0.9,
|
|
execution_plan='测试',
|
|
code='test',
|
|
success=True,
|
|
duration_ms=100
|
|
)
|
|
|
|
export_path = self.temp_dir / "export.json"
|
|
count = self.manager.export_sanitized(export_path)
|
|
|
|
self.assertGreater(count, 0)
|
|
self.assertTrue(export_path.exists())
|
|
|
|
# 验证导出内容
|
|
with open(export_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
self.assertEqual(len(data), count)
|
|
|
|
|
|
def run_tests():
|
|
"""运行所有测试"""
|
|
loader = unittest.TestLoader()
|
|
suite = unittest.TestSuite()
|
|
|
|
suite.addTests(loader.loadTestsFromTestCase(TestDataSanitizer))
|
|
suite.addTests(loader.loadTestsFromTestCase(TestDataGovernance))
|
|
suite.addTests(loader.loadTestsFromTestCase(TestHistoryManager))
|
|
|
|
runner = unittest.TextTestRunner(verbosity=2)
|
|
result = runner.run(suite)
|
|
|
|
return result.wasSuccessful()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
success = run_tests()
|
|
exit(0 if success else 1)
|
|
|