Files
LocalAgent/history/data_sanitizer.py
Mimikko-zeus 8a538bb950 feat: refactor API key configuration and enhance application initialization
- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic.
- Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration.
- Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup.
- Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics.
- Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management.
- Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
2026-02-27 14:32:30 +08:00

312 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据脱敏模块
对历史记录中的敏感信息进行识别和脱敏处理
"""
import re
from typing import Dict, List, Tuple, Set
from dataclasses import dataclass
from enum import Enum
class SensitiveType(Enum):
"""敏感信息类型"""
FILE_PATH = "file_path" # 文件路径
IP_ADDRESS = "ip_address" # IP地址
EMAIL = "email" # 邮箱
PHONE = "phone" # 电话号码
API_KEY = "api_key" # API密钥
PASSWORD = "password" # 密码
TOKEN = "token" # Token
DATABASE_URI = "database_uri" # 数据库连接串
CREDIT_CARD = "credit_card" # 信用卡号
ID_CARD = "id_card" # 身份证号
@dataclass
class SensitiveMatch:
"""敏感信息匹配结果"""
type: SensitiveType
value: str
start: int
end: int
masked_value: str
class DataSanitizer:
"""
数据脱敏器
识别并脱敏敏感信息,支持多种敏感数据类型
"""
# 敏感信息正则模式
PATTERNS = {
SensitiveType.FILE_PATH: [
r'[A-Za-z]:\\(?:[^\\/:*?"<>|\r\n]+\\)*[^\\/:*?"<>|\r\n]*', # Windows路径
r'/(?:[^/\0]+/)*[^/\0]*', # Unix路径需要额外验证
],
SensitiveType.IP_ADDRESS: [
r'\b(?:\d{1,3}\.){3}\d{1,3}\b', # IPv4
r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', # IPv6
],
SensitiveType.EMAIL: [
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
],
SensitiveType.PHONE: [
r'\b1[3-9]\d{9}\b', # 中国手机号
r'\b\d{3}-\d{4}-\d{4}\b', # 美国电话
],
SensitiveType.API_KEY: [
r'\b[A-Za-z0-9_-]{32,}\b', # 通用API密钥
r'sk-[A-Za-z0-9]{48}', # OpenAI风格
r'AIza[0-9A-Za-z_-]{35}', # Google API
],
SensitiveType.PASSWORD: [
r'(?i)password\s*[:=]\s*["\']?([^"\'\s]+)["\']?',
r'(?i)pwd\s*[:=]\s*["\']?([^"\'\s]+)["\']?',
],
SensitiveType.TOKEN: [
r'(?i)token\s*[:=]\s*["\']?([A-Za-z0-9_.-]+)["\']?',
r'(?i)bearer\s+([A-Za-z0-9_.-]+)',
],
SensitiveType.DATABASE_URI: [
r'(?i)(mysql|postgresql|mongodb|redis)://[^\s]+',
],
SensitiveType.CREDIT_CARD: [
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
],
SensitiveType.ID_CARD: [
r'\b\d{17}[\dXx]\b', # 中国身份证
],
}
# 需要特殊处理的类型(避免误判)
SPECIAL_VALIDATION = {
SensitiveType.FILE_PATH: '_validate_file_path',
SensitiveType.API_KEY: '_validate_api_key',
}
def __init__(self, enabled_types: Set[SensitiveType] = None):
"""
初始化脱敏器
Args:
enabled_types: 启用的敏感类型None表示全部启用
"""
self.enabled_types = enabled_types or set(SensitiveType)
self._compile_patterns()
def _compile_patterns(self):
"""编译正则表达式"""
self.compiled_patterns: Dict[SensitiveType, List[re.Pattern]] = {}
for sens_type in self.enabled_types:
if sens_type in self.PATTERNS:
self.compiled_patterns[sens_type] = [
re.compile(pattern) for pattern in self.PATTERNS[sens_type]
]
def _validate_file_path(self, text: str) -> bool:
"""验证是否为真实文件路径(避免误判)"""
# 排除短路径和常见误判
if len(text) < 5:
return False
# 必须包含常见路径特征
path_indicators = ['\\', '/', '.py', '.txt', '.json', '.log', 'Users', 'Program']
return any(indicator in text for indicator in path_indicators)
def _validate_api_key(self, text: str) -> bool:
"""验证是否为真实API密钥避免误判"""
# 排除纯数字或纯字母
has_digit = any(c.isdigit() for c in text)
has_alpha = any(c.isalpha() for c in text)
has_special = any(c in '-_' for c in text)
# 长度要求
return has_digit and has_alpha and len(text) >= 20
def find_sensitive_data(self, text: str) -> List[SensitiveMatch]:
"""
查找文本中的敏感信息
Args:
text: 待检测文本
Returns:
敏感信息匹配列表
"""
matches = []
for sens_type, patterns in self.compiled_patterns.items():
for pattern in patterns:
for match in pattern.finditer(text):
value = match.group(0)
# 特殊验证
if sens_type in self.SPECIAL_VALIDATION:
validator = getattr(self, self.SPECIAL_VALIDATION[sens_type])
if not validator(value):
continue
# 生成脱敏值
masked = self._mask_value(value, sens_type)
matches.append(SensitiveMatch(
type=sens_type,
value=value,
start=match.start(),
end=match.end(),
masked_value=masked
))
# 按位置排序,避免重叠
matches.sort(key=lambda m: m.start)
return self._remove_overlaps(matches)
def _remove_overlaps(self, matches: List[SensitiveMatch]) -> List[SensitiveMatch]:
"""移除重叠的匹配项(保留优先级高的)"""
if not matches:
return []
# 定义优先级(越小越优先)
priority = {
SensitiveType.PASSWORD: 1,
SensitiveType.API_KEY: 2,
SensitiveType.TOKEN: 3,
SensitiveType.DATABASE_URI: 4,
SensitiveType.CREDIT_CARD: 5,
SensitiveType.ID_CARD: 6,
SensitiveType.EMAIL: 7,
SensitiveType.PHONE: 8,
SensitiveType.IP_ADDRESS: 9,
SensitiveType.FILE_PATH: 10,
}
result = []
last_end = -1
for match in sorted(matches, key=lambda m: (m.start, priority.get(m.type, 99))):
if match.start >= last_end:
result.append(match)
last_end = match.end
return result
def _mask_value(self, value: str, sens_type: SensitiveType) -> str:
"""
生成脱敏值
Args:
value: 原始值
sens_type: 敏感类型
Returns:
脱敏后的值
"""
if sens_type == SensitiveType.FILE_PATH:
# 保留文件名,隐藏路径
parts = value.replace('\\', '/').split('/')
if len(parts) > 1:
return f"***/{parts[-1]}"
return "***"
elif sens_type == SensitiveType.EMAIL:
# 保留首尾字符
parts = value.split('@')
if len(parts) == 2:
name = parts[0]
domain = parts[1]
masked_name = name[0] + '***' + name[-1] if len(name) > 2 else '***'
return f"{masked_name}@{domain}"
elif sens_type == SensitiveType.PHONE:
# 保留前3后4
if len(value) >= 11:
return value[:3] + '****' + value[-4:]
elif sens_type == SensitiveType.IP_ADDRESS:
# 保留前两段
parts = value.split('.')
if len(parts) == 4:
return f"{parts[0]}.{parts[1]}.*.*"
elif sens_type == SensitiveType.CREDIT_CARD:
# 只保留后4位
digits = re.sub(r'[\s-]', '', value)
return '**** **** **** ' + digits[-4:]
elif sens_type == SensitiveType.ID_CARD:
# 保留前6后4
return value[:6] + '********' + value[-4:]
# 默认:完全隐藏
return f"[{sens_type.value.upper()}_MASKED]"
def sanitize(self, text: str) -> Tuple[str, List[SensitiveMatch]]:
"""
脱敏文本
Args:
text: 原始文本
Returns:
(脱敏后的文本, 匹配列表)
"""
matches = self.find_sensitive_data(text)
if not matches:
return text, []
# 从后往前替换,避免位置偏移
result = text
for match in reversed(matches):
result = result[:match.start] + match.masked_value + result[match.end:]
return result, matches
def get_sensitivity_score(self, text: str) -> float:
"""
计算文本的敏感度评分0-1
Args:
text: 待评估文本
Returns:
敏感度评分
"""
matches = self.find_sensitive_data(text)
if not matches:
return 0.0
# 根据敏感类型加权
weights = {
SensitiveType.PASSWORD: 1.0,
SensitiveType.API_KEY: 1.0,
SensitiveType.TOKEN: 0.9,
SensitiveType.DATABASE_URI: 0.9,
SensitiveType.CREDIT_CARD: 1.0,
SensitiveType.ID_CARD: 1.0,
SensitiveType.EMAIL: 0.6,
SensitiveType.PHONE: 0.6,
SensitiveType.IP_ADDRESS: 0.5,
SensitiveType.FILE_PATH: 0.3,
}
total_weight = sum(weights.get(m.type, 0.5) for m in matches)
# 归一化到 0-1
return min(1.0, total_weight / 3.0)
# 全局单例
_sanitizer: DataSanitizer = None
def get_sanitizer() -> DataSanitizer:
"""获取数据脱敏器单例"""
global _sanitizer
if _sanitizer is None:
_sanitizer = DataSanitizer()
return _sanitizer