python 基于 httpx 的流式请求

发布于:2025-07-18 ⋅ 阅读:(14) ⋅ 点赞:(0)


参考:
https://www.jb51.net/article/262636.htm

次要参考:
https://blog.csdn.net/gitblog_00079/article/details/139587558
https://blog.csdn.net/gitblog_00626/article/details/141801526
https://www.cnblogs.com/kaibindirver/p/18755942

https://juejin.cn/post/7088892051470680078
https://cloud.tencent.com/developer/article/1988628
https://docs.pingcode.com/ask/1179824.html
https://blog.csdn.net/2501_91483145/article/details/148616194

1. 环境介绍

本文使用 ollama 部署本地模型:

api_key 	= "EMPTY"
base_url 	= "http://192.168.17.100:11434/v1/chat/completions"
model 		= "deepseek-r1:1.5b"

2. 同步客户端

重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501
https://www.jb51.net/article/262636.htm

2.1. 面向过程

import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunk

from httpx_sse import EventSource
from httpx import AsyncClient, Client, Timeout

def __chunk(data, stream: bool=True):
    if stream:
        if data:
            answer_data = data.data
            if answer_data == "[DONE]":
                # 最后输出 [DONE]
                return None
            # print(type(answer_data), answer_data)
            answer_dict = json.loads(answer_data)
            # print(type(answer_dict), answer_dict)

            try:
                # print(answer_dict["choices"][0]["delta"]["content"])
                # return answer_dict["choices"][0]["delta"]["content"]
                return ChatCompletionChunk(**answer_dict)
            except Exception as ex:
                print(f"__chunk Exception:{str(ex)}")
    else:
        answer_dict = json.loads(data)
        # print(type(answer_dict), answer_dict)
        try:
            # print(answer_dict["choices"][0]["delta"]["content"])
            # return answer_dict["choices"][0]["delta"]["content"]
            return ChatCompletion(**answer_dict)
        except Exception as ex:
            print(f"__chunk Exception:{str(ex)}")
    return None
        
# async def async_main(base_url, headers, data):
#     async with AsyncClient() as client:
#         try:
#             # 重点增加超时配置
#             # 总超时设为 5秒,但读取超时设为 10秒
#             timeout_config = Timeout(5.0, read=10.0)

#             async with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:
#                 content_type = response.headers.get('content-type', '').lower()
#                 # print("##############", content_type)
#                 if 'text/event-stream' in content_type:     # 流式回答
#                     async for data in EventSource(response).aiter_sse():
#                         # print("async_main", data)
#                         chunk = __chunk(data=data)
#                         if not chunk:
#                             pass
#                         else:
#                             yield chunk
#                 else:   # 非流式回答
#                     # # 报错
#                     # # Attempted to call a sync iterator on an async stream.
#                     # result = await response.read()
#                     # print(result)
#                     pass
#         except Exception as ex:
#             print(f"async_main Exception:{str(ex)}")


def sync_main(base_url, headers, data):
    with Client() as client:
        try:
            # 重点增加超时配置
            # 总超时设为 5秒,但读取超时设为 10秒
            timeout_config = Timeout(5.0, read=10.0)

            with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:
                content_type = response.headers.get('content-type', '').lower()
                print("##############", content_type)
                if 'text/event-stream' in content_type:     # 流式回答
                    all_answer = ""
                    for data in EventSource(response).iter_sse():
                        chunk = __chunk(data=data)
                        if not chunk:
                            pass
                        else:
                            # all_answer += answer_text
                            # print(chunk)
                            yield chunk
                        print(all_answer)
                else:   # 非流式回答
                    print(response.read())
                    chunk = __chunk(response.read(), stream=False)
                    yield chunk
        except Exception as e:
            print(e)

if __name__ == "__main__":
    api_key     = "EMPTY"
    base_url    = "http://192.168.17.100:11434/v1/chat/completions"
    model       = "deepseek-r1:1.5b"

    headers = {
            "Authorization" : f"Bearer {api_key}",
            "Accept": "*/*",
            # "Accept": "text/event-stream"
        }

    messages = [
        {"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},
        {"role": "user", "content": "你好"}
    ]
    data = {
            "model": model,
            "messages": messages,
            "stream" : True
        }
    
    response = sync_main(base_url=base_url, headers=headers, data=data)
    for chunk in response:
        print(chunk)

2.1.1. 流式输出

......
{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\u5417"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='吗', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)

{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\uff1f"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='?', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)

{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": "stop"}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason='stop', index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)

解析效果

{
    "id": "chatcmpl-476",
    "object": "chat.completion.chunk",
    "created": 1752575345,
    "model": "deepseek-r1:1.5b",
    "system_fingerprint": "fp_ollama",
    "choices": [
        {
            "index": 0,
            "delta": {
                "role": "assistant",
                "content": ""
            },
            "finish_reason": "stop"
        }
    ]
}

2.1.2. 非流式输出

{"id": "chatcmpl-485", "object": "chat.completion", "created": 1752575233, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "message": {"role": "assistant", "content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 27, "completion_tokens": 78, "total_tokens": 105}}
ChatCompletion(id='chatcmpl-485', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='<think>\n嗯,用户发来了“手写文
字”里的这句话:“你好”。这是一个常见的问候语。\n\n现在,我需要根据我的知识库来判断这句问候是否正确。假设我是一位自然 lang Gaussian assistant,我会确认“你好”是一个常用
的中文问候,不会是错误的表达。\n\n因此,我可以回复“你好”来确认这一点。\n</think>\n\n你好!', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None))], created=1752575233, model='deepseek-r1:1.5b', object='chat.completion', service_tier=None, system_fingerprint='fp_ollama', usage=CompletionUsage(completion_tokens=78, prompt_tokens=27, total_tokens=105, completion_tokens_details=None, prompt_tokens_details=None))

解析效果

{
    "id": "chatcmpl-485",
    "object": "chat.completion",
    "created": 1752575233,
    "model": "deepseek-r1:1.5b",
    "system_fingerprint": "fp_ollama",
    "choices": [
        {
            "index": 0,
            "message": {
                "role": "assistant",
                "content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"
            },
            "finish_reason": "stop"
        }
    ],
    "usage": {
        "prompt_tokens": 27,
        "completion_tokens": 78,
        "total_tokens": 105
    }
}

2.2. 面向对象

import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunk

from httpx_sse import EventSource
from httpx import AsyncClient, Client, Timeout

class SyncHttpxClient():
    def __init__(self, api_key: str, base_url: str, timeout: int=5):
        self.api_key    = api_key
        self.base_url   = base_url
        self.timeout    = timeout

        self.headers = {
                "Authorization" : f"Bearer {api_key}",
                "Accept": "*/*",
                # "Accept": "text/event-stream"
            }

    def __chunk(self, data, stream: bool=True):
        if stream:
            if data:
                answer_data = data.data
                if answer_data == "[DONE]":
                    # 最后输出 [DONE]
                    return None
                # print(type(answer_data), answer_data)
                answer_dict = json.loads(answer_data)
                # print(type(answer_dict), answer_dict)

                try:
                    # print(answer_dict["choices"][0]["delta"]["content"])
                    # return answer_dict["choices"][0]["delta"]["content"]
                    return ChatCompletionChunk(**answer_dict)
                except Exception as ex:
                    print(f"__chunk Exception:{str(ex)}")
        else:
            answer_dict = json.loads(data)
            # print(type(answer_dict), answer_dict)
            try:
                # print(answer_dict["choices"][0]["delta"]["content"])
                # return answer_dict["choices"][0]["delta"]["content"]
                return ChatCompletion(**answer_dict)
            except Exception as ex:
                print(f"__chunk Exception:{str(ex)}")
        return None

    def generate(self, 
                       model:str, 
                       messages:list, 
                       functions=None, 
                       temperature:int=1, 
                       top_p:float=0, 
                       max_tokens:int=2048, 
                       stream:bool=True):
        data = {
                "model": model,
                "messages": messages,
                "functions": functions,
                "temperature": temperature,
                "top_p": top_p,
                "max_tokens": max_tokens,
                "stream" : stream
            }
        with Client() as client:
            try:
                # 重点增加超时配置
                # 总超时设为 5秒,但读取超时设为 10秒
                timeout_config = Timeout(self.timeout, read=10.0)

                with client.stream('POST', url=self.base_url, headers=self.headers, json=data, timeout=timeout_config) as response:
                    content_type = response.headers.get('content-type', '').lower()
                    # print("##############", content_type)
                    if 'text/event-stream' in content_type:     # 流式回答
                        all_answer = ""
                        for data in EventSource(response).iter_sse():
                            chunk = self.__chunk(data=data)
                            if not chunk:
                                pass
                            else:
                                # all_answer += answer_text
                                # print(chunk)
                                yield chunk
                            print(all_answer)
                    else:   # 非流式回答
                        print(response.read())
                        chunk = self.__chunk(response.read(), stream=False)
                        yield chunk
            except Exception as e:
                print(e)

if __name__ == "__main__":
    api_key     = "EMPTY"
    base_url    = "http://192.168.17.100:11434/v1/chat/completions"
    model       = "deepseek-r1:1.5b"

    sync_client = SyncHttpxClient(api_key=api_key, base_url=base_url)

    messages = [
        {"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},
        {"role": "user", "content": "你好"}
    ]

    response = sync_client.generate(
        model=model,
        messages=messages,
        stream=True
    )
    all_answer = ""
    for chunk in response:
        # all_answer += chunk.choices[0].delta.content
        print(chunk)
        # print(all_answer)

3. 异步客户端

3.1. 面向过程

重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501

import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunk

from httpx_sse import EventSource
from httpx import AsyncClient, Timeout

def __chunk(data):
    if data:
        answer_data = data.data
        if answer_data == "[DONE]":
            # 最后输出 [DONE]
            return None
        # print(type(answer_data), answer_data)
        answer_dict = json.loads(answer_data)
        print(type(answer_dict), answer_dict)

        try:
            # print(answer_dict["choices"][0]["delta"]["content"])
            # return answer_dict["choices"][0]["delta"]["content"]
            return ChatCompletionChunk(**answer_dict)
        except Exception as ex:
            print(f"__chunk Exception:{str(ex)}")
    return None
        
async def async_main(base_url, headers, data):
    async with AsyncClient() as client:
        try:
            # 重点增加超时配置
            # 总超时设为 5秒,但读取超时设为 10秒
            timeout_config = Timeout(5.0, read=10.0)

            async with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:
                content_type = response.headers.get('content-type', '').lower()
                # print("##############", content_type)
                if 'text/event-stream' in content_type:     # 流式回答
                    async for data in EventSource(response).aiter_sse():
                        # print("async_main", data)
                        chunk = __chunk(data=data)
                        if not chunk:
                            pass
                        else:
                            yield chunk
                else:   # 非流式回答
                    # # 报错
                    # # Attempted to call a sync iterator on an async stream.
                    # result = await response.read()
                    # print(result)
                    pass
        except Exception as ex:
            print(f"async_main Exception:{str(ex)}")

async def main():
    api_key     = "EMPTY"
    base_url    = "http://192.168.17.100:11434/v1/chat/completions"
    model       = "deepseek-r1:1.5b"

    headers = {
            "Authorization" : f"Bearer {api_key}",
            "Accept": "*/*",
            # "Accept": "text/event-stream"
        }

    messages = [
        {"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},
        {"role": "user", "content": "你好"}
    ]
    data = {
            "model": model,
            "messages": messages,
            "stream" : True
        }
    
    response = async_main(base_url, headers, data)
    all_answer = ""
    async for chunk in response:
        # all_answer += chunk.choices[0].delta.content
        print(chunk)
        # print(all_answer)

if __name__ == "__main__":
    asyncio.run(main())

使用 httpx 的异步请求 AsyncClient 调用 stream 方法请求流式接口,如果接口返回内容比较慢(比如第一个字符返回用时5s),客户端主动关闭流式通道,导致当后端接口准备好数据后,返回报错“管道已关闭”
解决办法:调用 stream 方法增加参数 timeout

3.2. 面向对象

import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunk

from httpx_sse import EventSource
from httpx import AsyncClient, Timeout

class AsyncHttpxClient():
    def __init__(self, api_key: str, base_url: str, timeout: int=5):
        self.api_key    = api_key
        self.base_url   = base_url
        self.timeout    = timeout

        self.headers = {
                "Authorization" : f"Bearer {api_key}",
                "Accept": "*/*",
                # "Accept": "text/event-stream"
            }

    def __chunk(self, data):
        if data:
            answer_data = data.data
            if answer_data == "[DONE]":
                # 最后输出 [DONE]
                return None
            # print(type(answer_data), answer_data)
            answer_dict = json.loads(answer_data)
            # print(type(answer_dict), answer_dict)

            try:
                # print(answer_dict["choices"][0]["delta"]["content"])
                # return answer_dict["choices"][0]["delta"]["content"]
                return ChatCompletionChunk(**answer_dict)
            except Exception as ex:
                print(f"AsyncHttpxClient.__chunk Exception:{str(ex)}")
        return None

    # model="qwen",
    # messages=[
    #     # {"role": "system", "content": "You are a helpful assistant."},
    #     # {"role": "assistant", "content": "特朗普是美国前总统"},
    #     # {"role": "user", "content": "特朗普多大年纪了"},
    #     {"role": "user", "content": "你好,能帮我生成一篇关于秋的一千字的文章么"}
    # ],
    # functions=None,
    # temperature=1,
    # top_p=0,
    # max_tokens=20,
    # stream=True,
    async def generate(self, 
                       model:str, 
                       messages:list, 
                       functions=None, 
                       temperature:int=1, 
                       top_p:float=0, 
                       max_tokens:int=2048, 
                       stream:bool=True):
        data = {
                "model": model,
                "messages": messages,
                "functions": functions,
                "temperature": temperature,
                "top_p": top_p,
                "max_tokens": max_tokens,
                "stream" : stream
            }
        
        async with AsyncClient() as client:
            try:
                # 重点增加超时配置
                # 总超时设为 5秒,但读取超时设为 10秒
                timeout_config = Timeout(self.timeout, read=10.0)

                async with client.stream('POST', url=self.base_url, headers=self.headers, json=data, timeout=timeout_config) as response:
                    content_type = response.headers.get('content-type', '').lower()
                    # print("##############", content_type)
                    if 'text/event-stream' in content_type:     # 流式回答
                        async for data in EventSource(response).aiter_sse():
                            chunk = self.__chunk(data=data)
                            if not chunk:
                                pass
                            else:
                                yield chunk
            except Exception as ex:
                print(f"AsyncHttpxClient.generate Exception:{str(ex)}")

async def main():
    api_key     = "EMPTY"
    base_url    = "http://192.168.17.100:11434/v1/chat/completions"
    model       = "deepseek-r1:1.5b"

    async_client = AsyncHttpxClient(api_key=api_key, base_url=base_url)

    messages = [
        {"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},
        {"role": "user", "content": "你好"}
    ]

    response = async_client.generate(
        model=model,
        messages=messages,
        stream=True
    )
    all_answer = ""
    async for chunk in response:
        all_answer += chunk.choices[0].delta.content
        # print(chunk)
        print(all_answer)

if __name__ == "__main__":
    asyncio.run(main=main())

3.3. Attempted to call a sync iterator on an async stream.

当异步使用 stream=False 时,会报这个错。尚未解决。


网站公告

今日签到

点亮在社区的每一天
去签到