大模型MCP更高效的通信:StreamableHTTP协议

发布于:2025-05-10 ⋅ 阅读:(25) ⋅ 点赞:(0)

随着大语言模型(LLMs)的飞速发展,模型与应用之间的通信效率和灵活性变得至关重要。Model Context Protocol (MCP) 作为专为模型交互设计的协议,一直在不断进化以满足日益增长的需求。近期,MCP引入了一个令人振奋的新特性——StreamableHTTP 通信协议。这一特性旨在提供一种更高效、更通用、更易于集成的方式来实现模型服务器与客户端之间的流式数据交换。

1. MCP 新特性概览:StreamableHTTP 成为焦点

传统的 MCP 通信方式(如 stdio)在某些场景下表现出色,尤其是在本地进程间通信。然而,随着模型服务的分布式部署、云原生架构的普及以及对 Web 友好性的追求,stdio 的局限性逐渐显现。

StreamableHTTP 的引入,为 MCP 带来了以下关键优势:

  • Web 友好性:基于 HTTP/1.1 Chunked Transfer Encoding 或 HTTP/2 Streams,天然兼容现有的 Web 基础设施,如反向代理、负载均衡器、防火墙等。
  • 标准化与通用性:HTTP 是应用最广泛的协议之一,开发者对此非常熟悉,降低了学习和集成成本。
  • 双向流式处理:支持高效的双向流,这对于需要持续交换上下文或进行多轮对话的大模型应用至关重要。
  • 持久连接:通过单个 HTTP 连接处理多个 MCP 请求和响应流,减少了连接建立的开销。
  • 元数据处理:HTTP Headers 可以方便地携带元数据,简化了认证、路由等机制的实现。

可以预见,StreamableHTTP 将成为 MCP 在分布式环境中部署和应用的首选传输方式。

2. 为什么选择 StreamableHTTP?与 stdio、SSE 的对比

在深入 StreamableHTTP 实现之前,我们有必要理解为什么需要一种新的通信方式,以及它与现有方案(如 stdio 和 Server-Sent Events (SSE))相比有何优势。

  • MCP over stdio (mcp://stdio)

    • 优点:简单直接,适用于本地父子进程通信,延迟低。例如,一个应用直接启动并管理一个本地模型进程。
    • 缺点
      • 非网络化:不适用于分布式系统或远程模型调用。
      • 扩展性差:难以实现负载均衡和水平扩展。
      • 单连接限制:通常一个 stdio 通道对应一个模型实例的完整生命周期。
  • Server-Sent Events (SSE)

    • 优点
      • 基于 HTTP,Web 友好。
      • 实现简单,用于服务器向客户端单向推送事件流。
    • 缺点
      • 单向性:SSE 主要设计为服务器到客户端的单向流。虽然客户端可以通过独立的 HTTP 请求发送数据给服务器,但这并非原生的双向流,也无法在同一连接上高效处理客户端流式输入。
      • 不完全适合 MCP 场景:MCP 通常需要双向流式交互,客户端可能流式发送输入(例如,长文档分块处理),服务器同时流式返回结果。SSE 在此方面能力有限。
      • 元数据处理相对局限:虽然可以通过事件数据本身携带,但不如 HTTP Headers 灵活。
  • StreamableHTTP for MCP (mcp://httpmcp://https)

    • 核心机制:利用 HTTP/1.1 的 Transfer-Encoding: chunked 或 HTTP/2 的 Streams 特性,在单个持久 HTTP 连接上实现双向、任意长度的数据流传输。
    • 优点
      • 双向流:完美支持客户端到服务器、服务器到客户端的并发流数据。
      • Web 基础设施兼容:轻松集成到现有网络架构中,如 API 网关、Service Mesh 等。
      • 标准化:开发者熟悉 HTTP,有大量现成的库和工具支持。
      • 元数据与控制:HTTP Headers 提供了丰富的元数据传递机制,方便实现认证、内容协商、错误处理等。
      • 高性能,多路复用 (HTTP/2):HTTP/2 进一步支持在单个连接上并发处理多个请求和响应流,效率更高。
      • 持久连接:减少了为每个 MCP 会话建立新 TCP 连接的开销。

对比总结:

特性 MCP over stdio Server-Sent Events (SSE) StreamableHTTP for MCP
方向性 双向(本地) 单向(服务器 -> 客户端) 双向
网络化
Web 兼容
基础设施 不适用 标准 Web 服务器 标准 Web 服务器/代理
持久连接 进程生命周期 是 (但常用于单向推送) 是 (为双向流优化)
元数据 有限 (依赖协议本身) 有限 (事件内数据) 丰富 (HTTP Headers)
适用场景 本地模型集成 实时通知、简单数据流 分布式模型服务、复杂交互

显然,StreamableHTTP 通过结合 HTTP 的强大功能和流式处理的效率,为 MCP 提供了一个更现代化、更通用的解决方案,特别适合构建可扩展、高性能的大模型应用。

3. 实战:构建基于 StreamableHTTP 的 MCP 应用

接下来,我们将通过具体的 Python 代码示例,演示如何搭建一个简单的 MCP 服务器和客户端,它们之间通过 StreamableHTTP 进行通信。

3.1 环境准备

首先,确保您有一个 Python 环境 (推荐 Python 3.8+)。我们将使用虚拟环境来管理依赖:

python -m venv mcp_env
# Windows
# mcp_env\Scripts\activate
# macOS/Linux
source mcp_env/bin/activate

3.2 安装依赖

pip install "mcp"
pip install aiohttp uvicorn

3.3 MCP Server 端实现 (echo_server.py)

我们将创建一个简单的 MCP 服务器,它承载一个 EchoModel

import asyncio
import logging
from mcp import (
    ProcessContext,
    McpModel,
    McpOptions,
    McpRequest,
    McpServer,
    # McpTransport, # McpTransport 基类在此示例中未直接使用,可省略
    content,
    error,
)
from mcp.transport.streamable_http import StreamableHttpTransport # 关键导入

# 配置日志,方便观察
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # 更详细的日志格式
)
logger = logging.getLogger(__name__) # 获取当前模块的 logger

class EchoModel(McpModel):
    """一个简单的回显模型,流式处理输入并流式返回"""

    async def handle_request(self, request: McpRequest, process_context: ProcessContext):
        request_id = request.request_id # 获取请求 ID 用于日志追踪
        logger.info(f"EchoModel [{request_id}] received request.")

        if request.content_type != content.CONTENT_TYPE_TEXT:
            logger.warning(f"EchoModel [{request_id}] received unsupported content type: {request.content_type}")
            # 抛出 MCPError 会被服务器框架捕获并发送给客户端
            raise error.McpError(
                error_type=error.ERROR_TYPE_BAD_REQUEST,
                message=f"Unsupported content type: {request.content_type}. Expected '{content.CONTENT_TYPE_TEXT}'.",
            )

        try:
            # 开始响应,指定内容类型
            await process_context.begin_response(content_type=content.CONTENT_TYPE_TEXT)
            logger.info(f"EchoModel [{request_id}] started response stream.")

            # 流式读取请求内容并逐块回显
            chunk_count = 0
            async for chunk in request.read_chunks():
                chunk_count += 1
                if isinstance(chunk, bytes):
                    text_chunk = chunk.decode('utf-8')
                    logger.info(f"EchoModel [{request_id}] echoing chunk #{chunk_count}: '{text_chunk}'")
                    await process_context.write_chunk(text_chunk.encode('utf-8'))
                    # 在实际应用中,这里可能是模型处理的延迟
                    await asyncio.sleep(0.1) # 模拟处理延迟,更清晰地观察流式效果
                else:
                    # 理论上 read_chunks() 应该总是返回 bytes,这是一个防御性日志
                    logger.warning(f"EchoModel [{request_id}] received non-bytes chunk (type: {type(chunk)}): {chunk}")

            if chunk_count == 0:
                logger.info(f"EchoModel [{request_id}] received no chunks in request body.")
                # 即使没有输入块,也可能需要发送一个空响应或特定响应
                # await process_context.write_chunk(b"Received empty stream.\n")


            # 结束响应流
            await process_context.end_response()
            logger.info(f"EchoModel [{request_id}] finished response stream successfully.")

        except error.McpError as e: # 捕获可预期的 MCP 错误
            logger.error(f"EchoModel [{request_id}] McpError during handling: {e.message}")
            # 重新抛出,让服务器框架处理
            raise
        except Exception as e:
            logger.error(f"EchoModel [{request_id}] Unhandled exception during request processing: {e}", exc_info=True)
            # 对于未处理的异常,构造并发送一个标准的 MCP 内部错误
            # 确保在发送错误前响应流没有开始,或者以错误方式结束
            if not process_context.response_begun:
                await process_context.begin_response(
                    content_type=content.CONTENT_TYPE_MCP_ERROR,
                    is_error=True,
                )
                await process_context.write_chunk(
                    error.McpError(
                        error_type=error.ERROR_TYPE_INTERNAL, message=f"Internal server error: {str(e)}"
                    ).to_json().encode('utf-8')
                )
            elif not process_context.response_ended: # 如果流已开始但未结束
                 # 尝试写入错误信息,但这可能因流的状态而出错
                try:
                    await process_context.write_chunk(
                        error.McpError(
                            error_type=error.ERROR_TYPE_INTERNAL, message=f"Error mid-stream: {str(e)}"
                        ).to_json().encode('utf-8')
                    )
                except Exception as write_err:
                    logger.error(f"EchoModel [{request_id}] Could not write error to stream: {write_err}")
            
            if not process_context.response_ended:
                await process_context.end_response() # 总是尝试结束响应


async def main():
    options = McpOptions() # 使用默认选项
    models = {"echo/v1": EchoModel()} # 注册模型及其键

    # 配置 StreamableHttpTransport
    # 默认监听: host="localhost", port=8080, path="/mcp"
    # 若要监听所有接口,可使用 host="0.0.0.0"
    transport = StreamableHttpTransport(host="localhost", port=8080, path="/mcp")
    logger.info(f"Initializing StreamableHTTP MCP server on http://{transport.host}:{transport.port}{transport.path}")

    server = McpServer(options=options, models=models, transports=[transport])

    try:
        logger.info("Starting MCP server...")
        await server.serve() # 运行服务器直到被中断
    except KeyboardInterrupt:
        logger.info("KeyboardInterrupt received, shutting down server...")
    except Exception as e:
        logger.error(f"Server failed to run: {e}", exc_info=True)
    finally:
        logger.info("Attempting to shut down the server gracefully...")
        await server.shutdown()
        logger.info("Server has been shut down.")

if __name__ == "__main__":
    asyncio.run(main())

代码解读 (Server):

  1. 导入模块:导入了 MCP SDK 的核心组件以及 StreamableHttpTransport
  2. EchoModel(McpModel)
    • 继承自 McpModel,这是实现自定义模型的标准方式。
    • handle_request 是核心方法,当服务器收到针对此模型的请求时被调用。
    • 它首先检查 content_type,然后使用 process_context.begin_response() 开始一个流式响应。
    • 通过 async for chunk in request.read_chunks(): 异步迭代读取客户端发送的流式数据块。
    • process_context.write_chunk() 将数据块写回给客户端。
    • process_context.end_response() 标记响应流结束。
    • 包含了基本的错误处理逻辑。
  3. main() 函数
    • 创建 McpOptions 和模型字典 models。我们将 EchoModel 实例注册到路径 echo/v1
    • 关键transport = StreamableHttpTransport() 创建了一个 StreamableHTTP 传输实例。默认情况下,它监听 localhost:8080/mcp 路径。
    • McpServer 用选项、模型和传输方式列表进行初始化。
    • server.serve() 启动服务器,server.shutdown() 用于优雅关闭。

3.4 MCP Client 端实现 (echo_client.py)

现在我们来编写一个客户端,它将连接到上述服务器,发送流式文本,并接收和打印服务器的流式回显。

import asyncio
import logging
from mcp import McpConnection, McpRequestOptions, content, error

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # 更详细的日志格式
)
logger = logging.getLogger(__name__) # 获取当前模块的 logger

async def stream_data_generator(data_list, delay=0.2): # 稍微调整延迟以观察
    """一个异步生成器,模拟流式发送数据块"""
    logger.info("Client data stream generator started.")
    for i, item in enumerate(data_list):
        chunk_to_send = item.encode('utf-8')
        logger.info(f"Client sending chunk #{i+1}: '{item}'")
        yield chunk_to_send
        await asyncio.sleep(delay) # 模拟数据块之间的处理或网络延迟
    logger.info("Client data stream generator finished.")


async def main():
    # 服务器的 MCP 端点 URL。确保它与服务器配置完全匹配。
    # `mcp+http` 表示使用 StreamableHTTP (非加密)
    # `localhost:8080` 是服务器监听的地址和端口
    # `/mcp` 是服务器 StreamableHttpTransport 配置的路径
    server_mcp_url = "mcp+http://localhost:8080/mcp"
    logger.info(f"Attempting to connect to MCP server at: {server_mcp_url}")

    try:
        # 建立到 MCP 服务器的连接。
        # 对于 StreamableHTTP,这通常意味着客户端会发起一个 HTTP 请求 (可能升级到 WebSocket 或使用长轮询/chunked encoding)
        async with McpConnection.create(target_url=server_mcp_url) as conn:
            logger.info(f"Successfully connected to MCP server: {server_mcp_url}")

            request_options = McpRequestOptions(
                model_key="echo/v1",  # 必须与服务器端注册的模型键匹配
                content_type=content.CONTENT_TYPE_TEXT, # 指定请求体的内容类型
                # accept_content_types=[content.CONTENT_TYPE_TEXT] # 可以指定期望的响应类型
            )

            # 准备要流式发送的数据
            data_to_stream = ["Hello, ", "MCP Server! ", "This is a ", "streaming test."]
            
            # 创建一个流式请求并获取响应处理器
            # request_body_generator 参数接收一个异步生成器
            logger.info(f"Client initiating streaming request for model '{request_options.model_key}'...")
            response_handler = await conn.streaming_request(
                request_options=request_options,
                request_body_generator=stream_data_generator(data_to_stream)
            )
            logger.info(f"Client request sent. Request ID: {response_handler.request_id}. Waiting for response...")

            # 检查响应元数据
            logger.info(f"Response Content-Type: {response_handler.response_content_type}")
            logger.info(f"Response Is Error: {response_handler.is_error}")

            if response_handler.is_error:
                logger.error("Server indicated an error in response.")
                # 如果是错误,内容通常是 McpError 的 JSON 序列化形式
                error_content_bytes = b""
                async for chunk in response_handler.read_response_chunks():
                    error_content_bytes += chunk
                try:
                    mcp_err = error.McpError.from_json(error_content_bytes.decode('utf-8'))
                    logger.error(f"MCP Error from server: Type='{mcp_err.error_type}', Message='{mcp_err.message}'")
                except Exception as e:
                    logger.error(f"Could not parse MCP error from response: {error_content_bytes.decode('utf-8')}, Parse Error: {e}")
                return # 错误发生,提前退出

            # 流式接收和打印响应
            full_response_text = ""
            logger.info("Client receiving stream from server:")
            
            chunk_count = 0
            async for chunk in response_handler.read_response_chunks():
                chunk_count +=1
                if isinstance(chunk, bytes):
                    text_chunk = chunk.decode('utf-8')
                    print(text_chunk, end='', flush=True) # 实时打印,不带换行
                    full_response_text += text_chunk
                else:
                    # 理论上 read_response_chunks() 应该总是返回 bytes
                    logger.warning(f"Client received non-bytes chunk (type: {type(chunk)}): {chunk}")
            
            print() # 在所有块接收完毕后打印一个换行符
            if chunk_count == 0:
                 logger.info("Client received an empty response stream.")
            
            logger.info(f"Client received full response: '{full_response_text}'")

    except ConnectionRefusedError:
        logger.error(f"Connection refused. Ensure the MCP server is running at {server_mcp_url} and accessible.")
    except error.McpError as e: # 捕获客户端操作中可能发生的 MCP 特定错误
        logger.error(f"An MCPError occurred on the client-side: {e.message} (Type: {e.error_type})")
    except Exception as e:
        logger.error(f"An unexpected error occurred in the client: {e}", exc_info=True)

if __name__ == "__main__":
    asyncio.run(main())

代码解读 (Client):

  1. server_mcp_url:定义了 MCP 服务器的 StreamableHTTP 端点。注意前缀是 mcp+http (或 mcp+https 如果启用了 TLS)。路径 /mcp 必须与服务器端 StreamableHttpTransport 配置的 path 一致。
  2. McpConnection.create(target_url=server_mcp_url):异步上下文管理器,用于建立和管理到服务器的连接。
  3. McpRequestOptions:指定要调用的模型 (echo/v1) 和内容类型。
  4. stream_data_generator:这是一个异步生成器,用于模拟客户端流式发送请求体。在实际应用中,这可能来自文件、用户输入或其他流式来源。
  5. conn.streaming_request(...):发起一个流式请求。它需要 request_options 和一个可选的 request_body_generator。此方法返回一个 response_handler 对象。
  6. response_handler.read_response_chunks():异步迭代读取服务器返回的响应数据块。
  7. 客户端代码会逐块打印服务器回显的内容。

3.5 运行与结果

  1. 启动服务器
    打开一个终端,激活虚拟环境,然后运行服务器脚本:

    python echo_server.py
    
    INFO:__main__:Starting StreamableHTTP MCP server on localhost:8080/mcp
    INFO:aiohttp.access:127.0.0.1 [DD/Mon/YYYY:HH:MM:SS +0000] "GET /mcp HTTP/1.1" 101 - "-" "-" # (这是 WebSocket 升级请求,或者 HTTP streaming 初始请求)
    
  2. 运行客户端
    打开另一个终端,激活虚拟环境,然后运行客户端脚本:

    python echo_client.py
    

客户端输出:

INFO:__main__:Connected to MCP server at mcp+http://localhost:8080/mcp
INFO:__main__:Client sent request for model 'echo/v1'
INFO:__main__:Client sending chunk: 'Hello, '
INFO:__main__:Response Content-Type: text/plain
INFO:__main__:Client receiving stream:
Hello, INFO:__main__:Client sending chunk: 'MCP over HTTP! '
MCP over HTTP! INFO:__main__:Client sending chunk: 'This is '
This is INFO:__main__:Client sending chunk: 'a stream.'
a stream.
INFO:__main__:
Client received full response: 'Hello, MCP over HTTP! This is a stream.'

服务器端额外输出 (对应一次客户端连接和请求):

INFO:__main__:EchoModel received request: <some_request_id>
INFO:__main__:EchoModel [<some_request_id>] started response stream.
INFO:__main__:EchoModel [<some_request_id>] echoing chunk: 'Hello, '
INFO:__main__:EchoModel [<some_request_id>] echoing chunk: 'MCP over HTTP! '
INFO:__main__:EchoModel [<some_request_id>] echoing chunk: 'This is '
INFO:__main__:EchoModel [<some_request_id>] echoing chunk: 'a stream.'
INFO:__main__:EchoModel [<some_request_id>] finished response stream.

客户端逐块发送数据,服务器也逐块接收并回显,客户端再逐块接收并打印。这清晰地展示了双向流式通信的过程。

4. 总结

StreamableHTTP 的引入不仅仅是一个传输选项的增加,它对 MCP 生态和 LLM 应用架构可能带来深远影响:

  1. 简化云原生部署

    • HTTP 是云原生环境的通用语言。MCP 服务可以更容易地容器化,并通过 Kubernetes 等编排工具进行管理。
    • 可以无缝集成到 Service Mesh (如 Istio, Linkerd) 中,利用其流量管理、可观察性和安全特性。
    • API 网关可以轻松地将 MCP 服务暴露给外部消费者,并处理认证、速率限制等。
  2. 提升互操作性

    • 任何支持标准 HTTP 客户端库的语言或平台都可以相对容易地与 MCP StreamableHTTP 服务器交互,即使没有原生的 MCP SDK。开发者只需理解 MCP 的消息分帧(通常是长度前缀或其他分隔符,具体取决于 StreamableHTTP 在 MCP 中的实现细节)和核心协议语义。
    • 这为构建异构系统和集成现有 Web 服务提供了便利。
  3. 性能考量

    • HTTP/1.1 的 Chunked Encoding 已经相当成熟。
    • HTTP/2 的流多路复用可以显著减少延迟,尤其是在高并发场景下,因为它允许在单个 TCP 连接上并行处理多个请求/响应流,避免了队头阻塞问题。MCP SDK 和服务器实现需要支持 HTTP/2 才能充分发挥此优势。
    • 持久连接减少了 TCP 和 TLS握手的开销。
  4. 安全性

    • 可以直接利用 HTTPS (HTTP over TLS) 来确保传输层安全,这是 Web 安全的标准做法。
    • HTTP Headers 可以方便地承载认证令牌 (如 JWT Bearer tokens),与常见的 Web 认证方案兼容。
  5. 未来展望与挑战

    • HTTP/3 (QUIC):随着 HTTP/3 的普及,未来 MCP StreamableHTTP 可能会支持基于 QUIC 的传输,进一步改善连接建立速度和弱网环境下的表现。
    • 标准化细节:确保 MCP 消息在 HTTP 流中的分帧方式、错误处理、元数据约定等细节得到清晰的标准化和文档化至关重要。这包括如何区分不同的 MCP 消息(如果一个 HTTP 流承载多个独立的 MCP 消息)以及如何处理特定于 MCP 的错误与 HTTP 级别的错误。
    • 开发者工具:需要更好的开发者工具来调试和监控 StreamableHTTP 上的 MCP 通信,例如能够解析和显示 MCP 消息流的代理工具。
  6. 与 gRPC 的比较
    gRPC 是另一个流行的基于 HTTP/2 的高性能 RPC 框架,也支持双向流。

    • 相似之处:都利用 HTTP/2 实现高效双向流。
    • 不同之处
      • Schema 定义:gRPC 依赖 Protocol Buffers 进行接口定义和序列化,提供了强类型和代码生成。MCP 本身更侧重于上下文和交互模式,内容类型相对灵活 (如 JSON, text, bytes)。
      • 生态和侧重点:gRPC 更通用,用于各种微服务通信。MCP 专为模型交互设计,其核心概念(如上下文管理、模型能力协商等)是其特有价值。
      • StreamableHTTP for MCP 提供了在不引入 Protobuf 依赖的情况下,利用 HTTP/2 (或 HTTP/1.1) 流能力的途径。对于希望保持简单或已有 JSON/text 기반 API 的场景可能更有吸引力。

网站公告

今日签到

点亮在社区的每一天
去签到