前绪
记第一次跟踪seatunnel的任务运行过程二——ClientJobExecutionEnvironment的execture方法
从这里开始,就是使用seatunnel-2.3.9的源码了。前面部分没有变化,2.3.X版本都是通用的。
建议打开源码,边读文章,边阅读源码
正文
getLogicalDag()方法还是在ClientJobExecutionEnvironment这个类中。
关键词DAG
DAG:有向无环图。
LogicalDag:在此可以理解为一个seatunnel job的运行结构图。管理的是从source到transform到sink的过程。
解析配置文件,生成资源对
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse(null);
getJobConfigParser().parse()方法中解析在seatunnel执行名中使用‘–config’指定的配置文件,将其中的source、transformer、sink解析成一个个的anction,并且将每个action(即source、transform、sink)所需要用到的jar包地址提取出来。
收集全部的action,以备后用(后面还收集了全部的jar包资源)
actions.addAll(immutablePair.getLeft());
使用actions这个对象,直接引用所有的action,方便后续的使用。例如:遍历所有的action进行某个动作处理。
读去配置,确定是否自动上传jar包
boolean enableUploadConnectorJarPackage = seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
前面已经解析出来的所有的action和对应用到的jar包,这里就是根据配置是否将jar自动上传到服务器。
默认值是:false,即不自动上传。代表着需要提前将需要用到的jar包上传到seatunnel的lib文件夹下。
这里的配置就是从${SEATUNNEL_HOME}/config/seatunnel.yaml这个配置文件中解析出来了的。但是2.3.9版本的seatunnel.yaml中默认是没有seatunnel.engine.jar-storage.enable这一项的,所以使用的基本都是默认值,即:false。
seatunnnl.yaml配置文件完整版及解析
seatunnel.yaml文件的解析对象对应的是org.apache.seatunnel.engine.common.config.server.ServerConfigOptions这个类。
seatunnel.yaml中配置不全且没有明确的说明,可以到这个文件中查找。
处理jar包
配置seatunnel.engine.jar-storage.enable=true,上传jar包
if (enableUploadConnectorJarPackage) {
Set<ConnectorJarIdentifier> commonJarIdentifiers = connectorPackageClient.uploadCommonPluginJars(Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
uploadActionPluginJar(actions, pluginJarIdentifiers);
Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
connectorJarIdentifiers.addAll(commonJarIdentifiers);
connectorJarIdentifiers.addAll(pluginJarIdentifiers);
jarUrls.addAll(commonPluginJarUrls);
jarUrls.addAll(connectorPluginJarUrls);
actions.forEach(
action -> {
addCommonPluginJarsToAction(
action, commonPluginJarUrls, commonJarIdentifiers);
});
}
如果要上传jar包,则将公共插件的jar包、前面解析出来的action使用到的jar包上传上去。
收集所有的jar包,并且给每个action添加公共插件jar包。
配置seatunnel.engine.jar-storage.enable=false(默认),不上传jar包
jarUrls.addAll(commonPluginJars);
jarUrls.addAll(immutablePair.getRight());
actions.forEach(
action -> {
addCommonPluginJarsToAction(
action, new HashSet<>(commonPluginJars), Collections.emptySet());
});
收集所有的jar包,并且给每个action添加公共插件jar包。
结束:生成logicDag
getLogicalDagGenerator().generate()
生成一个logicDag,并返回。