问题描述
调用sse流式接口使用httpx_sse的方式
import httpx
from httpx_sse import connect_sse
# 省略无关代码
try:
with httpx.Client() as client:
with connect_sse(client, "GET", url, params=param) as event_source:
clear_textbox(response_textbox)
# 把 iter_sse() 迭代完, 就相当于处理完了一次流式调用
for sse in event_source.iter_sse():
# 流式响应中,每次响应体的处理逻辑
print(f"generated_answer的值是: '{sse.data}'")
response = sse.data
if response != '':
# self.response = response
append_text(response_textbox, response)
except httpx.RequestError as e:
print(f"请求错误:{e}")
except Exception as e:
print(f"发生了一个错误:{e}")
httpx_sse的connet_sse源码:
@contextmanager
def connect_sse(
client: httpx.Client, method: str, url: str, **kwargs: Any
) -> Iterator[EventSource]:
headers = kwargs.pop("headers", {})
headers["Accept"] = "text/event-stream"
headers["Cache-Control"] = "no-store"
with client.stream(method, url, headers=headers, **kwargs) as response:
yield EventSource(response)
可以看到connect_sse源码中的headers的"Accept"设置了只接受"text/event-stream"流式结果,正常这么调用是没错的。但是当后端的流式接口因为401权限问题等报错返回了"application/json"格式,如
{ “code”:401, “msg”:“登录过期,请重新登录”, “data”:null} 这样的json格式结果时,以上代码就会报错,因为他不是"text/event-stream"流式响应结果头。那么该怎么办呢?
方案
重新写一个自定义的connect_sse。
import httpx
from httpx_sse import EventSource
from typing import Any, Iterator
from contextlib import contextmanager
import json
# 自定义调用sse接口
@contextmanager
def custom_connect_sse(
self, client: httpx.Client, method: str, url: str, **kwargs: Any
) -> Iterator[EventSource]:
headers = kwargs.pop("headers", {})
# 只有当没有指定Accept时才添加默认值
headers["Accept"] = "*/*"
headers["Cache-Control"] = "no-store"
with client.stream(method, url, headers=headers, **kwargs) as response:
content_type = response.headers.get('content-type', '').lower()
json_flag = False
if 'text/event-stream' in content_type:
# 处理SSE流
yield json_flag, EventSource(response)
elif 'application/json' in content_type:
# yield response # 在这里你可以决定如何进一步处理这个JSON响应
# 读取并合并所有文本块
text_data = ''.join([chunk for chunk in response.iter_text()])
# 解析整个响应体为JSON
json_data = json.loads(text_data)
json_flag = True
yield json_flag, json_data
调用代码
# 使用自定义的connect_sse函数
try:
with httpx.Client() as client:
with self.custom_connect_sse(client, "GET", url, params=param, headers=headers) as (json_flag, event_source):
if json_flag:
code = event_source.get("code")
msg = event_source.get("msg")
print(f"Code: {code}, Message: {msg}")
else:
full_answer = ""
clear_textbox(response_textbox)
for sse in event_source.iter_sse():
print(f"generated_answer的值是: '{sse.data}'")
response = sse.data
if response:
append_text(response_textbox, response)
full_answer += response
user_record += reply + full_answer + "\n"
print(f"user_record:{user_record}")
except httpx.RequestError as e:
print(f"请求错误:{e}")
except Exception as e:
print(f"发生了一个错误:{e}")
关键步骤:
1.设置headers[“Accept”] = “/”,所有响应头都可以接收
2.content_type = response.headers.get(‘content-type’, ‘’).lower() 判断响应头是流式还是json,并用json_flag记录是否json标识,返回不同的结果。如果是json,则循环合并处理chunk块,拼装完整json返回结果(实测第一次就返回完整json结构了,但是代码得这么写)。
3.使用自定义connect_sse方法时,根据json_flag来分别处理成功调用流式结果还是异常的json结果。