工程化的角度 -- Agent何时载入MCP Tool?
一、问题背景
在开发CatClaw基于LangChain的Agent系统时,我遇到了一个影响用户体验的问题:当用户发送聊天请求时,Agent需要加载MCP(Model Context Protocol)工具,而这一过程涉及到调用多家MCP服务器,导致初始响应时间过长,用户体验较差。
具体来说,在最初的实现中,我在get_agent函数中直接加载MCP工具,这意味着每次用户发起聊天请求时,都需要等待MCP工具的加载完成才能开始处理用户的请求。这种同步加载的方式在网络环境不佳或MCP服务器响应较慢时,会导致用户等待时间过长,严重影响用户体验。
二、重构方案
为了解决这个问题,我对代码进行了重构,将MCP工具的加载和管理与Agent的创建分离,实现了工具的预加载和异步管理。
1. 核心组件设计
MCPToolManager
MCPToolManager类负责管理单个用户的MCP工具,包括工具的加载、卸载和健康检查:
class MCPToolManager:
def __init__(self, user_id: str):
self.user_id = user_id
self.client: MultiServerMCPClient | None = None
self.tools: list[BaseTool] = []
self.is_loaded = False
self.health_check_task: asyncio.Task | None = None
self.health_check_interval = 300 # 5分钟健康检查一次
# 用户初始配置
self.config = {
"use_mcp_tools": False,
"mcp_config": {},
"mcp_health_interval": 300,
}
self._lock = asyncio.Lock()
MCPToolManagerFactory
MCPToolManagerFactory是一个单例类,负责管理所有用户的MCPToolManager实例:
class MCPToolManagerFactory:
"""
MCP工具管理器工厂类,用于管理所有用户的MCPToolManager实例
"""
_instance = None
_lock = asyncio.Lock()
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._user_managers: Dict[str, MCPToolManager] = {}
return cls._instance
async def get_manager(self, user_id: str) -> MCPToolManager:
"""
获取指定用户的MCPToolManager实例
Args:
user_id: 用户ID
Returns:
MCPToolManager: 用户对应的MCPToolManager实例
"""
async with self._lock:
if user_id not in self._user_managers:
self._user_managers[user_id] = MCPToolManager(user_id)
return self._user_managers[user_id]
2. 工具加载时机的调整
重构后,我将MCP工具的加载时机从get_agent函数中移出,改为在用户获取或更新设置时触发:
@router.get("/settings", response_model=BaseResponse[UserSettingsResponse])
async def get_user_settings(current_user: CurrentUser, db: DBSession):
# ... 省略获取设置的代码 ...
# 异步同步MCP设置
asyncio.create_task(
sync_user_mcp_settings(str(current_user.id), settings.settings or {})
)
# ... 省略返回代码 ...
@router.put("/settings", response_model=BaseResponse[UserSettingsResponse])
async def update_user_settings(
settings_data: UserSettingsUpdate, current_user: CurrentUser, db: DBSession
):
# ... 省略更新设置的代码 ...
# 同步MCP设置
await sync_user_mcp_settings(str(current_user.id), settings.settings or {})
# ... 省略返回代码 ...
3. Agent创建时的工具获取
在get_agent函数中,我不再直接加载MCP工具,而是通过MCPToolManagerFactory获取已加载的工具:
async def get_agent(
checkpointer: Any | None = None,
llm_model: str | None = None,
api_key: str | None = None,
base_url: str | None = None,
max_tokens: int = 4096,
user_id: Any | None = None,
) -> Runnable:
# ... 省略其他代码 ...
# 从MCP工具管理器获取预加载的工具
mcp_tools = []
# 读取用户记忆文件内容
soul_content = ""
important_content = ""
if user_id:
# 获取用户MCP工具
mcp_manager = await mcp_tool_manager_factory.get_manager(str(user_id))
mcp_tools = mcp_manager.get_tools()
# ... 省略其他代码 ...
# ... 省略其他代码 ...
# 使用 deepagents 默认 backend + 用户隔离工具
agent: Runnable = create_agent(
model,
tools=[*mcp_tools],
checkpointer=checkpointer,
system_prompt=system_prompt,
middleware=[
# ... 省略中间件代码 ...
],
).with_config({"recursion_limit": 1000})
return agent
三、实现细节
1. 工具加载与健康检查
MCPToolManager类实现了工具的异步加载和健康检查机制:
async def load_tools(self) -> bool:
"""
加载MCP工具到内存
Returns:
bool: 加载是否成功
"""
async with self._lock:
if self.is_loaded:
return True
try:
# 检查用户是否启用了MCP工具
if not self.config["use_mcp_tools"]:
logger.info(f"用户 {self.user_id} 未启用MCP工具")
return False
# 初始化MCP客户端
self.client = MultiServerMCPClient(self.config["mcp_config"])
# 获取工具
self.tools = await self.client.get_tools()
self.is_loaded = True
# 启动健康检查
await self.start_health_check()
logger.info(f"用户 {self.user_id} MCP工具加载成功")
logger.info(f"用户 {self.user_id} MCP工具: {[tool.name for tool in self.tools]}")
return True
except Exception as e:
logger.error(f"用户 {self.user_id} 加载MCP工具失败: {e}")
self.client = None
self.tools = []
self.is_loaded = False
return False
2. 健康检查机制
为了确保MCP工具的可用性,我实现了健康检查机制:
async def _health_check_loop(self) -> None:
"""
健康检查循环
"""
while self.is_loaded:
try:
await self._check_health()
except Exception as e:
logger.error(f"用户 {self.user_id} MCP工具健康检查失败: {e}")
# 如果mcp_health_interval不存在,默认使用health_check_interval
interval = self.config.get(
"mcp_health_interval", self.health_check_interval
)
await asyncio.sleep(interval)
async def _check_health(self) -> bool:
"""
执行健康检查
Returns:
bool: 健康检查是否通过
"""
if not self.client or not self.is_loaded:
return False
try:
# 简单的健康检查:尝试获取工具列表
# 注意:这里不更新self.tools,只是检查连接是否正常
await self.client.get_tools()
logger.info(f"用户 {self.user_id} MCP服务器健康检查通过")
return True
except Exception as e:
logger.error(f"用户 {self.user_id} MCP服务器健康检查失败: {e}")
# 健康检查失败时,可以选择自动重新加载工具
await self.unload_tools()
await self.load_tools()
return False
3. 异步同步MCP设置
在用户获取或更新设置时,我通过sync_user_mcp_settings函数异步同步MCP设置。这个函数的逻辑非常关键,它解释了何时去加载/卸载工具,而不是盲目地执行操作:
async def sync_user_mcp_settings(user_id: str, user_settings: dict) -> None:
"""
同步用户的MCP设置到MCPToolManager
Args:
user_id: 用户ID
user_settings: 用户设置字典
"""
try:
# 获取或创建用户的MCPToolManager实例
manager = await mcp_tool_manager_factory.get_manager(user_id)
# 获取当前MCP相关配置
current_use_mcp = manager.config.get("use_mcp_tools", False)
current_mcp_config = manager.config.get("mcp_config", {})
current_health_interval = manager.config.get("mcp_health_interval", 300)
# 获取新的MCP相关配置
new_use_mcp = user_settings.get("use_mcp_tools", False)
new_mcp_config = user_settings.get("mcp_config", {})
new_health_interval = user_settings.get("mcp_health_interval", 300)
# 检查是否有差异
config_changed = (
current_use_mcp != new_use_mcp
or current_mcp_config != new_mcp_config
or current_health_interval != new_health_interval
)
if not config_changed:
return # 没有差异,不需要更新
# 更新配置
manager.config["use_mcp_tools"] = new_use_mcp
manager.config["mcp_config"] = new_mcp_config
manager.config["mcp_health_interval"] = new_health_interval
# 处理MCP工具的加载/卸载
if new_use_mcp and not current_use_mcp:
# 启用MCP工具
await manager.load_tools()
logger.info(f"用户 {user_id} 启用MCP工具并加载成功")
elif not new_use_mcp and current_use_mcp:
# 禁用MCP工具
await manager.unload_tools()
logger.info(f"用户 {user_id} 禁用MCP工具并卸载成功")
elif new_use_mcp and config_changed:
# MCP工具已启用,但配置有变化,需要重新加载
await manager.unload_tools()
await manager.load_tools()
logger.info(f"用户 {user_id} MCP配置已更新并重新加载成功")
except Exception as e:
logger.error(f"同步用户 {user_id} MCP设置失败: {e}")
这个函数的核心逻辑是:
- 检查配置差异:只有当MCP相关配置发生变化时,才执行后续操作
- 根据配置变化类型执行不同操作:
- 从禁用到启用:加载MCP工具
- 从启用到禁用:卸载MCP工具
- 已启用但配置变化:重新加载MCP工具
- 异常处理:确保即使同步过程中出现错误,也不会影响用户设置的保存
四、总结分析
1. 工程实践与Demo的区别
| 方面 | Demo | 工程实践 |
|---|---|---|
| 工具加载时机 | 同步加载,在Agent创建时直接加载 | 异步预加载,在用户设置变更时触发 |
| 健康检查 | 通常不实现 | 实现定期健康检查,确保工具可用性 |
| 错误处理 | 简单处理或不处理 | 完善的错误处理和自动恢复机制 |
| 资源管理 | 不关注 | 实现工具的加载和卸载,合理管理资源 |
| 用户体验 | 不关注响应时间 | 优化初始响应时间,提升用户体验 |
2. 值得思考的点
2.1 工具加载策略
预加载 vs 按需加载:我选择了预加载策略,在用户设置变更时加载工具,这样可以确保用户在发起聊天请求时工具已经准备就绪。但对于不常用的工具,这种方式可能会占用不必要的资源。
懒加载:另一种策略是懒加载,即在用户首次需要使用工具时才加载。这种方式可以节省资源,但可能会导致首次使用工具时的响应延迟。
2.2 健康检查机制
检查频率:健康检查的频率需要根据实际情况调整,过于频繁的检查会增加网络负载,过于稀疏的检查则可能无法及时发现问题。
检查方式:我使用了简单的工具列表获取作为健康检查方式,这种方式简单有效,但可能无法发现所有类型的问题。
2.3 错误处理与重试机制
自动重试:在健康检查失败时,我实现了自动重新加载工具的机制,这可以提高系统的可靠性。
降级策略:当MCP工具加载失败时,系统应该有降级策略,确保Agent仍然可以正常工作,只是功能会有所限制。
2.4 扩展性考虑
多用户支持:通过
MCPToolManagerFactory,我实现了多用户的支持,每个用户都有自己的MCPToolManager实例。多MCP服务器支持:通过
MultiServerMCPClient,我实现了对多个MCP服务器的支持。
五、结论
通过将MCP工具的加载从get_agent函数中分离出来,改为在用户获取或更新设置时异步加载,我成功解决了用户发送聊天请求后初始响应过长的问题。这种工程化的处理方式不仅提升了用户体验,还提高了系统的可靠性和可维护性。
在实际工程实践中,我需要根据具体的业务场景和技术需求,选择合适的工具加载策略和健康检查机制,以达到最佳的用户体验和系统性能。