""" 数据治理策略模块 实现数据分级保存、保留期管理、归档和清理策略 """ import json from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Set from dataclasses import dataclass, asdict from enum import Enum from history.data_sanitizer import get_sanitizer, SensitiveType class DataLevel(Enum): """数据保存级别""" FULL = "full" # 完整保存(无脱敏) SANITIZED = "sanitized" # 脱敏保存 MINIMAL = "minimal" # 最小化保存(仅元数据) ARCHIVED = "archived" # 已归档 class RetentionPolicy(Enum): """数据保留策略""" SHORT = 7 # 7天 MEDIUM = 30 # 30天 LONG = 90 # 90天 PERMANENT = -1 # 永久保留 @dataclass class DataClassification: """数据分类结果""" level: DataLevel retention_days: int sensitivity_score: float sensitive_fields: Set[str] reason: str @dataclass class GovernanceMetrics: """治理度量指标""" total_records: int full_records: int sanitized_records: int minimal_records: int archived_records: int total_size_bytes: int sensitive_field_hits: Dict[str, int] expired_records: int last_cleanup_time: str class DataGovernancePolicy: """ 数据治理策略 根据敏感度自动分级保存,管理数据生命周期 """ # 字段敏感度配置 FIELD_SENSITIVITY = { 'user_input': 0.5, # 用户输入可能含敏感信息 'code': 0.7, # 代码可能含路径、密钥 'stdout': 0.6, # 输出可能含敏感数据 'stderr': 0.6, # 错误信息可能含路径 'execution_plan': 0.3, # 执行计划相对安全 'log_path': 0.4, # 日志路径 } # 分级阈值 LEVEL_THRESHOLDS = { DataLevel.FULL: 0.0, # 敏感度 < 0.3 完整保存 DataLevel.SANITIZED: 0.3, # 0.3 <= 敏感度 < 0.7 脱敏保存 DataLevel.MINIMAL: 0.7, # 敏感度 >= 0.7 最小化保存 } # 保留期配置(根据数据级别) RETENTION_CONFIG = { DataLevel.FULL: RetentionPolicy.LONG.value, # 完整数据保留90天 DataLevel.SANITIZED: RetentionPolicy.MEDIUM.value, # 脱敏数据保留30天 DataLevel.MINIMAL: RetentionPolicy.SHORT.value, # 最小化数据保留7天 } def __init__(self, workspace_path: Path): self.workspace = workspace_path self.sanitizer = get_sanitizer() self.metrics_file = workspace_path / "governance_metrics.json" self.archive_dir = workspace_path / "archive" self.archive_dir.mkdir(exist_ok=True) def classify_record(self, record_data: Dict) -> DataClassification: """ 对记录进行分类 Args: record_data: 记录数据字典 Returns: 数据分类结果 """ sensitive_fields = set() total_sensitivity = 0.0 field_count = 0 # 分析各字段敏感度 for field, weight in self.FIELD_SENSITIVITY.items(): if field in record_data and record_data[field]: content = str(record_data[field]) field_score = self.sanitizer.get_sensitivity_score(content) if field_score > 0.3: # 发现敏感信息 sensitive_fields.add(field) total_sensitivity += field_score * weight field_count += 1 # 计算综合敏感度 avg_sensitivity = total_sensitivity / field_count if field_count > 0 else 0.0 # 确定数据级别 if avg_sensitivity >= self.LEVEL_THRESHOLDS[DataLevel.MINIMAL]: level = DataLevel.MINIMAL reason = f"高敏感度({avg_sensitivity:.2f}),仅保留元数据" elif avg_sensitivity >= self.LEVEL_THRESHOLDS[DataLevel.SANITIZED]: level = DataLevel.SANITIZED reason = f"中等敏感度({avg_sensitivity:.2f}),脱敏保存" else: level = DataLevel.FULL reason = f"低敏感度({avg_sensitivity:.2f}),完整保存" # 确定保留期 retention_days = self.RETENTION_CONFIG[level] return DataClassification( level=level, retention_days=retention_days, sensitivity_score=avg_sensitivity, sensitive_fields=sensitive_fields, reason=reason ) def apply_policy(self, record_data: Dict) -> Dict: """ 应用治理策略,返回处理后的数据 Args: record_data: 原始记录数据 Returns: 处理后的记录数据 """ classification = self.classify_record(record_data) # 添加治理元数据 result = record_data.copy() result['_governance'] = { 'level': classification.level.value, 'retention_days': classification.retention_days, 'sensitivity_score': classification.sensitivity_score, 'sensitive_fields': list(classification.sensitive_fields), 'classified_at': datetime.now().isoformat(), 'expires_at': (datetime.now() + timedelta(days=classification.retention_days)).isoformat() } # 根据级别处理数据 if classification.level == DataLevel.MINIMAL: # 最小化:只保留元数据 result = self._minimize_record(result) elif classification.level == DataLevel.SANITIZED: # 脱敏:对敏感字段脱敏 result = self._sanitize_record(result, classification.sensitive_fields) # FULL 级别不做处理 return result def _minimize_record(self, record: Dict) -> Dict: """ 最小化记录(仅保留元数据) Args: record: 原始记录 Returns: 最小化后的记录 """ # 保留的字段 keep_fields = { 'task_id', 'timestamp', 'intent_label', 'intent_confidence', 'success', 'duration_ms', 'task_summary', '_governance' } minimal = {k: v for k, v in record.items() if k in keep_fields} # 添加摘要信息 minimal['user_input'] = '[已删除-高敏感]' minimal['code'] = '[已删除-高敏感]' minimal['stdout'] = '[已删除-高敏感]' minimal['stderr'] = '[已删除-高敏感]' minimal['execution_plan'] = record.get('execution_plan', '')[:100] + '...' return minimal def _sanitize_record(self, record: Dict, sensitive_fields: Set[str]) -> Dict: """ 脱敏记录 Args: record: 原始记录 sensitive_fields: 需要脱敏的字段 Returns: 脱敏后的记录 """ result = record.copy() for field in sensitive_fields: if field in result and result[field]: content = str(result[field]) sanitized, matches = self.sanitizer.sanitize(content) result[field] = sanitized # 记录脱敏信息 if '_sanitization' not in result: result['_sanitization'] = {} result['_sanitization'][field] = { 'masked_count': len(matches), 'types': list(set(m.type.value for m in matches)) } return result def check_expiration(self, record: Dict) -> bool: """ 检查记录是否过期 Args: record: 记录数据 Returns: 是否过期 """ if '_governance' not in record or record['_governance'] is None: return False expires_at = record['_governance'].get('expires_at') if not expires_at: return False try: expire_time = datetime.fromisoformat(expires_at) return datetime.now() > expire_time except (ValueError, TypeError): return False def archive_record(self, record: Dict) -> Path: """ 归档记录 Args: record: 记录数据 Returns: 归档文件路径 """ task_id = record.get('task_id', 'unknown') timestamp = record.get('timestamp', datetime.now().strftime('%Y%m%d_%H%M%S')) # 生成归档文件名 archive_file = self.archive_dir / f"{task_id}_{timestamp}.json" # 标记为已归档 record['_governance']['level'] = DataLevel.ARCHIVED.value record['_governance']['archived_at'] = datetime.now().isoformat() # 保存到归档目录 with open(archive_file, 'w', encoding='utf-8') as f: json.dump(record, f, ensure_ascii=False, indent=2) return archive_file def cleanup_expired(self, records: List[Dict]) -> tuple[List[Dict], int, int]: """ 清理过期记录 Args: records: 记录列表 Returns: (保留的记录列表, 归档数量, 删除数量) """ kept_records = [] archived_count = 0 deleted_count = 0 for record in records: if not self.check_expiration(record): kept_records.append(record) continue # 过期处理 level = record.get('_governance', {}).get('level') if level == DataLevel.FULL.value: # 完整数据:降级为脱敏 record['_governance']['level'] = DataLevel.SANITIZED.value record['_governance']['retention_days'] = RetentionPolicy.MEDIUM.value record['_governance']['expires_at'] = ( datetime.now() + timedelta(days=RetentionPolicy.MEDIUM.value) ).isoformat() # 执行脱敏 sensitive_fields = set(record['_governance'].get('sensitive_fields', [])) record = self._sanitize_record(record, sensitive_fields) kept_records.append(record) elif level == DataLevel.SANITIZED.value: # 脱敏数据:归档 self.archive_record(record) archived_count += 1 else: # 最小化数据:直接删除 deleted_count += 1 return kept_records, archived_count, deleted_count def collect_metrics(self, records: List[Dict]) -> GovernanceMetrics: """ 收集治理度量指标 Args: records: 记录列表 Returns: 度量指标 """ metrics = GovernanceMetrics( total_records=len(records), full_records=0, sanitized_records=0, minimal_records=0, archived_records=0, total_size_bytes=0, sensitive_field_hits={}, expired_records=0, last_cleanup_time=datetime.now().isoformat() ) for record in records: # 统计数据级别 level = record.get('_governance', {}).get('level') if level == DataLevel.FULL.value: metrics.full_records += 1 elif level == DataLevel.SANITIZED.value: metrics.sanitized_records += 1 elif level == DataLevel.MINIMAL.value: metrics.minimal_records += 1 elif level == DataLevel.ARCHIVED.value: metrics.archived_records += 1 # 统计敏感字段命中 sensitive_fields = record.get('_governance', {}).get('sensitive_fields', []) for field in sensitive_fields: metrics.sensitive_field_hits[field] = metrics.sensitive_field_hits.get(field, 0) + 1 # 统计过期记录 if self.check_expiration(record): metrics.expired_records += 1 # 估算大小 metrics.total_size_bytes += len(json.dumps(record, ensure_ascii=False)) return metrics def save_metrics(self, metrics: GovernanceMetrics): """保存度量指标""" with open(self.metrics_file, 'w', encoding='utf-8') as f: data = asdict(metrics) json.dump(data, f, ensure_ascii=False, indent=2) def load_metrics(self) -> Optional[GovernanceMetrics]: """加载度量指标""" if not self.metrics_file.exists(): return None try: with open(self.metrics_file, 'r', encoding='utf-8') as f: data = json.load(f) return GovernanceMetrics(**data) except Exception as e: print(f"[警告] 加载度量指标失败: {e}") return None # 全局单例 _policy: Optional[DataGovernancePolicy] = None def get_governance_policy(workspace_path: Path) -> DataGovernancePolicy: """获取数据治理策略单例""" global _policy if _policy is None: _policy = DataGovernancePolicy(workspace_path) return _policy