异步回调
import asyncio
from typing import Any, Dict, List
from langchain.chat_models import ChatOpenAI
from langchain.schema import LLMResult, HumanMessage
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
class MyCustomSyncHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")
class MyCustomAsyncHandler(AsyncCallbackHandler):
"""用于处理来自langchain的回调的异步回调处理程序。"""
async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""当链条开始运行时运行。"""
print("zzzz....")
await asyncio.sleep(0.3)
class_name = serialized["name"]
print("嗨!我刚醒来。您的llm正在启动")
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""当链条结束运行时运行。"""
print("zzzz....")
await asyncio.sleep(0.3)
print("嗨!我刚醒来。您的llm正在结束")
# 为了启用流式传输,我们在ChatModel构造函数中传入`streaming=True`$# 此外,我们还传入一个包含自定义处理程序的列表
chat = ChatOpenAI(
max_tokens=25,
streaming=True,
callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],
)
await chat.agenerate([[HumanMessage(content="给我讲个笑话")]])