import asyncio import time import hashlib from typing import Dict, Optional from core.agent import create_langchain_agent from utils.logger import chat_logger class AgentManager: def __init__(self): self._local_agent_cache = {} # 仅缓存agent配置,不缓存实例 self._is_shutdown = False async def initialize(self): """异步初始化管理器""" self._is_shutdown = False chat_logger.info("🔧🔧🔧🔧 Agent管理器初始化完成") async def shutdown(self): """异步关闭管理器""" self._is_shutdown = True self._local_agent_cache.clear() chat_logger.info("🧹🧹🧹🧹 Agent管理器已关闭") def _get_agent_config_key( self, thread_id: str, username: str, backend_url: str, token: str ) -> str: """生成agent配置的缓存key""" key_data = f"{thread_id}:{username}:{backend_url}:{token}" return hashlib.md5(key_data.encode()).hexdigest() def _get_user_identifier(self, username: str, token: str) -> str: """生成用户标识符""" if not username or username == "default": username_part = "anonymous" else: username_part = username if token and len(token) >= 8: token_part = token[:8] else: token_part = "notoken" return f"{username_part}_{token_part}" async def get_agent_instance( self, thread_id: str, username: str, backend_url: str, token: str ): if self._is_shutdown: raise RuntimeError("Agent管理器已关闭") clean_username = username or "anonymous" clean_backend = backend_url or "" clean_token = token or "" user_id = self._get_user_identifier(clean_username, clean_token) config_key = self._get_agent_config_key( thread_id, clean_username, clean_backend, clean_token ) # 检查本地配置缓存 current_time = time.time() if config_key in self._local_agent_cache: agent_instance, timestamp = self._local_agent_cache[config_key] if current_time - timestamp <= 300: # 5分钟本地缓存 chat_logger.info(f"使用本地缓存的agent配置: 用户={user_id}") return agent_instance chat_logger.info(f"创建新的agent实例: 用户={user_id}") agent_instance = await self._create_agent_async( backend_url=clean_backend, token=clean_token, username=clean_username, thread_id=thread_id, ) # 缓存agent配置到本地 self._local_agent_cache[config_key] = (agent_instance, current_time) chat_logger.info(f"Agent配置已缓存: 用户={user_id}") return agent_instance async def _create_agent_async( self, backend_url: str, token: str, username: str, thread_id: str ): """创建agent实例""" def sync_create_agent(): return create_langchain_agent( backend_url=backend_url, token=token, username=username, thread_id=thread_id, ) loop = asyncio.get_event_loop() return await loop.run_in_executor(None, sync_create_agent) async def clear_user_agent( self, thread_id: str, username: str, backend_url: str, token: str ): """清除特定用户的agent配置缓存""" config_key = self._get_agent_config_key(thread_id, username, backend_url, token) # 清除本地缓存 if config_key in self._local_agent_cache: del self._local_agent_cache[config_key] user_id = self._get_user_identifier(username, token) chat_logger.info(f"已清除用户Agent配置缓存: {user_id}") async def get_cache_status(self): """获取缓存状态""" if self._is_shutdown: return {"status": "shutdown", "cache_size": 0} return { "local_config_cache_size": len(self._local_agent_cache), "status": "active", "message": "", } async def clear_cache(self): """清空本地配置缓存""" local_count = len(self._local_agent_cache) self._local_agent_cache.clear() chat_logger.info(f"清空本地配置缓存: {local_count}个配置") return local_count # 全局实例 agent_manager = AgentManager()