本文是"Dagster Pipes教程"的第一部分,介绍如何通过Dagster资产调用外部Python脚本并集成到数据管道中。首先,创建Dagster资产subprocess_asset
,利用PipesSubprocessClient
资源执行外部脚本external_code.py
,实现跨进程的数据处理。通过dagster dev
启动UI,可在Dagster界面中监控子进程的执行状态和日志输出,包括标准输出(stdout)内容。本文详细讲解了资产定义、资源注入及命令执行的完整流程,为后续修改外部代码以支持Dagster Pipes通信奠定基础。此方法适用于需要将现有脚本集成到Dagster数据管道的场景,提升自动化与可观测性。完成本部分后,读者可继续学习第二部分,掌握如何增强外部脚本与Dagster的交互能力。
教程概述
本教程将指导你完成以下步骤:
- 创建一个调用外部Python脚本的Dagster资产
- 定义必要的Dagster资源(resources)
- 在Dagster UI中运行并查看结果
前提条件
在开始之前,请确保你已经:
- 安装了Dagster
- 创建了一个名为
external_code.py
的独立Python脚本,内容如下:
import pandas as pd
def main():
orders_df = pd.DataFrame({
"order_id": [1, 2],
"item_id": [432, 878]
})
total_orders = len(orders_df)
print(f"processing total {total_orders} orders")
第一步:定义Dagster资产
首先,在与external_code.py
相同的目录下创建一个名为dagster_code.py
的新文件。
1.1 创建资产定义
将以下代码复制到dagster_code.py
中:
import shutil
import dagster as dg
@dg.asset
def subprocess_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
cmd = [
shutil.which("python"),
dg.file_relative_path(__file__, "external_code.py")
]
return pipes_subprocess_client.run(
command=cmd,
context=context
).get_materialize_result()
代码解析:
- 我们创建了一个名为
subprocess_asset
的资产 - 使用
AssetExecutionContext
作为上下文参数,它提供了系统信息如资源、配置和日志记录 - 指定了
PipesSubprocessClient
资源 - 构建了一个命令列表来执行外部脚本
- 使用
pipes_subprocess_client.run()
方法在管道会话中同步执行子进程
1.2 从资产调用外部代码
上述代码中的关键部分是:
pipes_subprocess_client.run(
command=cmd,
context=context
).get_materialize_result()
这段代码做了什么:
PipesSubprocessClient
资源暴露了一个run
方法- 当资产执行时,这个方法会在管道会话中同步执行子进程
- 返回一个
PipesClientCompletedInvocation
对象 - 可以使用
get_materialize_result()
方法访问子进程报告的MaterializeResult
事件
第二步:定义Definitions对象
为了让Dagster工具(如CLI、UI和Dagster+)能够加载和访问资产及子进程资源,我们需要创建一个Definitions
对象。
在dagster_code.py
文件末尾添加以下代码:
from dagster import Definitions
defs = Definitions(
assets=[subprocess_asset],
resources={
"pipes_subprocess_client": dg.PipesSubprocessClient()
}
)
此时,dagster_code.py
文件应该如下所示:
import shutil
import dagster as dg
@dg.asset
def subprocess_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
cmd = [
shutil.which("python"),
dg.file_relative_path(__file__, "external_code.py")
]
return pipes_subprocess_client.run(
command=cmd,
context=context
).get_materialize_result()
from dagster import Definitions
defs = Definitions(
assets=[subprocess_asset],
resources={
"pipes_subprocess_client": dg.PipesSubprocessClient()
}
)
第三步:从Dagster UI运行子进程
现在,让我们在Dagster UI中执行我们创建的子进程资产。
在新的命令行会话中运行以下命令启动UI:
dagster dev -f dagster_code.py
点击右上角的"Materialize"按钮来运行你的代码
导航到"Run details"页面,在这里你可以看到运行的日志
在
external_code.py
中,我们有一个打印语句将输出到stdout。Dagster会在UI的原始计算日志视图中显示这些内容。要查看stdout日志,切换日志部分到stdout:
下一步
到目前为止,你已经创建了一个调用外部Python脚本的Dagster资产,在子进程中执行了代码,并在Dagster UI中查看了结果。接下来,你将学习如何修改外部代码以与Dagster Pipes配合工作,将信息发送回Dagster。
总结
通过本教程的第一部分,我们实现了:
- 创建了一个Dagster资产来调用外部Python脚本
- 配置了必要的资源来支持子进程执行
- 在Dagster UI中成功运行并查看了结果
这个基础设置为你在后续步骤中实现更复杂的管道通信打下了良好的基础。