记第一次跟踪seatunnel的任务运行过程三——解析配置的具体方法getLogicalDag

发布于:2025-03-14 ⋅ 阅读:(9) ⋅ 点赞:(0)

前绪

记第一次跟踪seatunnel的任务运行过程二——ClientJobExecutionEnvironment的execture方法

从这里开始,就是使用seatunnel-2.3.9的源码了。前面部分没有变化,2.3.X版本都是通用的。
建议打开源码,边读文章,边阅读源码

正文

getLogicalDag()方法还是在ClientJobExecutionEnvironment这个类中。

关键词DAG

DAG:有向无环图。
LogicalDag:在此可以理解为一个seatunnel job的运行结构图。管理的是从source到transform到sink的过程。

解析配置文件,生成资源对

ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse(null);

getJobConfigParser().parse()方法中解析在seatunnel执行名中使用‘–config’指定的配置文件,将其中的source、transformer、sink解析成一个个的anction,并且将每个action(即source、transform、sink)所需要用到的jar包地址提取出来。

收集全部的action,以备后用(后面还收集了全部的jar包资源)

actions.addAll(immutablePair.getLeft());

使用actions这个对象,直接引用所有的action,方便后续的使用。例如:遍历所有的action进行某个动作处理。

读去配置,确定是否自动上传jar包

boolean enableUploadConnectorJarPackage = seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();

前面已经解析出来的所有的action和对应用到的jar包,这里就是根据配置是否将jar自动上传到服务器。
默认值是:false,即不自动上传。代表着需要提前将需要用到的jar包上传到seatunnel的lib文件夹下。
这里的配置就是从${SEATUNNEL_HOME}/config/seatunnel.yaml这个配置文件中解析出来了的。但是2.3.9版本的seatunnel.yaml中默认是没有seatunnel.engine.jar-storage.enable这一项的,所以使用的基本都是默认值,即:false。

seatunnnl.yaml配置文件完整版及解析

seatunnel.yaml文件的解析对象对应的是org.apache.seatunnel.engine.common.config.server.ServerConfigOptions这个类。
seatunnel.yaml中配置不全且没有明确的说明,可以到这个文件中查找。

处理jar包

配置seatunnel.engine.jar-storage.enable=true,上传jar包

 if (enableUploadConnectorJarPackage) {
            Set<ConnectorJarIdentifier> commonJarIdentifiers = connectorPackageClient.uploadCommonPluginJars(Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
            uploadActionPluginJar(actions, pluginJarIdentifiers);
            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
            connectorJarIdentifiers.addAll(commonJarIdentifiers);
            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
            jarUrls.addAll(commonPluginJarUrls);
            jarUrls.addAll(connectorPluginJarUrls);
            actions.forEach(
                    action -> {
                        addCommonPluginJarsToAction(
                                action, commonPluginJarUrls, commonJarIdentifiers);
                    });
        }

如果要上传jar包,则将公共插件的jar包、前面解析出来的action使用到的jar包上传上去。
收集所有的jar包,并且给每个action添加公共插件jar包。

配置seatunnel.engine.jar-storage.enable=false(默认),不上传jar包

jarUrls.addAll(commonPluginJars);
            jarUrls.addAll(immutablePair.getRight());
            actions.forEach(
                    action -> {
                        addCommonPluginJarsToAction(
                                action, new HashSet<>(commonPluginJars), Collections.emptySet());
                    });

收集所有的jar包,并且给每个action添加公共插件jar包。

结束:生成logicDag

getLogicalDagGenerator().generate()

生成一个logicDag,并返回。

后续

记第一次跟踪seatunnel的任务运行过程四——getJobConfigParser().parse()的动作