借助 KubeMQ 简化多 LLM 集成

发布于:2025-07-02 ⋅ 阅读:(21) ⋅ 点赞:(0)

    将多个大语言模型(LLM),如 OpenAI 和 Anthropic 的 Claude 集成到应用程序中是一项具有挑战性的任务。处理不同 API 和通信协议的复杂性,以及确保请求高效路由,都会带来诸多难题。

    然而,使用消息代理和路由器可以成为解决这些问题的优雅方案,能处理这些痛点并提供多项关键优势。在本文中,我们将探讨如何实现这一点,并提供代码示例,指导大家如何使用 KubeMQ 构建一个与 OpenAI 和 Anthropic Claude 交互的路由器,当然要集成 DeepSeek 也类似。

使用消息代理作为 LLM 路由器的关键优势

1. 简化集成

使用消息代理作为路由器可抽象掉直接与不同 LLM API 交互的复杂性,简化客户端代码并降低错误发生概率。

2. 多模型应用场景

消息代理可促进多个 LLM 或专用于不同任务(如一个模型用于摘要,另一个用于情感分析)的模型间通信,能高效地将请求路由到相应模型,使应用程序可在不增加额外开销的情况下充分发挥各模型的优势。

3. 批量处理与大规模推理

对于需要批量处理或大规模推理任务的应用,消息代理可通过在 LLM 忙碌或不可用时排队请求来实现异步处理。这确保了即使在高负载下也不会丢失数据或请求,提供可靠的处理能力。

4. 冗余与故障转移保障

在关键业务场景中,消息代理可确保无缝故障转移至其他环境。例如,如果连接提供 OpenAI 模型的云提供商失败,KubeMQ 可自动切换至其他提供商。这种冗余性可确保 AI 操作不间断,维护服务可靠性和客户满意度。

5. 高流量应用处理

消息代理可将传入请求分发至多个 LLM 实例或副本,防止过载并确保顺畅运行。这种负载均衡对于高流量应用至关重要,使其能够有效扩展而不影响性能。

使用 KubeMQ 构建 LLM 路由器:集成 OpenAI 和 Claude

现在,我将指导大家如何使用 KubeMQ 构建一个与 OpenAI 和 Anthropic Claude 交互的路由器。KubeMQ 是一款领先、开源的消息代理和消息队列平台,我们将利用其优势并提供代码示例,介绍如何设置消息代理、构建服务器端路由器以及创建用于发送查询的客户端。所有代码示例均可在 KubeMQ 的 GitHub(https://github.com/kubemq-io/kubemq-llm-router) 仓库中找到。

先决条件

在开始之前,请确保满足以下条件:

  • • 已安装 Python 3.7 或更高版本。

  • • 机器上已安装 Docker。

  • • 拥有 OpenAI 和 Anthropic 的有效 API 密钥。

  • • 拥有 KubeMQ 密钥(可从 KubeMQ 网站获取)。

  • • 已安装 kubemq-cq Python 包。

  • • 创建包含 API 密钥的 .env 文件:

OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key

设置 KubeMQ

首先,我们需要确保 KubeMQ 正常运行。我们将使用 Docker 进行部署:

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your_token" \
  kubemq/kubemq-community:latest

端口说明:

  • • 8080:暴露 KubeMQ REST API

  • • 50000:打开用于客户端 - 服务器通信的 gRPC 端口

  • • 9090:暴露 KubeMQ REST 网关
    注意:将 your_token 替换为实际的 KubeMQ 密钥。

创建 LLM 路由器服务器

LLM 路由器作为客户端和 LLM 之间的中间件。它监听特定通道上的查询并将它们路由至相应的 LLM。

server.py
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading

load_dotenv()

classLLMRouter:
    def__init__(self):
        self.openai_llm = ChatOpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            model_name="gpt-3.5-turbo"
        )
        self.claude_llm = Anthropic(
            api_key=os.getenv("ANTHROPIC_API_KEY"),
            model="claude-3"
        )
        self.client = Client(address="localhost:50000")

    defhandle_openai_query(self, request: QueryMessageReceived):
        try:
            message = request.body.decode('utf-8')
            result = self.openai_llm(message)
            response = QueryResponseMessage(
                query_received=request,
                is_executed=True,
                body=result.encode('utf-8')
            )
            self.client.send_response_message(response)
        except Exception as e:
            self.client.send_response_message(QueryResponseMessage(
                query_received=request,
                is_executed=False,
                error=str(e)
            ))

    defhandle_claude_query(self, request: QueryMessageReceived):
        try:
            message = request.body.decode('utf-8')
            result = self.claude_llm(message)
            response = QueryResponseMessage(
                query_received=request,
                is_executed=True,
                body=result.encode('utf-8')
            )
            self.client.send_response_message(response)
        except Exception as e:
            self.client.send_response_message(QueryResponseMessage(
                query_received=request,
                is_executed=False,
                error=str(e)
            ))

    defrun(self):
        defon_error(err: str):
            print(f"Error: {err}")

        defsubscribe_openai():
            self.client.subscribe_to_queries(
                subscription=QueriesSubscription(
                    channel="openai_requests",
                    on_receive_query_callback=self.handle_openai_query,
                    on_error_callback=on_error,
                ),
                cancel=CancellationToken()
            )

        defsubscribe_claude():
            self.client.subscribe_to_queries(
                subscription=QueriesSubscription(
                    channel="claude_requests",
                    on_receive_query_callback=self.handle_claude_query,
                    on_error_callback=on_error,
                ),
                cancel=CancellationToken()
            )

        threading.Thread(target=subscribe_openai).start()
        threading.Thread(target=subscribe_claude).start()

        print("LLM Router running on channels: openai_requests, claude_requests")
        try:
            whileTrue:
                time.sleep(1)
        except KeyboardInterrupt:
            print("Shutting down...")

if __name__ == "__main__":
    router = LLMRouter()
    router.run()

解释:

  • • 初始化

    • • 加载 API 密钥的环境变量。

    • • 初始化 OpenAI 和 Anthropic LLM 的客户端。

    • • 设置 KubeMQ 客户端。

  • • 处理查询

    • • handle_openai_query 和 handle_claude_query 解码传入消息,将其传递给相应的 LLM,并发送回响应。

    • • 捕获错误并发送回 is_executed 标志设置为 False 的响应。

  • • 订阅

    • • 路由器订阅两个通道:openai_requests 和 claude_requests

    • • 使用线程并行处理订阅。

  • • 运行服务器

    • • run 方法启动订阅并保持服务器运行,直到收到中断信号。

开发 LLM 客户端

客户端向 LLM 路由器发送查询,指定要使用的模型。

client.py
from kubemq.cq import Client, QueryMessage
import json

classLLMClient:
    def__init__(self, address="localhost:50000"):
        self.client = Client(address=address)

    defsend_message(self, message: str, model: str) -> dict:
        channel = f"{model}_requests"
        response = self.client.send_query_request(QueryMessage(
            channel=channel,
            body=message.encode('utf-8'),
            timeout_in_seconds=30
        ))
        if response.is_error:
            return {"error": response.error}
        else:
            return {"response": response.body.decode('utf-8')}

if __name__ == "__main__":
    client = LLMClient()
    models = ["openai", "claude"]
    message = input("Enter your message: ")
    model = input(f"Choose model ({'/'.join(models)}): ")
    if model in models:
        response = client.send_message(message, model)
        if"error"in response:
            print(f"Error: {response['error']}")
        else:
            print(f"Response: {response['response']}")
    else:
        print("Invalid model selected")

解释:

  • • 初始化

    • • 设置 KubeMQ 客户端。

  • • 发送消息

    • • send_message 方法根据选定的模型构建相应的通道。

    • • 向路由器发送查询消息并等待响应。

    • • 处理错误并解码响应体。

  • • 用户交互

    • • 提示用户输入消息并选择模型。

    • • 打印来自 LLM 的响应。

通过 REST 进行发送和接收

对于偏好或需要 RESTful 通信的服务或客户端,KubeMQ 提供了 REST 端点。

通过 REST 发送请求

端点:

POST http://localhost:9090/send/request

头部:

Content-Type: application/json

主体:

{
   "RequestTypeData": 2,
   "ClientID": "LLMRouter-sender",
   "Channel": "openai_requests",
   "BodyString": "What is the capital of France?",
   "Timeout": 30000
}

载荷说明:

  • • RequestTypeData:指定请求类型(2 表示查询)。

  • • ClientID:发送请求的客户端的标识符。

  • • Channel:对应 LLM 模型的通道(openai_requests 或 claude_requests)。

  • • BodyString:要发送给 LLM 的消息。

  • • Timeout:等待响应的时间(以毫秒为单位)。

接收响应

响应将是一个包含 LLM 输出或错误消息的 JSON 对象。


综上,通过利用消息代理(KubeMQ),我们构建了一个可扩展且高效的路由器,能够与多个 LLM 交互。这种设置使客户端能够无缝地向不同模型发送查询,并且可以扩展以包含更多模型或功能。这种方法的优势包括:

  1. 1. 简化集成:抽象掉直接与不同 LLM API 交互的复杂性,简化客户端代码并降低错误发生概率。

  2. 2. 多模型支持:高效地将请求路由至专用于不同任务的相应模型。

  3. 3. 可靠性:确保即使在 LLM 忙碌或不可用时也不会丢失数据。

  4. 4. 冗余性:提供故障转移机制以维持不间断的运营。

  5. 5. 可扩展性:通过在多个 LLM 实例上分发请求来处理高流量。