langchain从入门到精通(六)——LCEL 表达式与 Runnable 可运行协议

发布于:2025-06-15 ⋅ 阅读:(13) ⋅ 点赞:(0)

1. 多组件 invoke 嵌套的缺点

prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
# 获取输出内容
content = parser.invoke(
llm.invoke(
prompt.invoke(
{"query": req.query.data}
)
)
)

这种写法虽然能实现对应的功能,但是存在很多缺陷:

  1. 嵌套式写法让程序的维护性与可阅读性大大降低,当需要修改某个组件时,变得异常困难。
  2. 没法得知每一步的具体结果与执行进度,出错时难以排查。
  3. 嵌套式写法没法集成大量的组件,组件越来越多时,代码会变成“一次性”代码

2. 手写一个"Chain"优化代码

观察发现,虽然 Prompt、Model、OutputParser 分别有自己独立的调用方式,例如:
Prompt 组件 :format、invoke、to_string、to_messages。
Model 组件 :generate、invoke、batch。
OutputParser 组件 :parse、invoke

但是有一个共同的调用方法:invoke,并且每一个组件的输出都是下一个组件的输入,是否可以将所有组件组装得到一个列表,然后循环依次调用 invoke 执行每一个组件,然后将当前组件的输出作为下一个组件的输入。

from typing import Any
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
class Chain: steps: list = []
def __init__(self, steps: list): self.steps = steps
def invoke(self, input: Any) -> Any: output: Any = input
for step in self.steps: output = step.invoke(output)
print(step)
print("执行结果:", output)
print("===============")
return output
chain = Chain([prompt, llm, parser])
print(chain.invoke({"query": "你好,你是?"}))

输出结果为:

input_variables=['query'] messages=
[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['query'],
template='{query}'))]
执行结果: messages=[HumanMessage(content='你好,你是?')] ===============
client=<openai.resources.chat.completions.Completions object at
0x000001C6BF694310> async_client=
<openai.resources.chat.completions.AsyncCompletions object at 0x000001C6BF695BD0> model_name='gpt-3.5-turbo-16k' openai_api_key=SecretStr('**********') openai_api_base='https://api.xty.app/v1' openai_proxy='' 执行结果: content='你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样
的问题,帮助解决问题,提供信息和创意。有什么我可以帮助你的吗?' response_metadata=
{'token_usage': {'completion_tokens': 72, 'prompt_tokens': 13, 'total_tokens': 85}, 'model_name': 'gpt-3.5-turbo-16k', 'system_fingerprint': 'fp_b28b39ffa8',
'finish_reason': 'stop', 'logprobs': None} id='run-5bf9e183-4b28-4be9-bf65- ce0ad9590785-0' =============== 执行结果: 你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样的问题,帮
助解决问题,提供信息和创意。有什么我可以帮助你的吗?
=============== 你好!我是 ChatGPT,一个由OpenAI开发的人工智

3. Runnable 简介与 LCEL 表达式

为了尽可能简化创建自定义链,LangChain 官方实现了一个 Runnable 协议,这个协议适用于
LangChain 中的绝大部分组件,并实现了大量的标准接口,涵盖:

  1. stream :将组件的响应块流式返回,如果组件不支持流式则会直接输出。
  2. invoke :调用组件并得到对应的结果。
  3. batch :批量调用组件并得到对应的结果。
  4. astream :stream 的异步版本。
  5. ainvoke :invoke 的异步版本。
  6. abatch :batch 的异步版本。
  7. astream_log :除了流式返回最终响应块之外,还会流式返回中间步骤。
    除此之外,在 Runnable 中还重写了 orror 方法,这是 Python 中 | 运算符的计算逻辑,所有的 Runnable 组件,均可以通过 | 或者 pipe() 的方式将多个组件拼接起来形成一条链。

在这里插入图片描述

组件 输入类型 输出类型
Prompt 提示 Dict 字典 PromptValue 提示值
ChatModel 聊天模型 字符串、聊天消息列表、PromptValue提示值 ChatMessage 聊天消息
LLM 大语言模型 字符串、聊天消息列表、PromptValue提示值 String 字符串
OutputParser 输出解析器 LLM 或聊天模型的输出 取决于解析器
Retriever 检索器 单个字符串 List of Document 文档列表
Tool 工具 字符串、字典或取决于工具 取决于工具

例如上面自定义"Chain"的写法等同于如下的代码:

from typing import Any
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
chain = prompt | llm | parser # 等价于以下写法
composed_chain_with_pipe = (
RunnableParallel({"query": RunnablePassthrough()})
.pipe(prompt)
.pipe(llm)
.pipe(parser)
)
print(chain.invoke({"query": "你好,你是?"}))

Runnable 底层的运行逻辑本质上也是将每一个组件添加到列表中,然后按照顺序执行并返回最终结果,这里参考核心源码:

def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context
# setup callbacks and context config = config_with_context(ensure_config(config), self.steps) callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self),
input, name=config.get("run_name") or self.get_name(), run_id=config.pop("run_id", None),
)
# 调用所有步骤并逐个执行得到对应的输出,然后作为下一个的输入
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input, # mark each step as a child run patch_config( config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)

4. 两个Runnable核心类的讲解与使用

1. RunnableParallel 并行运行

RunnableParallel 是 LangChain 中封装的支持运行多个 Runnable 的类,一般用于操作 Runnable 的输出,以匹配序列中下一个 Runnable 的输入,起到并行运行 Runnable 并格式化输出结构的作用。例如 RunnableParallel 可以让我们同时执行多条 Chain,然后以字典的形式返回各个 Chain 的结果,对比每一条链单独执行,效率会高很多。

import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.编排2个提示模板
joke_prompt = ChatPromptTemplate.from_template("请讲一个关于{subject}的冷笑话,尽可能
短")
poem_prompt = ChatPromptTemplate.from_template("请写一篇关于{subject}的诗,尽可能短")
# 2.创建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.构建两条链
joke_chain = joke_prompt | llm | parser poem_chain = poem_prompt | llm | parser
# 5.使用RunnableParallel创建并行可运行
map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
# 6.运行并行可运行组件得到响应结果
resp = map_chain.invoke({"subject": "程序员"})
print(resp)

输出内容:

{'joke': '为什么程序员总是用尺子测电脑屏幕?因为他们听说了“像素”是屏幕上的一种“尺寸”。',
'poem': '在代码的海洋里徜徉,\n程序员心怀梦想与创意。\n键盘敲击是旋律,\nbug 是诗歌的瑕疵。
\n\n算法如诗的韵律,\n逻辑是句子的构思。\n编程者如诗人般,\n创造出数字的奇迹。'}

除了并行执行,RunnableParallel 还可以用于操作 Runnable 的输出,用于产生符合下一个 Runnable 组件的数据。
例如:用户传递数据,并行执行检索策略得到上下文随后传递给 Prompt 组件,如下。

import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str: """模拟一个检索器,传入查询query,输出文本""" print("执行检索:", query)
return "我叫老铁,是一名AI应用开发工程师。"
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下
文进行回复。
<context>
{context} <context> 用户的问题是: {query}""")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.编排链
chain = RunnableParallel( context=retrieval, query=RunnablePassthrough(),
) | prompt | llm | parser
# 5.调用链生成结果
content = chain.invoke("你好,我叫什么?")
print(content)

输出内容:

执行检索: 你好,我叫什么?
你好,你叫老铁。

在创建 RunnableParallel 的时候,支持传递字典、函数、映射、键值对数据等多种方式,
RunnableParallel 底层会执行检测并将数据统一转换为 Runnable,核心源码如下:

# langchain-core/runnables/base.py
def __init__( self, steps__: Optional[
Mapping[
str, Union[
Runnable[Input, Any], Callable[[Input], Any], Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
]
] = None, **kwargs: Union[
Runnable[Input, Any], Callable[[Input], Any], Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
) -> None: # 1.检测是否传递字典,如果传递,则提取字段内的所有键值对
merged = {**steps__} if steps__ is not None else {}
# 2.传递了键值对,则将键值对更新到merged进行合并
merged.update(kwargs) super().__init__( # type: ignore[call-arg]
# 3.循环遍历merged的所有键值对,并将每一个元素转换成Runnable
steps__={key: coerce_to_runnable(r) for key, r in merged.items()}
)

除此之外,在 Chains 中使用时,可以简写成字典的方式, orror 会自动将字典转换成
RunnableParallel,核心源码:

# langchain_core/runnables/base.py
def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]: """Coerce a runnable-like object into a Runnable.
Args:
thing: A runnable-like object.
Returns: A Runnable. """
if isinstance(thing, Runnable): return thing
elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):
return RunnableGenerator(thing) elif callable(thing): return RunnableLambda(cast(Callable[[Input], Output], thing)) elif isinstance(thing, dict): # 如果类型为字典,使用字典创建RunnableParallel并转换成Runnable格式
return cast(Runnable[Input, Output], RunnableParallel(thing)) else:
raise TypeError(
f"Expected a Runnable, callable or dict."
f"Instead got an unsupported type: {type(thing)}"
)

2. RunnablePassthrough 传递数据

除了 RunnablePassthrough,在 LangChain 中,另外一个高频使用的 Runnable 类是RunnablePassthrough,这个类透传上游参数输入,简单来说,就是可以获取上游的数据,并保持不变或者新增额外的键。通常与 RunnableParallel 一起使用,将数据分配给映射中的新键。
例如:使用 RunnablePassthrough 来简化 invoke 的调用流程。

import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.编排prompt
prompt = ChatPromptTemplate.from_template("{query}")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.创建链
chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()
# 4.调用链并获取结果
content = chain.invoke("你好,你是")
print(content)

输出内容:

你好!是的,我是ChatGPT,你需要什么帮助吗?

RunnablePassthrough() 获取的是整个输入的内容(字符串或者字典),如果想获取字典内的某个部
分,可以使用 itemgetter 函数,并传入对应的字段名即可,如下:

from operator import itemgetter
chain = {"query": itemgetter("query")} | prompt | llm | StrOutputParser()
content = chain.invoke({"query": "你好,你是?"})

除此之外,如果想在传递的数据中添加数据,还可以使用 RunnablePassthrough.assign() 方法来实现快速添加。例如:为

import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str: """模拟一个检索器,传入查询query,输出文本""" print("执行检索:", query)
return "我叫老铁,是一名AI应用开发工程师。"
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下
文进行回复。
<context>
{context} <context> 用户的问题是: {query}""")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.编排链,RunnablePassthrough.assign写法
chain = (
RunnablePassthrough.assign(context=lambda query: retrieval(query)) |
prompt |
llm |
parser
)
# 5.调用链生成结果
content = chain.invoke({"query": "你好,我叫什么?"})
print(content)

5. LangChain 中 Runnable 的进阶组合用法


1. 整体流程说明

一个典型的 AI 应用链可以分为以下几个模块,每个模块都是一个 Runnable

用户输入 → Prompt 模板 → LLM 模型 → 输出解析器 → 最终输出

2. 所需组件导入

from langchain_core.runnables import RunnableLambda, RunnableMap
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

3. 定义 Prompt 模板(PromptTemplate)

prompt = PromptTemplate.from_template("请用中文写一首关于{topic}的诗。")

这是一个可填充的提示模板,接收参数 topic


4. 初始化 LLM 模型(ChatOpenAI)

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)

你也可以替换为 Claude、Gemini、Qwen 等其他模型。


5. 定义输出解析器(StrOutputParser)

parser = StrOutputParser()

用于将 LLM 的结构化输出(如 JSON、Message)转换为纯字符串。


6. 构建完整的链(组合多个 Runnable)

# 将输入 topic 映射为 prompt 所需格式
input_mapper = RunnableLambda(lambda x: {"topic": x})

# 构建完整链条:输入 → Prompt → LLM → 解析器
chain = input_mapper | prompt | llm | parser

7. 调用链条(invoke、batch、stream)

7.1 单次调用
result = chain.invoke("长城")
print(result)
7.2 批量调用
topics = ["长江", "黄山", "故宫"]
results = chain.batch(topics)
print(results)
7.3 流式输出(逐 token 打印)
for chunk in chain.stream("泰山"):
    print(chunk, end="")

8. 使用 RunnableMap 处理多输入字段

如果你的提示模板需要多个输入字段(如 productaudience),可以使用 RunnableMap

prompt = PromptTemplate.from_template("请写一个关于{product},面向{audience}的广告文案。")

# 输入映射器:将用户输入拆分为多个字段
input_map = RunnableMap({
    "product": lambda x: x["product"],
    "audience": lambda x: x["audience"]
})

# 构建链
chain = input_map | prompt | llm | parser

# 调用
result = chain.invoke({
    "product": "智能手表",
    "audience": "年轻上班族"
})
print(result)

9. 使用 OutputParser 解析 JSON 输出(可选)

如果你的 LLM 输出是结构化 JSON,可以使用 JsonOutputParser

from langchain.output_parsers import JsonOutputParser

parser = JsonOutputParser()

然后在提示中引导模型输出 JSON 格式:

prompt = PromptTemplate.from_template("""
请以 JSON 格式回答以下问题:
问题:{question}
输出格式:
{{
  "answer": "...",
  "confidence": "高/中/低"
}}
""")

10. 总结:链式组合的优势

特性 描述
可组合 各模块可自由组合、替换
可调试 每个模块可单独测试
可扩展 支持加入工具、检索器、函数调用等
可追踪 可与 LangSmith 集成,进行链路追踪

完整组合链一览

from langchain_core.runnables import RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 模块定义
prompt = PromptTemplate.from_template("请写一首关于{topic}的七言绝句。")
llm = ChatOpenAI(model="gpt-3.5-turbo")
parser = StrOutputParser()

# 输入映射
input_mapper = RunnableLambda(lambda x: {"topic": x})

# 链式组合
chain = input_mapper | prompt | llm | parser

# 调用链
print(chain.invoke("西湖"))


网站公告

今日签到

点亮在社区的每一天
去签到