1. 多组件 invoke 嵌套的缺点
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
# 获取输出内容
content = parser.invoke(
llm.invoke(
prompt.invoke(
{"query": req.query.data}
)
)
)
这种写法虽然能实现对应的功能,但是存在很多缺陷:
- 嵌套式写法让程序的维护性与可阅读性大大降低,当需要修改某个组件时,变得异常困难。
- 没法得知每一步的具体结果与执行进度,出错时难以排查。
- 嵌套式写法没法集成大量的组件,组件越来越多时,代码会变成“一次性”代码
2. 手写一个"Chain"优化代码
观察发现,虽然 Prompt、Model、OutputParser 分别有自己独立的调用方式,例如:
Prompt 组件 :format、invoke、to_string、to_messages。
Model 组件 :generate、invoke、batch。
OutputParser 组件 :parse、invoke
但是有一个共同的调用方法:invoke,并且每一个组件的输出都是下一个组件的输入,是否可以将所有组件组装得到一个列表,然后循环依次调用 invoke 执行每一个组件,然后将当前组件的输出作为下一个组件的输入。
from typing import Any
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
class Chain: steps: list = []
def __init__(self, steps: list): self.steps = steps
def invoke(self, input: Any) -> Any: output: Any = input
for step in self.steps: output = step.invoke(output)
print(step)
print("执行结果:", output)
print("===============")
return output
chain = Chain([prompt, llm, parser])
print(chain.invoke({"query": "你好,你是?"}))
输出结果为:
input_variables=['query'] messages=
[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['query'],
template='{query}'))]
执行结果: messages=[HumanMessage(content='你好,你是?')] ===============
client=<openai.resources.chat.completions.Completions object at
0x000001C6BF694310> async_client=
<openai.resources.chat.completions.AsyncCompletions object at 0x000001C6BF695BD0> model_name='gpt-3.5-turbo-16k' openai_api_key=SecretStr('**********') openai_api_base='https://api.xty.app/v1' openai_proxy='' 执行结果: content='你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样
的问题,帮助解决问题,提供信息和创意。有什么我可以帮助你的吗?' response_metadata=
{'token_usage': {'completion_tokens': 72, 'prompt_tokens': 13, 'total_tokens': 85}, 'model_name': 'gpt-3.5-turbo-16k', 'system_fingerprint': 'fp_b28b39ffa8',
'finish_reason': 'stop', 'logprobs': None} id='run-5bf9e183-4b28-4be9-bf65- ce0ad9590785-0' =============== 执行结果: 你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样的问题,帮
助解决问题,提供信息和创意。有什么我可以帮助你的吗?
=============== 你好!我是 ChatGPT,一个由OpenAI开发的人工智
3. Runnable 简介与 LCEL 表达式
为了尽可能简化创建自定义链,LangChain 官方实现了一个 Runnable 协议,这个协议适用于
LangChain 中的绝大部分组件,并实现了大量的标准接口,涵盖:
- stream :将组件的响应块流式返回,如果组件不支持流式则会直接输出。
- invoke :调用组件并得到对应的结果。
- batch :批量调用组件并得到对应的结果。
- astream :stream 的异步版本。
- ainvoke :invoke 的异步版本。
- abatch :batch 的异步版本。
- astream_log :除了流式返回最终响应块之外,还会流式返回中间步骤。
除此之外,在 Runnable 中还重写了 or 和 ror 方法,这是 Python 中 | 运算符的计算逻辑,所有的 Runnable 组件,均可以通过 | 或者 pipe() 的方式将多个组件拼接起来形成一条链。
组件 | 输入类型 | 输出类型 |
---|---|---|
Prompt 提示 | Dict 字典 | PromptValue 提示值 |
ChatModel 聊天模型 | 字符串、聊天消息列表、PromptValue提示值 | ChatMessage 聊天消息 |
LLM 大语言模型 | 字符串、聊天消息列表、PromptValue提示值 | String 字符串 |
OutputParser 输出解析器 | LLM 或聊天模型的输出 | 取决于解析器 |
Retriever 检索器 | 单个字符串 | List of Document 文档列表 |
Tool 工具 | 字符串、字典或取决于工具 | 取决于工具 |
例如上面自定义"Chain"的写法等同于如下的代码:
from typing import Any
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
chain = prompt | llm | parser # 等价于以下写法
composed_chain_with_pipe = (
RunnableParallel({"query": RunnablePassthrough()})
.pipe(prompt)
.pipe(llm)
.pipe(parser)
)
print(chain.invoke({"query": "你好,你是?"}))
Runnable 底层的运行逻辑本质上也是将每一个组件添加到列表中,然后按照顺序执行并返回最终结果,这里参考核心源码:
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context
# setup callbacks and context config = config_with_context(ensure_config(config), self.steps) callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self),
input, name=config.get("run_name") or self.get_name(), run_id=config.pop("run_id", None),
)
# 调用所有步骤并逐个执行得到对应的输出,然后作为下一个的输入
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input, # mark each step as a child run patch_config( config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)
4. 两个Runnable核心类的讲解与使用
1. RunnableParallel 并行运行
RunnableParallel 是 LangChain 中封装的支持运行多个 Runnable 的类,一般用于操作 Runnable 的输出,以匹配序列中下一个 Runnable 的输入,起到并行运行 Runnable 并格式化输出结构的作用。例如 RunnableParallel 可以让我们同时执行多条 Chain,然后以字典的形式返回各个 Chain 的结果,对比每一条链单独执行,效率会高很多。
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.编排2个提示模板
joke_prompt = ChatPromptTemplate.from_template("请讲一个关于{subject}的冷笑话,尽可能
短")
poem_prompt = ChatPromptTemplate.from_template("请写一篇关于{subject}的诗,尽可能短")
# 2.创建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.构建两条链
joke_chain = joke_prompt | llm | parser poem_chain = poem_prompt | llm | parser
# 5.使用RunnableParallel创建并行可运行
map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
# 6.运行并行可运行组件得到响应结果
resp = map_chain.invoke({"subject": "程序员"})
print(resp)
输出内容:
{'joke': '为什么程序员总是用尺子测电脑屏幕?因为他们听说了“像素”是屏幕上的一种“尺寸”。',
'poem': '在代码的海洋里徜徉,\n程序员心怀梦想与创意。\n键盘敲击是旋律,\nbug 是诗歌的瑕疵。
\n\n算法如诗的韵律,\n逻辑是句子的构思。\n编程者如诗人般,\n创造出数字的奇迹。'}
除了并行执行,RunnableParallel 还可以用于操作 Runnable 的输出,用于产生符合下一个 Runnable 组件的数据。
例如:用户传递数据,并行执行检索策略得到上下文随后传递给 Prompt 组件,如下。
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str: """模拟一个检索器,传入查询query,输出文本""" print("执行检索:", query)
return "我叫老铁,是一名AI应用开发工程师。"
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下
文进行回复。
<context>
{context} <context> 用户的问题是: {query}""")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.编排链
chain = RunnableParallel( context=retrieval, query=RunnablePassthrough(),
) | prompt | llm | parser
# 5.调用链生成结果
content = chain.invoke("你好,我叫什么?")
print(content)
输出内容:
执行检索: 你好,我叫什么?
你好,你叫老铁。
在创建 RunnableParallel 的时候,支持传递字典、函数、映射、键值对数据等多种方式,
RunnableParallel 底层会执行检测并将数据统一转换为 Runnable,核心源码如下:
# langchain-core/runnables/base.py
def __init__( self, steps__: Optional[
Mapping[
str, Union[
Runnable[Input, Any], Callable[[Input], Any], Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
]
] = None, **kwargs: Union[
Runnable[Input, Any], Callable[[Input], Any], Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
) -> None: # 1.检测是否传递字典,如果传递,则提取字段内的所有键值对
merged = {**steps__} if steps__ is not None else {}
# 2.传递了键值对,则将键值对更新到merged进行合并
merged.update(kwargs) super().__init__( # type: ignore[call-arg]
# 3.循环遍历merged的所有键值对,并将每一个元素转换成Runnable
steps__={key: coerce_to_runnable(r) for key, r in merged.items()}
)
除此之外,在 Chains 中使用时,可以简写成字典的方式, or 和 ror 会自动将字典转换成
RunnableParallel,核心源码:
# langchain_core/runnables/base.py
def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]: """Coerce a runnable-like object into a Runnable.
Args:
thing: A runnable-like object.
Returns: A Runnable. """
if isinstance(thing, Runnable): return thing
elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):
return RunnableGenerator(thing) elif callable(thing): return RunnableLambda(cast(Callable[[Input], Output], thing)) elif isinstance(thing, dict): # 如果类型为字典,使用字典创建RunnableParallel并转换成Runnable格式
return cast(Runnable[Input, Output], RunnableParallel(thing)) else:
raise TypeError(
f"Expected a Runnable, callable or dict."
f"Instead got an unsupported type: {type(thing)}"
)
2. RunnablePassthrough 传递数据
除了 RunnablePassthrough,在 LangChain 中,另外一个高频使用的 Runnable 类是RunnablePassthrough,这个类透传上游参数输入,简单来说,就是可以获取上游的数据,并保持不变或者新增额外的键。通常与 RunnableParallel 一起使用,将数据分配给映射中的新键。
例如:使用 RunnablePassthrough 来简化 invoke 的调用流程。
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.编排prompt
prompt = ChatPromptTemplate.from_template("{query}")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.创建链
chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()
# 4.调用链并获取结果
content = chain.invoke("你好,你是")
print(content)
输出内容:
你好!是的,我是ChatGPT,你需要什么帮助吗?
RunnablePassthrough() 获取的是整个输入的内容(字符串或者字典),如果想获取字典内的某个部
分,可以使用 itemgetter 函数,并传入对应的字段名即可,如下:
from operator import itemgetter
chain = {"query": itemgetter("query")} | prompt | llm | StrOutputParser()
content = chain.invoke({"query": "你好,你是?"})
除此之外,如果想在传递的数据中添加数据,还可以使用 RunnablePassthrough.assign() 方法来实现快速添加。例如:为
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str: """模拟一个检索器,传入查询query,输出文本""" print("执行检索:", query)
return "我叫老铁,是一名AI应用开发工程师。"
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下
文进行回复。
<context>
{context} <context> 用户的问题是: {query}""")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.编排链,RunnablePassthrough.assign写法
chain = (
RunnablePassthrough.assign(context=lambda query: retrieval(query)) |
prompt |
llm |
parser
)
# 5.调用链生成结果
content = chain.invoke({"query": "你好,我叫什么?"})
print(content)
5. LangChain 中 Runnable 的进阶组合用法
1. 整体流程说明
一个典型的 AI 应用链可以分为以下几个模块,每个模块都是一个 Runnable
:
用户输入 → Prompt 模板 → LLM 模型 → 输出解析器 → 最终输出
2. 所需组件导入
from langchain_core.runnables import RunnableLambda, RunnableMap
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
3. 定义 Prompt 模板(PromptTemplate)
prompt = PromptTemplate.from_template("请用中文写一首关于{topic}的诗。")
这是一个可填充的提示模板,接收参数 topic
。
4. 初始化 LLM 模型(ChatOpenAI)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
你也可以替换为 Claude、Gemini、Qwen 等其他模型。
5. 定义输出解析器(StrOutputParser)
parser = StrOutputParser()
用于将 LLM 的结构化输出(如 JSON、Message)转换为纯字符串。
6. 构建完整的链(组合多个 Runnable)
# 将输入 topic 映射为 prompt 所需格式
input_mapper = RunnableLambda(lambda x: {"topic": x})
# 构建完整链条:输入 → Prompt → LLM → 解析器
chain = input_mapper | prompt | llm | parser
7. 调用链条(invoke、batch、stream)
7.1 单次调用
result = chain.invoke("长城")
print(result)
7.2 批量调用
topics = ["长江", "黄山", "故宫"]
results = chain.batch(topics)
print(results)
7.3 流式输出(逐 token 打印)
for chunk in chain.stream("泰山"):
print(chunk, end="")
8. 使用 RunnableMap 处理多输入字段
如果你的提示模板需要多个输入字段(如 product
和 audience
),可以使用 RunnableMap
:
prompt = PromptTemplate.from_template("请写一个关于{product},面向{audience}的广告文案。")
# 输入映射器:将用户输入拆分为多个字段
input_map = RunnableMap({
"product": lambda x: x["product"],
"audience": lambda x: x["audience"]
})
# 构建链
chain = input_map | prompt | llm | parser
# 调用
result = chain.invoke({
"product": "智能手表",
"audience": "年轻上班族"
})
print(result)
9. 使用 OutputParser 解析 JSON 输出(可选)
如果你的 LLM 输出是结构化 JSON,可以使用 JsonOutputParser
:
from langchain.output_parsers import JsonOutputParser
parser = JsonOutputParser()
然后在提示中引导模型输出 JSON 格式:
prompt = PromptTemplate.from_template("""
请以 JSON 格式回答以下问题:
问题:{question}
输出格式:
{{
"answer": "...",
"confidence": "高/中/低"
}}
""")
10. 总结:链式组合的优势
特性 | 描述 |
---|---|
可组合 | 各模块可自由组合、替换 |
可调试 | 每个模块可单独测试 |
可扩展 | 支持加入工具、检索器、函数调用等 |
可追踪 | 可与 LangSmith 集成,进行链路追踪 |
完整组合链一览
from langchain_core.runnables import RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 模块定义
prompt = PromptTemplate.from_template("请写一首关于{topic}的七言绝句。")
llm = ChatOpenAI(model="gpt-3.5-turbo")
parser = StrOutputParser()
# 输入映射
input_mapper = RunnableLambda(lambda x: {"topic": x})
# 链式组合
chain = input_mapper | prompt | llm | parser
# 调用链
print(chain.invoke("西湖"))