文章目录
参考:
https://www.jb51.net/article/262636.htm
次要参考:
https://blog.csdn.net/gitblog_00079/article/details/139587558
https://blog.csdn.net/gitblog_00626/article/details/141801526
https://www.cnblogs.com/kaibindirver/p/18755942
https://juejin.cn/post/7088892051470680078
https://cloud.tencent.com/developer/article/1988628
https://docs.pingcode.com/ask/1179824.html
https://blog.csdn.net/2501_91483145/article/details/148616194
1. 环境介绍
本文使用 ollama 部署本地模型:
api_key = "EMPTY"
base_url = "http://192.168.17.100:11434/v1/chat/completions"
model = "deepseek-r1:1.5b"
2. 同步客户端
重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501
https://www.jb51.net/article/262636.htm
2.1. 面向过程
import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from httpx_sse import EventSource
from httpx import AsyncClient, Client, Timeout
def __chunk(data, stream: bool=True):
if stream:
if data:
answer_data = data.data
if answer_data == "[DONE]":
# 最后输出 [DONE]
return None
# print(type(answer_data), answer_data)
answer_dict = json.loads(answer_data)
# print(type(answer_dict), answer_dict)
try:
# print(answer_dict["choices"][0]["delta"]["content"])
# return answer_dict["choices"][0]["delta"]["content"]
return ChatCompletionChunk(**answer_dict)
except Exception as ex:
print(f"__chunk Exception:{str(ex)}")
else:
answer_dict = json.loads(data)
# print(type(answer_dict), answer_dict)
try:
# print(answer_dict["choices"][0]["delta"]["content"])
# return answer_dict["choices"][0]["delta"]["content"]
return ChatCompletion(**answer_dict)
except Exception as ex:
print(f"__chunk Exception:{str(ex)}")
return None
# async def async_main(base_url, headers, data):
# async with AsyncClient() as client:
# try:
# # 重点增加超时配置
# # 总超时设为 5秒,但读取超时设为 10秒
# timeout_config = Timeout(5.0, read=10.0)
# async with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:
# content_type = response.headers.get('content-type', '').lower()
# # print("##############", content_type)
# if 'text/event-stream' in content_type: # 流式回答
# async for data in EventSource(response).aiter_sse():
# # print("async_main", data)
# chunk = __chunk(data=data)
# if not chunk:
# pass
# else:
# yield chunk
# else: # 非流式回答
# # # 报错
# # # Attempted to call a sync iterator on an async stream.
# # result = await response.read()
# # print(result)
# pass
# except Exception as ex:
# print(f"async_main Exception:{str(ex)}")
def sync_main(base_url, headers, data):
with Client() as client:
try:
# 重点增加超时配置
# 总超时设为 5秒,但读取超时设为 10秒
timeout_config = Timeout(5.0, read=10.0)
with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:
content_type = response.headers.get('content-type', '').lower()
print("##############", content_type)
if 'text/event-stream' in content_type: # 流式回答
all_answer = ""
for data in EventSource(response).iter_sse():
chunk = __chunk(data=data)
if not chunk:
pass
else:
# all_answer += answer_text
# print(chunk)
yield chunk
print(all_answer)
else: # 非流式回答
print(response.read())
chunk = __chunk(response.read(), stream=False)
yield chunk
except Exception as e:
print(e)
if __name__ == "__main__":
api_key = "EMPTY"
base_url = "http://192.168.17.100:11434/v1/chat/completions"
model = "deepseek-r1:1.5b"
headers = {
"Authorization" : f"Bearer {api_key}",
"Accept": "*/*",
# "Accept": "text/event-stream"
}
messages = [
{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},
{"role": "user", "content": "你好"}
]
data = {
"model": model,
"messages": messages,
"stream" : True
}
response = sync_main(base_url=base_url, headers=headers, data=data)
for chunk in response:
print(chunk)
2.1.1. 流式输出
......
{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\u5417"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='吗', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)
{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\uff1f"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='?', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)
{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": "stop"}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason='stop', index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)
解析效果
{
"id": "chatcmpl-476",
"object": "chat.completion.chunk",
"created": 1752575345,
"model": "deepseek-r1:1.5b",
"system_fingerprint": "fp_ollama",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": ""
},
"finish_reason": "stop"
}
]
}
2.1.2. 非流式输出
{"id": "chatcmpl-485", "object": "chat.completion", "created": 1752575233, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "message": {"role": "assistant", "content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 27, "completion_tokens": 78, "total_tokens": 105}}
ChatCompletion(id='chatcmpl-485', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='<think>\n嗯,用户发来了“手写文
字”里的这句话:“你好”。这是一个常见的问候语。\n\n现在,我需要根据我的知识库来判断这句问候是否正确。假设我是一位自然 lang Gaussian assistant,我会确认“你好”是一个常用
的中文问候,不会是错误的表达。\n\n因此,我可以回复“你好”来确认这一点。\n</think>\n\n你好!', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None))], created=1752575233, model='deepseek-r1:1.5b', object='chat.completion', service_tier=None, system_fingerprint='fp_ollama', usage=CompletionUsage(completion_tokens=78, prompt_tokens=27, total_tokens=105, completion_tokens_details=None, prompt_tokens_details=None))
解析效果
{
"id": "chatcmpl-485",
"object": "chat.completion",
"created": 1752575233,
"model": "deepseek-r1:1.5b",
"system_fingerprint": "fp_ollama",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 27,
"completion_tokens": 78,
"total_tokens": 105
}
}
2.2. 面向对象
import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from httpx_sse import EventSource
from httpx import AsyncClient, Client, Timeout
class SyncHttpxClient():
def __init__(self, api_key: str, base_url: str, timeout: int=5):
self.api_key = api_key
self.base_url = base_url
self.timeout = timeout
self.headers = {
"Authorization" : f"Bearer {api_key}",
"Accept": "*/*",
# "Accept": "text/event-stream"
}
def __chunk(self, data, stream: bool=True):
if stream:
if data:
answer_data = data.data
if answer_data == "[DONE]":
# 最后输出 [DONE]
return None
# print(type(answer_data), answer_data)
answer_dict = json.loads(answer_data)
# print(type(answer_dict), answer_dict)
try:
# print(answer_dict["choices"][0]["delta"]["content"])
# return answer_dict["choices"][0]["delta"]["content"]
return ChatCompletionChunk(**answer_dict)
except Exception as ex:
print(f"__chunk Exception:{str(ex)}")
else:
answer_dict = json.loads(data)
# print(type(answer_dict), answer_dict)
try:
# print(answer_dict["choices"][0]["delta"]["content"])
# return answer_dict["choices"][0]["delta"]["content"]
return ChatCompletion(**answer_dict)
except Exception as ex:
print(f"__chunk Exception:{str(ex)}")
return None
def generate(self,
model:str,
messages:list,
functions=None,
temperature:int=1,
top_p:float=0,
max_tokens:int=2048,
stream:bool=True):
data = {
"model": model,
"messages": messages,
"functions": functions,
"temperature": temperature,
"top_p": top_p,
"max_tokens": max_tokens,
"stream" : stream
}
with Client() as client:
try:
# 重点增加超时配置
# 总超时设为 5秒,但读取超时设为 10秒
timeout_config = Timeout(self.timeout, read=10.0)
with client.stream('POST', url=self.base_url, headers=self.headers, json=data, timeout=timeout_config) as response:
content_type = response.headers.get('content-type', '').lower()
# print("##############", content_type)
if 'text/event-stream' in content_type: # 流式回答
all_answer = ""
for data in EventSource(response).iter_sse():
chunk = self.__chunk(data=data)
if not chunk:
pass
else:
# all_answer += answer_text
# print(chunk)
yield chunk
print(all_answer)
else: # 非流式回答
print(response.read())
chunk = self.__chunk(response.read(), stream=False)
yield chunk
except Exception as e:
print(e)
if __name__ == "__main__":
api_key = "EMPTY"
base_url = "http://192.168.17.100:11434/v1/chat/completions"
model = "deepseek-r1:1.5b"
sync_client = SyncHttpxClient(api_key=api_key, base_url=base_url)
messages = [
{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},
{"role": "user", "content": "你好"}
]
response = sync_client.generate(
model=model,
messages=messages,
stream=True
)
all_answer = ""
for chunk in response:
# all_answer += chunk.choices[0].delta.content
print(chunk)
# print(all_answer)
3. 异步客户端
3.1. 面向过程
重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501
import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from httpx_sse import EventSource
from httpx import AsyncClient, Timeout
def __chunk(data):
if data:
answer_data = data.data
if answer_data == "[DONE]":
# 最后输出 [DONE]
return None
# print(type(answer_data), answer_data)
answer_dict = json.loads(answer_data)
print(type(answer_dict), answer_dict)
try:
# print(answer_dict["choices"][0]["delta"]["content"])
# return answer_dict["choices"][0]["delta"]["content"]
return ChatCompletionChunk(**answer_dict)
except Exception as ex:
print(f"__chunk Exception:{str(ex)}")
return None
async def async_main(base_url, headers, data):
async with AsyncClient() as client:
try:
# 重点增加超时配置
# 总超时设为 5秒,但读取超时设为 10秒
timeout_config = Timeout(5.0, read=10.0)
async with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:
content_type = response.headers.get('content-type', '').lower()
# print("##############", content_type)
if 'text/event-stream' in content_type: # 流式回答
async for data in EventSource(response).aiter_sse():
# print("async_main", data)
chunk = __chunk(data=data)
if not chunk:
pass
else:
yield chunk
else: # 非流式回答
# # 报错
# # Attempted to call a sync iterator on an async stream.
# result = await response.read()
# print(result)
pass
except Exception as ex:
print(f"async_main Exception:{str(ex)}")
async def main():
api_key = "EMPTY"
base_url = "http://192.168.17.100:11434/v1/chat/completions"
model = "deepseek-r1:1.5b"
headers = {
"Authorization" : f"Bearer {api_key}",
"Accept": "*/*",
# "Accept": "text/event-stream"
}
messages = [
{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},
{"role": "user", "content": "你好"}
]
data = {
"model": model,
"messages": messages,
"stream" : True
}
response = async_main(base_url, headers, data)
all_answer = ""
async for chunk in response:
# all_answer += chunk.choices[0].delta.content
print(chunk)
# print(all_answer)
if __name__ == "__main__":
asyncio.run(main())
使用 httpx 的异步请求 AsyncClient 调用 stream 方法请求流式接口,如果接口返回内容比较慢(比如第一个字符返回用时5s),客户端主动关闭流式通道,导致当后端接口准备好数据后,返回报错“管道已关闭”
解决办法:调用 stream 方法增加参数 timeout
3.2. 面向对象
import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from httpx_sse import EventSource
from httpx import AsyncClient, Timeout
class AsyncHttpxClient():
def __init__(self, api_key: str, base_url: str, timeout: int=5):
self.api_key = api_key
self.base_url = base_url
self.timeout = timeout
self.headers = {
"Authorization" : f"Bearer {api_key}",
"Accept": "*/*",
# "Accept": "text/event-stream"
}
def __chunk(self, data):
if data:
answer_data = data.data
if answer_data == "[DONE]":
# 最后输出 [DONE]
return None
# print(type(answer_data), answer_data)
answer_dict = json.loads(answer_data)
# print(type(answer_dict), answer_dict)
try:
# print(answer_dict["choices"][0]["delta"]["content"])
# return answer_dict["choices"][0]["delta"]["content"]
return ChatCompletionChunk(**answer_dict)
except Exception as ex:
print(f"AsyncHttpxClient.__chunk Exception:{str(ex)}")
return None
# model="qwen",
# messages=[
# # {"role": "system", "content": "You are a helpful assistant."},
# # {"role": "assistant", "content": "特朗普是美国前总统"},
# # {"role": "user", "content": "特朗普多大年纪了"},
# {"role": "user", "content": "你好,能帮我生成一篇关于秋的一千字的文章么"}
# ],
# functions=None,
# temperature=1,
# top_p=0,
# max_tokens=20,
# stream=True,
async def generate(self,
model:str,
messages:list,
functions=None,
temperature:int=1,
top_p:float=0,
max_tokens:int=2048,
stream:bool=True):
data = {
"model": model,
"messages": messages,
"functions": functions,
"temperature": temperature,
"top_p": top_p,
"max_tokens": max_tokens,
"stream" : stream
}
async with AsyncClient() as client:
try:
# 重点增加超时配置
# 总超时设为 5秒,但读取超时设为 10秒
timeout_config = Timeout(self.timeout, read=10.0)
async with client.stream('POST', url=self.base_url, headers=self.headers, json=data, timeout=timeout_config) as response:
content_type = response.headers.get('content-type', '').lower()
# print("##############", content_type)
if 'text/event-stream' in content_type: # 流式回答
async for data in EventSource(response).aiter_sse():
chunk = self.__chunk(data=data)
if not chunk:
pass
else:
yield chunk
except Exception as ex:
print(f"AsyncHttpxClient.generate Exception:{str(ex)}")
async def main():
api_key = "EMPTY"
base_url = "http://192.168.17.100:11434/v1/chat/completions"
model = "deepseek-r1:1.5b"
async_client = AsyncHttpxClient(api_key=api_key, base_url=base_url)
messages = [
{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},
{"role": "user", "content": "你好"}
]
response = async_client.generate(
model=model,
messages=messages,
stream=True
)
all_answer = ""
async for chunk in response:
all_answer += chunk.choices[0].delta.content
# print(chunk)
print(all_answer)
if __name__ == "__main__":
asyncio.run(main=main())
3.3. Attempted to call a sync iterator on an async stream.
当异步使用 stream=False 时,会报这个错。尚未解决。