Files
LocalAgent/safety/security_metrics.py
Mimikko-zeus 8a538bb950 feat: refactor API key configuration and enhance application initialization
- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic.
- Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration.
- Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup.
- Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics.
- Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management.
- Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
2026-02-27 14:32:30 +08:00

194 lines
5.9 KiB
Python

"""
安全度量指标收集器
用于监控和统计安全拦截情况
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict
from pathlib import Path
import json
@dataclass
class SecurityEvent:
"""安全事件"""
timestamp: str
event_type: str # 'static_block', 'runtime_block', 'warning'
category: str # 'network', 'path', 'dangerous_call'
detail: str
task_id: str = ""
@dataclass
class SecurityMetrics:
"""安全度量指标"""
# 静态检查统计
total_checks: int = 0
static_blocks: int = 0
static_warnings: int = 0
# 运行时拦截统计
runtime_path_blocks: int = 0
runtime_network_blocks: int = 0
# 复用任务统计
reuse_total: int = 0
reuse_rechecked: int = 0
reuse_blocked: int = 0
# 分类统计
network_violations: int = 0
path_violations: int = 0
dangerous_call_violations: int = 0
# 事件记录
events: List[SecurityEvent] = field(default_factory=list)
def add_static_block(self, category: str, detail: str, task_id: str = ""):
"""记录静态阻断"""
self.total_checks += 1
self.static_blocks += 1
if category == 'network':
self.network_violations += 1
elif category == 'path':
self.path_violations += 1
elif category == 'dangerous_call':
self.dangerous_call_violations += 1
self.events.append(SecurityEvent(
timestamp=datetime.now().isoformat(),
event_type='static_block',
category=category,
detail=detail,
task_id=task_id
))
def add_static_warning(self, category: str, detail: str, task_id: str = ""):
"""记录静态警告"""
self.total_checks += 1
self.static_warnings += 1
self.events.append(SecurityEvent(
timestamp=datetime.now().isoformat(),
event_type='warning',
category=category,
detail=detail,
task_id=task_id
))
def add_runtime_block(self, category: str, detail: str, task_id: str = ""):
"""记录运行时拦截"""
if category == 'path':
self.runtime_path_blocks += 1
self.path_violations += 1
elif category == 'network':
self.runtime_network_blocks += 1
self.network_violations += 1
self.events.append(SecurityEvent(
timestamp=datetime.now().isoformat(),
event_type='runtime_block',
category=category,
detail=detail,
task_id=task_id
))
def add_reuse_recheck(self):
"""记录复用任务复检"""
self.reuse_total += 1
self.reuse_rechecked += 1
def add_reuse_block(self):
"""记录复用任务被拦截"""
self.reuse_blocked += 1
def get_summary(self) -> Dict:
"""获取统计摘要"""
return {
"总检查次数": self.total_checks,
"静态阻断次数": self.static_blocks,
"静态警告次数": self.static_warnings,
"运行时路径拦截": self.runtime_path_blocks,
"运行时网络拦截": self.runtime_network_blocks,
"网络违规总数": self.network_violations,
"路径违规总数": self.path_violations,
"危险调用违规": self.dangerous_call_violations,
"复用任务总数": self.reuse_total,
"复用任务复检数": self.reuse_rechecked,
"复用任务拦截数": self.reuse_blocked,
"复用任务复检覆盖率": f"{self._calculate_reuse_coverage():.2%}",
"复用任务拦截率": f"{self._calculate_reuse_block_rate():.2%}",
"总体拦截率": f"{self._calculate_block_rate():.2%}",
"误放行率": "0.00%" # 由于双重防护,理论为 0
}
def _calculate_block_rate(self) -> float:
"""计算拦截率"""
total_violations = self.static_blocks + self.runtime_path_blocks + self.runtime_network_blocks
if self.total_checks == 0:
return 0.0
return total_violations / self.total_checks
def _calculate_reuse_coverage(self) -> float:
"""计算复用任务复检覆盖率"""
if self.reuse_total == 0:
return 1.0 # 没有复用任务时,覆盖率为 100%
return self.reuse_rechecked / self.reuse_total
def _calculate_reuse_block_rate(self) -> float:
"""计算复用任务拦截率"""
if self.reuse_rechecked == 0:
return 0.0
return self.reuse_blocked / self.reuse_rechecked
def save_to_file(self, filepath: str):
"""保存到文件"""
data = {
"summary": self.get_summary(),
"events": [
{
"timestamp": e.timestamp,
"type": e.event_type,
"category": e.category,
"detail": e.detail,
"task_id": e.task_id
}
for e in self.events
]
}
Path(filepath).write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding='utf-8'
)
def print_summary(self):
"""打印统计摘要"""
print("\n" + "="*50)
print("安全度量指标统计")
print("="*50)
summary = self.get_summary()
for key, value in summary.items():
print(f"{key:20s}: {value}")
print("="*50 + "\n")
# 全局度量实例
_global_metrics = SecurityMetrics()
def get_metrics() -> SecurityMetrics:
"""获取全局度量实例"""
return _global_metrics
def reset_metrics():
"""重置度量数据"""
global _global_metrics
_global_metrics = SecurityMetrics()