Files
LocalAgent/safety/llm_reviewer.py
Mimikko-zeus 1ba5f0f7d6 feat: implement streaming support for chat and enhance safety review process
- Updated .env.example to include API key placeholder and configuration instructions.
- Refactored main.py to support streaming responses from the LLM, improving user experience during chat interactions.
- Enhanced LLMClient to include methods for streaming chat and collecting responses.
- Modified safety review process to pass static analysis warnings to the LLM for better code safety evaluation.
- Improved UI components in chat_view.py to handle streaming messages effectively.
2026-01-07 09:43:40 +08:00

141 lines
4.1 KiB
Python

"""
LLM 软规则审查器
使用 LLM 进行代码安全审查
"""
import os
import json
from typing import Optional, List
from dataclasses import dataclass
from dotenv import load_dotenv
from llm.client import get_client, LLMClientError, ENV_PATH
from llm.prompts import SAFETY_REVIEW_SYSTEM, SAFETY_REVIEW_USER
@dataclass
class LLMReviewResult:
"""LLM 审查结果"""
passed: bool
reason: str
raw_response: Optional[str] = None
class LLMReviewer:
"""
LLM 安全审查器
使用大模型对代码进行语义级别的安全审查
"""
def __init__(self):
load_dotenv(ENV_PATH)
self.model_name = os.getenv("GENERATION_MODEL_NAME")
def review(
self,
user_input: str,
execution_plan: str,
code: str,
warnings: Optional[List[str]] = None
) -> LLMReviewResult:
"""
审查代码安全性
Args:
user_input: 用户原始需求
execution_plan: 执行计划
code: 待审查的代码
warnings: 静态检查产生的警告列表
Returns:
LLMReviewResult: 审查结果
"""
try:
client = get_client()
# 构建警告信息
warning_text = ""
if warnings and len(warnings) > 0:
warning_text = "\n\n【静态检查警告】请重点审查以下内容:\n" + "\n".join(f"- {w}" for w in warnings)
messages = [
{"role": "system", "content": SAFETY_REVIEW_SYSTEM},
{"role": "user", "content": SAFETY_REVIEW_USER.format(
user_input=user_input,
execution_plan=execution_plan,
code=code
) + warning_text}
]
response = client.chat(
messages=messages,
model=self.model_name,
temperature=0.1,
max_tokens=512,
timeout=120
)
return self._parse_response(response)
except LLMClientError as e:
# LLM 调用失败,保守起见判定为不通过
return LLMReviewResult(
passed=False,
reason=f"安全审查失败({str(e)}),出于安全考虑拒绝执行"
)
except Exception as e:
return LLMReviewResult(
passed=False,
reason=f"安全审查异常({str(e)}),出于安全考虑拒绝执行"
)
def _parse_response(self, response: str) -> LLMReviewResult:
"""解析 LLM 响应"""
try:
# 提取 JSON
json_str = self._extract_json(response)
data = json.loads(json_str)
passed = data.get("pass", False)
reason = data.get("reason", "未提供原因")
# 确保 passed 是布尔值
if isinstance(passed, str):
passed = passed.lower() in ('true', 'yes', '1', 'pass')
return LLMReviewResult(
passed=bool(passed),
reason=reason,
raw_response=response
)
except (json.JSONDecodeError, ValueError, TypeError):
# 解析失败,保守判定
return LLMReviewResult(
passed=False,
reason=f"审查结果解析失败,出于安全考虑拒绝执行",
raw_response=response
)
def _extract_json(self, text: str) -> str:
"""从文本中提取 JSON"""
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
return text[start:end + 1]
return text
def review_code_safety(
user_input: str,
execution_plan: str,
code: str,
warnings: Optional[List[str]] = None
) -> LLMReviewResult:
"""便捷函数:审查代码安全性"""
reviewer = LLMReviewer()
return reviewer.review(user_input, execution_plan, code, warnings)