主题
客户端概念 
MCP 客户端是连接到 MCP 服务器并利用其功能的应用程序。本页面介绍了客户端的核心概念和实现方式。
客户端功能 
MCP 客户端具有以下核心功能:
连接管理 
- 建立与服务器的连接
- 维护会话状态
- 处理连接重连和故障恢复
协议处理 
- 发送和接收 MCP 消息
- 处理协议版本协商
- 管理能力交换
功能集成 
- 访问服务器提供的资源
- 调用服务器工具
- 使用服务器提示
核心客户端特性 
采样 (Sampling) 
采样是客户端与 LLM 交互的核心机制:
- 文本生成:请求 LLM 生成文本响应
- 结构化输出:获取格式化的数据结构
- 流式响应:支持实时文本生成
python
# 示例:使用采样功能
response = await client.sample(
    messages=[{"role": "user", "content": "解释量子计算"}],
    max_tokens=1000
)根 (Roots) 
根定义了客户端可以访问的文件系统路径:
- 安全边界:限制文件访问范围
- 权限控制:定义可读写的目录
- 路径解析:处理相对和绝对路径
python
# 配置根路径
client.set_roots([
    "/home/user/projects",
    "/tmp/workspace"
])启发 (Elicitation) 
启发是引导 LLM 生成特定类型响应的技术:
- 提示工程:设计有效的提示模板
- 上下文注入:添加相关背景信息
- 响应格式化:指定输出格式要求
客户端架构 
传输层 
负责底层通信:
- STDIO 传输
- HTTP 传输
- WebSocket 传输
协议层 
处理 MCP 协议:
- 消息序列化/反序列化
- 错误处理
- 能力协商
应用层 
实现业务逻辑:
- 功能调用
- 数据处理
- 用户界面
实现示例 
基础客户端 
python
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
async def main():
    # 创建客户端
    client = Client()
    
    # 连接到服务器
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"]
    )
    
    async with client.connect(server_params) as session:
        # 列出可用资源
        resources = await session.list_resources()
        print(f"可用资源: {resources}")
        
        # 读取资源
        if resources:
            content = await session.read_resource(resources[0].uri)
            print(f"资源内容: {content}")
        
        # 列出可用工具
        tools = await session.list_tools()
        print(f"可用工具: {tools}")
        
        # 调用工具
        if tools:
            result = await session.call_tool(
                tools[0].name,
                arguments={"param": "value"}
            )
            print(f"工具结果: {result}")
if __name__ == "__main__":
    import asyncio
    asyncio.run(main())高级客户端功能 
python
class AdvancedClient:
    def __init__(self):
        self.client = Client()
        self.sessions = {}
    
    async def connect_multiple_servers(self, server_configs):
        """连接多个服务器"""
        for name, config in server_configs.items():
            session = await self.client.connect(config)
            self.sessions[name] = session
    
    async def aggregate_resources(self):
        """聚合所有服务器的资源"""
        all_resources = []
        for name, session in self.sessions.items():
            resources = await session.list_resources()
            for resource in resources:
                resource.server = name
                all_resources.append(resource)
        return all_resources
    
    async def smart_tool_selection(self, task_description):
        """智能选择合适的工具"""
        all_tools = []
        for session in self.sessions.values():
            tools = await session.list_tools()
            all_tools.extend(tools)
        
        # 基于任务描述选择最合适的工具
        # 这里可以使用 LLM 或其他 AI 技术
        selected_tool = self.select_best_tool(task_description, all_tools)
        return selected_tool错误处理 
连接错误 
python
try:
    async with client.connect(server_params) as session:
        # 执行操作
        pass
except ConnectionError as e:
    print(f"连接失败: {e}")
    # 实现重连逻辑
except TimeoutError as e:
    print(f"连接超时: {e}")
    # 处理超时情况协议错误 
python
try:
    result = await session.call_tool("tool_name", arguments={})
except McpError as e:
    if e.code == "METHOD_NOT_FOUND":
        print("工具不存在")
    elif e.code == "INVALID_PARAMS":
        print("参数无效")
    else:
        print(f"协议错误: {e.message}")安全考虑 
服务器验证 
- 验证服务器身份
- 检查证书有效性
- 实现信任机制
数据保护 
- 加密敏感数据传输
- 避免日志中记录敏感信息
- 实现数据脱敏
权限控制 
- 限制文件系统访问
- 验证工具调用权限
- 实现用户授权机制
性能优化 
连接池 
python
class ConnectionPool:
    def __init__(self, max_connections=10):
        self.max_connections = max_connections
        self.connections = []
        self.available = []
    
    async def get_connection(self, server_params):
        if self.available:
            return self.available.pop()
        
        if len(self.connections) < self.max_connections:
            conn = await self.create_connection(server_params)
            self.connections.append(conn)
            return conn
        
        # 等待可用连接
        return await self.wait_for_connection()缓存策略 
python
class CachedClient:
    def __init__(self):
        self.resource_cache = {}
        self.tool_cache = {}
    
    async def read_resource_cached(self, uri, ttl=300):
        if uri in self.resource_cache:
            cached_time, content = self.resource_cache[uri]
            if time.time() - cached_time < ttl:
                return content
        
        content = await self.session.read_resource(uri)
        self.resource_cache[uri] = (time.time(), content)
        return content监控和调试 
日志记录 
python
import logging
logger = logging.getLogger(__name__)
class LoggingClient:
    async def call_tool_with_logging(self, tool_name, arguments):
        logger.info(f"调用工具: {tool_name}, 参数: {arguments}")
        
        start_time = time.time()
        try:
            result = await self.session.call_tool(tool_name, arguments)
            duration = time.time() - start_time
            logger.info(f"工具调用成功,耗时: {duration:.2f}s")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"工具调用失败,耗时: {duration:.2f}s, 错误: {e}")
            raise性能监控 
python
class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            'tool_calls': 0,
            'resource_reads': 0,
            'total_time': 0,
            'errors': 0
        }
    
    def record_tool_call(self, duration, success=True):
        self.metrics['tool_calls'] += 1
        self.metrics['total_time'] += duration
        if not success:
            self.metrics['errors'] += 1
    
    def get_stats(self):
        if self.metrics['tool_calls'] > 0:
            avg_time = self.metrics['total_time'] / self.metrics['tool_calls']
            error_rate = self.metrics['errors'] / self.metrics['tool_calls']
            return {
                'average_call_time': avg_time,
                'error_rate': error_rate,
                'total_calls': self.metrics['tool_calls']
            }
        return {}最佳实践 
设计原则 
- 异步优先:使用异步编程模式
- 错误恢复:实现健壮的错误处理
- 资源管理:及时释放连接和资源
- 用户体验:提供清晰的反馈和进度指示
实现建议 
- 使用官方 SDK 进行开发
- 实现适当的重试机制
- 添加详细的日志记录
- 进行充分的测试
下一步 
了解了客户端概念后,您可以: