| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import asyncio
- import time
- import hashlib
- from typing import Dict, Optional
- from core.agent import create_langchain_agent
- from utils.logger import chat_logger
- class AgentManager:
- def __init__(self):
- self._local_agent_cache = {} # 仅缓存agent配置,不缓存实例
- self._is_shutdown = False
- async def initialize(self):
- """异步初始化管理器"""
- self._is_shutdown = False
- chat_logger.info("Agent管理器初始化完成")
- async def shutdown(self):
- """异步关闭管理器"""
- self._is_shutdown = True
- self._local_agent_cache.clear()
- chat_logger.info("Agent管理器已关闭")
- def _get_agent_config_key(
- self, thread_id: str, username: str, backend_url: str, token: str
- ) -> str:
- """生成agent配置的缓存key"""
- key_data = f"{thread_id}:{username}:{backend_url}:{token}"
- return hashlib.md5(key_data.encode()).hexdigest()
- def _get_user_identifier(self, username: str, token: str) -> str:
- """生成用户标识符"""
- if not username or username == "default":
- username_part = "anonymous"
- else:
- username_part = username
- if token and len(token) >= 8:
- token_part = token[:8]
- else:
- token_part = "notoken"
- return f"{username_part}_{token_part}"
- async def get_agent_instance(
- self, thread_id: str, username: str, backend_url: str, token: str
- ):
- if self._is_shutdown:
- raise RuntimeError("Agent管理器已关闭")
- clean_username = username or "anonymous"
- clean_backend = backend_url or ""
- clean_token = token or ""
- user_id = self._get_user_identifier(clean_username, clean_token)
- config_key = self._get_agent_config_key(
- thread_id, clean_username, clean_backend, clean_token
- )
- # 检查本地配置缓存
- current_time = time.time()
- if config_key in self._local_agent_cache:
- agent_instance, timestamp = self._local_agent_cache[config_key]
- if current_time - timestamp <= 300: # 5分钟本地缓存
- chat_logger.info(f"使用本地缓存的agent配置: 用户={user_id}")
- return agent_instance
- chat_logger.info(f"创建新的agent实例: 用户={user_id}")
- agent_instance = await self._create_agent_async(
- backend_url=clean_backend,
- token=clean_token,
- username=clean_username,
- thread_id=thread_id,
- )
- # 缓存agent配置到本地
- self._local_agent_cache[config_key] = (agent_instance, current_time)
- chat_logger.info(f"Agent配置已缓存: 用户={user_id}")
- return agent_instance
- async def _create_agent_async(
- self, backend_url: str, token: str, username: str, thread_id: str
- ):
- """创建agent实例"""
- def sync_create_agent():
- return create_langchain_agent(
- backend_url=backend_url,
- token=token,
- username=username,
- thread_id=thread_id,
- )
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, sync_create_agent)
- async def clear_user_agent(
- self, thread_id: str, username: str, backend_url: str, token: str
- ):
- """清除特定用户的agent配置缓存"""
- config_key = self._get_agent_config_key(thread_id, username, backend_url, token)
- # 清除本地缓存
- if config_key in self._local_agent_cache:
- del self._local_agent_cache[config_key]
- user_id = self._get_user_identifier(username, token)
- chat_logger.info(f"已清除用户Agent配置缓存: {user_id}")
- async def get_cache_status(self):
- """获取缓存状态"""
- if self._is_shutdown:
- return {"status": "shutdown", "cache_size": 0}
- return {
- "local_config_cache_size": len(self._local_agent_cache),
- "status": "active",
- "message": "",
- }
- async def clear_cache(self):
- """清空本地配置缓存"""
- local_count = len(self._local_agent_cache)
- self._local_agent_cache.clear()
- chat_logger.info(f"清空本地配置缓存: {local_count}个配置")
- return local_count
- # 全局实例
- agent_manager = AgentManager()
|