| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- import asyncio
- import time
- from typing import Dict, Any
- from langchain_core.messages import HumanMessage
- from utils.logger import chat_logger, log_chat_entry
- from core.agent_manager import agent_manager
- from core.chat_result_manager import chat_result_manager
- class AsyncChatService:
- """异步聊天服务 - 支持轮询方式的版本"""
- def __init__(self):
- self.agent_manager = agent_manager
- self._thread_pool = None
- self._processing_tasks = {} # 正在处理的任务缓存
- def _get_thread_pool(self):
- """获取或创建线程池"""
- if self._thread_pool is None:
- import concurrent.futures
- self._thread_pool = concurrent.futures.ThreadPoolExecutor(
- max_workers=20,
- thread_name_prefix="async_chat_worker",
- )
- return self._thread_pool
- async def submit_chat_task(self, request_data: Dict[str, Any]) -> str:
- """提交聊天任务(立即返回任务ID)"""
- username = request_data["username"]
- # 创建任务记录
- task_id = chat_result_manager.create_task(request_data)
- chat_logger.info(f"用户{username},已提交聊天任务: {task_id}")
- # 异步执行任务
- asyncio.create_task(self._process_chat_task(task_id, request_data))
- return task_id
- async def _process_chat_task(self, task_id: str, request_data: Dict[str, Any]):
- """异步处理聊天任务"""
- try:
- # 更新状态为处理中
- chat_result_manager.update_task_status(task_id, "processing")
- # 提取请求数据
- message = request_data["message"]
- thread_id = request_data["thread_id"]
- username = request_data["username"]
- backend_url = request_data["backend_url"]
- token = request_data["token"]
- user_id = username
- chat_logger.info(f"开始处理任务 - 任务ID={task_id}, 用户={user_id}")
- # 异步获取agent实例
- agent = await self.agent_manager.get_agent_instance(
- thread_id=thread_id,
- username=username,
- backend_url=backend_url,
- token=token,
- )
- # 在线程池中执行同步的Langchain操作
- result = await self._run_agent_in_threadpool(
- agent, message, thread_id, user_id
- )
- if not isinstance(result, dict) or "messages" not in result:
- raise ValueError(f"Agent返回格式异常: {type(result)}")
- # 处理结果
- response_data = self._process_agent_result(result, user_id, request_data)
- # 更新任务状态为完成
- chat_result_manager.update_task_status(task_id, "completed", response_data)
- chat_logger.info(f"任务处理完成 - 任务ID={task_id}")
- except Exception as e:
- error_msg = f"聊天处理失败: {str(e)}"
- chat_logger.error(f"{error_msg} - 任务ID={task_id}")
- # 更新任务状态为失败
- chat_result_manager.update_task_status(
- task_id, "failed", error_message=error_msg
- )
- async def _run_agent_in_threadpool(
- self, agent, message: str, thread_id: str, user_id: str
- ):
- """在线程池中执行Langchain Agent"""
- loop = asyncio.get_event_loop()
- thread_pool = self._get_thread_pool()
- # 准备输入
- inputs = {"messages": [HumanMessage(content=message)]}
- config = {"configurable": {"thread_id": thread_id}}
- chat_logger.info(f"在线程池中执行Agent - 用户={user_id}")
- try:
- # 在线程池中执行同步操作
- result = await loop.run_in_executor(
- thread_pool, lambda: agent.invoke(inputs, config)
- )
- return result
- except Exception as e:
- chat_logger.error(f"Agent执行失败 - 用户={user_id}: {str(e)}")
- raise
- def _process_agent_result(
- self, result: Dict[str, Any], user_id: str, request_data: Dict
- ) -> Dict[str, Any]:
- """处理Agent返回结果"""
- all_messages = result["messages"]
- processed_messages = []
- all_ai_messages = []
- all_tool_calls = []
- final_answer = ""
- for i, msg in enumerate(all_messages):
- msg_data = {
- "index": i,
- "type": getattr(msg, "type", "unknown"),
- "content": "",
- }
- # 获取内容
- if hasattr(msg, "content"):
- content = msg.content
- if isinstance(content, str):
- msg_data["content"] = content
- else:
- msg_data["content"] = str(content)
- # 获取工具调用
- if hasattr(msg, "tool_calls") and msg.tool_calls:
- msg_data["tool_calls"] = msg.tool_calls
- all_tool_calls.extend(msg.tool_calls)
- for tool_call in msg.tool_calls:
- tool_name = tool_call.get("name", "unknown")
- tool_args = tool_call.get("args", {})
- chat_logger.info(f"工具调用 - 用户={user_id}, 工具={tool_name}")
- if hasattr(msg, "tool_call_id"):
- msg_data["tool_call_id"] = msg.tool_call_id
- if hasattr(msg, "name"):
- msg_data["name"] = msg.name
- processed_messages.append(msg_data)
- # 收集AI消息
- if msg_data["type"] == "ai":
- all_ai_messages.append(msg_data)
- final_answer = msg_data["content"]
- # 构建响应
- response = {
- "final_answer": final_answer,
- "all_ai_messages": all_ai_messages,
- "all_messages": processed_messages,
- "tool_calls": all_tool_calls,
- "thread_id": request_data["thread_id"],
- "user_identifier": user_id,
- "backend_config": {
- "backend_url": request_data["backend_url"] or "未配置",
- "username": request_data["username"],
- "has_token": bool(request_data["token"]),
- },
- "success": True,
- }
- # 记录日志
- log_chat_entry(user_id, request_data["message"], response)
- return response
- async def get_task_result(self, task_id: str) -> Dict[str, Any]:
- """获取任务结果"""
- task_info = chat_result_manager.get_task(task_id)
- chat_logger.info(f"获取任务结果 - 任务ID={task_id}, 状态={task_info['status']}")
- if not task_info:
- return {
- "success": False,
- "error": f"任务不存在: {task_id}",
- "task_id": task_id,
- }
- return {
- "task_id": task_id,
- "status": task_info["status"],
- "response": task_info["response_data"],
- "error": task_info["error_message"],
- "created_at": task_info["created_at"],
- "updated_at": task_info["updated_at"],
- "success": task_info["status"] == "completed",
- }
- async def shutdown(self):
- """关闭服务"""
- if self._thread_pool:
- self._thread_pool.shutdown(wait=False)
- self._thread_pool = None
- chat_result_manager.close()
- chat_logger.info("异步聊天服务已关闭")
- # 全局实例
- async_chat_service = AsyncChatService()
|