Flink是Apache软件基金会下开源的分布式流批一体计算框架,具备实时流计算和高吞吐批处理计算的大数据计算能力。本专栏内容为Flink源码解析的记录与分享。
本文解析的Kafka源码版本为:flink-1.19.0
1.ExecutionGraph生成功能概述
在前文《Flink-1.19.0源码详解5-JobGraph生成-前篇》和《Flink-1.19.0源码详解6-JobGraph生成-后篇》中,已介绍了Flink JobGraph生成的源码,解析Flink遍历StreamGraph的每个StreamNode节点,逐步生成JobVertex节点、JobEdge边和IntermediateDataSet数据集,构建JobGraph图的过程。在完成 JobGraph的生成后,Flink Client会向Yarn中的Flink集群提交调度请求与JobGraph图,完成把调度从客户端进行到集群端的转变。
本文从Flink集群端调度开始解析ExecutionGraph生成源码(内容为下流程图的红色部分),解析了Flink JobMaster对JobVertex节点进行遍历,依次生成ExecutionJobVertex节点、ExecutionVertex节点、IntermediateResult数据集、IntermediateResultPartition数据集分区和其连接关系,解析了构建ExecutionGraph图的完整源码。
ExecutionGraph生成的本质是在原本逻辑数据处理流图JobGraph的基础上,按并行度做分布式展开,生成分布式数据处理流图ExecutionGraph。
Flink的ExecutionGraph生成主要是通过遍历JobGraph中每个JobVertex节点,生成其对应的ExecutionJobVertex节点,并为每个JobVertex节点的IntermediateDataSet数据集生成IntermediateResult数据集。ExecutionGraph进一步把原有的JobGraph进行分布式并行化展开,把ExecutionJobVertex节点按并行度创建ExecutionVertex节点和封装其执行信息的Execution,把IntermediateResult数据集按并行度创建IntermediateResultPartition数据集分区,并创建边连接上下游IntermediateResultPartition数据集分区与ExecutionVertex节点。
ExecutionGraph生成的具体步骤如下:
1.创建ExecutionJobVertex:遍历JobGraph的JobVertex,为每个JobVertex创建对应的ExecutionJobVertex。
2.创建IntermediateResul:获取ExecutionJobVertex对应JobVertex下游的IntermediateDataSet,为每个IntermediateDataSet创建对应的IntermediateResult。
3.创建ExecutionVertex:按并行度为每个ExecutionJobVertex创建ExecutionVertex。
4.创建Execution:为每个并行度上的ExecutionVertex创建封装其执行信息的Execution。
5.创建IntermediateResultPartition:创建每个ExecutionVertex上每个IntermediateResult的IntermediateResultPartition。
6.创建边:Flink在新版本(1.13后)取消了ExecutionEdge,用EdgeManager管理的(Map<ExecutionVertexID, List<ConsumedPartitionGroup>>和Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>>来保存IntermediateResultPartition与ExecutionVertex的连接关系。
7.最终创建完整的ExecutionGraph:最终完成对所有的JobVertex的遍历,依次生成ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition和其连接关系,构建完整的ExecutionGraph图。
ExecutionGraph生成源码图解:
完整代码解析:
2.进入ExecutionGraph调度
在创建JobMaster时,JobMaster会在创建SchedulerNG实例时,调用DefaultExecutionGraphBuilder的buildGraph()方法,开始进行ExecutionGraph的生成。
源码图解:
DefaultExecutionGraphBuilder.buildGraph()方法创建了ExecutionGraph实例、初始化了JobVertex节点并对其进行了排序,配置了StateBackend和Checkpoint。其中关键是调用ExecutionGraph的attachJobGraph()方法,开始了ExecutionGraph图节点与边的生成。
DefaultExecutionGraphBuilder.buildGraph()方法源码:
public static DefaultExecutionGraph buildGraph(
JobGraph jobGraph,
Configuration jobManagerConfig,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
ClassLoader classLoader,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
Time rpcTimeout,
BlobWriter blobWriter,
Logger log,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
ExecutionDeploymentListener executionDeploymentListener,
ExecutionStateUpdateListener executionStateUpdateListener,
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
VertexParallelismStore vertexParallelismStore,
Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory,
boolean isDynamicGraph,
ExecutionJobVertex.Factory executionJobVertexFactory,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
boolean nonFinishedHybridPartitionShouldBeUnknown,
JobManagerJobMetricGroup jobManagerJobMetricGroup)
throws JobExecutionException, JobException {
checkNotNull(jobGraph, "job graph cannot be null");
final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();
final JobInformation jobInformation =
new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
final int executionHistorySizeLimit =
jobManagerConfig.get(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory =
PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(
jobManagerConfig);
final int offloadShuffleDescriptorsThreshold =
jobManagerConfig.get(
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD);
final TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory;
try {
taskDeploymentDescriptorFactory =
new TaskDeploymentDescriptorFactory(
BlobWriter.serializeAndTryOffload(jobInformation, jobId, blobWriter),
jobId,
partitionLocationConstraint,
blobWriter,
nonFinishedHybridPartitionShouldBeUnknown,
offloadShuffleDescriptorsThreshold);
} catch (IOException e) {
throw new JobException("Could not create the TaskDeploymentDescriptorFactory.", e);
}
//创建DefaultExecutionGraph实例
// create a new execution graph, if none exists so far
final DefaultExecutionGraph executionGraph =
new DefaultExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
executionHistorySizeLimit,
classLoader,
blobWriter,
partitionGroupReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore,
isDynamicGraph,
executionJobVertexFactory,
jobGraph.getJobStatusHooks(),
markPartitionFinishedStrategy,
taskDeploymentDescriptorFactory);
// set the basic properties
try {
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
} catch (Throwable t) {
log.warn("Cannot create JSON plan for job", t);
// give the graph an empty plan
executionGraph.setJsonPlan("{}");
}
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);
//初始化顶点,主要为file output fomart准备输出目录;为input splits创建对应的splits等
for (JobVertex vertex : jobGraph.getVertices()) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(
jobId,
"The vertex "
+ vertex.getID()
+ " ("
+ vertex.getName()
+ ") has no invokable class.");
}
try {
vertex.initializeOnMaster(
new SimpleInitializeOnMasterContext(
classLoader,
vertexParallelismStore
.getParallelismInfo(vertex.getID())
.getParallelism()));
} catch (Throwable t) {
throw new JobExecutionException(
jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
t);
}
}
log.info(
"Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
//对JobVertex进行排序
// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug(
"Adding {} vertices from job graph {} ({}).",
sortedTopology.size(),
jobName,
jobId);
}
//生成ExecutionGraph
executionGraph.attachJobGraph(sortedTopology, jobManagerJobMetricGroup);
if (log.isDebugEnabled()) {
log.debug(
"Successfully created execution graph from job graph {} ({}).", jobName, jobId);
}
// configure the state checkpointing
if (isDynamicGraph) {
// dynamic graph does not support checkpointing so we skip it
log.warn("Skip setting up checkpointing for a job with dynamic graph.");
} else if (isCheckpointingEnabled(jobGraph)) {
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
// load the state backend from the application settings
final StateBackend applicationConfiguredBackend;
final SerializedValue<StateBackend> serializedAppConfigured =
snapshotSettings.getDefaultStateBackend();
if (serializedAppConfigured == null) {
applicationConfiguredBackend = null;
} else {
try {
applicationConfiguredBackend =
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(
jobId, "Could not deserialize application-defined state backend.", e);
}
}
//创建StateBackend
final StateBackend rootBackend;
try {
rootBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(
applicationConfiguredBackend,
jobGraph.getJobConfiguration(),
jobManagerConfig,
classLoader,
log);
} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
throw new JobExecutionException(
jobId, "Could not instantiate configured state backend", e);
}
// load the checkpoint storage from the application settings
final CheckpointStorage applicationConfiguredStorage;
final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
snapshotSettings.getDefaultCheckpointStorage();
if (serializedAppConfiguredStorage == null) {
applicationConfiguredStorage = null;
} else {
try {
applicationConfiguredStorage =
serializedAppConfiguredStorage.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(
jobId,
"Could not deserialize application-defined checkpoint storage.",
e);
}
}
//读取checkpoint配置
final CheckpointStorage rootStorage;
try {
rootStorage =
CheckpointStorageLoader.load(
applicationConfiguredStorage,
rootBackend,
jobGraph.getJobConfiguration(),
jobManagerConfig,
classLoader,
log);
} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
throw new JobExecutionException(
jobId, "Could not instantiate configured checkpoint storage", e);
}
// instantiate the user-defined checkpoint hooks
final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
snapshotSettings.getMasterHooks();
final List<MasterTriggerRestoreHook<?>> hooks;
//初始化用户checkpoint hook
if (serializedHooks == null) {
hooks = Collections.emptyList();
} else {
final MasterTriggerRestoreHook.Factory[] hookFactories;
try {
hookFactories = serializedHooks.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(
jobId, "Could not instantiate user-defined checkpoint hooks", e);
}
final Thread thread = Thread.currentThread();
final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(classLoader);
try {
hooks = new ArrayList<>(hookFactories.length);
for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
}
} finally {
thread.setContextClassLoader(originalClassLoader);
}
}
final CheckpointCoordinatorConfiguration chkConfig =
snapshotSettings.getCheckpointCoordinatorConfiguration();
//配置Checkpoint
executionGraph.enableCheckpointing(
chkConfig,
hooks,
checkpointIdCounter,
completedCheckpointStore,
rootBackend,
rootStorage,
checkpointStatsTrackerFactory.get(),
checkpointsCleaner,
jobManagerConfig.get(STATE_CHANGE_LOG_STORAGE));
}
return executionGraph;
}
在DefaultExecutionGraph的attachJobGraph()方法中,进行了ExecutionJobVertex节点的生成与初始化,并把ExecutionGraph划分了SchedulingPipelinedRegion。
DefaultExecutionGraph.attachJobGraph()方法源码:
public void attachJobGraph(
List<JobVertex> verticesToAttach, JobManagerJobMetricGroup jobManagerJobMetricGroup)
throws JobException {
assertRunningInJobMasterMainThread();
LOG.debug(
"Attaching {} topologically sorted vertices to existing job graph with {} "
+ "vertices and {} intermediate results.",
verticesToAttach.size(),
tasks.size(),
intermediateResults.size());
//生成ExecutionJobVertex
attachJobVertices(verticesToAttach, jobManagerJobMetricGroup);
if (!isDynamic) {
//初始化所有ExecutionJobVertex
initializeJobVertices(verticesToAttach);
}
//将ExecutionGraph的拓扑划分Region
// the topology assigning should happen before notifying new vertices to failoverStrategy
executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
partitionGroupReleaseStrategy =
partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}
3.创建ExecutionJobVertex
在DefaultExecutionGraph的attachJobGraph()方法中,调用了DefaultExecutionGraph的attachJobVertices()方法进行ExecutionJobVertex节点的生成。
源码图解:
DefaultExecutionGraph的attachJobVertices()方法遍历了JobGraph中所有的JobVertex节点,为每个JobVertex节点生成对应的ExecutionJobVertex节点。
DefaultExecutionGraph.attachJobVertices()方法源码:
private void attachJobVertices(
List<JobVertex> topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup)
throws JobException {
//遍历所有JobVertex
for (JobVertex jobVertex : topologicallySorted) {
//...
//遍历JobGraph的所有JobVertex,生成ExecutionJobVertex
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv =
executionJobVertexFactory.createExecutionJobVertex(
this,
jobVertex,
parallelismInfo,
coordinatorStore,
jobManagerJobMetricGroup);
//...
}
}
其中调用的ExecutionJobVertex.Factory.createExecutionJobVertex()方法具体创建了ExecutionJobVertex实例。
ExecutionJobVertex.Factory.createExecutionJobVertex()方法源码:
public static class Factory {
ExecutionJobVertex createExecutionJobVertex(
InternalExecutionGraphAccessor graph,
JobVertex jobVertex,
VertexParallelismInformation parallelismInfo,
CoordinatorStore coordinatorStore,
JobManagerJobMetricGroup jobManagerJobMetricGroup)
throws JobException {
//创建ExecutionJobVertex实例
return new ExecutionJobVertex(
graph, jobVertex, parallelismInfo, coordinatorStore, jobManagerJobMetricGroup);
}
4.进入ExecutionJobVertex初始化
完成了ExecutionJobVertex节点创建后,还需要对ExecutionJobVertex节点对应的ExecutionVertex节点与IntermediateResult数据集进行创建。
在DefaultExecutionGraph的attachJobGraph()方法中,在执行完DefaultExecutionGraph的attachJobVertices()方法创建完ExecutionJobVertex节点后,会继续执行DefaultExecutionGraph的initializeJobVertices()方法开始初始化ExecutionJobVertex节点。
源码图解:
DefaultExecutionGraph.attachJobGraph()方法源码:
public void attachJobGraph(
List<JobVertex> verticesToAttach, JobManagerJobMetricGroup jobManagerJobMetricGroup)
throws JobException {
assertRunningInJobMasterMainThread();
LOG.debug(
"Attaching {} topologically sorted vertices to existing job graph with {} "
+ "vertices and {} intermediate results.",
verticesToAttach.size(),
tasks.size(),
intermediateResults.size());
//生成ExecutionJobVertex
attachJobVertices(verticesToAttach, jobManagerJobMetricGroup);
if (!isDynamic) {
//初始化所有ExecutionJobVertex
initializeJobVertices(verticesToAttach);
}
//将ExecutionGraph的拓扑划分Region
// the topology assigning should happen before notifying new vertices to failoverStrategy
executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
partitionGroupReleaseStrategy =
partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}
DefaultExecutionGraph.initializeJobVertices()方法遍历了JobGraph中所有JobVertex节点,找到每个JobVertex节点对应的ExecutionJobVertex节点,并对其进行初始化。
DefaultExecutionGraph.initializeJobVertices()方法源码:
private void initializeJobVertices(List<JobVertex> topologicallySorted) throws JobException {
final long createTimestamp = System.currentTimeMillis();
//遍历JobVertex,初始化其对应的ExecutionJobVertex
for (JobVertex jobVertex : topologicallySorted) {
//获取每个JobVertex对应的ExecutionJobVertex
final ExecutionJobVertex ejv = tasks.get(jobVertex.getID());
//初始化每个ExecutionJobVertex
initializeJobVertex(ejv, createTimestamp);
}
}
DefaultExecutionGraph的initializeJobVertex()方法继承自其父类ExecutionGraph,在ExecutionGraph.initializeJobVertex()方法中,先调用VertexInputInfoComputationUtils的computeVertexInputInfos()方法生成当前ExecutionJobVertex节点的每个输入描述Map<IntermediateDataSetID, JobVertexInputInfo>,再初始化每个ExecutionJobVertex节点。
ExecutionGraph.initializeJobVertex()方法源码:
default void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
throws JobException {
//2.再初始化每个ExecutionJobVertex
initializeJobVertex(
ejv,
createTimestamp,
//1.先调用VertexInputInfoComputationUtils.computeVertexInputInfos()生成当前ExecutionJobVertex的input的描述Map<IntermediateDataSetID, JobVertexInputInfo>
VertexInputInfoComputationUtils.computeVertexInputInfos(
ejv, getAllIntermediateResults()::get));
}
VertexInputInfoComputationUtils的computeVertexInputInfos()方法取出ExecutionJobVertex节点每个输入JobEdge的IntermediateResultDataSet数据集,并继续调用VertexInputInfoComputationUtils的computeVertexInputInfos()方法。
VertexInputInfoComputationUtils.computeVertexInputInfos()方法源码:
public static Map<IntermediateDataSetID, JobVertexInputInfo> computeVertexInputInfos(
ExecutionJobVertex ejv,
Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever)
throws JobException {
checkState(ejv.isParallelismDecided());
final List<IntermediateResultInfo> intermediateResultInfos = new ArrayList<>();
//取出ExecutionJobVertex每个Input的JobEdge的IntermediateResultDataSet
for (JobEdge edge : ejv.getJobVertex().getInputs()) {
IntermediateResult ires = intermediateResultRetriever.apply(edge.getSourceId());
if (ires == null) {
throw new JobException(
"Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
+ edge.getSourceId());
}
intermediateResultInfos.add(new IntermediateResultWrapper(ires));
}
//继续调用computeVertexInputInfos()方法
return computeVertexInputInfos(
ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic());
}
VertexInputInfoComputationUtils的computeVertexInputInfos()方法遍历ExecutionJobVertex节点上游JobEdge中的每个IntermediateResultDataSet,根据DistributionPattern为Pointwise或AlltoAll生成不同的节点输入描述JobVertexInputInfo。
VertexInputInfoComputationUtils.computeVertexInputInfos()方法源码:
public static Map<IntermediateDataSetID, JobVertexInputInfo> computeVertexInputInfos(
int parallelism,
List<? extends IntermediateResultInfo> inputs,
boolean isDynamicGraph) {
checkArgument(parallelism > 0);
final Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos =
new LinkedHashMap<>();
//遍历所有IntermediateResultDataSet
for (IntermediateResultInfo input : inputs) {
//根据IntermediateResultDataSet的DistributionPattern
int sourceParallelism = input.getNumPartitions();
若DistributionPattern为Pointwise
if (input.isPointwise()) {
jobVertexInputInfos.putIfAbsent(
input.getResultId(),
computeVertexInputInfoForPointwise(
sourceParallelism,
parallelism,
input::getNumSubpartitions,
isDynamicGraph));
} else {
//若DistributionPattern为AlltoAll
jobVertexInputInfos.putIfAbsent(
input.getResultId(),
computeVertexInputInfoForAllToAll(
sourceParallelism,
parallelism,
input::getNumSubpartitions,
isDynamicGraph,
input.isBroadcast()));
}
}
return jobVertexInputInfos;
}
为每个ExecutionVerte节点生成对应的输入描述JobVertexInputInfo,需根据不同的DistributionPattern连接类型生成,若DistributionPattern为Pointwise,根据索引比例滑动选择分区为每个ExecutionJobVertex安排JobVertexInputInfo,若DistributionPattern为AlltoAll,则为每ExecutionJobVertex的每个上游生成对应的索引JobVertexInputInfo。
若DistributionPattern为Pointwise:
VertexInputInfoComputationUtils.computeVertexInputInfoForPointwise()方法源码:
static JobVertexInputInfo computeVertexInputInfoForPointwise(
int sourceCount,
int targetCount,
Function<Integer, Integer> numOfSubpartitionsRetriever,
boolean isDynamicGraph) {
final List<ExecutionVertexInputInfo> executionVertexInputInfos = new ArrayList<>();
//若输入并行度大于输出
if (sourceCount >= targetCount) {
for (int index = 0; index < targetCount; index++) {
//根据索引比例滑动选择分区
int start = index * sourceCount / targetCount;
int end = (index + 1) * sourceCount / targetCount;
IndexRange partitionRange = new IndexRange(start, end - 1);
IndexRange subpartitionRange =
computeConsumedSubpartitionRange(
index,
1,
() -> numOfSubpartitionsRetriever.apply(start),
isDynamicGraph,
false);
executionVertexInputInfos.add(
new ExecutionVertexInputInfo(index, partitionRange, subpartitionRange));
}
} else {
//若输入并行度小于输出
for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
//根据索引比例滑动选择分区
int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
int numConsumers = end - start;
IndexRange partitionRange = new IndexRange(partitionNum, partitionNum);
// Variable used in lambda expression should be final or effectively final
final int finalPartitionNum = partitionNum;
for (int i = start; i < end; i++) {
IndexRange subpartitionRange =
computeConsumedSubpartitionRange(
i,
numConsumers,
() -> numOfSubpartitionsRetriever.apply(finalPartitionNum),
isDynamicGraph,
false);
executionVertexInputInfos.add(
new ExecutionVertexInputInfo(i, partitionRange, subpartitionRange));
}
}
}
return new JobVertexInputInfo(executionVertexInputInfos);
}
若DistributionPattern为AlltoAll:
VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll()方法源码:
static JobVertexInputInfo computeVertexInputInfoForAllToAll(
int sourceCount,
int targetCount,
Function<Integer, Integer> numOfSubpartitionsRetriever,
boolean isDynamicGraph,
boolean isBroadcast) {
final List<ExecutionVertexInputInfo> executionVertexInputInfos = new ArrayList<>();
IndexRange partitionRange = new IndexRange(0, sourceCount - 1);
//为每个ExecutionVertex生成与前置IntermediateResultDataSet的对应关系
for (int i = 0; i < targetCount; ++i) {
IndexRange subpartitionRange =
computeConsumedSubpartitionRange(
i,
targetCount,
() -> numOfSubpartitionsRetriever.apply(0),
isDynamicGraph,
isBroadcast);
executionVertexInputInfos.add(
new ExecutionVertexInputInfo(i, partitionRange, subpartitionRange));
}
return new JobVertexInputInfo(executionVertexInputInfos);
}
在ExecutionGraph.initializeJobVertex()方法中,调用VertexInputInfoComputationUtils的computeVertexInputInfos()方法生成当前ExecutionJobVertex节点的每个输入描述Map<IntermediateDataSetID, JobVertexInputInfo>,为后续生成每个ExecutionJobVertex节点与IntermediateResult数据集的连接做了分配。
执行完VertexInputInfoComputationUtils.computeVertexInputInfos()方法后,DefaultExecutionGraph开始调用其initializeJobVertex()方法,开始初始化每个ExecutionJobVertex节点。
ExecutionGraph.initializeJobVertex()方法源码:
default void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
throws JobException {
//2.再初始化每个ExecutionJobVertex
initializeJobVertex(
ejv,
createTimestamp,
//1.先调用VertexInputInfoComputationUtils.computeVertexInputInfos()生成当前ExecutionJobVertex的input的描述Map<IntermediateDataSetID, JobVertexInputInfo>
VertexInputInfoComputationUtils.computeVertexInputInfos(
ejv, getAllIntermediateResults()::get));
}
DefaultExecutionGraph的initializeJobVertex()方法初始化了每个ExecutionJobVertex节点,并连接了ExecutionJobVertex节点和前置的intermediateResults数据集。
DefaultExecutionGraph.initializeJobVertex()方法源码:
public void initializeJobVertex(
ExecutionJobVertex ejv,
long createTimestamp,
Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos)
throws JobException {
//...
//初始化每个ExecutionJobVertex
ejv.initialize(
executionHistorySizeLimit,
rpcTimeout,
createTimestamp,
this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()));
//连接ExecutionJobVertex和前置的intermediateResults
ejv.connectToPredecessors(this.intermediateResults);
//...
}
ExecutionJobVertex.initialize()方法创建了ExecutionJobVertex节点对应的ExecutionVertex节点与IntermediateResult数据集。
5.创建ExecutionVertex节点与IntermediateResult数据集
进入ExecutionJobVertex的initialize()方法,方法会为ExecutionJobVertex节点下游每个IntermediateDataSet数据集创建对应的IntermediateResult数据集,并按并行度为每个ExecutionJobVertex节点创建ExecutionVertex节点。
源码图解:
在ExecutionJobVertex的initialize()方法中,对ExecutionJobVertex节点每个下游IntermediateDataSet数据集生成对应的IntermediateResult数据集,并按并行度创建了每个ExecutionVertex。
ExecutionJobVertex.initialize()方法源码:
protected void initialize(
int executionHistorySizeLimit,
Time timeout,
long createTimestamp,
SubtaskAttemptNumberStore initialAttemptCounts)
throws JobException {
checkState(parallelismInfo.getParallelism() > 0);
checkState(!isInitialized());
this.taskVertices = new ExecutionVertex[parallelismInfo.getParallelism()];
this.inputs = new ArrayList<>(jobVertex.getInputs().size());
//声明IntermediateResults
// create the intermediate results
this.producedDataSets =
new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
//创建当前ExecutionJobVertex的下游的IntermediateResult
for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
//获取每个当前JobVertex下游的IntermediateDataSet
final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
//对每个IntermediateDataSet生成IntermediateResult
this.producedDataSets[i] =
new IntermediateResult(
result,
this,
this.parallelismInfo.getParallelism(),
result.getResultType());
}
//把ExecutionJobVertex按并行度创建多个ExecutionVertex
// create all task vertices
for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {
//创建每个ExecutionVertex
ExecutionVertex vertex =
createExecutionVertex(
this,
i,
producedDataSets,
timeout,
createTimestamp,
executionHistorySizeLimit,
initialAttemptCounts.getAttemptCount(i));
this.taskVertices[i] = vertex;
}
// sanity check for the double referencing between intermediate result partitions and
// execution vertices
for (IntermediateResult ir : this.producedDataSets) {
if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) {
throw new RuntimeException(
"The intermediate result's partitions were not correctly assigned.");
}
}
// set up the input splits, if the vertex has any
try {
@SuppressWarnings("unchecked")
InputSplitSource<InputSplit> splitSource =
(InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
if (splitSource != null) {
Thread currentThread = Thread.currentThread();
ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
currentThread.setContextClassLoader(graph.getUserClassLoader());
try {
inputSplits =
splitSource.createInputSplits(this.parallelismInfo.getParallelism());
if (inputSplits != null) {
splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
}
} finally {
currentThread.setContextClassLoader(oldContextClassLoader);
}
} else {
inputSplits = null;
}
} catch (Throwable t) {
throw new JobException(
"Creating the input splits caused an error: " + t.getMessage(), t);
}
}
其中ExecutionVertex节点的创建是通过调用ExecutionJobVertex的createExecutionVertex()方法,进行了对ExecutionVertex节点的实例化。
ExecutionJobVertex.createExecutionVertex()方法源码:
protected ExecutionVertex createExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout,
long createTimestamp,
int executionHistorySizeLimit,
int initialAttemptCount) {
//实例化每个ExecutionVertex
return new ExecutionVertex(
jobVertex,
subTaskIndex,
producedDataSets,
timeout,
createTimestamp,
executionHistorySizeLimit,
initialAttemptCount);
}
6.创建每个ExecutionVertex节点对应的Execution与IntermediateResultPartition数据集分区
在ExecutionVertex的构造方法中,首先配置了ExecutionVertex节点的基本信息,然后根据下游IntermediateResult数据集的生成当前ExecutionVertex节点对应的IntermediateResultPartition数据集分区,最后创建封装Task执行信息的Execution。
源码图解:
ExecutionVertex.ExecutionVertex()方法源码:
public ExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout,
long createTimestamp,
int executionHistorySizeLimit,
int initialAttemptCount) {
//配置ExecutionVertex基本信息
this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex;
this.executionVertexId = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);
this.taskNameWithSubtask =
String.format(
"%s (%d/%d)",
jobVertex.getJobVertex().getName(),
subTaskIndex + 1,
jobVertex.getParallelism());
//根据下游IntermediateResult的生成当前ExecutionVertex对应的IntermediateResultPartition
this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);
for (IntermediateResult result : producedDataSets) {
IntermediateResultPartition irp =
new IntermediateResultPartition(
result,
this,
subTaskIndex,
getExecutionGraphAccessor().getEdgeManager());
result.setPartition(subTaskIndex, irp);
resultPartitions.put(irp.getPartitionId(), irp);
}
this.executionHistory = new ExecutionHistory(executionHistorySizeLimit);
this.nextAttemptNumber = initialAttemptCount;
this.inputBytes = NUM_BYTES_UNKNOWN;
this.timeout = timeout;
this.inputSplits = new ArrayList<>();
//创建封装Task执行信息的Execution
this.currentExecution = createNewExecution(createTimestamp);
getExecutionGraphAccessor().registerExecution(currentExecution);
}
在创建封装Task执行信息的Execution时,调用了ExecutionVertex.createNewExecution()方法进行了Execution的实例化。
ExecutionVertex.createNewExecution()方法源码:
Execution createNewExecution(final long timestamp) {
//实例化Execution
return new Execution(
getExecutionGraphAccessor().getFutureExecutor(),
this,
nextAttemptNumber++,
timestamp,
timeout);
}
7.结语
至此,ExecutionGraph的ExecutionJobVertex节点、ExecutionVertex节点、IntermediateResult数据集、IntermediateResultPartition数据集分区与封装Task执行信息的Execution都已创建完毕。因篇幅关系,ExecutionGraph生成的后续源码解析将继续在本专栏的下篇博文展开。