""" 数据脱敏模块 对历史记录中的敏感信息进行识别和脱敏处理 """ import re from typing import Dict, List, Tuple, Set from dataclasses import dataclass from enum import Enum class SensitiveType(Enum): """敏感信息类型""" FILE_PATH = "file_path" # 文件路径 IP_ADDRESS = "ip_address" # IP地址 EMAIL = "email" # 邮箱 PHONE = "phone" # 电话号码 API_KEY = "api_key" # API密钥 PASSWORD = "password" # 密码 TOKEN = "token" # Token DATABASE_URI = "database_uri" # 数据库连接串 CREDIT_CARD = "credit_card" # 信用卡号 ID_CARD = "id_card" # 身份证号 @dataclass class SensitiveMatch: """敏感信息匹配结果""" type: SensitiveType value: str start: int end: int masked_value: str class DataSanitizer: """ 数据脱敏器 识别并脱敏敏感信息,支持多种敏感数据类型 """ # 敏感信息正则模式 PATTERNS = { SensitiveType.FILE_PATH: [ r'[A-Za-z]:\\(?:[^\\/:*?"<>|\r\n]+\\)*[^\\/:*?"<>|\r\n]*', # Windows路径 r'/(?:[^/\0]+/)*[^/\0]*', # Unix路径(需要额外验证) ], SensitiveType.IP_ADDRESS: [ r'\b(?:\d{1,3}\.){3}\d{1,3}\b', # IPv4 r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', # IPv6 ], SensitiveType.EMAIL: [ r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', ], SensitiveType.PHONE: [ r'\b1[3-9]\d{9}\b', # 中国手机号 r'\b\d{3}-\d{4}-\d{4}\b', # 美国电话 ], SensitiveType.API_KEY: [ r'\b[A-Za-z0-9_-]{32,}\b', # 通用API密钥 r'sk-[A-Za-z0-9]{48}', # OpenAI风格 r'AIza[0-9A-Za-z_-]{35}', # Google API ], SensitiveType.PASSWORD: [ r'(?i)password\s*[:=]\s*["\']?([^"\'\s]+)["\']?', r'(?i)pwd\s*[:=]\s*["\']?([^"\'\s]+)["\']?', ], SensitiveType.TOKEN: [ r'(?i)token\s*[:=]\s*["\']?([A-Za-z0-9_.-]+)["\']?', r'(?i)bearer\s+([A-Za-z0-9_.-]+)', ], SensitiveType.DATABASE_URI: [ r'(?i)(mysql|postgresql|mongodb|redis)://[^\s]+', ], SensitiveType.CREDIT_CARD: [ r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', ], SensitiveType.ID_CARD: [ r'\b\d{17}[\dXx]\b', # 中国身份证 ], } # 需要特殊处理的类型(避免误判) SPECIAL_VALIDATION = { SensitiveType.FILE_PATH: '_validate_file_path', SensitiveType.API_KEY: '_validate_api_key', } def __init__(self, enabled_types: Set[SensitiveType] = None): """ 初始化脱敏器 Args: enabled_types: 启用的敏感类型,None表示全部启用 """ self.enabled_types = enabled_types or set(SensitiveType) self._compile_patterns() def _compile_patterns(self): """编译正则表达式""" self.compiled_patterns: Dict[SensitiveType, List[re.Pattern]] = {} for sens_type in self.enabled_types: if sens_type in self.PATTERNS: self.compiled_patterns[sens_type] = [ re.compile(pattern) for pattern in self.PATTERNS[sens_type] ] def _validate_file_path(self, text: str) -> bool: """验证是否为真实文件路径(避免误判)""" # 排除短路径和常见误判 if len(text) < 5: return False # 必须包含常见路径特征 path_indicators = ['\\', '/', '.py', '.txt', '.json', '.log', 'Users', 'Program'] return any(indicator in text for indicator in path_indicators) def _validate_api_key(self, text: str) -> bool: """验证是否为真实API密钥(避免误判)""" # 排除纯数字或纯字母 has_digit = any(c.isdigit() for c in text) has_alpha = any(c.isalpha() for c in text) has_special = any(c in '-_' for c in text) # 长度要求 return has_digit and has_alpha and len(text) >= 20 def find_sensitive_data(self, text: str) -> List[SensitiveMatch]: """ 查找文本中的敏感信息 Args: text: 待检测文本 Returns: 敏感信息匹配列表 """ matches = [] for sens_type, patterns in self.compiled_patterns.items(): for pattern in patterns: for match in pattern.finditer(text): value = match.group(0) # 特殊验证 if sens_type in self.SPECIAL_VALIDATION: validator = getattr(self, self.SPECIAL_VALIDATION[sens_type]) if not validator(value): continue # 生成脱敏值 masked = self._mask_value(value, sens_type) matches.append(SensitiveMatch( type=sens_type, value=value, start=match.start(), end=match.end(), masked_value=masked )) # 按位置排序,避免重叠 matches.sort(key=lambda m: m.start) return self._remove_overlaps(matches) def _remove_overlaps(self, matches: List[SensitiveMatch]) -> List[SensitiveMatch]: """移除重叠的匹配项(保留优先级高的)""" if not matches: return [] # 定义优先级(越小越优先) priority = { SensitiveType.PASSWORD: 1, SensitiveType.API_KEY: 2, SensitiveType.TOKEN: 3, SensitiveType.DATABASE_URI: 4, SensitiveType.CREDIT_CARD: 5, SensitiveType.ID_CARD: 6, SensitiveType.EMAIL: 7, SensitiveType.PHONE: 8, SensitiveType.IP_ADDRESS: 9, SensitiveType.FILE_PATH: 10, } result = [] last_end = -1 for match in sorted(matches, key=lambda m: (m.start, priority.get(m.type, 99))): if match.start >= last_end: result.append(match) last_end = match.end return result def _mask_value(self, value: str, sens_type: SensitiveType) -> str: """ 生成脱敏值 Args: value: 原始值 sens_type: 敏感类型 Returns: 脱敏后的值 """ if sens_type == SensitiveType.FILE_PATH: # 保留文件名,隐藏路径 parts = value.replace('\\', '/').split('/') if len(parts) > 1: return f"***/{parts[-1]}" return "***" elif sens_type == SensitiveType.EMAIL: # 保留首尾字符 parts = value.split('@') if len(parts) == 2: name = parts[0] domain = parts[1] masked_name = name[0] + '***' + name[-1] if len(name) > 2 else '***' return f"{masked_name}@{domain}" elif sens_type == SensitiveType.PHONE: # 保留前3后4 if len(value) >= 11: return value[:3] + '****' + value[-4:] elif sens_type == SensitiveType.IP_ADDRESS: # 保留前两段 parts = value.split('.') if len(parts) == 4: return f"{parts[0]}.{parts[1]}.*.*" elif sens_type == SensitiveType.CREDIT_CARD: # 只保留后4位 digits = re.sub(r'[\s-]', '', value) return '**** **** **** ' + digits[-4:] elif sens_type == SensitiveType.ID_CARD: # 保留前6后4 return value[:6] + '********' + value[-4:] # 默认:完全隐藏 return f"[{sens_type.value.upper()}_MASKED]" def sanitize(self, text: str) -> Tuple[str, List[SensitiveMatch]]: """ 脱敏文本 Args: text: 原始文本 Returns: (脱敏后的文本, 匹配列表) """ matches = self.find_sensitive_data(text) if not matches: return text, [] # 从后往前替换,避免位置偏移 result = text for match in reversed(matches): result = result[:match.start] + match.masked_value + result[match.end:] return result, matches def get_sensitivity_score(self, text: str) -> float: """ 计算文本的敏感度评分(0-1) Args: text: 待评估文本 Returns: 敏感度评分 """ matches = self.find_sensitive_data(text) if not matches: return 0.0 # 根据敏感类型加权 weights = { SensitiveType.PASSWORD: 1.0, SensitiveType.API_KEY: 1.0, SensitiveType.TOKEN: 0.9, SensitiveType.DATABASE_URI: 0.9, SensitiveType.CREDIT_CARD: 1.0, SensitiveType.ID_CARD: 1.0, SensitiveType.EMAIL: 0.6, SensitiveType.PHONE: 0.6, SensitiveType.IP_ADDRESS: 0.5, SensitiveType.FILE_PATH: 0.3, } total_weight = sum(weights.get(m.type, 0.5) for m in matches) # 归一化到 0-1 return min(1.0, total_weight / 3.0) # 全局单例 _sanitizer: DataSanitizer = None def get_sanitizer() -> DataSanitizer: """获取数据脱敏器单例""" global _sanitizer if _sanitizer is None: _sanitizer = DataSanitizer() return _sanitizer