import asyncio import time from typing import Dict, Any from langchain_core.messages import HumanMessage from utils.logger import chat_logger, log_chat_entry from core.agent_manager import agent_manager from core.chat_result_manager import chat_result_manager class AsyncChatService: """异步聊天服务 - 支持轮询方式的版本""" def __init__(self): self.agent_manager = agent_manager self._thread_pool = None self._processing_tasks = {} # 正在处理的任务缓存 def _get_thread_pool(self): """获取或创建线程池""" if self._thread_pool is None: import concurrent.futures self._thread_pool = concurrent.futures.ThreadPoolExecutor( max_workers=20, thread_name_prefix="async_chat_worker", ) return self._thread_pool async def submit_chat_task(self, request_data: Dict[str, Any]) -> str: """提交聊天任务(立即返回任务ID)""" username = request_data["username"] # 创建任务记录 task_id = chat_result_manager.create_task(request_data) chat_logger.info(f"用户{username},已提交聊天任务: {task_id}") # 异步执行任务 asyncio.create_task(self._process_chat_task(task_id, request_data)) return task_id async def _process_chat_task(self, task_id: str, request_data: Dict[str, Any]): """异步处理聊天任务""" try: # 更新状态为处理中 chat_result_manager.update_task_status(task_id, "processing") # 提取请求数据 message = request_data["message"] thread_id = request_data["thread_id"] username = request_data["username"] backend_url = request_data["backend_url"] token = request_data["token"] user_id = username chat_logger.info(f"开始处理任务 - 任务ID={task_id}, 用户={user_id}") # 异步获取agent实例 agent = await self.agent_manager.get_agent_instance( thread_id=thread_id, username=username, backend_url=backend_url, token=token, ) # 在线程池中执行同步的Langchain操作 result = await self._run_agent_in_threadpool( agent, message, thread_id, user_id ) if not isinstance(result, dict) or "messages" not in result: raise ValueError(f"Agent返回格式异常: {type(result)}") # 处理结果 response_data = self._process_agent_result(result, user_id, request_data) # 更新任务状态为完成 chat_result_manager.update_task_status(task_id, "completed", response_data) chat_logger.info(f"任务处理完成 - 任务ID={task_id}") except Exception as e: error_msg = f"聊天处理失败: {str(e)}" chat_logger.error(f"{error_msg} - 任务ID={task_id}") # 更新任务状态为失败 chat_result_manager.update_task_status( task_id, "failed", error_message=error_msg ) async def _run_agent_in_threadpool( self, agent, message: str, thread_id: str, user_id: str ): """在线程池中执行Langchain Agent""" loop = asyncio.get_event_loop() thread_pool = self._get_thread_pool() # 准备输入 inputs = {"messages": [HumanMessage(content=message)]} config = {"configurable": {"thread_id": thread_id}} chat_logger.info(f"在线程池中执行Agent - 用户={user_id}") try: # 在线程池中执行同步操作 result = await loop.run_in_executor( thread_pool, lambda: agent.invoke(inputs, config) ) return result except Exception as e: chat_logger.error(f"Agent执行失败 - 用户={user_id}: {str(e)}") raise def _process_agent_result( self, result: Dict[str, Any], user_id: str, request_data: Dict ) -> Dict[str, Any]: """处理Agent返回结果""" all_messages = result["messages"] processed_messages = [] all_ai_messages = [] all_tool_calls = [] final_answer = "" for i, msg in enumerate(all_messages): msg_data = { "index": i, "type": getattr(msg, "type", "unknown"), "content": "", } # 获取内容 if hasattr(msg, "content"): content = msg.content if isinstance(content, str): msg_data["content"] = content else: msg_data["content"] = str(content) # 获取工具调用 if hasattr(msg, "tool_calls") and msg.tool_calls: msg_data["tool_calls"] = msg.tool_calls all_tool_calls.extend(msg.tool_calls) for tool_call in msg.tool_calls: tool_name = tool_call.get("name", "unknown") tool_args = tool_call.get("args", {}) chat_logger.info(f"工具调用 - 用户={user_id}, 工具={tool_name}") if hasattr(msg, "tool_call_id"): msg_data["tool_call_id"] = msg.tool_call_id if hasattr(msg, "name"): msg_data["name"] = msg.name processed_messages.append(msg_data) # 收集AI消息 if msg_data["type"] == "ai": all_ai_messages.append(msg_data) final_answer = msg_data["content"] # 构建响应 response = { "final_answer": final_answer, "all_ai_messages": all_ai_messages, "all_messages": processed_messages, "tool_calls": all_tool_calls, "thread_id": request_data["thread_id"], "user_identifier": user_id, "backend_config": { "backend_url": request_data["backend_url"] or "未配置", "username": request_data["username"], "has_token": bool(request_data["token"]), }, "success": True, } # 记录日志 log_chat_entry(user_id, request_data["message"], response) return response async def get_task_result(self, task_id: str) -> Dict[str, Any]: """获取任务结果""" task_info = chat_result_manager.get_task(task_id) chat_logger.info(f"获取任务结果 - 任务ID={task_id}, 状态={task_info['status']}") if not task_info: return { "success": False, "error": f"任务不存在: {task_id}", "task_id": task_id, } return { "task_id": task_id, "status": task_info["status"], "response": task_info["response_data"], "error": task_info["error_message"], "created_at": task_info["created_at"], "updated_at": task_info["updated_at"], "success": task_info["status"] == "completed", } async def shutdown(self): """关闭服务""" if self._thread_pool: self._thread_pool.shutdown(wait=False) self._thread_pool = None chat_result_manager.close() chat_logger.info("异步聊天服务已关闭") # 全局实例 async_chat_service = AsyncChatService()