本文主要从提供SSE方式接入DeepSeek,并通过fastapi websocket对外提供接入方法。
参考文档:
腾讯云大模型:https://cloud.tencent.com/document/product/1759/109380
fastAPI官网:https://fastapi.tiangolo.com/
WebSocketManager
提供WebsocketManager对websocket连接实例进行管理,代码片段如下:
from fastapi import WebSocket
class WsManager:
"""
websocket manager
"""
connectors: dict[str, WebSocket] = {}
@staticmethod
def get_connector(connector_id: str) -> WebSocket | None:
"""
获取连接实例
:param connector_id:
:return:
"""
connector = WsManager.connectors.get(connector_id)
return connector
@staticmethod
def add_connector(connector_id: str, connector: WebSocket):
"""
添加websocket客户端
:param connector_id: 客户端ID
:param connector: 客户端连接实例
:return:
"""
WsManager.connectors[connector_id] = connector
@staticmethod
def remove_connector(connector_id: str):
"""
移除websocket连接
:param connector_id: 连接ID
:return:
"""
try:
del WsManager.connectors[connector_id]
except Exception:
pass
@staticmethod
async def send_message(connector_id: str, message: str):
connector = WsManager.connectors.get(connector_id)
if connector is None:
return
try:
await connector.send_text(message)
except Exception as e:
print('消息发送失败')
@staticmethod
async def send_message_json(connector_id: str, message: dict):
connector = WsManager.connectors.get(connector_id)
if connector is None:
return
try:
await connector.send_json(message)
except Exception as e:
print('消息发送失败')
接入DeepSeek
import uuid
import os
import requests
import json
from src.core.WsManager import WsManager
from .session import get_session
TCLOUD_URL = 'lke.tencentcloudapi.com'
TCLOUD_DeepSeek_SSE_URL = "https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse"
SECRET_ID = "XXXXXXXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXX"
REGION = "ap-guangzhou"
TYPE = 5
def get_session():
"""
生成一个 UUID
:return session id 字符串
"""
new_uuid = uuid.uuid4()
# 将 UUID 转换为字符串
session_id = str(new_uuid)
return session_id
def _resolve_message(message: str, prev_message: str):
"""
处理消息
:param message:
:param prev_message:
:return:
"""
if prev_message == '':
return None
if prev_message.startswith("event:"):
# 生成消息
message = message.replace("data:", '').strip()
try:
message_json = json.loads(message)
message_type = message_json.get('type')
payload = message_json.get('payload')
if message_type == 'token_stat':
# token统计事件
return None
elif message_type == 'thought':
# 思考事件
return None
elif message_type == 'error':
# 错误事件
return None
elif message_type == 'reference':
# 参考来源事件
return None
content = payload.get('content')
record_id = payload.get('record_id')
request_id = payload.get('request_id')
session_id = payload.get('session_id')
trace_id = payload.get('trace_id')
message_id = payload.get('message_id')
return {
'type': message_type,
'data': {
'content': content,
'record_id': record_id,
'request_id': request_id,
'session_id': session_id,
'trace_id': trace_id,
'message_id': message_id,
}
}
except Exception as e:
print(e)
else:
return None
async def get_q_a_result(visitor_id: str, question: str):
"""
获取问答结果
:param visitor_id: 访客ID
:param question: 问题内容
:return: 回答内容
"""
session_id = get_session()
req_data = {
"content": f"{question}",
"bot_app_key": "XXXXXXXX", # 可以获取方式见下图
"visitor_biz_id": visitor_id,
"session_id": session_id,
"streaming_throttle": 100
}
res = requests.post(TCLOUD_DeepSeek_SSE_URL, data=json.dumps(req_data),
stream=True, headers={"Accept": "text/event-stream"})
prev_message: str = '' # 上一条消息
if res.status_code == 200:
for line in res.iter_lines():
if line:
data = line.decode('utf-8')
message = _resolve_message(data, prev_message)
if message:
await WsManager.send_message_json(visitor_id, message)
prev_message = data
else:
print('Failed to get data. Status code: {response.status_code}')
# 关闭websocket连接
connector = WsManager.get_connector(visitor_id)
if connector:
await connector.close()
WsManager.remove_connector(visitor_id)
- 进入控制台
2. 创建应用,并点击调用
通过fastapi 以websocket方式对外提供接口
这个接口是,前台连接websocket后,后台直接根据连接参数,开始调用问答,问答结束后自动关闭websocket连接
import asyncio
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponse
from src.core.WsManager import WsManager
from src.utils.tcloudLLM import get_example_recommend
from src.utils.session import get_visitor_id
router = APIRouter(prefix="/api/tcloud", tags=['腾信云大模型接口'])
@router.websocket("/ws/{compound_name}")
async def get_exercises_ws(compound_name: str, websocket: WebSocket, background_tasks: BackgroundTasks):
await websocket.accept()
visitor_id = get_visitor_id()
WsManager.add_connector(visitor_id, websocket)
# 本来使用BackgroundTask去做后台任务,结果不知道什么原因,调用不起来,然后换成asyncio处理
asyncio.create_task(get_example_recommend(visitor_id, compound_name)) # 创建对应问答任务
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect:
WsManager.remove_connector(visitor_id)
except Exception:
WsManager.remove_connector(visitor_id)
如果需要进行多轮问答,提供为多次问答设置相关的
request_id
参数进行消息串联。