- Renamed `check_environment` to `check_api_key_configured` for clarity, simplifying the API key validation logic. - Removed the blocking behavior of the API key check during application startup, allowing the app to run while providing a prompt for configuration. - Updated `LocalAgentApp` to accept an `api_configured` parameter, enabling conditional messaging for API key setup. - Enhanced the `SandboxRunner` to support backup management and improved execution result handling with detailed metrics. - Integrated data governance strategies into the `HistoryManager`, ensuring compliance and improved data management. - Added privacy settings and metrics tracking across various components to enhance user experience and application safety.
1509 lines
56 KiB
Python
1509 lines
56 KiB
Python
"""
|
||
LocalAgent 主应用类
|
||
管理 UI 状态切换和协调各模块工作流程
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import platform
|
||
import tkinter as tk
|
||
from tkinter import messagebox
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, Any, Tuple, List
|
||
import threading
|
||
import queue
|
||
|
||
from llm.client import get_client, LLMClientError
|
||
from llm.prompts import (
|
||
EXECUTION_PLAN_SYSTEM, EXECUTION_PLAN_USER,
|
||
CODE_GENERATION_SYSTEM, CODE_GENERATION_USER,
|
||
TASK_SUMMARY_SYSTEM, TASK_SUMMARY_USER,
|
||
CODE_FIX_SYSTEM, CODE_FIX_USER,
|
||
REQUIREMENT_CHECK_SYSTEM, REQUIREMENT_CHECK_USER,
|
||
REQUIREMENT_CLARIFY_SYSTEM, REQUIREMENT_CLARIFY_USER,
|
||
REQUIREMENT_STRUCTURE_SYSTEM, REQUIREMENT_STRUCTURE_USER
|
||
)
|
||
from intent.classifier import classify_intent, IntentResult
|
||
from intent.labels import CHAT, EXECUTION, GUIDANCE
|
||
from safety.rule_checker import check_code_safety
|
||
from safety.llm_reviewer import review_code_safety, LLMReviewResult
|
||
from executor.sandbox_runner import SandboxRunner, ExecutionResult
|
||
from app.exceptions import (
|
||
RequirementAnalysisException,
|
||
CriticalInfoMissingException,
|
||
AmbiguousRequirementException,
|
||
LowConfidenceException,
|
||
CheckerFailureException,
|
||
classify_requirement_error
|
||
)
|
||
from ui.chat_view import ChatView
|
||
from ui.task_guide_view import TaskGuideView
|
||
from ui.history_view import HistoryView
|
||
from ui.settings_view import SettingsView
|
||
from ui.clarify_view import ClarifyView
|
||
from ui.clear_confirm_dialog import show_clear_confirm_dialog
|
||
from history.manager import get_history_manager, HistoryManager
|
||
from app.privacy_config import get_privacy_manager, PrivacyManager
|
||
|
||
|
||
class LocalAgentApp:
|
||
"""
|
||
LocalAgent 主应用
|
||
|
||
职责:
|
||
1. 管理 UI 状态切换
|
||
2. 协调各模块工作流程
|
||
3. 处理用户交互
|
||
"""
|
||
|
||
def __init__(self, project_root: Path, api_configured: bool = True):
|
||
self.project_root: Path = project_root
|
||
self.workspace: Path = project_root / "workspace"
|
||
self.runner: SandboxRunner = SandboxRunner(str(self.workspace))
|
||
self.history: HistoryManager = get_history_manager(self.workspace)
|
||
self.privacy: PrivacyManager = get_privacy_manager(self.workspace)
|
||
|
||
# API 配置状态
|
||
self._api_configured = api_configured
|
||
|
||
# 当前任务状态
|
||
self.current_task: Optional[Dict[str, Any]] = None
|
||
|
||
# 线程通信队列
|
||
self.result_queue: queue.Queue = queue.Queue()
|
||
|
||
# UI 组件
|
||
self.root: Optional[tk.Tk] = None
|
||
self.main_container: Optional[tk.Frame] = None
|
||
self.chat_view: Optional[ChatView] = None
|
||
self.task_view: Optional[TaskGuideView] = None
|
||
self.history_view: Optional[HistoryView] = None
|
||
self.settings_view: Optional[SettingsView] = None
|
||
self.clarify_view: Optional[ClarifyView] = None
|
||
self.privacy_view = None # 隐私设置视图
|
||
|
||
# 需求澄清状态
|
||
self._clarify_state: Optional[Dict[str, Any]] = None
|
||
|
||
# 对话上下文(用于多轮对话)
|
||
self._chat_context: List[Dict[str, str]] = []
|
||
self._max_context_length: int = 10 # 最多保留的对话轮数
|
||
|
||
# 初始化 UI
|
||
self._init_ui()
|
||
|
||
def _init_ui(self) -> None:
|
||
"""初始化 UI"""
|
||
self.root = tk.Tk()
|
||
self.root.title("LocalAgent - 本地 AI 助手")
|
||
self.root.geometry("1100x750")
|
||
self.root.minsize(900, 600)
|
||
self.root.configure(bg='#1e1e1e')
|
||
|
||
# 设置窗口图标(如果有的话)
|
||
try:
|
||
self.root.iconbitmap(self.project_root / "icon.ico")
|
||
except:
|
||
pass
|
||
|
||
# 主容器
|
||
self.main_container = tk.Frame(self.root, bg='#1e1e1e')
|
||
self.main_container.pack(fill=tk.BOTH, expand=True)
|
||
|
||
# 聊天视图
|
||
self.chat_view = ChatView(
|
||
self.main_container,
|
||
self._on_user_input,
|
||
on_show_history=self._show_history,
|
||
on_show_settings=self._show_settings
|
||
)
|
||
|
||
# 设置隐私设置回调
|
||
self.chat_view.on_show_privacy = self._show_privacy
|
||
|
||
# 设置清空上下文的回调
|
||
self.chat_view.set_clear_context_callback(self._clear_chat_context)
|
||
|
||
# 如果未配置 API Key,显示提示
|
||
if not self._api_configured:
|
||
self.chat_view.add_message(
|
||
"⚠️ 尚未配置 API Key,请点击右上角「设置」按钮进行配置。\n"
|
||
"获取 API Key: https://siliconflow.cn",
|
||
'error'
|
||
)
|
||
|
||
# 度量指标记录
|
||
self._metrics = {
|
||
'clarification_triggered': 0, # 澄清触发次数
|
||
'direct_execution': 0, # 直接执行次数
|
||
'user_modifications': 0, # 用户二次修改次数
|
||
'ambiguity_failures': 0, # 需求歧义导致失败次数
|
||
'total_tasks': 0 # 总任务数
|
||
}
|
||
|
||
# 定期检查后台任务结果
|
||
self._check_queue()
|
||
|
||
def _check_queue(self) -> None:
|
||
"""检查后台任务队列"""
|
||
try:
|
||
while True:
|
||
callback, args = self.result_queue.get_nowait()
|
||
callback(*args)
|
||
except queue.Empty:
|
||
pass
|
||
|
||
# 每 100ms 检查一次
|
||
self.root.after(100, self._check_queue)
|
||
|
||
def _run_in_thread(self, func: callable, callback: callable, *args) -> None:
|
||
"""在后台线程运行函数,完成后回调"""
|
||
def wrapper():
|
||
try:
|
||
result = func(*args)
|
||
self.result_queue.put((callback, (result, None)))
|
||
except Exception as e:
|
||
self.result_queue.put((callback, (None, e)))
|
||
|
||
thread = threading.Thread(target=wrapper, daemon=True)
|
||
thread.start()
|
||
|
||
def _on_user_input(self, user_input: str) -> None:
|
||
"""处理用户输入"""
|
||
# 显示用户消息
|
||
self.chat_view.add_message(user_input, 'user')
|
||
self.chat_view.set_input_enabled(False)
|
||
self.chat_view.show_loading("正在分析您的需求")
|
||
|
||
# 在后台线程进行意图识别
|
||
self._run_in_thread(
|
||
classify_intent,
|
||
lambda result, error: self._on_intent_result(user_input, result, error),
|
||
user_input
|
||
)
|
||
|
||
def _on_intent_result(self, user_input: str, intent_result: Optional[IntentResult], error: Optional[Exception]) -> None:
|
||
"""意图识别完成回调"""
|
||
self.chat_view.hide_loading()
|
||
|
||
if error:
|
||
# 记录配置变更后的首次调用失败
|
||
from llm.config_metrics import get_config_metrics
|
||
metrics = get_config_metrics(self.workspace)
|
||
metrics.record_first_call(success=False, error_message=str(error))
|
||
|
||
self.chat_view.add_message(f"意图识别失败: {str(error)}", 'error')
|
||
self.chat_view.set_input_enabled(True)
|
||
return
|
||
|
||
# 记录配置变更后的首次调用成功
|
||
from llm.config_metrics import get_config_metrics
|
||
metrics = get_config_metrics(self.workspace)
|
||
metrics.record_first_call(success=True)
|
||
|
||
if intent_result.label == CHAT:
|
||
# 对话模式
|
||
self._handle_chat(user_input, intent_result)
|
||
elif intent_result.label == GUIDANCE:
|
||
# 操作指导模式
|
||
self._handle_guidance(user_input, intent_result)
|
||
else:
|
||
# 执行模式
|
||
self._handle_execution(user_input, intent_result)
|
||
|
||
def _handle_chat(self, user_input: str, intent_result: IntentResult) -> None:
|
||
"""处理对话任务"""
|
||
self.chat_view.add_message(
|
||
f"识别为对话模式 (原因: {intent_result.reason})",
|
||
'system'
|
||
)
|
||
|
||
# 添加用户消息到上下文
|
||
self._chat_context.append({"role": "user", "content": user_input})
|
||
|
||
# 开始流式消息
|
||
self.chat_view.start_stream_message('assistant')
|
||
|
||
# 在后台线程调用 LLM(流式)
|
||
def do_chat_stream():
|
||
client = get_client()
|
||
# 使用专门的对话模型,如果未配置则使用代码生成模型
|
||
model = os.getenv("CHAT_MODEL_NAME") or os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
# 构建带上下文的消息列表
|
||
messages = self._build_chat_messages()
|
||
|
||
full_response = []
|
||
for chunk in client.chat_stream(
|
||
messages=messages,
|
||
model=model,
|
||
temperature=0.7,
|
||
max_tokens=2048,
|
||
timeout=300
|
||
):
|
||
full_response.append(chunk)
|
||
# 通过队列发送 chunk 到主线程更新 UI
|
||
self.result_queue.put((self._on_chat_chunk, (chunk,)))
|
||
|
||
return ''.join(full_response)
|
||
|
||
self._run_in_thread(
|
||
do_chat_stream,
|
||
self._on_chat_complete
|
||
)
|
||
|
||
def _on_chat_chunk(self, chunk: str):
|
||
"""收到对话片段回调(主线程)"""
|
||
self.chat_view.append_stream_chunk(chunk)
|
||
|
||
def _on_chat_complete(self, response: Optional[str], error: Optional[Exception]):
|
||
"""对话完成回调"""
|
||
self.chat_view.end_stream_message()
|
||
|
||
if error:
|
||
self.chat_view.add_message(f"对话失败: {str(error)}", 'error')
|
||
elif response:
|
||
# 保存助手回复到上下文
|
||
self._chat_context.append({"role": "assistant", "content": response})
|
||
# 限制上下文长度
|
||
self._trim_chat_context()
|
||
|
||
self.chat_view.set_input_enabled(True)
|
||
|
||
def _get_system_environment_info(self, scenario: str = 'chat') -> str:
|
||
"""
|
||
获取当前系统运行环境信息(隐私保护版本)
|
||
|
||
Args:
|
||
scenario: 场景类型 ('chat', 'guidance', 'execution')
|
||
"""
|
||
return self.privacy.get_environment_info(scenario)
|
||
|
||
def _build_chat_messages(self) -> List[Dict[str, str]]:
|
||
"""构建带上下文的消息列表"""
|
||
env_info = self._get_system_environment_info(scenario='chat')
|
||
|
||
system_prompt = f"""你是 LocalAgent,一个本地运行的 AI 助手。你可以帮助用户回答问题、处理文件等任务。请用中文回答。
|
||
|
||
## 你的身份
|
||
- 名称:LocalAgent
|
||
- 定位:本地 AI 助手
|
||
- 能力:回答问题、协助文件处理、提供操作指导
|
||
|
||
## 用户运行环境
|
||
{env_info}
|
||
|
||
## 注意事项
|
||
- 如果用户的问题涉及之前的对话内容,请结合上下文进行回答
|
||
- 根据用户的操作系统和环境,给出适合其系统的建议和解答
|
||
- 如果涉及文件路径,请使用适合用户操作系统的路径格式
|
||
- 当被问到"你是谁"时,请介绍自己是 LocalAgent,而不是其他 AI 助手"""
|
||
|
||
messages = [{"role": "system", "content": system_prompt}]
|
||
messages.extend(self._chat_context)
|
||
return messages
|
||
|
||
def _trim_chat_context(self) -> None:
|
||
"""限制对话上下文长度"""
|
||
# 每轮对话包含 user 和 assistant 两条消息
|
||
max_messages = self._max_context_length * 2
|
||
if len(self._chat_context) > max_messages:
|
||
# 保留最近的消息
|
||
self._chat_context = self._chat_context[-max_messages:]
|
||
|
||
def _clear_chat_context(self) -> None:
|
||
"""清空对话上下文"""
|
||
self._chat_context = []
|
||
|
||
def _handle_guidance(self, user_input: str, intent_result: IntentResult) -> None:
|
||
"""处理操作指导任务(无法通过本地代码完成的任务)"""
|
||
self.chat_view.add_message(
|
||
f"识别为操作指导 (原因: {intent_result.reason})\n该任务无法通过本地代码完成,将为您提供操作指导。",
|
||
'system'
|
||
)
|
||
|
||
# 添加用户消息到上下文
|
||
self._chat_context.append({"role": "user", "content": user_input})
|
||
|
||
# 开始流式消息
|
||
self.chat_view.start_stream_message('assistant')
|
||
|
||
# 在后台线程调用 LLM(流式)
|
||
def do_guidance_stream():
|
||
client = get_client()
|
||
model = os.getenv("CHAT_MODEL_NAME") or os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
# 获取环境信息(指导场景)
|
||
env_info = self._get_system_environment_info(scenario='guidance')
|
||
|
||
# 构建专门的操作指导 Prompt
|
||
system_prompt = f"""你是一个操作指导助手。用户询问的是一个无法通过本地Python代码完成的任务(如软件设置、系统配置、GUI操作等)。
|
||
|
||
## 用户运行环境
|
||
{env_info}
|
||
|
||
## 回答要求
|
||
请提供清晰、详细的操作步骤指导:
|
||
1. 使用编号列表,步骤清晰
|
||
2. 如果有多种方法,列出最常用的1-2种
|
||
3. **重要**: 根据用户的操作系统({platform.system()})给出针对性的操作指导
|
||
4. 可以适当配合说明截图位置或界面元素名称
|
||
5. 如果操作有风险,给出提醒
|
||
6. 如果涉及文件路径,使用适合用户系统的路径格式
|
||
|
||
用中文回答。"""
|
||
|
||
# 构建带上下文的消息列表
|
||
messages = [{"role": "system", "content": system_prompt}]
|
||
messages.extend(self._chat_context)
|
||
|
||
full_response = []
|
||
for chunk in client.chat_stream(
|
||
messages=messages,
|
||
model=model,
|
||
temperature=0.7,
|
||
max_tokens=2048,
|
||
timeout=300
|
||
):
|
||
full_response.append(chunk)
|
||
self.result_queue.put((self._on_chat_chunk, (chunk,)))
|
||
|
||
return ''.join(full_response)
|
||
|
||
self._run_in_thread(
|
||
do_guidance_stream,
|
||
self._on_chat_complete
|
||
)
|
||
|
||
def _handle_execution(self, user_input: str, intent_result: IntentResult):
|
||
"""处理执行任务"""
|
||
# 记录总任务数
|
||
self._metrics['total_tasks'] += 1
|
||
|
||
self.chat_view.add_message(
|
||
f"识别为执行任务 (置信度: {intent_result.confidence:.0%})\n原因: {intent_result.reason}",
|
||
'system'
|
||
)
|
||
|
||
# 保存用户输入和意图结果
|
||
self.current_task = {
|
||
'user_input': user_input,
|
||
'intent_result': intent_result
|
||
}
|
||
|
||
# 先查找是否有相似的成功任务(使用增强匹配)
|
||
result = self.history.find_similar_success(user_input, return_details=True)
|
||
if result:
|
||
similar_record, similarity_score, differences = result
|
||
|
||
# 统计关键差异
|
||
critical_diffs = [d for d in differences if d.importance == 'critical']
|
||
|
||
# 记录复用建议被提供
|
||
from history.reuse_metrics import get_reuse_metrics
|
||
metrics = get_reuse_metrics(self.workspace)
|
||
metrics.record_reuse_offered(
|
||
original_task_id=similar_record.task_id,
|
||
similarity_score=similarity_score,
|
||
differences_count=len(differences),
|
||
critical_differences=len(critical_diffs)
|
||
)
|
||
|
||
# 显示增强的复用确认对话框
|
||
from ui.reuse_confirm_dialog import show_reuse_confirm_dialog
|
||
|
||
def on_reuse_confirm():
|
||
# 用户接受复用
|
||
metrics.record_reuse_accepted(
|
||
original_task_id=similar_record.task_id,
|
||
similarity_score=similarity_score,
|
||
differences_count=len(differences),
|
||
critical_differences=len(critical_diffs)
|
||
)
|
||
|
||
# 复用代码 - 需要重新进行安全检查
|
||
self.current_task['execution_plan'] = similar_record.execution_plan
|
||
self.current_task['code'] = similar_record.code
|
||
self.current_task['task_summary'] = similar_record.task_summary
|
||
self.current_task['is_reuse'] = True
|
||
self.current_task['reuse_original_task_id'] = similar_record.task_id
|
||
|
||
self.chat_view.add_message(
|
||
f"复用历史成功代码 (相似度: {similarity_score:.0%}),正在进行安全复检...",
|
||
'system'
|
||
)
|
||
|
||
# 强制进行安全检查
|
||
self._perform_safety_check(self.current_task['code'])
|
||
|
||
def on_reuse_reject():
|
||
# 用户拒绝复用
|
||
metrics.record_reuse_rejected(
|
||
original_task_id=similar_record.task_id,
|
||
similarity_score=similarity_score,
|
||
differences_count=len(differences),
|
||
critical_differences=len(critical_diffs)
|
||
)
|
||
|
||
self.chat_view.add_message("将生成新代码", 'system')
|
||
self.chat_view.show_loading("正在分析需求完整性")
|
||
|
||
# 继续正常流程
|
||
self._run_in_thread(
|
||
self._check_requirement_completeness,
|
||
self._on_requirement_checked,
|
||
user_input
|
||
)
|
||
|
||
# 显示对话框
|
||
show_reuse_confirm_dialog(
|
||
parent=self.root,
|
||
task_summary=similar_record.task_summary or similar_record.user_input[:50],
|
||
timestamp=similar_record.timestamp,
|
||
similarity_score=similarity_score,
|
||
differences=differences,
|
||
on_confirm=on_reuse_confirm,
|
||
on_reject=on_reuse_reject
|
||
)
|
||
return
|
||
|
||
self.chat_view.show_loading("正在分析需求完整性")
|
||
|
||
# 检查需求是否完整
|
||
self._run_in_thread(
|
||
self._check_requirement_completeness,
|
||
self._on_requirement_checked,
|
||
user_input
|
||
)
|
||
|
||
def _on_plan_generated(self, plan: Optional[str], error: Optional[Exception]):
|
||
"""执行计划生成完成回调"""
|
||
if error:
|
||
self.chat_view.hide_loading()
|
||
self.chat_view.add_message(f"生成执行计划失败: {str(error)}", 'error')
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
return
|
||
|
||
self.current_task['execution_plan'] = plan
|
||
self.chat_view.update_loading_text("正在生成执行代码")
|
||
|
||
# 在后台线程生成代码
|
||
self._run_in_thread(
|
||
self._generate_code,
|
||
self._on_code_generated,
|
||
self.current_task.get('structured_requirement') or self.current_task['user_input'],
|
||
plan
|
||
)
|
||
|
||
def _check_requirement_completeness(self, user_input: str) -> Dict[str, Any]:
|
||
"""检查需求是否完整"""
|
||
import json
|
||
|
||
client = get_client()
|
||
model = os.getenv("CHAT_MODEL_NAME") or os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
response = client.chat(
|
||
messages=[
|
||
{"role": "system", "content": REQUIREMENT_CHECK_SYSTEM},
|
||
{"role": "user", "content": REQUIREMENT_CHECK_USER.format(user_input=user_input)}
|
||
],
|
||
model=model,
|
||
temperature=0.3,
|
||
max_tokens=500,
|
||
timeout=60
|
||
)
|
||
|
||
# 解析 JSON 响应
|
||
try:
|
||
# 尝试提取 JSON
|
||
json_match = response
|
||
if '```' in response:
|
||
import re
|
||
match = re.search(r'```(?:json)?\s*(.*?)\s*```', response, re.DOTALL)
|
||
if match:
|
||
json_match = match.group(1)
|
||
|
||
result = json.loads(json_match)
|
||
return result
|
||
except json.JSONDecodeError:
|
||
# 解析失败,默认认为需求完整
|
||
return {
|
||
"is_complete": True,
|
||
"confidence": 0.5,
|
||
"reason": "无法解析完整性检查结果",
|
||
"suggested_defaults": {}
|
||
}
|
||
|
||
def _on_requirement_checked(self, result: Optional[Dict], error: Optional[Exception]):
|
||
"""需求完整性检查完成回调"""
|
||
# 分类异常
|
||
exception = classify_requirement_error(result, error)
|
||
|
||
self.chat_view.hide_loading()
|
||
|
||
# 根据异常严重程度决定处理策略
|
||
if isinstance(exception, CriticalInfoMissingException):
|
||
# 关键信息缺失 - 强制澄清
|
||
self._metrics['clarification_triggered'] += 1
|
||
self.chat_view.add_message(
|
||
f"❌ 关键信息缺失,无法继续执行\n"
|
||
f"原因: {str(exception)}\n"
|
||
f"缺失字段: {', '.join(exception.missing_fields)}\n"
|
||
f"正在启动需求澄清流程...",
|
||
'error'
|
||
)
|
||
self._start_clarification()
|
||
|
||
elif isinstance(exception, AmbiguousRequirementException):
|
||
# 需求歧义 - 强制澄清
|
||
self._metrics['clarification_triggered'] += 1
|
||
self.chat_view.add_message(
|
||
f"⚠️ 需求存在歧义\n"
|
||
f"原因: {str(exception)}\n"
|
||
f"模糊部分: {', '.join(exception.ambiguous_parts)}\n"
|
||
f"正在启动需求澄清流程...",
|
||
'warning'
|
||
)
|
||
self._start_clarification()
|
||
|
||
elif isinstance(exception, LowConfidenceException):
|
||
# 低置信度 - 建议澄清但允许用户选择
|
||
self._metrics['clarification_triggered'] += 1
|
||
self.chat_view.add_message(
|
||
f"⚠️ 需求置信度较低 ({exception.confidence:.0%})\n"
|
||
f"原因: {str(exception)}\n"
|
||
f"建议进行需求澄清以提高准确性",
|
||
'warning'
|
||
)
|
||
# 提供选择:澄清或继续
|
||
self._show_low_confidence_options(result)
|
||
|
||
elif isinstance(exception, CheckerFailureException):
|
||
# 检查器失败 - 降级处理,但记录警告
|
||
self._metrics['direct_execution'] += 1
|
||
self.chat_view.add_message(
|
||
f"⚠️ 需求完整性检查器异常: {str(exception)}\n"
|
||
f"将尝试直接生成代码,但可能存在理解偏差",
|
||
'warning'
|
||
)
|
||
# 保存建议的默认值(如果有)
|
||
if result:
|
||
self.current_task['suggested_defaults'] = result.get('suggested_defaults', {})
|
||
self._continue_to_code_generation()
|
||
|
||
else:
|
||
# 需求完整 - 直接继续
|
||
self._metrics['direct_execution'] += 1
|
||
# 保存建议的默认值
|
||
if result:
|
||
self.current_task['suggested_defaults'] = result.get('suggested_defaults', {})
|
||
self._continue_to_code_generation()
|
||
|
||
def _show_low_confidence_options(self, result: Optional[Dict]):
|
||
"""显示低置信度时的选项"""
|
||
from tkinter import messagebox
|
||
|
||
choice = messagebox.askyesno(
|
||
"需求澄清",
|
||
"检测到需求描述可能不够清晰,建议进行澄清以提高准确性。\n\n"
|
||
"是否启动需求澄清流程?\n\n"
|
||
"选择「是」:通过问答澄清需求细节\n"
|
||
"选择「否」:使用默认参数直接生成代码",
|
||
icon='warning'
|
||
)
|
||
|
||
if choice:
|
||
# 用户选择澄清
|
||
self._start_clarification()
|
||
else:
|
||
# 用户选择继续
|
||
self._metrics['direct_execution'] += 1
|
||
if result:
|
||
self.current_task['suggested_defaults'] = result.get('suggested_defaults', {})
|
||
self._continue_to_code_generation()
|
||
|
||
def _continue_to_code_generation(self):
|
||
"""继续代码生成流程"""
|
||
self.chat_view.show_loading("正在生成任务摘要")
|
||
|
||
# 在后台线程生成任务摘要
|
||
self._run_in_thread(
|
||
self._generate_task_summary,
|
||
self._on_summary_generated,
|
||
self.current_task.get('structured_requirement') or self.current_task['user_input']
|
||
)
|
||
|
||
def _start_clarification(self):
|
||
"""启动需求澄清流程"""
|
||
# 初始化澄清状态
|
||
self._clarify_state = {
|
||
'original_input': self.current_task['user_input'],
|
||
'collected_info': {},
|
||
'history': [],
|
||
'current_question': None
|
||
}
|
||
|
||
# 重置已显示的历史计数
|
||
self._displayed_history_count = 0
|
||
|
||
self.chat_view.show_loading("正在生成澄清问题")
|
||
|
||
# 获取第一个澄清问题
|
||
self._run_in_thread(
|
||
self._get_clarify_question,
|
||
self._on_clarify_question_received,
|
||
self.current_task['user_input'],
|
||
{},
|
||
""
|
||
)
|
||
|
||
def _get_clarify_question(self, user_input: str, collected_info: Dict, user_answer: str) -> Dict[str, Any]:
|
||
"""获取澄清问题"""
|
||
import json
|
||
|
||
client = get_client()
|
||
model = os.getenv("CHAT_MODEL_NAME") or os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
# 格式化已收集的信息
|
||
collected_str = json.dumps(collected_info, ensure_ascii=False, indent=2) if collected_info else "{}"
|
||
|
||
response = client.chat(
|
||
messages=[
|
||
{"role": "system", "content": REQUIREMENT_CLARIFY_SYSTEM},
|
||
{"role": "user", "content": REQUIREMENT_CLARIFY_USER.format(
|
||
user_input=user_input,
|
||
collected_info=collected_str,
|
||
user_answer=user_answer or "(首次询问)"
|
||
)}
|
||
],
|
||
model=model,
|
||
temperature=0.3,
|
||
max_tokens=1000,
|
||
timeout=60
|
||
)
|
||
|
||
# 解析 JSON 响应
|
||
try:
|
||
json_match = response
|
||
if '```' in response:
|
||
import re
|
||
match = re.search(r'```(?:json)?\s*(.*?)\s*```', response, re.DOTALL)
|
||
if match:
|
||
json_match = match.group(1)
|
||
|
||
result = json.loads(json_match)
|
||
return result
|
||
except json.JSONDecodeError:
|
||
# 解析失败,认为不需要继续澄清
|
||
return {
|
||
"need_clarify": False,
|
||
"question": "",
|
||
"options": [],
|
||
"collected_info": collected_info,
|
||
"missing_info": []
|
||
}
|
||
|
||
def _on_clarify_question_received(self, result: Optional[Dict], error: Optional[Exception]):
|
||
"""收到澄清问题回调"""
|
||
# 隐藏澄清视图的加载状态(如果有)
|
||
if self.clarify_view:
|
||
self.clarify_view.hide_loading()
|
||
|
||
if error:
|
||
# 出错时切换回聊天界面
|
||
if self.clarify_view:
|
||
self.clarify_view.hide()
|
||
self.clarify_view = None
|
||
self.chat_view.get_frame().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||
self.chat_view.add_message(f"获取澄清问题失败: {str(error)}", 'error')
|
||
self._continue_to_code_generation()
|
||
return
|
||
|
||
need_clarify = result.get('need_clarify', False)
|
||
|
||
if not need_clarify:
|
||
# 不需要继续澄清,隐藏澄清视图,进行需求结构化
|
||
if self.clarify_view:
|
||
self.clarify_view.hide()
|
||
self.clarify_view = None
|
||
self._clarify_state['collected_info'].update(result.get('collected_info', {}))
|
||
self._structure_requirement()
|
||
else:
|
||
# 继续显示/更新澄清视图
|
||
self._show_clarify_view(result)
|
||
|
||
def _show_clarify_view(self, clarify_data: Dict):
|
||
"""显示或更新需求澄清视图"""
|
||
# 如果澄清视图不存在,创建新的
|
||
if not self.clarify_view:
|
||
# 隐藏聊天视图
|
||
self.chat_view.get_frame().pack_forget()
|
||
|
||
# 创建澄清视图
|
||
self.clarify_view = ClarifyView(
|
||
self.main_container,
|
||
on_submit=self._on_clarify_submit,
|
||
on_cancel=self._on_clarify_cancel
|
||
)
|
||
self.clarify_view.show()
|
||
|
||
# 添加上一轮的历史记录(如果有新的)
|
||
history = self._clarify_state.get('history', [])
|
||
displayed_count = getattr(self, '_displayed_history_count', 0)
|
||
|
||
for item in history[displayed_count:]:
|
||
self.clarify_view.add_history_item(item['question'], item['answer'])
|
||
|
||
self._displayed_history_count = len(history)
|
||
|
||
# 设置新问题和选项
|
||
question = clarify_data.get('question', '请提供更多信息')
|
||
options = clarify_data.get('options', [])
|
||
|
||
self.clarify_view.set_question(question, options)
|
||
|
||
# 更新已收集信息提示
|
||
collected = self._clarify_state.get('collected_info', {})
|
||
missing = clarify_data.get('missing_info', [])
|
||
self.clarify_view.update_info_label(len(collected), len(collected) + len(missing))
|
||
|
||
# 保存当前问题
|
||
self._clarify_state['current_question'] = question
|
||
self._clarify_state['current_options'] = options
|
||
|
||
def _on_clarify_submit(self, answers: Dict[str, Any]):
|
||
"""澄清问题提交回调"""
|
||
# 格式化答案为字符串
|
||
answer_parts = []
|
||
for key, value in answers.items():
|
||
if isinstance(value, list):
|
||
answer_parts.append(f"{key}: {', '.join(value)}")
|
||
else:
|
||
answer_parts.append(f"{key}: {value}")
|
||
answer_str = "; ".join(answer_parts)
|
||
|
||
# 保存到历史
|
||
self._clarify_state['history'].append({
|
||
'question': self._clarify_state.get('current_question', ''),
|
||
'answer': answer_str
|
||
})
|
||
|
||
# 更新已收集的信息
|
||
self._clarify_state['collected_info'].update(answers)
|
||
|
||
# 在澄清视图中显示加载状态(不切换回聊天界面)
|
||
if self.clarify_view:
|
||
self.clarify_view.show_loading("正在分析您的回答...")
|
||
|
||
# 继续获取下一个问题
|
||
self._run_in_thread(
|
||
self._get_clarify_question,
|
||
self._on_clarify_question_received,
|
||
self._clarify_state['original_input'],
|
||
self._clarify_state['collected_info'],
|
||
answer_str
|
||
)
|
||
|
||
def _on_clarify_cancel(self):
|
||
"""取消澄清"""
|
||
if self.clarify_view:
|
||
self.clarify_view.hide()
|
||
self.clarify_view = None
|
||
|
||
self._clarify_state = None
|
||
self._displayed_history_count = 0
|
||
self.current_task = None
|
||
|
||
self.chat_view.get_frame().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||
self.chat_view.set_input_enabled(True)
|
||
self.chat_view.add_message("已取消需求澄清", 'system')
|
||
|
||
def _structure_requirement(self):
|
||
"""将澄清后的需求结构化"""
|
||
self.chat_view.get_frame().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||
self.chat_view.show_loading("正在整理需求")
|
||
|
||
self._run_in_thread(
|
||
self._do_structure_requirement,
|
||
self._on_requirement_structured
|
||
)
|
||
|
||
def _do_structure_requirement(self) -> str:
|
||
"""执行需求结构化"""
|
||
import json
|
||
|
||
client = get_client()
|
||
model = os.getenv("CHAT_MODEL_NAME") or os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
collected_str = json.dumps(
|
||
self._clarify_state['collected_info'],
|
||
ensure_ascii=False,
|
||
indent=2
|
||
)
|
||
|
||
response = client.chat_stream_collect(
|
||
messages=[
|
||
{"role": "system", "content": REQUIREMENT_STRUCTURE_SYSTEM},
|
||
{"role": "user", "content": REQUIREMENT_STRUCTURE_USER.format(
|
||
user_input=self._clarify_state['original_input'],
|
||
collected_info=collected_str
|
||
)}
|
||
],
|
||
model=model,
|
||
temperature=0.3,
|
||
max_tokens=1500,
|
||
timeout=120
|
||
)
|
||
|
||
return response
|
||
|
||
def _on_requirement_structured(self, result: Optional[str], error: Optional[Exception]):
|
||
"""需求结构化完成回调"""
|
||
if error:
|
||
self.chat_view.hide_loading()
|
||
self.chat_view.add_message(f"需求整理失败: {str(error)}", 'error')
|
||
# 使用原始输入继续
|
||
self._continue_to_code_generation()
|
||
return
|
||
|
||
# 保存结构化的需求
|
||
self.current_task['structured_requirement'] = result
|
||
self.current_task['collected_info'] = self._clarify_state['collected_info']
|
||
|
||
# 清理澄清状态
|
||
self._clarify_state = None
|
||
|
||
self.chat_view.hide_loading()
|
||
self.chat_view.add_message("需求已明确,开始生成代码", 'system')
|
||
|
||
# 继续代码生成流程
|
||
self._continue_to_code_generation()
|
||
|
||
def _generate_task_summary(self, user_input: str) -> str:
|
||
"""生成任务摘要(使用小模型)"""
|
||
client = get_client()
|
||
# 使用意图识别模型(小模型)生成摘要
|
||
model = os.getenv("INTENT_MODEL_NAME") or os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
response = client.chat(
|
||
messages=[
|
||
{"role": "system", "content": TASK_SUMMARY_SYSTEM},
|
||
{"role": "user", "content": TASK_SUMMARY_USER.format(user_input=user_input)}
|
||
],
|
||
model=model,
|
||
temperature=0.3,
|
||
max_tokens=50,
|
||
timeout=30
|
||
)
|
||
|
||
# 清理响应(去除引号、换行等)
|
||
summary = response.strip().strip('"\'').strip()
|
||
# 限制长度
|
||
if len(summary) > 20:
|
||
summary = summary[:20]
|
||
|
||
return summary
|
||
|
||
def _on_summary_generated(self, summary: Optional[str], error: Optional[Exception]):
|
||
"""任务摘要生成完成回调"""
|
||
if error:
|
||
# 摘要生成失败不影响主流程,使用默认值
|
||
summary = self.current_task['user_input'][:15] + "..."
|
||
|
||
self.current_task['task_summary'] = summary
|
||
self.chat_view.update_loading_text("正在生成执行计划")
|
||
|
||
# 继续生成执行计划
|
||
self._run_in_thread(
|
||
self._generate_execution_plan,
|
||
self._on_plan_generated,
|
||
self.current_task['user_input']
|
||
)
|
||
|
||
def _on_code_generated(self, result: tuple, error: Optional[Exception]):
|
||
"""代码生成完成回调"""
|
||
if error:
|
||
self.chat_view.hide_loading()
|
||
self.chat_view.add_message(f"生成代码失败: {str(error)}", 'error')
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
return
|
||
|
||
# result 可能是 (code, extract_error) 元组
|
||
if isinstance(result, tuple):
|
||
code, extract_error = result
|
||
if extract_error:
|
||
self.chat_view.hide_loading()
|
||
self.chat_view.add_message(f"代码提取失败: {str(extract_error)}", 'error')
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
return
|
||
else:
|
||
code = result
|
||
|
||
self.current_task['code'] = code
|
||
self.chat_view.update_loading_text("正在进行安全检查")
|
||
|
||
# 统一调用安全检查流程
|
||
self._perform_safety_check(code)
|
||
|
||
def _on_safety_reviewed(self, review_result, error: Optional[Exception]):
|
||
"""安全审查完成回调"""
|
||
self.chat_view.hide_loading()
|
||
|
||
if error:
|
||
self.chat_view.add_message(f"安全审查失败: {str(error)}", 'error')
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
return
|
||
|
||
if not review_result.passed:
|
||
self.chat_view.add_message(
|
||
f"安全审查未通过: {review_result.reason}",
|
||
'error'
|
||
)
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
return
|
||
|
||
# 安全检查通过,检查工作区是否有内容
|
||
has_content, file_count, size_str = self.runner.check_workspace_content()
|
||
|
||
if has_content:
|
||
# 有内容,显示确认对话框
|
||
self._show_clear_confirm_dialog(file_count, size_str)
|
||
else:
|
||
# 无内容,直接进入任务引导
|
||
self.chat_view.add_message("安全检查通过,请确认执行", 'system')
|
||
self._show_task_guide()
|
||
|
||
def _show_clear_confirm_dialog(self, file_count: int, size_str: str):
|
||
"""显示清理确认对话框"""
|
||
# 检查是否有最近的备份
|
||
latest_backup = self.runner.backup_manager.get_latest_backup()
|
||
has_recent_backup = latest_backup is not None
|
||
|
||
def on_confirm(create_backup: bool):
|
||
"""用户确认清空"""
|
||
# 清空工作区(根据用户选择决定是否备份)
|
||
backup_id = self.runner.clear_workspace(
|
||
clear_input=True,
|
||
clear_output=True,
|
||
create_backup=create_backup
|
||
)
|
||
|
||
if backup_id:
|
||
self.chat_view.add_message(
|
||
f"已备份工作区内容(备份 ID: {backup_id}),安全检查通过,请确认执行",
|
||
'system'
|
||
)
|
||
else:
|
||
self.chat_view.add_message("安全检查通过,请确认执行", 'system')
|
||
|
||
# 显示任务引导视图
|
||
self._show_task_guide()
|
||
|
||
def on_cancel():
|
||
"""用户取消"""
|
||
self.chat_view.add_message("已取消执行", 'system')
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
|
||
# 显示对话框
|
||
show_clear_confirm_dialog(
|
||
parent=self.root,
|
||
file_count=file_count,
|
||
total_size=size_str,
|
||
has_recent_backup=has_recent_backup,
|
||
on_confirm=on_confirm,
|
||
on_cancel=on_cancel
|
||
)
|
||
|
||
def _generate_execution_plan(self, user_input: str) -> str:
|
||
"""生成执行计划(使用流式传输)"""
|
||
client = get_client()
|
||
model = os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
# 使用流式传输,避免超时
|
||
response = client.chat_stream_collect(
|
||
messages=[
|
||
{"role": "system", "content": EXECUTION_PLAN_SYSTEM},
|
||
{"role": "user", "content": EXECUTION_PLAN_USER.format(user_input=user_input)}
|
||
],
|
||
model=model,
|
||
temperature=0.3,
|
||
max_tokens=1024,
|
||
timeout=300 # 5分钟超时
|
||
)
|
||
|
||
return response
|
||
|
||
def _generate_code(self, user_input: str, execution_plan: str) -> tuple:
|
||
"""生成执行代码(使用流式传输)"""
|
||
client = get_client()
|
||
model = os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
# 使用流式传输,避免超时
|
||
response = client.chat_stream_collect(
|
||
messages=[
|
||
{"role": "system", "content": CODE_GENERATION_SYSTEM},
|
||
{"role": "user", "content": CODE_GENERATION_USER.format(
|
||
user_input=user_input,
|
||
execution_plan=execution_plan
|
||
)}
|
||
],
|
||
model=model,
|
||
temperature=0.2,
|
||
max_tokens=4096, # 代码可能较长
|
||
timeout=300 # 5分钟超时
|
||
)
|
||
|
||
# 提取代码块,捕获可能的异常
|
||
try:
|
||
code = self._extract_code(response)
|
||
return (code, None)
|
||
except ValueError as e:
|
||
return (None, e)
|
||
|
||
def _extract_code(self, response: str) -> str:
|
||
"""从 LLM 响应中提取代码"""
|
||
import re
|
||
|
||
# 尝试提取 ```python ... ``` 代码块
|
||
pattern = r'```python\s*(.*?)\s*```'
|
||
matches = re.findall(pattern, response, re.DOTALL)
|
||
|
||
if matches:
|
||
return matches[0]
|
||
|
||
# 尝试提取 ``` ... ``` 代码块
|
||
pattern = r'```\s*(.*?)\s*```'
|
||
matches = re.findall(pattern, response, re.DOTALL)
|
||
|
||
if matches:
|
||
return matches[0]
|
||
|
||
# 如果没有代码块,检查是否看起来像 Python 代码
|
||
# 简单检查:是否包含 def 或 import 语句
|
||
if 'import ' in response or 'def ' in response:
|
||
return response
|
||
|
||
# 无法提取代码,抛出异常
|
||
raise ValueError(
|
||
"无法从 LLM 响应中提取代码块。\n"
|
||
f"响应内容预览: {response[:200]}..."
|
||
)
|
||
|
||
def _show_task_guide(self):
|
||
"""显示任务引导视图"""
|
||
if not self.current_task:
|
||
return
|
||
|
||
# 隐藏聊天视图
|
||
self.chat_view.get_frame().pack_forget()
|
||
|
||
# 创建任务引导视图
|
||
self.task_view = TaskGuideView(
|
||
self.main_container,
|
||
on_execute=self._on_execute_task,
|
||
on_cancel=self._on_cancel_task,
|
||
workspace_path=self.workspace
|
||
)
|
||
|
||
# 设置内容
|
||
self.task_view.set_intent_result(
|
||
self.current_task['intent_result'].reason,
|
||
self.current_task['intent_result'].confidence
|
||
)
|
||
self.task_view.set_execution_plan(self.current_task['execution_plan'])
|
||
self.task_view.set_code(self.current_task['code'])
|
||
|
||
# 显示
|
||
self.task_view.show()
|
||
|
||
def _on_execute_task(self):
|
||
"""执行任务"""
|
||
if not self.current_task:
|
||
return
|
||
|
||
self.task_view.set_buttons_enabled(False)
|
||
|
||
# 在后台线程执行
|
||
def do_execute():
|
||
return self.runner.execute(
|
||
self.current_task['code'],
|
||
user_input=self.current_task.get('user_input', ''),
|
||
is_retry=self.current_task.get('is_retry', False)
|
||
)
|
||
|
||
self._run_in_thread(
|
||
do_execute,
|
||
self._on_execution_complete
|
||
)
|
||
|
||
def _on_execution_complete(self, result: Optional[ExecutionResult], error: Optional[Exception]):
|
||
"""执行完成回调"""
|
||
if error:
|
||
messagebox.showerror("执行错误", f"执行失败: {str(error)}")
|
||
# 记录失败指标
|
||
if not result or not result.success:
|
||
self._metrics['ambiguity_failures'] += 1
|
||
else:
|
||
# 保存历史记录
|
||
if self.current_task:
|
||
self.history.add_record(
|
||
task_id=result.task_id,
|
||
user_input=self.current_task['user_input'],
|
||
intent_label=self.current_task['intent_result'].label,
|
||
intent_confidence=self.current_task['intent_result'].confidence,
|
||
execution_plan=self.current_task['execution_plan'],
|
||
code=self.current_task['code'],
|
||
success=result.success,
|
||
duration_ms=result.duration_ms,
|
||
stdout=result.stdout,
|
||
stderr=result.stderr,
|
||
log_path=result.log_path,
|
||
task_summary=self.current_task.get('task_summary', '')
|
||
)
|
||
|
||
# 记录失败指标
|
||
if not result.success:
|
||
self._metrics['ambiguity_failures'] += 1
|
||
|
||
# 如果是复用任务,记录执行结果
|
||
if self.current_task.get('is_reuse') and self.current_task.get('reuse_original_task_id'):
|
||
from history.reuse_metrics import get_reuse_metrics
|
||
metrics = get_reuse_metrics(self.workspace)
|
||
metrics.record_reuse_execution(
|
||
original_task_id=self.current_task['reuse_original_task_id'],
|
||
new_task_id=result.task_id,
|
||
success=result.success
|
||
)
|
||
|
||
self._show_execution_result(result)
|
||
# 刷新输出文件列表
|
||
if self.task_view:
|
||
self.task_view.refresh_output()
|
||
|
||
self._back_to_chat()
|
||
|
||
def _show_execution_result(self, result: ExecutionResult):
|
||
"""显示执行结果(支持三态)"""
|
||
status_display = result.get_status_display()
|
||
|
||
# 构建统计信息
|
||
stats_info = ""
|
||
if result.total_count > 0:
|
||
stats_info = f"""
|
||
统计信息:
|
||
总数: {result.total_count} 个
|
||
成功: {result.success_count} 个
|
||
失败: {result.failed_count} 个
|
||
成功率: {result.success_rate:.1%}
|
||
"""
|
||
|
||
message = f"""{status_display}
|
||
|
||
任务 ID: {result.task_id}
|
||
耗时: {result.duration_ms} ms
|
||
{stats_info}
|
||
输出:
|
||
{result.stdout if result.stdout else '(无输出)'}
|
||
|
||
{f'错误信息: {result.stderr}' if result.stderr else ''}
|
||
"""
|
||
|
||
if result.status == 'success':
|
||
# 全部成功:询问是否打开输出目录
|
||
open_output = messagebox.askyesno(
|
||
"执行结果",
|
||
message + "\n\n是否打开输出文件夹?"
|
||
)
|
||
if open_output:
|
||
os.startfile(str(self.workspace / "output"))
|
||
|
||
elif result.status == 'partial':
|
||
# 部分成功:提供三个选项
|
||
from tkinter import messagebox as mb
|
||
choice = mb.askquestion(
|
||
"执行结果",
|
||
message + f"\n\n部分文件处理失败({result.failed_count}/{result.total_count})\n\n" +
|
||
"是否查看输出文件夹?\n" +
|
||
"(选择「否」可查看日志了解失败原因)",
|
||
icon='warning'
|
||
)
|
||
if choice == 'yes':
|
||
os.startfile(str(self.workspace / "output"))
|
||
else:
|
||
# 询问是否查看日志
|
||
open_log = mb.askyesno(
|
||
"查看日志",
|
||
"是否打开日志文件查看失败详情?"
|
||
)
|
||
if open_log and result.log_path:
|
||
os.startfile(result.log_path)
|
||
else:
|
||
# 全部失败:询问是否打开日志
|
||
open_log = messagebox.askyesno(
|
||
"执行结果",
|
||
message + "\n\n是否打开日志文件查看详情?"
|
||
)
|
||
if open_log and result.log_path:
|
||
os.startfile(result.log_path)
|
||
|
||
def _on_cancel_task(self):
|
||
"""取消任务"""
|
||
self.current_task = None
|
||
self._back_to_chat()
|
||
|
||
def _back_to_chat(self):
|
||
"""返回聊天视图"""
|
||
if self.task_view:
|
||
self.task_view.hide()
|
||
self.task_view = None
|
||
|
||
self.chat_view.get_frame().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
|
||
def _show_history(self):
|
||
"""显示历史记录视图"""
|
||
# 隐藏聊天视图
|
||
self.chat_view.get_frame().pack_forget()
|
||
|
||
# 创建历史记录视图
|
||
self.history_view = HistoryView(
|
||
self.main_container,
|
||
self.history,
|
||
on_back=self._hide_history,
|
||
on_reuse_code=self._on_reuse_code,
|
||
on_retry_task=self._on_retry_task
|
||
)
|
||
self.history_view.show()
|
||
|
||
def _hide_history(self):
|
||
"""隐藏历史记录视图,返回聊天"""
|
||
if self.history_view:
|
||
self.history_view.hide()
|
||
self.history_view = None
|
||
|
||
self.chat_view.get_frame().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||
|
||
def _on_reuse_code(self, record):
|
||
"""复用历史记录中的代码"""
|
||
from history.manager import TaskRecord
|
||
|
||
# 隐藏历史视图
|
||
self._hide_history()
|
||
|
||
# 设置当前任务
|
||
self.current_task = {
|
||
'user_input': record.user_input,
|
||
'intent_result': IntentResult(
|
||
label=record.intent_label,
|
||
confidence=record.intent_confidence,
|
||
reason="复用历史任务"
|
||
),
|
||
'execution_plan': record.execution_plan,
|
||
'code': record.code,
|
||
'task_summary': record.task_summary,
|
||
'is_reuse': True
|
||
}
|
||
|
||
self.chat_view.add_message(f"复用历史任务: {record.task_summary or record.user_input[:30]}", 'system')
|
||
self.chat_view.add_message("已加载历史代码,正在进行安全复检...", 'system')
|
||
self.chat_view.show_loading("正在进行安全检查")
|
||
|
||
# 强制进行安全检查(不跳过)
|
||
self._perform_safety_check(self.current_task['code'])
|
||
|
||
def _on_retry_task(self, record):
|
||
"""重试失败的任务(AI 修复)"""
|
||
from history.manager import TaskRecord
|
||
|
||
# 隐藏历史视图
|
||
self._hide_history()
|
||
|
||
self.chat_view.add_message(f"重试任务: {record.task_summary or record.user_input[:30]}", 'system')
|
||
self.chat_view.show_loading("正在分析错误并修复代码")
|
||
self.chat_view.set_input_enabled(False)
|
||
|
||
# 保存任务信息
|
||
self.current_task = {
|
||
'user_input': record.user_input,
|
||
'intent_result': IntentResult(
|
||
label=record.intent_label,
|
||
confidence=record.intent_confidence,
|
||
reason="重试失败任务"
|
||
),
|
||
'execution_plan': record.execution_plan,
|
||
'original_code': record.code,
|
||
'original_stdout': record.stdout,
|
||
'original_stderr': record.stderr,
|
||
'task_summary': record.task_summary,
|
||
'is_retry': True
|
||
}
|
||
|
||
# 在后台线程修复代码
|
||
self._run_in_thread(
|
||
self._fix_code,
|
||
self._on_code_fixed,
|
||
record
|
||
)
|
||
|
||
def _fix_code(self, record) -> tuple:
|
||
"""修复失败的代码"""
|
||
client = get_client()
|
||
model = os.getenv("GENERATION_MODEL_NAME")
|
||
|
||
response = client.chat_stream_collect(
|
||
messages=[
|
||
{"role": "system", "content": CODE_FIX_SYSTEM},
|
||
{"role": "user", "content": CODE_FIX_USER.format(
|
||
user_input=record.user_input,
|
||
execution_plan=record.execution_plan,
|
||
code=record.code,
|
||
stdout=record.stdout or "(无输出)",
|
||
stderr=record.stderr or "(无错误信息)"
|
||
)}
|
||
],
|
||
model=model,
|
||
temperature=0.2,
|
||
max_tokens=4096,
|
||
timeout=300
|
||
)
|
||
|
||
try:
|
||
code = self._extract_code(response)
|
||
return (code, None)
|
||
except ValueError as e:
|
||
return (None, e)
|
||
|
||
def _on_code_fixed(self, result: tuple, error: Optional[Exception]):
|
||
"""代码修复完成回调"""
|
||
if error:
|
||
self.chat_view.hide_loading()
|
||
self.chat_view.add_message(f"代码修复失败: {str(error)}", 'error')
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
return
|
||
|
||
code, extract_error = result
|
||
if extract_error:
|
||
self.chat_view.hide_loading()
|
||
self.chat_view.add_message(f"代码提取失败: {str(extract_error)}", 'error')
|
||
self.chat_view.set_input_enabled(True)
|
||
self.current_task = None
|
||
return
|
||
|
||
self.current_task['code'] = code
|
||
self.chat_view.update_loading_text("正在进行安全检查")
|
||
|
||
# 统一调用安全检查流程
|
||
self._perform_safety_check(code)
|
||
|
||
def _show_settings(self):
|
||
"""显示设置视图"""
|
||
# 隐藏聊天视图
|
||
self.chat_view.get_frame().pack_forget()
|
||
|
||
# 创建设置视图
|
||
self.settings_view = SettingsView(
|
||
self.main_container,
|
||
env_path=self.project_root / ".env",
|
||
on_save=self._on_settings_saved,
|
||
on_back=self._hide_settings
|
||
)
|
||
self.settings_view.show()
|
||
|
||
def _hide_settings(self):
|
||
"""隐藏设置视图,返回聊天"""
|
||
if self.settings_view:
|
||
self.settings_view.hide()
|
||
self.settings_view = None
|
||
|
||
self.chat_view.get_frame().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||
|
||
def _show_privacy(self):
|
||
"""显示隐私设置视图"""
|
||
# 隐藏聊天视图
|
||
self.chat_view.get_frame().pack_forget()
|
||
|
||
# 创建隐私设置视图
|
||
from ui.privacy_settings_view import PrivacySettingsView
|
||
self.privacy_view = PrivacySettingsView(
|
||
self.main_container,
|
||
workspace=self.workspace,
|
||
on_back=self._hide_privacy
|
||
)
|
||
self.privacy_view.show()
|
||
|
||
def _hide_privacy(self):
|
||
"""隐藏隐私设置视图,返回聊天"""
|
||
if self.privacy_view:
|
||
self.privacy_view.hide()
|
||
self.privacy_view = None
|
||
|
||
self.chat_view.get_frame().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||
|
||
def _on_settings_saved(self, connection_test_success: bool):
|
||
"""设置保存后的回调"""
|
||
# 配置已通过 set_key 保存并更新了环境变量
|
||
# 客户端已在 settings_view 中重置并测试连接
|
||
|
||
# 更新 API 配置状态
|
||
self._api_configured = connection_test_success
|
||
|
||
# 如果连接测试成功,在聊天视图中显示提示
|
||
if connection_test_success:
|
||
self.chat_view.add_message("✅ 配置已更新并生效,可以开始使用了", 'system')
|
||
|
||
def _perform_safety_check(self, code: str):
|
||
"""
|
||
统一的安全检查流程(硬规则 + LLM 审查)
|
||
所有代码(新生成/复用/修复)都必须经过此流程
|
||
"""
|
||
# 记录复用任务复检
|
||
from safety.security_metrics import get_metrics
|
||
metrics = get_metrics()
|
||
if self.current_task.get('is_reuse'):
|
||
metrics.add_reuse_recheck()
|
||
|
||
# 硬规则检查(同步,很快)
|
||
rule_result = check_code_safety(code)
|
||
if not rule_result.passed:
|
||
self.chat_view.hide_loading()
|
||
violations = "\n".join(f" • {v}" for v in rule_result.violations)
|
||
self.chat_view.add_message(
|
||
f"安全检查未通过,任务已取消:\n{violations}",
|
||
'error'
|
||
)
|
||
self.chat_view.set_input_enabled(True)
|
||
|
||
# 记录安全拦截指标
|
||
if self.current_task.get('is_reuse'):
|
||
metrics.add_reuse_block()
|
||
|
||
self.current_task = None
|
||
return
|
||
|
||
# 保存警告信息,传递给 LLM 审查
|
||
self.current_task['warnings'] = rule_result.warnings
|
||
|
||
# 在后台线程进行 LLM 安全审查
|
||
self._run_in_thread(
|
||
lambda: review_code_safety(
|
||
self.current_task['user_input'],
|
||
self.current_task['execution_plan'],
|
||
code,
|
||
rule_result.warnings # 传递警告给 LLM
|
||
),
|
||
self._on_safety_reviewed
|
||
)
|
||
|
||
def run(self):
|
||
"""运行应用"""
|
||
self.root.mainloop()
|
||
|