目录
2. 任务执行命令类:FlinkTaskExecuteCommand
4. 任务执行:FlinkExecution.execute()
本文基于SeaTunnel 2.3.x源码分析Flink引擎执行流程,以seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java
为入口,完整解析Flink引擎的执行流程。
1. 任务启动入口
启动类核心代码:
// 1. 初始化Flink启动命令参数 FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); // 2. 执行SeaTunnel.run()回调Flink执行命令 SeaTunnel.run(flinkCommandArgs.buildCommand());
buildCommand()
返回FlinkTaskExecuteCommand
实例SeaTunnel.run()
最终调用FlinkTaskExecuteCommand.execute()
2. 任务执行命令类:FlinkTaskExecuteCommand
核心执行流程:
public void execute() { // 1. 解析配置文件生成Config对象 Config config = ConfigBuilder.of(configFile); // 2. 创建FlinkExecution实例 FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config); // 3. 执行任务 seaTunnelTaskExecution.execute(); }
3. FlinkExecution的创建与初始化
3.1 核心组件初始化
public FlinkExecution(Config config) { // 创建三大处理器 this.sourcePluginExecuteProcessor = new SourceExecuteProcessor( jarPaths, config.getConfigList(Constants.SOURCE), jobContext); this.transformPluginExecuteProcessor = new TransformExecuteProcessor( jarPaths, TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()), jobContext); this.sinkPluginExecuteProcessor = new SinkExecuteProcessor( jarPaths, config.getConfigList(Constants.SINK), jobContext); // 初始化Flink执行环境 this.flinkRuntimeEnvironment = FlinkRuntimeEnvironment.getInstance( this.registerPlugin(config, jarPaths)); // 为处理器注入运行时环境 this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); this.sinkPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); }
3.2 关键对象说明
组件 | 类型 | 功能 |
---|---|---|
sourcePluginExecuteProcessor |
SourceExecuteProcessor |
处理数据源接入 |
transformPluginExecuteProcessor |
TransformExecuteProcessor |
处理数据转换逻辑 |
sinkPluginExecuteProcessor |
SinkExecuteProcessor |
处理数据输出 |
flinkRuntimeEnvironment |
FlinkRuntimeEnvironment |
封装Flink StreamExecutionEnvironment |
4. 任务执行:FlinkExecution.execute()
DAG构建流程:
public void execute() { // 初始