OpenAI流式解析

发布于:2025-03-23 ⋅ 阅读:(26) ⋅ 点赞:(0)

OpenAI 流式的代码:

首选一般请使用os.getenv 去读环境变量的内容

注意使用pip install python-dotenv 的安装方法

load_dotenv 是这个库提供的一个函数,用于读取 .env 文件并将其中定义的键值对设置为系统的环境变量。

默认情况下,load_dotenv() 会自动查找当前目录下的 .env 文件。如果文件不在默认路径,可以通过参数指定,例如 load_dotenv(‘/path/to/.env’)。

为什么代码里用 dotenv 而不是 python_dotenv?

在 Python 中,导入模块时使用的是库的模块名,而不是 PyPI 上的包名。python-dotenv 这个包安装后,提供了一个名为 dotenv 的模块供导入。这是由库开发者决定的命名约定。例如:

  • PyPI 包名:python-dotenv
  • 导入时的模块名:dotenv

流式代码 类 (一键运行)

from openai import  AsyncOpenAI
from dotenv import load_dotenv
import os

load_dotenv()

class AsyncOpenAIOut:
    def __init__(self):
        self.api_key = os.getenv("OPENAI_API_KEY")
        self.base_url = os.getenv("OPENAI_BASE_URL")
        self.oai_client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)
        self.model = os.getenv("OPENAI_MODEL")
    async def gpt_stream(self, user_message: str,model: str = os.getenv("OPENAI_MODEL"),history: list[dict] = [],system_prompt: str = "") :
        messages = []
        if history:
            messages.extend(history)
        
        if system_prompt:
            messages.extend([{"role": "system", "content": system_prompt}])
        
        messages.append({"role": "user", "content": user_message})
        response = await self.oai_client.chat.completions.create(
            model=model,
            messages=messages,
            stream=True
        )
        
        async for chunk in response:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

async_openai_out = AsyncOpenAIOut()

if __name__ == "__main__":
    async def test_gpt_stream():
        async for chunk in async_openai_out.gpt_stream(user_message="写300字作文",system_prompt="You are a helpful assistant."):
            print(chunk)

    import asyncio
    
    asyncio.run(test_gpt_stream())

这里面有几点需要注意:

简短回答:print(chunk) 是 同步操作会在当前事件循环中执行完毕后才继续但它不是 I/O 密集型操作,所以不会造成实际的“阻塞”问题,特别是在异步函数中逐步输出内容的场景下,它是可接受的。

想确保异步非阻塞输出:

        async for chunk in async_openai_out.gpt_stream(user_message="写300字作文",system_prompt="You are a helpful assistant."):
            # print(chunk)
            await asyncio.to_thread(print, chunk)  # 在后台线程执行 print

异步生成器(Async Generator) 的用法,结合了 Python 的异步编程(async for)和生成器(yield)机制。

在异步迭代 response 中的每个 chunk,如果它有内容,就通过 yield 一块一块地“流式返回”。

async for chunk in response:
    if chunk.choices[0].delta.content:
        yield chunk.choices[0].delta.content

对比for : 普通for是同步迭代

async for是异步迭代,比如网络流、WebSocket、OpenAI 的 Stream 响应等。

yield 是生成器的关键,它不是“返回”值,而是“产出”值(可以被迭代一次)。