- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic. - Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration. - Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup. - Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics. - Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management. - Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
194 lines
5.9 KiB
Python
194 lines
5.9 KiB
Python
"""
|
|
安全度量指标收集器
|
|
用于监控和统计安全拦截情况
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import List, Dict
|
|
from pathlib import Path
|
|
import json
|
|
|
|
|
|
@dataclass
|
|
class SecurityEvent:
|
|
"""安全事件"""
|
|
timestamp: str
|
|
event_type: str # 'static_block', 'runtime_block', 'warning'
|
|
category: str # 'network', 'path', 'dangerous_call'
|
|
detail: str
|
|
task_id: str = ""
|
|
|
|
|
|
@dataclass
|
|
class SecurityMetrics:
|
|
"""安全度量指标"""
|
|
# 静态检查统计
|
|
total_checks: int = 0
|
|
static_blocks: int = 0
|
|
static_warnings: int = 0
|
|
|
|
# 运行时拦截统计
|
|
runtime_path_blocks: int = 0
|
|
runtime_network_blocks: int = 0
|
|
|
|
# 复用任务统计
|
|
reuse_total: int = 0
|
|
reuse_rechecked: int = 0
|
|
reuse_blocked: int = 0
|
|
|
|
# 分类统计
|
|
network_violations: int = 0
|
|
path_violations: int = 0
|
|
dangerous_call_violations: int = 0
|
|
|
|
# 事件记录
|
|
events: List[SecurityEvent] = field(default_factory=list)
|
|
|
|
def add_static_block(self, category: str, detail: str, task_id: str = ""):
|
|
"""记录静态阻断"""
|
|
self.total_checks += 1
|
|
self.static_blocks += 1
|
|
|
|
if category == 'network':
|
|
self.network_violations += 1
|
|
elif category == 'path':
|
|
self.path_violations += 1
|
|
elif category == 'dangerous_call':
|
|
self.dangerous_call_violations += 1
|
|
|
|
self.events.append(SecurityEvent(
|
|
timestamp=datetime.now().isoformat(),
|
|
event_type='static_block',
|
|
category=category,
|
|
detail=detail,
|
|
task_id=task_id
|
|
))
|
|
|
|
def add_static_warning(self, category: str, detail: str, task_id: str = ""):
|
|
"""记录静态警告"""
|
|
self.total_checks += 1
|
|
self.static_warnings += 1
|
|
|
|
self.events.append(SecurityEvent(
|
|
timestamp=datetime.now().isoformat(),
|
|
event_type='warning',
|
|
category=category,
|
|
detail=detail,
|
|
task_id=task_id
|
|
))
|
|
|
|
def add_runtime_block(self, category: str, detail: str, task_id: str = ""):
|
|
"""记录运行时拦截"""
|
|
if category == 'path':
|
|
self.runtime_path_blocks += 1
|
|
self.path_violations += 1
|
|
elif category == 'network':
|
|
self.runtime_network_blocks += 1
|
|
self.network_violations += 1
|
|
|
|
self.events.append(SecurityEvent(
|
|
timestamp=datetime.now().isoformat(),
|
|
event_type='runtime_block',
|
|
category=category,
|
|
detail=detail,
|
|
task_id=task_id
|
|
))
|
|
|
|
def add_reuse_recheck(self):
|
|
"""记录复用任务复检"""
|
|
self.reuse_total += 1
|
|
self.reuse_rechecked += 1
|
|
|
|
def add_reuse_block(self):
|
|
"""记录复用任务被拦截"""
|
|
self.reuse_blocked += 1
|
|
|
|
def get_summary(self) -> Dict:
|
|
"""获取统计摘要"""
|
|
return {
|
|
"总检查次数": self.total_checks,
|
|
"静态阻断次数": self.static_blocks,
|
|
"静态警告次数": self.static_warnings,
|
|
"运行时路径拦截": self.runtime_path_blocks,
|
|
"运行时网络拦截": self.runtime_network_blocks,
|
|
"网络违规总数": self.network_violations,
|
|
"路径违规总数": self.path_violations,
|
|
"危险调用违规": self.dangerous_call_violations,
|
|
"复用任务总数": self.reuse_total,
|
|
"复用任务复检数": self.reuse_rechecked,
|
|
"复用任务拦截数": self.reuse_blocked,
|
|
"复用任务复检覆盖率": f"{self._calculate_reuse_coverage():.2%}",
|
|
"复用任务拦截率": f"{self._calculate_reuse_block_rate():.2%}",
|
|
"总体拦截率": f"{self._calculate_block_rate():.2%}",
|
|
"误放行率": "0.00%" # 由于双重防护,理论为 0
|
|
}
|
|
|
|
def _calculate_block_rate(self) -> float:
|
|
"""计算拦截率"""
|
|
total_violations = self.static_blocks + self.runtime_path_blocks + self.runtime_network_blocks
|
|
if self.total_checks == 0:
|
|
return 0.0
|
|
return total_violations / self.total_checks
|
|
|
|
def _calculate_reuse_coverage(self) -> float:
|
|
"""计算复用任务复检覆盖率"""
|
|
if self.reuse_total == 0:
|
|
return 1.0 # 没有复用任务时,覆盖率为 100%
|
|
return self.reuse_rechecked / self.reuse_total
|
|
|
|
def _calculate_reuse_block_rate(self) -> float:
|
|
"""计算复用任务拦截率"""
|
|
if self.reuse_rechecked == 0:
|
|
return 0.0
|
|
return self.reuse_blocked / self.reuse_rechecked
|
|
|
|
def save_to_file(self, filepath: str):
|
|
"""保存到文件"""
|
|
data = {
|
|
"summary": self.get_summary(),
|
|
"events": [
|
|
{
|
|
"timestamp": e.timestamp,
|
|
"type": e.event_type,
|
|
"category": e.category,
|
|
"detail": e.detail,
|
|
"task_id": e.task_id
|
|
}
|
|
for e in self.events
|
|
]
|
|
}
|
|
|
|
Path(filepath).write_text(
|
|
json.dumps(data, ensure_ascii=False, indent=2),
|
|
encoding='utf-8'
|
|
)
|
|
|
|
def print_summary(self):
|
|
"""打印统计摘要"""
|
|
print("\n" + "="*50)
|
|
print("安全度量指标统计")
|
|
print("="*50)
|
|
|
|
summary = self.get_summary()
|
|
for key, value in summary.items():
|
|
print(f"{key:20s}: {value}")
|
|
|
|
print("="*50 + "\n")
|
|
|
|
|
|
# 全局度量实例
|
|
_global_metrics = SecurityMetrics()
|
|
|
|
|
|
def get_metrics() -> SecurityMetrics:
|
|
"""获取全局度量实例"""
|
|
return _global_metrics
|
|
|
|
|
|
def reset_metrics():
|
|
"""重置度量数据"""
|
|
global _global_metrics
|
|
_global_metrics = SecurityMetrics()
|
|
|