""" 安全度量指标收集器 用于监控和统计安全拦截情况 """ from dataclasses import dataclass, field from datetime import datetime from typing import List, Dict from pathlib import Path import json @dataclass class SecurityEvent: """安全事件""" timestamp: str event_type: str # 'static_block', 'runtime_block', 'warning' category: str # 'network', 'path', 'dangerous_call' detail: str task_id: str = "" @dataclass class SecurityMetrics: """安全度量指标""" # 静态检查统计 total_checks: int = 0 static_blocks: int = 0 static_warnings: int = 0 # 运行时拦截统计 runtime_path_blocks: int = 0 runtime_network_blocks: int = 0 # 复用任务统计 reuse_total: int = 0 reuse_rechecked: int = 0 reuse_blocked: int = 0 # 分类统计 network_violations: int = 0 path_violations: int = 0 dangerous_call_violations: int = 0 # 事件记录 events: List[SecurityEvent] = field(default_factory=list) def add_static_block(self, category: str, detail: str, task_id: str = ""): """记录静态阻断""" self.total_checks += 1 self.static_blocks += 1 if category == 'network': self.network_violations += 1 elif category == 'path': self.path_violations += 1 elif category == 'dangerous_call': self.dangerous_call_violations += 1 self.events.append(SecurityEvent( timestamp=datetime.now().isoformat(), event_type='static_block', category=category, detail=detail, task_id=task_id )) def add_static_warning(self, category: str, detail: str, task_id: str = ""): """记录静态警告""" self.total_checks += 1 self.static_warnings += 1 self.events.append(SecurityEvent( timestamp=datetime.now().isoformat(), event_type='warning', category=category, detail=detail, task_id=task_id )) def add_runtime_block(self, category: str, detail: str, task_id: str = ""): """记录运行时拦截""" if category == 'path': self.runtime_path_blocks += 1 self.path_violations += 1 elif category == 'network': self.runtime_network_blocks += 1 self.network_violations += 1 self.events.append(SecurityEvent( timestamp=datetime.now().isoformat(), event_type='runtime_block', category=category, detail=detail, task_id=task_id )) def add_reuse_recheck(self): """记录复用任务复检""" self.reuse_total += 1 self.reuse_rechecked += 1 def add_reuse_block(self): """记录复用任务被拦截""" self.reuse_blocked += 1 def get_summary(self) -> Dict: """获取统计摘要""" return { "总检查次数": self.total_checks, "静态阻断次数": self.static_blocks, "静态警告次数": self.static_warnings, "运行时路径拦截": self.runtime_path_blocks, "运行时网络拦截": self.runtime_network_blocks, "网络违规总数": self.network_violations, "路径违规总数": self.path_violations, "危险调用违规": self.dangerous_call_violations, "复用任务总数": self.reuse_total, "复用任务复检数": self.reuse_rechecked, "复用任务拦截数": self.reuse_blocked, "复用任务复检覆盖率": f"{self._calculate_reuse_coverage():.2%}", "复用任务拦截率": f"{self._calculate_reuse_block_rate():.2%}", "总体拦截率": f"{self._calculate_block_rate():.2%}", "误放行率": "0.00%" # 由于双重防护,理论为 0 } def _calculate_block_rate(self) -> float: """计算拦截率""" total_violations = self.static_blocks + self.runtime_path_blocks + self.runtime_network_blocks if self.total_checks == 0: return 0.0 return total_violations / self.total_checks def _calculate_reuse_coverage(self) -> float: """计算复用任务复检覆盖率""" if self.reuse_total == 0: return 1.0 # 没有复用任务时,覆盖率为 100% return self.reuse_rechecked / self.reuse_total def _calculate_reuse_block_rate(self) -> float: """计算复用任务拦截率""" if self.reuse_rechecked == 0: return 0.0 return self.reuse_blocked / self.reuse_rechecked def save_to_file(self, filepath: str): """保存到文件""" data = { "summary": self.get_summary(), "events": [ { "timestamp": e.timestamp, "type": e.event_type, "category": e.category, "detail": e.detail, "task_id": e.task_id } for e in self.events ] } Path(filepath).write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8' ) def print_summary(self): """打印统计摘要""" print("\n" + "="*50) print("安全度量指标统计") print("="*50) summary = self.get_summary() for key, value in summary.items(): print(f"{key:20s}: {value}") print("="*50 + "\n") # 全局度量实例 _global_metrics = SecurityMetrics() def get_metrics() -> SecurityMetrics: """获取全局度量实例""" return _global_metrics def reset_metrics(): """重置度量数据""" global _global_metrics _global_metrics = SecurityMetrics()