一、问题背景

在开发CatClaw基于LangChain的Agent系统时,我遇到了一个影响用户体验的问题:当用户发送聊天请求时,Agent需要加载MCP(Model Context Protocol)工具,而这一过程涉及到调用多家MCP服务器,导致初始响应时间过长,用户体验较差。

具体来说,在最初的实现中,我在get_agent函数中直接加载MCP工具,这意味着每次用户发起聊天请求时,都需要等待MCP工具的加载完成才能开始处理用户的请求。这种同步加载的方式在网络环境不佳或MCP服务器响应较慢时,会导致用户等待时间过长,严重影响用户体验。

二、重构方案

为了解决这个问题,我对代码进行了重构,将MCP工具的加载和管理与Agent的创建分离,实现了工具的预加载和异步管理。

1. 核心组件设计

MCPToolManager

MCPToolManager类负责管理单个用户的MCP工具,包括工具的加载、卸载和健康检查:

class MCPToolManager:
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.client: MultiServerMCPClient | None = None
        self.tools: list[BaseTool] = []
        self.is_loaded = False
        self.health_check_task: asyncio.Task | None = None
        self.health_check_interval = 300  # 5分钟健康检查一次
        # 用户初始配置
        self.config = {
            "use_mcp_tools": False,
            "mcp_config": {},
            "mcp_health_interval": 300,
        }
        self._lock = asyncio.Lock()

MCPToolManagerFactory

MCPToolManagerFactory是一个单例类,负责管理所有用户的MCPToolManager实例:

class MCPToolManagerFactory:
    """
    MCP工具管理器工厂类,用于管理所有用户的MCPToolManager实例
    """

    _instance = None
    _lock = asyncio.Lock()

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._user_managers: Dict[str, MCPToolManager] = {}
        return cls._instance

    async def get_manager(self, user_id: str) -> MCPToolManager:
        """
        获取指定用户的MCPToolManager实例

        Args:
            user_id: 用户ID

        Returns:
            MCPToolManager: 用户对应的MCPToolManager实例
        """
        async with self._lock:
            if user_id not in self._user_managers:
                self._user_managers[user_id] = MCPToolManager(user_id)
            return self._user_managers[user_id]

2. 工具加载时机的调整

重构后,我将MCP工具的加载时机从get_agent函数中移出,改为在用户获取或更新设置时触发:

@router.get("/settings", response_model=BaseResponse[UserSettingsResponse])
async def get_user_settings(current_user: CurrentUser, db: DBSession):
    # ... 省略获取设置的代码 ...

    # 异步同步MCP设置
    asyncio.create_task(
        sync_user_mcp_settings(str(current_user.id), settings.settings or {})
    )

    # ... 省略返回代码 ...

@router.put("/settings", response_model=BaseResponse[UserSettingsResponse])
async def update_user_settings(
    settings_data: UserSettingsUpdate, current_user: CurrentUser, db: DBSession
):
    # ... 省略更新设置的代码 ...

    # 同步MCP设置
    await sync_user_mcp_settings(str(current_user.id), settings.settings or {})

    # ... 省略返回代码 ...

3. Agent创建时的工具获取

get_agent函数中,我不再直接加载MCP工具,而是通过MCPToolManagerFactory获取已加载的工具:

async def get_agent(
    checkpointer: Any | None = None,
    llm_model: str | None = None,
    api_key: str | None = None,
    base_url: str | None = None,
    max_tokens: int = 4096,
    user_id: Any | None = None,
) -> Runnable:
    # ... 省略其他代码 ...

    # 从MCP工具管理器获取预加载的工具
    mcp_tools = []
    # 读取用户记忆文件内容
    soul_content = ""
    important_content = ""
    if user_id:
        # 获取用户MCP工具
        mcp_manager = await mcp_tool_manager_factory.get_manager(str(user_id))
        mcp_tools = mcp_manager.get_tools()

        # ... 省略其他代码 ...

    # ... 省略其他代码 ...

    # 使用 deepagents 默认 backend + 用户隔离工具
    agent: Runnable = create_agent(
        model,
        tools=[*mcp_tools],
        checkpointer=checkpointer,
        system_prompt=system_prompt,
        middleware=[
            # ... 省略中间件代码 ...
        ],
    ).with_config({"recursion_limit": 1000})
    return agent

三、实现细节

1. 工具加载与健康检查

MCPToolManager类实现了工具的异步加载和健康检查机制:

async def load_tools(self) -> bool:
    """
    加载MCP工具到内存

    Returns:
        bool: 加载是否成功
    """
    async with self._lock:
        if self.is_loaded:
            return True

        try:
            # 检查用户是否启用了MCP工具
            if not self.config["use_mcp_tools"]:
                logger.info(f"用户 {self.user_id} 未启用MCP工具")
                return False

            # 初始化MCP客户端
            self.client = MultiServerMCPClient(self.config["mcp_config"])
            # 获取工具
            self.tools = await self.client.get_tools()
            self.is_loaded = True

            # 启动健康检查
            await self.start_health_check()
            logger.info(f"用户 {self.user_id} MCP工具加载成功")
            logger.info(f"用户 {self.user_id} MCP工具: {[tool.name for tool in self.tools]}")
            return True
        except Exception as e:
            logger.error(f"用户 {self.user_id} 加载MCP工具失败: {e}")
            self.client = None
            self.tools = []
            self.is_loaded = False
            return False

2. 健康检查机制

为了确保MCP工具的可用性,我实现了健康检查机制:

async def _health_check_loop(self) -> None:
    """
    健康检查循环
    """
    while self.is_loaded:
        try:
            await self._check_health()
        except Exception as e:
            logger.error(f"用户 {self.user_id} MCP工具健康检查失败: {e}")
        # 如果mcp_health_interval不存在,默认使用health_check_interval
        interval = self.config.get(
            "mcp_health_interval", self.health_check_interval
        )
        await asyncio.sleep(interval)

async def _check_health(self) -> bool:
    """
    执行健康检查

    Returns:
        bool: 健康检查是否通过
    """
    if not self.client or not self.is_loaded:
        return False

    try:
        # 简单的健康检查:尝试获取工具列表
        # 注意:这里不更新self.tools,只是检查连接是否正常
        await self.client.get_tools()
        logger.info(f"用户 {self.user_id} MCP服务器健康检查通过")
        return True
    except Exception as e:
        logger.error(f"用户 {self.user_id} MCP服务器健康检查失败: {e}")
        # 健康检查失败时,可以选择自动重新加载工具
        await self.unload_tools()
        await self.load_tools()
        return False

3. 异步同步MCP设置

在用户获取或更新设置时,我通过sync_user_mcp_settings函数异步同步MCP设置。这个函数的逻辑非常关键,它解释了何时去加载/卸载工具,而不是盲目地执行操作:

async def sync_user_mcp_settings(user_id: str, user_settings: dict) -> None:
    """
    同步用户的MCP设置到MCPToolManager

    Args:
        user_id: 用户ID
        user_settings: 用户设置字典
    """
    try:
        # 获取或创建用户的MCPToolManager实例
        manager = await mcp_tool_manager_factory.get_manager(user_id)

        # 获取当前MCP相关配置
        current_use_mcp = manager.config.get("use_mcp_tools", False)
        current_mcp_config = manager.config.get("mcp_config", {})
        current_health_interval = manager.config.get("mcp_health_interval", 300)

        # 获取新的MCP相关配置
        new_use_mcp = user_settings.get("use_mcp_tools", False)
        new_mcp_config = user_settings.get("mcp_config", {})
        new_health_interval = user_settings.get("mcp_health_interval", 300)

        # 检查是否有差异
        config_changed = (
            current_use_mcp != new_use_mcp
            or current_mcp_config != new_mcp_config
            or current_health_interval != new_health_interval
        )

        if not config_changed:
            return  # 没有差异,不需要更新

        # 更新配置
        manager.config["use_mcp_tools"] = new_use_mcp
        manager.config["mcp_config"] = new_mcp_config
        manager.config["mcp_health_interval"] = new_health_interval

        # 处理MCP工具的加载/卸载
        if new_use_mcp and not current_use_mcp:
            # 启用MCP工具
            await manager.load_tools()
            logger.info(f"用户 {user_id} 启用MCP工具并加载成功")
        elif not new_use_mcp and current_use_mcp:
            # 禁用MCP工具
            await manager.unload_tools()
            logger.info(f"用户 {user_id} 禁用MCP工具并卸载成功")
        elif new_use_mcp and config_changed:
            # MCP工具已启用,但配置有变化,需要重新加载
            await manager.unload_tools()
            await manager.load_tools()
            logger.info(f"用户 {user_id} MCP配置已更新并重新加载成功")

    except Exception as e:
        logger.error(f"同步用户 {user_id} MCP设置失败: {e}")

这个函数的核心逻辑是:

  1. 检查配置差异:只有当MCP相关配置发生变化时,才执行后续操作
  2. 根据配置变化类型执行不同操作
    • 从禁用到启用:加载MCP工具
    • 从启用到禁用:卸载MCP工具
    • 已启用但配置变化:重新加载MCP工具
  3. 异常处理:确保即使同步过程中出现错误,也不会影响用户设置的保存

四、总结分析

1. 工程实践与Demo的区别

方面 Demo 工程实践
工具加载时机 同步加载,在Agent创建时直接加载 异步预加载,在用户设置变更时触发
健康检查 通常不实现 实现定期健康检查,确保工具可用性
错误处理 简单处理或不处理 完善的错误处理和自动恢复机制
资源管理 不关注 实现工具的加载和卸载,合理管理资源
用户体验 不关注响应时间 优化初始响应时间,提升用户体验

2. 值得思考的点

2.1 工具加载策略

  • 预加载 vs 按需加载:我选择了预加载策略,在用户设置变更时加载工具,这样可以确保用户在发起聊天请求时工具已经准备就绪。但对于不常用的工具,这种方式可能会占用不必要的资源。

  • 懒加载:另一种策略是懒加载,即在用户首次需要使用工具时才加载。这种方式可以节省资源,但可能会导致首次使用工具时的响应延迟。

2.2 健康检查机制

  • 检查频率:健康检查的频率需要根据实际情况调整,过于频繁的检查会增加网络负载,过于稀疏的检查则可能无法及时发现问题。

  • 检查方式:我使用了简单的工具列表获取作为健康检查方式,这种方式简单有效,但可能无法发现所有类型的问题。

2.3 错误处理与重试机制

  • 自动重试:在健康检查失败时,我实现了自动重新加载工具的机制,这可以提高系统的可靠性。

  • 降级策略:当MCP工具加载失败时,系统应该有降级策略,确保Agent仍然可以正常工作,只是功能会有所限制。

2.4 扩展性考虑

  • 多用户支持:通过MCPToolManagerFactory,我实现了多用户的支持,每个用户都有自己的MCPToolManager实例。

  • 多MCP服务器支持:通过MultiServerMCPClient,我实现了对多个MCP服务器的支持。

五、结论

通过将MCP工具的加载从get_agent函数中分离出来,改为在用户获取或更新设置时异步加载,我成功解决了用户发送聊天请求后初始响应过长的问题。这种工程化的处理方式不仅提升了用户体验,还提高了系统的可靠性和可维护性。

在实际工程实践中,我需要根据具体的业务场景和技术需求,选择合适的工具加载策略和健康检查机制,以达到最佳的用户体验和系统性能。

MCP Tool 加载流程图

flowchart TD subgraph MCP工具加载流程 A[用户获取/更新设置] --> B[调用sync_user_mcp_settings] B --> C[获取MCPToolManager实例] C --> D[检查配置差异] D -->|无差异| E[返回] D -->|有差异| F[更新配置] F --> G{配置变化类型} G -->|从禁用到启用| H[加载MCP工具] G -->|从启用到禁用| I[卸载MCP工具] G -->|已启用但配置变化| J[重新加载MCP工具] H --> K[启动健康检查] J --> K K --> L[工具就绪] I --> M[停止健康检查] L --> N[Agent创建时获取工具] M --> O[工具卸载完成] subgraph 健康检查 P[定期执行健康检查] --> Q{检查是否通过} Q -->|通过| R[记录健康状态] Q -->|失败| S[卸载工具] S --> T[重新加载工具] T --> R R --> P end end style MCP工具加载流程 fill:#f0f4f8,stroke:#2c3e50,stroke-width:2px style A fill:#e3f2fd,stroke:#1976d2,stroke-width:2px style B fill:#e1bee7,stroke:#7b1fa2,stroke-width:2px style F fill:#e0f7fa,stroke:#0097a7,stroke-width:2px style H fill:#e0f7fa,stroke:#0097a7,stroke-width:2px style I fill:#ffebee,stroke:#c62828,stroke-width:2px style J fill:#fff3e0,stroke:#ef6c00,stroke-width:2px style N fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px