使用python接入腾讯云DeepSeek

发布于:2025-02-25 ⋅ 阅读:(26) ⋅ 点赞:(0)

本文主要从提供SSE方式接入DeepSeek,并通过fastapi websocket对外提供接入方法。
参考文档:
腾讯云大模型:https://cloud.tencent.com/document/product/1759/109380
fastAPI官网:https://fastapi.tiangolo.com/

WebSocketManager

提供WebsocketManager对websocket连接实例进行管理,代码片段如下:

from fastapi import WebSocket


class WsManager:
    """
    websocket manager
    """
    connectors: dict[str, WebSocket] = {}

    @staticmethod
    def get_connector(connector_id: str) -> WebSocket | None:
        """
        获取连接实例
        :param connector_id:
        :return:
        """
        connector = WsManager.connectors.get(connector_id)
        return connector

    @staticmethod
    def add_connector(connector_id: str, connector: WebSocket):
        """
        添加websocket客户端
        :param connector_id: 客户端ID
        :param connector: 客户端连接实例
        :return:
        """
        WsManager.connectors[connector_id] = connector

    @staticmethod
    def remove_connector(connector_id: str):
        """
        移除websocket连接
        :param connector_id: 连接ID
        :return:
        """
        try:
            del WsManager.connectors[connector_id]
        except Exception:
            pass

    @staticmethod
    async def send_message(connector_id: str, message: str):
        connector = WsManager.connectors.get(connector_id)
        if connector is None:
            return
        try:
            await connector.send_text(message)
        except Exception as e:
            print('消息发送失败')

    @staticmethod
    async def send_message_json(connector_id: str, message: dict):
        connector = WsManager.connectors.get(connector_id)
        if connector is None:
            return
        try:
            await connector.send_json(message)
        except Exception as e:
            print('消息发送失败')

接入DeepSeek

import uuid
import os
import requests
import json

from src.core.WsManager import WsManager
from .session import get_session

TCLOUD_URL = 'lke.tencentcloudapi.com'
TCLOUD_DeepSeek_SSE_URL = "https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse"

SECRET_ID = "XXXXXXXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXX"
REGION = "ap-guangzhou"
TYPE = 5


def get_session():
    """
    生成一个 UUID
    :return session id 字符串
    """
    new_uuid = uuid.uuid4()
    # 将 UUID 转换为字符串
    session_id = str(new_uuid)
    return session_id


def _resolve_message(message: str, prev_message: str):
    """
    处理消息
    :param message:
    :param prev_message:
    :return:
    """
    if prev_message == '':
        return None
    if prev_message.startswith("event:"):
        # 生成消息
        message = message.replace("data:", '').strip()
        try:
            message_json = json.loads(message)
            message_type = message_json.get('type')
            payload = message_json.get('payload')
            if message_type == 'token_stat':
                # token统计事件
                return None
            elif message_type == 'thought':
                # 思考事件
                return None
            elif message_type == 'error':
                # 错误事件
                return None
            elif message_type == 'reference':
                # 参考来源事件
                return None
            content = payload.get('content')
            record_id = payload.get('record_id')
            request_id = payload.get('request_id')
            session_id = payload.get('session_id')
            trace_id = payload.get('trace_id')
            message_id = payload.get('message_id')
            return {
                'type': message_type,
                'data': {
                    'content': content,
                    'record_id': record_id,
                    'request_id': request_id,
                    'session_id': session_id,
                    'trace_id': trace_id,
                    'message_id': message_id,
                }
            }
        except Exception as e:
            print(e)
    else:
        return None


async def get_q_a_result(visitor_id: str, question: str):
    """
    获取问答结果
    :param visitor_id: 访客ID
    :param question: 问题内容
    :return: 回答内容
    """
    session_id = get_session()
    req_data = {
        "content": f"{question}",
        "bot_app_key": "XXXXXXXX",  # 可以获取方式见下图
        "visitor_biz_id": visitor_id,
        "session_id": session_id,
        "streaming_throttle": 100
    }
    res = requests.post(TCLOUD_DeepSeek_SSE_URL, data=json.dumps(req_data),
                        stream=True, headers={"Accept": "text/event-stream"})
    prev_message: str = ''  # 上一条消息
    if res.status_code == 200:
        for line in res.iter_lines():
            if line:
                data = line.decode('utf-8')
                message = _resolve_message(data, prev_message)
                if message:
                    await WsManager.send_message_json(visitor_id, message)
                prev_message = data
    else:
        print('Failed to get data. Status code: {response.status_code}')
    # 关闭websocket连接
    connector = WsManager.get_connector(visitor_id)
    if connector:
        await connector.close()
        WsManager.remove_connector(visitor_id)

  1. 进入控制台
    在这里插入图片描述2. 创建应用,并点击调用
    在这里插入图片描述

通过fastapi 以websocket方式对外提供接口

这个接口是,前台连接websocket后,后台直接根据连接参数,开始调用问答,问答结束后自动关闭websocket连接

import asyncio

from fastapi import APIRouter, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponse

from src.core.WsManager import WsManager
from src.utils.tcloudLLM import get_example_recommend
from src.utils.session import get_visitor_id

router = APIRouter(prefix="/api/tcloud", tags=['腾信云大模型接口'])


@router.websocket("/ws/{compound_name}")
async def get_exercises_ws(compound_name: str, websocket: WebSocket, background_tasks: BackgroundTasks):
    await websocket.accept()
    visitor_id = get_visitor_id()
    WsManager.add_connector(visitor_id, websocket)
    # 本来使用BackgroundTask去做后台任务,结果不知道什么原因,调用不起来,然后换成asyncio处理
    asyncio.create_task(get_example_recommend(visitor_id, compound_name))  # 创建对应问答任务
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        WsManager.remove_connector(visitor_id)
    except Exception:
        WsManager.remove_connector(visitor_id)

如果需要进行多轮问答,提供为多次问答设置相关的 request_id 参数进行消息串联。


网站公告

今日签到

点亮在社区的每一天
去签到