主题
Python SDK 
MCP Python SDK 提供了构建 MCP 服务器和客户端的完整工具集,支持现代 Python 异步编程模式。
安装 
使用 pip 
bash
pip install mcp使用 uv(推荐) 
bash
uv add mcp开发版本 
bash
pip install git+https://github.com/modelcontextprotocol/python-sdk.git核心组件 
1. 服务器 SDK 
基础服务器 
python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, Prompt
import asyncio
# 创建服务器实例
server = Server("my-server")
@server.list_resources()
async def list_resources() -> list[Resource]:
    """列出可用资源"""
    return [
        Resource(
            uri="file://config.json",
            name="配置文件",
            description="应用程序配置",
            mimeType="application/json"
        )
    ]
@server.read_resource()
async def read_resource(uri: str) -> str:
    """读取资源内容"""
    if uri == "file://config.json":
        return '{"app": "my-app", "version": "1.0.0"}'
    raise ValueError(f"未知资源: {uri}")
@server.list_tools()
async def list_tools() -> list[Tool]:
    """列出可用工具"""
    return [
        Tool(
            name="calculate",
            description="执行数学计算",
            inputSchema={
                "type": "object",
                "properties": {
                    "expression": {
                        "type": "string",
                        "description": "要计算的数学表达式"
                    }
                },
                "required": ["expression"]
            }
        )
    ]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[dict]:
    """调用工具"""
    if name == "calculate":
        expression = arguments.get("expression", "")
        try:
            result = eval(expression)  # 注意:生产环境中应使用安全的计算方法
            return [{"type": "text", "text": f"结果: {result}"}]
        except Exception as e:
            return [{"type": "text", "text": f"计算错误: {e}"}]
    
    raise ValueError(f"未知工具: {name}")
async def main():
    """启动服务器"""
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )
if __name__ == "__main__":
    asyncio.run(main())高级服务器功能 
python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Resource, Tool, Prompt, TextContent, ImageContent,
    EmbeddedResource, LoggingLevel
)
import asyncio
import json
import logging
from pathlib import Path
class AdvancedServer:
    def __init__(self, name: str):
        self.server = Server(name)
        self.setup_logging()
        self.setup_handlers()
        self.resources_cache = {}
    
    def setup_logging(self):
        """设置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(self.server.name)
    
    def setup_handlers(self):
        """设置处理器"""
        
        @self.server.list_resources()
        async def list_resources() -> list[Resource]:
            """动态资源列表"""
            resources = []
            
            # 文件系统资源
            data_dir = Path("./data")
            if data_dir.exists():
                for file_path in data_dir.glob("**/*"):
                    if file_path.is_file():
                        resources.append(Resource(
                            uri=f"file://{file_path}",
                            name=file_path.name,
                            description=f"文件: {file_path}",
                            mimeType=self._get_mime_type(file_path)
                        ))
            
            # API 资源
            resources.extend([
                Resource(
                    uri="api://weather/current",
                    name="当前天气",
                    description="获取当前天气信息"
                ),
                Resource(
                    uri="api://news/latest",
                    name="最新新闻",
                    description="获取最新新闻"
                )
            ])
            
            self.logger.info(f"列出 {len(resources)} 个资源")
            return resources
        
        @self.server.read_resource()
        async def read_resource(uri: str) -> str | bytes:
            """读取资源内容"""
            self.logger.info(f"读取资源: {uri}")
            
            # 缓存检查
            if uri in self.resources_cache:
                self.logger.info(f"使用缓存: {uri}")
                return self.resources_cache[uri]
            
            try:
                if uri.startswith("file://"):
                    file_path = Path(uri[7:])  # 移除 "file://" 前缀
                    if file_path.exists():
                        content = file_path.read_text(encoding='utf-8')
                        self.resources_cache[uri] = content
                        return content
                    else:
                        raise FileNotFoundError(f"文件不存在: {file_path}")
                
                elif uri.startswith("api://"):
                    # 模拟 API 调用
                    if "weather" in uri:
                        content = json.dumps({
                            "temperature": 22,
                            "humidity": 65,
                            "condition": "晴朗"
                        }, ensure_ascii=False)
                    elif "news" in uri:
                        content = json.dumps({
                            "headlines": [
                                "科技新闻 1",
                                "科技新闻 2",
                                "科技新闻 3"
                            ]
                        }, ensure_ascii=False)
                    else:
                        raise ValueError(f"未知 API: {uri}")
                    
                    self.resources_cache[uri] = content
                    return content
                
                else:
                    raise ValueError(f"不支持的 URI 方案: {uri}")
            
            except Exception as e:
                self.logger.error(f"读取资源失败: {e}")
                raise
        
        @self.server.list_tools()
        async def list_tools() -> list[Tool]:
            """工具列表"""
            return [
                Tool(
                    name="file_search",
                    description="在文件中搜索文本",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "pattern": {
                                "type": "string",
                                "description": "搜索模式"
                            },
                            "directory": {
                                "type": "string",
                                "description": "搜索目录",
                                "default": "."
                            }
                        },
                        "required": ["pattern"]
                    }
                ),
                Tool(
                    name="web_request",
                    description="发送 HTTP 请求",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "url": {
                                "type": "string",
                                "description": "请求 URL"
                            },
                            "method": {
                                "type": "string",
                                "enum": ["GET", "POST", "PUT", "DELETE"],
                                "default": "GET"
                            },
                            "headers": {
                                "type": "object",
                                "description": "请求头"
                            },
                            "data": {
                                "type": "object",
                                "description": "请求数据"
                            }
                        },
                        "required": ["url"]
                    }
                ),
                Tool(
                    name="data_analysis",
                    description="分析数据并生成报告",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "data": {
                                "type": "array",
                                "description": "要分析的数据"
                            },
                            "analysis_type": {
                                "type": "string",
                                "enum": ["summary", "trend", "correlation"],
                                "description": "分析类型"
                            }
                        },
                        "required": ["data", "analysis_type"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict) -> list[dict]:
            """调用工具"""
            self.logger.info(f"调用工具: {name} with {arguments}")
            
            try:
                if name == "file_search":
                    return await self._file_search(arguments)
                elif name == "web_request":
                    return await self._web_request(arguments)
                elif name == "data_analysis":
                    return await self._data_analysis(arguments)
                else:
                    raise ValueError(f"未知工具: {name}")
            
            except Exception as e:
                self.logger.error(f"工具调用失败: {e}")
                return [{"type": "text", "text": f"错误: {e}"}]
        
        @self.server.list_prompts()
        async def list_prompts() -> list[Prompt]:
            """提示列表"""
            return [
                Prompt(
                    name="code_review",
                    description="代码审查提示",
                    arguments=[
                        {
                            "name": "code",
                            "description": "要审查的代码",
                            "required": True
                        },
                        {
                            "name": "language",
                            "description": "编程语言",
                            "required": False
                        }
                    ]
                ),
                Prompt(
                    name="data_summary",
                    description="数据摘要提示",
                    arguments=[
                        {
                            "name": "data_source",
                            "description": "数据源",
                            "required": True
                        }
                    ]
                )
            ]
        
        @self.server.get_prompt()
        async def get_prompt(name: str, arguments: dict) -> dict:
            """获取提示"""
            self.logger.info(f"获取提示: {name} with {arguments}")
            
            if name == "code_review":
                code = arguments.get("code", "")
                language = arguments.get("language", "python")
                
                return {
                    "messages": [
                        {
                            "role": "user",
                            "content": {
                                "type": "text",
                                "text": f"请审查以下 {language} 代码,提供改进建议:\n\n```{language}\n{code}\n```"
                            }
                        }
                    ]
                }
            
            elif name == "data_summary":
                data_source = arguments.get("data_source", "")
                
                return {
                    "messages": [
                        {
                            "role": "user",
                            "content": {
                                "type": "text",
                                "text": f"请分析并总结来自 {data_source} 的数据,包括关键指标、趋势和洞察。"
                            }
                        }
                    ]
                }
            
            else:
                raise ValueError(f"未知提示: {name}")
    
    async def _file_search(self, arguments: dict) -> list[dict]:
        """文件搜索实现"""
        pattern = arguments.get("pattern", "")
        directory = arguments.get("directory", ".")
        
        results = []
        search_dir = Path(directory)
        
        if search_dir.exists():
            for file_path in search_dir.rglob("*.py"):  # 只搜索 Python 文件
                try:
                    content = file_path.read_text(encoding='utf-8')
                    if pattern.lower() in content.lower():
                        results.append(f"找到匹配: {file_path}")
                except Exception as e:
                    self.logger.warning(f"读取文件失败 {file_path}: {e}")
        
        return [{"type": "text", "text": f"搜索结果:\n" + "\n".join(results)}]
    
    async def _web_request(self, arguments: dict) -> list[dict]:
        """Web 请求实现"""
        url = arguments.get("url", "")
        method = arguments.get("method", "GET")
        
        # 这里应该使用真实的 HTTP 客户端,如 aiohttp
        # 为了示例,我们返回模拟响应
        response = {
            "status": 200,
            "url": url,
            "method": method,
            "data": "模拟响应数据"
        }
        
        return [{"type": "text", "text": json.dumps(response, ensure_ascii=False, indent=2)}]
    
    async def _data_analysis(self, arguments: dict) -> list[dict]:
        """数据分析实现"""
        data = arguments.get("data", [])
        analysis_type = arguments.get("analysis_type", "summary")
        
        if analysis_type == "summary":
            summary = {
                "count": len(data),
                "type": type(data[0]).__name__ if data else "unknown",
                "sample": data[:3] if data else []
            }
            result = f"数据摘要:\n{json.dumps(summary, ensure_ascii=False, indent=2)}"
        
        elif analysis_type == "trend":
            # 简单趋势分析
            if all(isinstance(x, (int, float)) for x in data):
                if len(data) > 1:
                    trend = "上升" if data[-1] > data[0] else "下降"
                else:
                    trend = "无法确定"
                result = f"趋势分析: {trend}"
            else:
                result = "趋势分析: 数据类型不支持"
        
        else:
            result = f"分析类型 '{analysis_type}' 暂未实现"
        
        return [{"type": "text", "text": result}]
    
    def _get_mime_type(self, file_path: Path) -> str:
        """获取文件 MIME 类型"""
        suffix = file_path.suffix.lower()
        mime_types = {
            '.json': 'application/json',
            '.txt': 'text/plain',
            '.py': 'text/x-python',
            '.js': 'text/javascript',
            '.html': 'text/html',
            '.css': 'text/css',
            '.md': 'text/markdown'
        }
        return mime_types.get(suffix, 'application/octet-stream')
    
    async def run(self):
        """运行服务器"""
        self.logger.info(f"启动服务器: {self.server.name}")
        
        async with stdio_server() as (read_stream, write_stream):
            await self.server.run(
                read_stream,
                write_stream,
                self.server.create_initialization_options()
            )
# 使用示例
async def main():
    server = AdvancedServer("advanced-mcp-server")
    await server.run()
if __name__ == "__main__":
    asyncio.run(main())2. 客户端 SDK 
基础客户端 
python
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
import asyncio
async def basic_client():
    """基础客户端示例"""
    # 配置服务器参数
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"]
    )
    
    # 创建客户端
    client = Client()
    
    async with client.connect(server_params) as session:
        # 列出资源
        resources = await session.list_resources()
        print(f"可用资源: {len(resources)}")
        
        # 读取资源
        if resources:
            content = await session.read_resource(resources[0].uri)
            print(f"资源内容: {content}")
        
        # 列出工具
        tools = await session.list_tools()
        print(f"可用工具: {len(tools)}")
        
        # 调用工具
        if tools:
            result = await session.call_tool(
                tools[0].name,
                {"expression": "2 + 3"}
            )
            print(f"工具结果: {result}")
if __name__ == "__main__":
    asyncio.run(basic_client())高级客户端 
python
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
from mcp.client.sse import SseServerParameters
import asyncio
import json
from typing import Dict, List, Any
class AdvancedClient:
    def __init__(self):
        self.client = Client()
        self.sessions = {}
        self.capabilities_cache = {}
    
    async def connect_stdio_server(self, name: str, command: str, args: List[str]):
        """连接 STDIO 服务器"""
        server_params = StdioServerParameters(command=command, args=args)
        session = await self.client.connect(server_params)
        self.sessions[name] = session
        await self._cache_capabilities(name, session)
        return session
    
    async def connect_sse_server(self, name: str, url: str):
        """连接 SSE 服务器"""
        server_params = SseServerParameters(url=url)
        session = await self.client.connect(server_params)
        self.sessions[name] = session
        await self._cache_capabilities(name, session)
        return session
    
    async def _cache_capabilities(self, name: str, session):
        """缓存服务器能力"""
        try:
            capabilities = {
                'resources': await session.list_resources(),
                'tools': await session.list_tools(),
                'prompts': await session.list_prompts()
            }
            self.capabilities_cache[name] = capabilities
        except Exception as e:
            print(f"缓存能力失败 {name}: {e}")
    
    async def get_server_info(self, server_name: str) -> Dict[str, Any]:
        """获取服务器信息"""
        if server_name not in self.capabilities_cache:
            return {}
        
        caps = self.capabilities_cache[server_name]
        return {
            'resources_count': len(caps.get('resources', [])),
            'tools_count': len(caps.get('tools', [])),
            'prompts_count': len(caps.get('prompts', [])),
            'resources': [r.name for r in caps.get('resources', [])],
            'tools': [t.name for t in caps.get('tools', [])],
            'prompts': [p.name for p in caps.get('prompts', [])]
        }
    
    async def execute_workflow(self, workflow: List[Dict[str, Any]]):
        """执行工作流"""
        results = []
        
        for step in workflow:
            step_type = step.get('type')
            server_name = step.get('server')
            
            if server_name not in self.sessions:
                results.append({
                    'step': step,
                    'error': f'服务器 {server_name} 未连接'
                })
                continue
            
            session = self.sessions[server_name]
            
            try:
                if step_type == 'read_resource':
                    uri = step.get('uri')
                    result = await session.read_resource(uri)
                    results.append({
                        'step': step,
                        'result': result
                    })
                
                elif step_type == 'call_tool':
                    tool_name = step.get('tool')
                    arguments = step.get('arguments', {})
                    result = await session.call_tool(tool_name, arguments)
                    results.append({
                        'step': step,
                        'result': result
                    })
                
                elif step_type == 'get_prompt':
                    prompt_name = step.get('prompt')
                    arguments = step.get('arguments', {})
                    result = await session.get_prompt(prompt_name, arguments)
                    results.append({
                        'step': step,
                        'result': result
                    })
                
                else:
                    results.append({
                        'step': step,
                        'error': f'未知步骤类型: {step_type}'
                    })
            
            except Exception as e:
                results.append({
                    'step': step,
                    'error': str(e)
                })
        
        return results
    
    async def batch_operations(self, operations: List[Dict[str, Any]]):
        """批量操作"""
        tasks = []
        
        for op in operations:
            if op['type'] == 'read_resource':
                task = self._read_resource_task(op)
            elif op['type'] == 'call_tool':
                task = self._call_tool_task(op)
            else:
                continue
            
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
    
    async def _read_resource_task(self, op: Dict[str, Any]):
        """资源读取任务"""
        server_name = op['server']
        uri = op['uri']
        session = self.sessions[server_name]
        return await session.read_resource(uri)
    
    async def _call_tool_task(self, op: Dict[str, Any]):
        """工具调用任务"""
        server_name = op['server']
        tool_name = op['tool']
        arguments = op.get('arguments', {})
        session = self.sessions[server_name]
        return await session.call_tool(tool_name, arguments)
    
    async def monitor_resources(self, server_name: str, interval: float = 5.0):
        """监控资源变化"""
        if server_name not in self.sessions:
            print(f"服务器 {server_name} 未连接")
            return
        
        session = self.sessions[server_name]
        last_resources = set()
        
        while True:
            try:
                resources = await session.list_resources()
                current_resources = {r.uri for r in resources}
                
                # 检查变化
                added = current_resources - last_resources
                removed = last_resources - current_resources
                
                if added:
                    print(f"新增资源: {added}")
                if removed:
                    print(f"移除资源: {removed}")
                
                last_resources = current_resources
                await asyncio.sleep(interval)
            
            except Exception as e:
                print(f"监控资源失败: {e}")
                await asyncio.sleep(interval)
    
    async def close_all(self):
        """关闭所有连接"""
        for name, session in self.sessions.items():
            try:
                await session.close()
                print(f"已关闭连接: {name}")
            except Exception as e:
                print(f"关闭连接失败 {name}: {e}")
        
        self.sessions.clear()
        self.capabilities_cache.clear()
# 使用示例
async def advanced_client_example():
    client = AdvancedClient()
    
    try:
        # 连接多个服务器
        await client.connect_stdio_server(
            "file-server",
            "python",
            ["file_server.py"]
        )
        
        await client.connect_stdio_server(
            "calc-server",
            "python",
            ["calc_server.py"]
        )
        
        # 获取服务器信息
        for server_name in client.sessions:
            info = await client.get_server_info(server_name)
            print(f"服务器 {server_name}: {info}")
        
        # 执行工作流
        workflow = [
            {
                'type': 'read_resource',
                'server': 'file-server',
                'uri': 'file://config.json'
            },
            {
                'type': 'call_tool',
                'server': 'calc-server',
                'tool': 'calculate',
                'arguments': {'expression': '10 * 5'}
            }
        ]
        
        results = await client.execute_workflow(workflow)
        print(f"工作流结果: {results}")
        
        # 批量操作
        operations = [
            {
                'type': 'call_tool',
                'server': 'calc-server',
                'tool': 'calculate',
                'arguments': {'expression': '1 + 1'}
            },
            {
                'type': 'call_tool',
                'server': 'calc-server',
                'tool': 'calculate',
                'arguments': {'expression': '2 * 3'}
            }
        ]
        
        batch_results = await client.batch_operations(operations)
        print(f"批量操作结果: {batch_results}")
    
    finally:
        await client.close_all()
if __name__ == "__main__":
    asyncio.run(advanced_client_example())类型系统 
核心类型 
python
from mcp.types import (
    # 基础类型
    Resource, Tool, Prompt,
    TextContent, ImageContent, EmbeddedResource,
    
    # 消息类型
    InitializeRequest, InitializeResponse,
    ListResourcesRequest, ListResourcesResponse,
    ReadResourceRequest, ReadResourceResponse,
    ListToolsRequest, ListToolsResponse,
    CallToolRequest, CallToolResponse,
    ListPromptsRequest, ListPromptsResponse,
    GetPromptRequest, GetPromptResponse,
    
    # 错误类型
    McpError, InvalidRequestError, MethodNotFoundError,
    InvalidParamsError, InternalError,
    
    # 传输类型
    StdioServerParameters, SseServerParameters,
    
    # 日志类型
    LoggingLevel, LoggingMessage
)
# 资源定义
resource = Resource(
    uri="file://example.txt",
    name="示例文件",
    description="这是一个示例文件",
    mimeType="text/plain"
)
# 工具定义
tool = Tool(
    name="my_tool",
    description="我的工具",
    inputSchema={
        "type": "object",
        "properties": {
            "param1": {"type": "string"},
            "param2": {"type": "number"}
        },
        "required": ["param1"]
    }
)
# 提示定义
prompt = Prompt(
    name="my_prompt",
    description="我的提示",
    arguments=[
        {
            "name": "context",
            "description": "上下文信息",
            "required": True
        }
    ]
)错误处理 
自定义错误处理 
python
from mcp.types import McpError, InvalidParamsError
from mcp.server import Server
import logging
server = Server("error-handling-server")
logger = logging.getLogger(__name__)
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[dict]:
    """带错误处理的工具调用"""
    try:
        if name == "divide":
            a = arguments.get("a")
            b = arguments.get("b")
            
            # 参数验证
            if a is None or b is None:
                raise InvalidParamsError("缺少必需参数 'a' 或 'b'")
            
            if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
                raise InvalidParamsError("参数必须是数字")
            
            if b == 0:
                raise InvalidParamsError("除数不能为零")
            
            result = a / b
            return [{"type": "text", "text": f"结果: {result}"}]
        
        else:
            raise McpError(f"未知工具: {name}", code=-32601)
    
    except InvalidParamsError:
        # 重新抛出参数错误
        raise
    except Exception as e:
        # 记录并转换为内部错误
        logger.error(f"工具调用内部错误: {e}")
        raise McpError("内部服务器错误", code=-32603)测试 
单元测试 
python
import pytest
import asyncio
from mcp.server import Server
from mcp.client import Client
from mcp.client.stdio import StdioServerParameters
@pytest.fixture
async def test_server():
    """测试服务器夹具"""
    server = Server("test-server")
    
    @server.list_tools()
    async def list_tools():
        return [
            {
                "name": "add",
                "description": "加法运算",
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "a": {"type": "number"},
                        "b": {"type": "number"}
                    },
                    "required": ["a", "b"]
                }
            }
        ]
    
    @server.call_tool()
    async def call_tool(name: str, arguments: dict):
        if name == "add":
            return [{"type": "text", "text": str(arguments["a"] + arguments["b"])}]
        raise ValueError(f"未知工具: {name}")
    
    return server
@pytest.mark.asyncio
async def test_tool_call(test_server):
    """测试工具调用"""
    # 这里需要实际的服务器进程来测试
    # 在实际测试中,您需要启动服务器进程
    pass
class MockSession:
    """模拟会话用于测试"""
    
    async def list_tools(self):
        return [
            {
                "name": "test_tool",
                "description": "测试工具"
            }
        ]
    
    async def call_tool(self, name: str, arguments: dict):
        if name == "test_tool":
            return [{"type": "text", "text": "测试结果"}]
        raise ValueError(f"未知工具: {name}")
@pytest.mark.asyncio
async def test_mock_session():
    """测试模拟会话"""
    session = MockSession()
    
    # 测试列出工具
    tools = await session.list_tools()
    assert len(tools) == 1
    assert tools[0]["name"] == "test_tool"
    
    # 测试调用工具
    result = await session.call_tool("test_tool", {})
    assert result[0]["text"] == "测试结果"
    
    # 测试错误情况
    with pytest.raises(ValueError):
        await session.call_tool("unknown_tool", {})部署 
Docker 部署 
dockerfile
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制代码
COPY . .
# 暴露端口(如果使用 HTTP 传输)
EXPOSE 8000
# 启动命令
CMD ["python", "server.py"]yaml
# docker-compose.yml
version: '3.8'
services:
  mcp-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - LOG_LEVEL=INFO
    volumes:
      - ./data:/app/data
    restart: unless-stopped生产环境配置 
python
import os
import logging
from mcp.server import Server
from mcp.server.stdio import stdio_server
# 环境配置
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
DATA_DIR = os.getenv("DATA_DIR", "./data")
MAX_CONNECTIONS = int(os.getenv("MAX_CONNECTIONS", "100"))
# 配置日志
logging.basicConfig(
    level=getattr(logging, LOG_LEVEL),
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mcp-server.log'),
        logging.StreamHandler()
    ]
)
class ProductionServer:
    def __init__(self):
        self.server = Server("production-mcp-server")
        self.logger = logging.getLogger(__name__)
        self.setup_handlers()
    
    def setup_handlers(self):
        """设置生产环境处理器"""
        # 实现生产级别的处理器
        pass
    
    async def run(self):
        """运行生产服务器"""
        self.logger.info("启动生产服务器")
        
        try:
            async with stdio_server() as (read_stream, write_stream):
                await self.server.run(
                    read_stream,
                    write_stream,
                    self.server.create_initialization_options()
                )
        except Exception as e:
            self.logger.error(f"服务器错误: {e}")
            raise
if __name__ == "__main__":
    server = ProductionServer()
    asyncio.run(server.run())最佳实践 
1. 性能优化 
python
import asyncio
from functools import lru_cache
import time
class OptimizedServer:
    def __init__(self):
        self.server = Server("optimized-server")
        self.cache = {}
        self.cache_ttl = {}
        
    @lru_cache(maxsize=128)
    def expensive_computation(self, input_data: str) -> str:
        """使用 LRU 缓存的昂贵计算"""
        # 模拟昂贵的计算
        time.sleep(0.1)
        return f"处理结果: {input_data}"
    
    async def cached_resource_read(self, uri: str, ttl: int = 300):
        """带 TTL 的资源缓存"""
        current_time = time.time()
        
        # 检查缓存
        if uri in self.cache:
            cached_time = self.cache_ttl.get(uri, 0)
            if current_time - cached_time < ttl:
                return self.cache[uri]
        
        # 读取并缓存
        content = await self._read_resource_from_source(uri)
        self.cache[uri] = content
        self.cache_ttl[uri] = current_time
        
        return content
    
    async def _read_resource_from_source(self, uri: str) -> str:
        """从源读取资源"""
        # 实际的资源读取逻辑
        await asyncio.sleep(0.01)  # 模拟 I/O
        return f"资源内容: {uri}"2. 安全性 
python
import hashlib
import hmac
import os
from typing import Optional
class SecureServer:
    def __init__(self, api_key: Optional[str] = None):
        self.server = Server("secure-server")
        self.api_key = api_key or os.getenv("MCP_API_KEY")
        self.allowed_paths = {"/safe/path1", "/safe/path2"}
    
    def verify_signature(self, data: str, signature: str) -> bool:
        """验证请求签名"""
        if not self.api_key:
            return True  # 如果没有配置密钥,跳过验证
        
        expected = hmac.new(
            self.api_key.encode(),
            data.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(expected, signature)
    
    def sanitize_path(self, path: str) -> str:
        """清理文件路径"""
        # 移除危险字符
        path = path.replace("..", "").replace("//", "/")
        
        # 检查是否在允许的路径内
        if not any(path.startswith(allowed) for allowed in self.allowed_paths):
            raise ValueError(f"路径不被允许: {path}")
        
        return path
    
    async def secure_file_read(self, path: str) -> str:
        """安全的文件读取"""
        safe_path = self.sanitize_path(path)
        
        try:
            with open(safe_path, 'r', encoding='utf-8') as f:
                return f.read()
        except FileNotFoundError:
            raise ValueError(f"文件不存在: {safe_path}")
        except PermissionError:
            raise ValueError(f"没有权限访问: {safe_path}")3. 监控和日志 
python
import time
import json
from functools import wraps
class MonitoredServer:
    def __init__(self):
        self.server = Server("monitored-server")
        self.metrics = {
            'requests': 0,
            'errors': 0,
            'response_times': []
        }
    
    def monitor_performance(self, func):
        """性能监控装饰器"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            self.metrics['requests'] += 1
            
            try:
                result = await func(*args, **kwargs)
                return result
            except Exception as e:
                self.metrics['errors'] += 1
                self.logger.error(f"函数 {func.__name__} 出错: {e}")
                raise
            finally:
                duration = time.time() - start_time
                self.metrics['response_times'].append(duration)
                
                # 保持最近 1000 次请求的响应时间
                if len(self.metrics['response_times']) > 1000:
                    self.metrics['response_times'] = self.metrics['response_times'][-1000:]
        
        return wrapper
    
    def get_metrics(self) -> dict:
        """获取性能指标"""
        response_times = self.metrics['response_times']
        
        if response_times:
            avg_response_time = sum(response_times) / len(response_times)
            max_response_time = max(response_times)
            min_response_time = min(response_times)
        else:
            avg_response_time = max_response_time = min_response_time = 0
        
        return {
            'total_requests': self.metrics['requests'],
            'total_errors': self.metrics['errors'],
            'error_rate': self.metrics['errors'] / max(self.metrics['requests'], 1),
            'avg_response_time': avg_response_time,
            'max_response_time': max_response_time,
            'min_response_time': min_response_time
        }下一步 
- 查看 TypeScript SDK - 了解 TypeScript 实现
- 构建服务器 - 深入服务器开发
- 构建客户端 - 深入客户端开发
- 查看示例 - 学习实际应用