- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic. - Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration. - Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup. - Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics. - Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management. - Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
312 lines
9.9 KiB
Python
312 lines
9.9 KiB
Python
"""
|
||
数据脱敏模块
|
||
对历史记录中的敏感信息进行识别和脱敏处理
|
||
"""
|
||
|
||
import re
|
||
from typing import Dict, List, Tuple, Set
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
|
||
|
||
class SensitiveType(Enum):
|
||
"""敏感信息类型"""
|
||
FILE_PATH = "file_path" # 文件路径
|
||
IP_ADDRESS = "ip_address" # IP地址
|
||
EMAIL = "email" # 邮箱
|
||
PHONE = "phone" # 电话号码
|
||
API_KEY = "api_key" # API密钥
|
||
PASSWORD = "password" # 密码
|
||
TOKEN = "token" # Token
|
||
DATABASE_URI = "database_uri" # 数据库连接串
|
||
CREDIT_CARD = "credit_card" # 信用卡号
|
||
ID_CARD = "id_card" # 身份证号
|
||
|
||
|
||
@dataclass
|
||
class SensitiveMatch:
|
||
"""敏感信息匹配结果"""
|
||
type: SensitiveType
|
||
value: str
|
||
start: int
|
||
end: int
|
||
masked_value: str
|
||
|
||
|
||
class DataSanitizer:
|
||
"""
|
||
数据脱敏器
|
||
|
||
识别并脱敏敏感信息,支持多种敏感数据类型
|
||
"""
|
||
|
||
# 敏感信息正则模式
|
||
PATTERNS = {
|
||
SensitiveType.FILE_PATH: [
|
||
r'[A-Za-z]:\\(?:[^\\/:*?"<>|\r\n]+\\)*[^\\/:*?"<>|\r\n]*', # Windows路径
|
||
r'/(?:[^/\0]+/)*[^/\0]*', # Unix路径(需要额外验证)
|
||
],
|
||
SensitiveType.IP_ADDRESS: [
|
||
r'\b(?:\d{1,3}\.){3}\d{1,3}\b', # IPv4
|
||
r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', # IPv6
|
||
],
|
||
SensitiveType.EMAIL: [
|
||
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
|
||
],
|
||
SensitiveType.PHONE: [
|
||
r'\b1[3-9]\d{9}\b', # 中国手机号
|
||
r'\b\d{3}-\d{4}-\d{4}\b', # 美国电话
|
||
],
|
||
SensitiveType.API_KEY: [
|
||
r'\b[A-Za-z0-9_-]{32,}\b', # 通用API密钥
|
||
r'sk-[A-Za-z0-9]{48}', # OpenAI风格
|
||
r'AIza[0-9A-Za-z_-]{35}', # Google API
|
||
],
|
||
SensitiveType.PASSWORD: [
|
||
r'(?i)password\s*[:=]\s*["\']?([^"\'\s]+)["\']?',
|
||
r'(?i)pwd\s*[:=]\s*["\']?([^"\'\s]+)["\']?',
|
||
],
|
||
SensitiveType.TOKEN: [
|
||
r'(?i)token\s*[:=]\s*["\']?([A-Za-z0-9_.-]+)["\']?',
|
||
r'(?i)bearer\s+([A-Za-z0-9_.-]+)',
|
||
],
|
||
SensitiveType.DATABASE_URI: [
|
||
r'(?i)(mysql|postgresql|mongodb|redis)://[^\s]+',
|
||
],
|
||
SensitiveType.CREDIT_CARD: [
|
||
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
|
||
],
|
||
SensitiveType.ID_CARD: [
|
||
r'\b\d{17}[\dXx]\b', # 中国身份证
|
||
],
|
||
}
|
||
|
||
# 需要特殊处理的类型(避免误判)
|
||
SPECIAL_VALIDATION = {
|
||
SensitiveType.FILE_PATH: '_validate_file_path',
|
||
SensitiveType.API_KEY: '_validate_api_key',
|
||
}
|
||
|
||
def __init__(self, enabled_types: Set[SensitiveType] = None):
|
||
"""
|
||
初始化脱敏器
|
||
|
||
Args:
|
||
enabled_types: 启用的敏感类型,None表示全部启用
|
||
"""
|
||
self.enabled_types = enabled_types or set(SensitiveType)
|
||
self._compile_patterns()
|
||
|
||
def _compile_patterns(self):
|
||
"""编译正则表达式"""
|
||
self.compiled_patterns: Dict[SensitiveType, List[re.Pattern]] = {}
|
||
for sens_type in self.enabled_types:
|
||
if sens_type in self.PATTERNS:
|
||
self.compiled_patterns[sens_type] = [
|
||
re.compile(pattern) for pattern in self.PATTERNS[sens_type]
|
||
]
|
||
|
||
def _validate_file_path(self, text: str) -> bool:
|
||
"""验证是否为真实文件路径(避免误判)"""
|
||
# 排除短路径和常见误判
|
||
if len(text) < 5:
|
||
return False
|
||
|
||
# 必须包含常见路径特征
|
||
path_indicators = ['\\', '/', '.py', '.txt', '.json', '.log', 'Users', 'Program']
|
||
return any(indicator in text for indicator in path_indicators)
|
||
|
||
def _validate_api_key(self, text: str) -> bool:
|
||
"""验证是否为真实API密钥(避免误判)"""
|
||
# 排除纯数字或纯字母
|
||
has_digit = any(c.isdigit() for c in text)
|
||
has_alpha = any(c.isalpha() for c in text)
|
||
has_special = any(c in '-_' for c in text)
|
||
# 长度要求
|
||
return has_digit and has_alpha and len(text) >= 20
|
||
|
||
def find_sensitive_data(self, text: str) -> List[SensitiveMatch]:
|
||
"""
|
||
查找文本中的敏感信息
|
||
|
||
Args:
|
||
text: 待检测文本
|
||
|
||
Returns:
|
||
敏感信息匹配列表
|
||
"""
|
||
matches = []
|
||
|
||
for sens_type, patterns in self.compiled_patterns.items():
|
||
for pattern in patterns:
|
||
for match in pattern.finditer(text):
|
||
value = match.group(0)
|
||
|
||
# 特殊验证
|
||
if sens_type in self.SPECIAL_VALIDATION:
|
||
validator = getattr(self, self.SPECIAL_VALIDATION[sens_type])
|
||
if not validator(value):
|
||
continue
|
||
|
||
# 生成脱敏值
|
||
masked = self._mask_value(value, sens_type)
|
||
|
||
matches.append(SensitiveMatch(
|
||
type=sens_type,
|
||
value=value,
|
||
start=match.start(),
|
||
end=match.end(),
|
||
masked_value=masked
|
||
))
|
||
|
||
# 按位置排序,避免重叠
|
||
matches.sort(key=lambda m: m.start)
|
||
return self._remove_overlaps(matches)
|
||
|
||
def _remove_overlaps(self, matches: List[SensitiveMatch]) -> List[SensitiveMatch]:
|
||
"""移除重叠的匹配项(保留优先级高的)"""
|
||
if not matches:
|
||
return []
|
||
|
||
# 定义优先级(越小越优先)
|
||
priority = {
|
||
SensitiveType.PASSWORD: 1,
|
||
SensitiveType.API_KEY: 2,
|
||
SensitiveType.TOKEN: 3,
|
||
SensitiveType.DATABASE_URI: 4,
|
||
SensitiveType.CREDIT_CARD: 5,
|
||
SensitiveType.ID_CARD: 6,
|
||
SensitiveType.EMAIL: 7,
|
||
SensitiveType.PHONE: 8,
|
||
SensitiveType.IP_ADDRESS: 9,
|
||
SensitiveType.FILE_PATH: 10,
|
||
}
|
||
|
||
result = []
|
||
last_end = -1
|
||
|
||
for match in sorted(matches, key=lambda m: (m.start, priority.get(m.type, 99))):
|
||
if match.start >= last_end:
|
||
result.append(match)
|
||
last_end = match.end
|
||
|
||
return result
|
||
|
||
def _mask_value(self, value: str, sens_type: SensitiveType) -> str:
|
||
"""
|
||
生成脱敏值
|
||
|
||
Args:
|
||
value: 原始值
|
||
sens_type: 敏感类型
|
||
|
||
Returns:
|
||
脱敏后的值
|
||
"""
|
||
if sens_type == SensitiveType.FILE_PATH:
|
||
# 保留文件名,隐藏路径
|
||
parts = value.replace('\\', '/').split('/')
|
||
if len(parts) > 1:
|
||
return f"***/{parts[-1]}"
|
||
return "***"
|
||
|
||
elif sens_type == SensitiveType.EMAIL:
|
||
# 保留首尾字符
|
||
parts = value.split('@')
|
||
if len(parts) == 2:
|
||
name = parts[0]
|
||
domain = parts[1]
|
||
masked_name = name[0] + '***' + name[-1] if len(name) > 2 else '***'
|
||
return f"{masked_name}@{domain}"
|
||
|
||
elif sens_type == SensitiveType.PHONE:
|
||
# 保留前3后4
|
||
if len(value) >= 11:
|
||
return value[:3] + '****' + value[-4:]
|
||
|
||
elif sens_type == SensitiveType.IP_ADDRESS:
|
||
# 保留前两段
|
||
parts = value.split('.')
|
||
if len(parts) == 4:
|
||
return f"{parts[0]}.{parts[1]}.*.*"
|
||
|
||
elif sens_type == SensitiveType.CREDIT_CARD:
|
||
# 只保留后4位
|
||
digits = re.sub(r'[\s-]', '', value)
|
||
return '**** **** **** ' + digits[-4:]
|
||
|
||
elif sens_type == SensitiveType.ID_CARD:
|
||
# 保留前6后4
|
||
return value[:6] + '********' + value[-4:]
|
||
|
||
# 默认:完全隐藏
|
||
return f"[{sens_type.value.upper()}_MASKED]"
|
||
|
||
def sanitize(self, text: str) -> Tuple[str, List[SensitiveMatch]]:
|
||
"""
|
||
脱敏文本
|
||
|
||
Args:
|
||
text: 原始文本
|
||
|
||
Returns:
|
||
(脱敏后的文本, 匹配列表)
|
||
"""
|
||
matches = self.find_sensitive_data(text)
|
||
|
||
if not matches:
|
||
return text, []
|
||
|
||
# 从后往前替换,避免位置偏移
|
||
result = text
|
||
for match in reversed(matches):
|
||
result = result[:match.start] + match.masked_value + result[match.end:]
|
||
|
||
return result, matches
|
||
|
||
def get_sensitivity_score(self, text: str) -> float:
|
||
"""
|
||
计算文本的敏感度评分(0-1)
|
||
|
||
Args:
|
||
text: 待评估文本
|
||
|
||
Returns:
|
||
敏感度评分
|
||
"""
|
||
matches = self.find_sensitive_data(text)
|
||
|
||
if not matches:
|
||
return 0.0
|
||
|
||
# 根据敏感类型加权
|
||
weights = {
|
||
SensitiveType.PASSWORD: 1.0,
|
||
SensitiveType.API_KEY: 1.0,
|
||
SensitiveType.TOKEN: 0.9,
|
||
SensitiveType.DATABASE_URI: 0.9,
|
||
SensitiveType.CREDIT_CARD: 1.0,
|
||
SensitiveType.ID_CARD: 1.0,
|
||
SensitiveType.EMAIL: 0.6,
|
||
SensitiveType.PHONE: 0.6,
|
||
SensitiveType.IP_ADDRESS: 0.5,
|
||
SensitiveType.FILE_PATH: 0.3,
|
||
}
|
||
|
||
total_weight = sum(weights.get(m.type, 0.5) for m in matches)
|
||
# 归一化到 0-1
|
||
return min(1.0, total_weight / 3.0)
|
||
|
||
|
||
# 全局单例
|
||
_sanitizer: DataSanitizer = None
|
||
|
||
|
||
def get_sanitizer() -> DataSanitizer:
|
||
"""获取数据脱敏器单例"""
|
||
global _sanitizer
|
||
if _sanitizer is None:
|
||
_sanitizer = DataSanitizer()
|
||
return _sanitizer
|
||
|