环境准备
在开始之前,请确保您已安装必要的 Python 库:
pip install openai mcp
此外,创建一个 .env
文件来存储您的配置:
MODEL_NAME=deepseek-chat
BASE_URL=https://api.deepseek.com/v1
API_KEY=your_api_key_here
将 your_api_key_here
替换为您的实际 API 密钥。
Server 实现
Server 的作用是提供工具(tools),这些工具可以被 Client 调用。以下是通用的 Server 结构:
Server 结构
import argparse
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from mcp.server.sse import SseServerTransport
from starlette.requests import Request
from starlette.routing import Mount, Route
from mcp.server import Server
import logging
import uvicorn
# 定义服务器名称
MCP_SERVER_NAME = "your-server-name"
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)
# 定义工具
@mcp.tool()
def your_tool_name(param1: type, param2: type) -> return_type:
"""
工具描述。
参数:
- param1 (type): 描述
- param2 (type): 描述
返回:
- return_type: 描述
"""
# 工具实现
pass
# 创建 Starlette 应用
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
return Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
# 主程序入口
if __name__ == "__main__":
mcp_server = mcp._mcp_server
# 解析命令行参数
parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=18080, help='Port to listen on')
args = parser.parse_args()
# 创建并运行 Starlette 应用
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=args.host, port=args.port)
实现工具
- 选择工具:根据您的需求定义工具,例如数学运算、数据查询等。
- 实现工具函数:使用
@mcp.tool()
装饰器,并确保有详细的文档字符串。 - 配置服务器:设置服务器名称、日志、主机和端口。
Client 实现
Client 的作用是连接到 Server,获取可用的工具,并使用这些工具来处理用户的查询。以下是通用的 Client 结构:
Client 结构
import asyncio
import os
import sys
from typing import List
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import AsyncOpenAI
class MCPClient:
def __init__(self, model_name: str, base_url: str, api_key: str, server_urls: List[str]):
# 初始化代码
pass
async def initialize_sessions(self):
# 初始化会话代码
pass
async def cleanup(self):
# 清理资源代码
pass
async def process_query(self, query: str) -> str:
# 处理查询代码
pass
async def chat_loop(self):
# 交互循环代码
pass
async def main():
# 主函数代码
pass
if __name__ == "__main__":
asyncio.run(main())
实现 Client
- 配置环境:从
.env
文件或环境变量中获取模型名称、基础 URL 和 API 密钥。 - 定义服务器 URL 列表:指定要连接的 Server 地址。
- 实现 MCPClient 类:
- 初始化:设置模型和服务器信息。
- 会话管理:连接到每个 Server 并获取工具列表。
- 查询处理:收集工具,发送请求到 OpenAI API,处理工具调用。
- 资源清理:确保所有连接正确关闭。
- 交互循环:接受用户输入,显示回复,并处理退出命令。
逻辑流程
以下 Mermaid 图展示了 Client 和 Server 之间的交互逻辑:
- Client 连接到多个 Server,获取工具列表。
- Client 收集所有可用工具,并将用户查询发送到 OpenAI API。
- OpenAI API 返回响应,Client 处理工具调用并获取结果。
- 最终,Client 将回复显示给用户。
源码运行测试
本节将指导您如何运行和测试提供的三个源码文件:数学运算服务器(math-mcp-sse.py
)、取模服务器(modulo-mcp-sse.py
)和客户端(client.py
)。通过以下步骤,您可以验证它们的功能并观察其交互效果。
运行 Server
我们将分别启动两个服务器,它们分别提供不同的工具并监听不同的端口。
数学运算服务器:
保存代码为:
import argparse from mcp.server.fastmcp import FastMCP from starlette.applications import Starlette from mcp.server.sse import SseServerTransport from starlette.requests import Request from starlette.routing import Mount, Route from mcp.server import Server import logging import uvicorn # 定义服务器名称 MCP_SERVER_NAME = "math-mcp-sse" # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(MCP_SERVER_NAME) # 初始化 FastMCP 实例 mcp = FastMCP(MCP_SERVER_NAME) @mcp.tool() def add(a: float, b: float) -> float: """ Add two numbers. Parameters: - a (float): First number (required) - b (float): Second number (required) Returns: - float: The result of a + b """ return a + b @mcp.tool() def subtract(a: float, b: float) -> float: """ Subtract two numbers. Parameters: - a (float): The number to subtract from (required) - b (float): The number to subtract (required) Returns: - float: The result of a - b """ return a - b @mcp.tool() def multiply(a: float, b: float) -> float: """ Multiply two numbers. Parameters: - a (float): First number (required) - b (float): Second number (required) Returns: - float: The result of a * b """ return a * b @mcp.tool() def divide(a: float, b: float) -> float: """ Divide two numbers. Parameters: - a (float): Numerator (required) - b (float): Denominator (required, must not be zero) Returns: - float: The result of a / b """ if b == 0: raise ValueError("Division by zero is not allowed") return a / b # 创建 Starlette 应用 def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette: """Create a Starlette application that can serve the provided mcp server with SSE.""" sse = SseServerTransport("/messages/") async def handle_sse(request: Request) -> None: async with sse.connect_sse( request.scope, request.receive, request._send, ) as (read_stream, write_stream): await mcp_server.run( read_stream, write_stream, mcp_server.create_initialization_options(), ) return Starlette( debug=debug, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) # 主程序入口 if __name__ == "__main__": mcp_server = mcp._mcp_server # 解析命令行参数 parser = argparse.ArgumentParser(description='Run MCP SSE-based server') parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') parser.add_argument('--port', type=int, default=18080, help='Port to listen on') args = parser.parse_args() # 创建并运行 Starlette 应用 starlette_app = create_starlette_app(mcp_server, debug=True) uvicorn.run(starlette_app, host=args.host, port=args.port)
运行命令:
python math-mcp-sse.py --port 18080
该服务器提供
add
、subtract
、multiply
和divide
工具,监听在0.0.0.0:18080
。
取模服务器:
保存代码为:
import argparse from mcp.server.fastmcp import FastMCP from starlette.applications import Starlette from mcp.server.sse import SseServerTransport from starlette.requests import Request from starlette.routing import Mount, Route from mcp.server import Server import logging import uvicorn # 定义服务器名称 MCP_SERVER_NAME = "modulo-mcp-sse" # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(MCP_SERVER_NAME) # 初始化 FastMCP 实例 mcp = FastMCP(MCP_SERVER_NAME) # 定义取模工具 @mcp.tool() def modulo(a: float, b: float) -> float: """ 计算两个数的取模结果。 参数: - a (float):被除数(必填) - b (float):除数(必填,不能为零) 返回: - float:a % b 的结果 """ if b == 0: raise ValueError("除数不能为零") return a % b # 创建 Starlette 应用 def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette: """创建一个支持 SSE 的 Starlette 应用,用于运行 MCP 服务器。""" sse = SseServerTransport("/messages/") async def handle_sse(request: Request) -> None: async with sse.connect_sse( request.scope, request.receive, request._send, ) as (read_stream, write_stream): await mcp_server.run( read_stream, write_stream, mcp_server.create_initialization_options(), ) return Starlette( debug=debug, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) # 主程序入口 if __name__ == "__main__": mcp_server = mcp._mcp_server # 解析命令行参数 parser = argparse.ArgumentParser(description='运行基于 SSE 的 MCP 取模服务器') parser.add_argument('--host', default='0.0.0.0', help='绑定的主机地址') parser.add_argument('--port', type=int, default=18081, help='监听端口') args = parser.parse_args() # 创建并运行 Starlette 应用 starlette_app = create_starlette_app(mcp_server, debug=True) uvicorn.run(starlette_app, host=args.host, port=args.port)
运行命令:
python modulo-mcp-sse.py --port 18081
该服务器提供
modulo
工具,监听在0.0.0.0:18081
。
运行 Client
客户端将连接到上述两个服务器,获取工具并处理用户查询。
保存代码为:
import asyncio import json import os import sys from typing import List from mcp import ClientSession from mcp.client.sse import sse_client from openai import AsyncOpenAI class MCPClient: def __init__(self, model_name: str, base_url: str, api_key: str, server_urls: List[str]): """ 初始化 MCP 客户端,连接 OpenAI 接口。 :param model_name: 使用的模型名称,例如 "deepseek-chat"。 :param base_url: OpenAI 接口的基础地址,例如 "https://api.deepseek.com/v1"。 :param api_key: OpenAI API 密钥,用于身份验证。 :param server_urls: SSE 服务地址列表,用于连接多个服务器。 """ self.model_name = model_name self.server_urls = server_urls self.sessions = {} # 存储每个服务器的会话及其上下文:server_id -> (session, session_context, streams_context) self.tool_mapping = {} # 工具映射:prefixed_name -> (session, original_tool_name) # 初始化 OpenAI 异步客户端 self.client = AsyncOpenAI(base_url=base_url, api_key=api_key) async def initialize_sessions(self): """ 初始化与所有 SSE 服务器的连接,并获取可用工具列表。 """ for i, server_url in enumerate(self.server_urls): server_id = f"server{i}" # 为每个服务器生成唯一标识符 # 创建 SSE 客户端并进入上下文 streams_context = sse_client(url=server_url) streams = await streams_context.__aenter__() session_context = ClientSession(*streams) session = await session_context.__aenter__() await session.initialize() # 存储会话及其上下文 self.sessions[server_id] = (session, session_context, streams_context) # 获取工具列表并建立映射 response = await session.list_tools() for tool in response.tools: prefixed_name = f"{server_id}_{tool.name}" # 为工具名添加服务器前缀 self.tool_mapping[prefixed_name] = (session, tool.name) print(f"已连接到 {server_url},工具列表:{[tool.name for tool in response.tools]}") async def cleanup(self): """ 清理所有会话和连接资源,确保无资源泄漏。 """ for server_id, (session, session_context, streams_context) in self.sessions.items(): await session_context.__aexit__(None, None, None) # 退出会话上下文 await streams_context.__aexit__(None, None, None) # 退出 SSE 流上下文 print("所有会话已清理。") async def process_query(self, query: str) -> str: """ 处理用户的自然语言查询,通过工具调用完成任务并返回结果。 :param query: 用户输入的查询字符串。 :return: 处理后的回复文本。 """ messages = [{"role": "user", "content": query}] # 初始化消息列表 # 收集所有可用工具 available_tools = [] for server_id, (session, _, _) in self.sessions.items(): response = await session.list_tools() for tool in response.tools: prefixed_name = f"{server_id}_{tool.name}" available_tools.append({ "type": "function", "function": { "name": prefixed_name, "description": tool.description, "parameters": tool.inputSchema, }, }) # 向模型发送初始请求 response = await self.client.chat.completions.create( model=self.model_name, messages=messages, tools=available_tools, ) final_text = [] # 存储最终回复内容 message = response.choices[0].message final_text.append(message.content or "") # 添加模型的初始回复 # 处理工具调用 while message.tool_calls: for tool_call in message.tool_calls: prefixed_name = tool_call.function.name if prefixed_name in self.tool_mapping: session, original_tool_name = self.tool_mapping[prefixed_name] tool_args = json.loads(tool_call.function.arguments) try: result = await session.call_tool(original_tool_name, tool_args) except Exception as e: result = {"content": f"调用工具 {original_tool_name} 出错:{str(e)}"} print(result["content"]) final_text.append(f"[调用工具 {prefixed_name} 参数: {tool_args}]") final_text.append(f"工具结果: {result.content}") messages.extend([ { "role": "assistant", "tool_calls": [{ "id": tool_call.id, "type": "function", "function": {"name": prefixed_name, "arguments": json.dumps(tool_args)}, }], }, {"role": "tool", "tool_call_id": tool_call.id, "content": str(result.content)}, ]) else: print(f"工具 {prefixed_name} 未找到") final_text.append(f"工具 {prefixed_name} 未找到") # 获取工具调用后的后续回复 response = await self.client.chat.completions.create( model=self.model_name, messages=messages, tools=available_tools, ) message = response.choices[0].message if message.content: final_text.append(message.content) return "\n".join(final_text) async def chat_loop(self): """ 启动命令行交互式对话循环,接受用户输入并显示回复。 """ print("\nMCP 客户端已启动,输入你的问题,输入 'quit' 退出。") while True: try: query = input("\n问题: ").strip() if query.lower() == "quit": break response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\n发生错误: {str(e)}") async def main(): """ 程序入口,设置配置并启动 MCP 客户端。 """ # 从环境变量获取配置 model_name = os.getenv("MODEL_NAME", "deepseek-chat") base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1") api_key = os.getenv("API_KEY") if not api_key: print("未设置 API_KEY 环境变量。") sys.exit(1) # 定义 SSE 服务器地址列表 server_urls = ["http://localhost:18080/sse", "http://localhost:18081/sse"] # 创建并运行客户端 client = MCPClient(model_name=model_name, base_url=base_url, api_key=api_key, server_urls=server_urls) try: await client.initialize_sessions() await client.chat_loop() finally: await client.cleanup() if __name__ == "__main__": asyncio.run(main())
确保
.env
文件已配置正确,包含以下内容:MODEL_NAME=deepseek-chat BASE_URL=https://api.deepseek.com/v1 API_KEY=your_api_key_here
将
your_api_key_here
替换为您的实际 API 密钥。运行命令:
python client.py
客户端启动后,将连接到
http://localhost:18080/sse
和http://localhost:18081/sse
,并进入交互模式。
测试示例
以下是一些测试用例,用于验证服务器和客户端的协同工作:
测试加法运算:
在客户端命令行输入:
计算 5 加 3
。预期输出类似:
[调用工具 server0_add 参数: {"a": 5, "b": 3}] 工具结果: 8
说明:客户端调用
math-mcp-sse.py
提供的add
工具。
测试取模运算:
在客户端命令行输入:
计算 10 除以 3 的余数
。预期输出类似:
[调用工具 server1_modulo 参数: {"a": 10, "b": 3}] 工具结果: 1
说明:客户端调用
modulo-mcp-sse.py
提供的modulo
工具。
测试错误情况:
在客户端命令行输入:
计算 7 除以 0
。预期输出类似:
调用工具 divide 出错:Division by zero is not allowed [调用工具 server0_divide 参数: {"a": 7, "b": 0}] 工具结果: Division by zero is not allowed
说明:客户端尝试调用
divide
工具,但因除数为零而报错。
通过以上步骤,您可以成功运行两个服务器和一个客户端,并通过交互测试验证它们的工具调用功能。确保在运行前安装必要的依赖(pip install openai mcp
),并正确配置环境变量。
总结
通过本教程,您学习了如何使用 MCP 实现通用的 Server 和 Client。MCP 协议的灵活性使其适用于各种 AI 应用场景,您可以根据需求扩展工具和功能。希望本教程能帮助您快速上手 MCP,并构建出强大的 AI 驱动应用!