Flink 任务调度机制

发布于:2025-04-12 ⋅ 阅读:(28) ⋅ 点赞:(0)

一、Task 任务调度执行流程

一、Graph 的概念

        Flink 中的执行图可以分为四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

  • StreamGraph:执行用户代码中的 env.execute() 方法后,根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph 经过优化(合并算子链、设置并行度、优化分区策略等)后生成 JobGraph, 是客户端提交给 JobManager 的数据结构。
  • ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

        例如 example 里的 WordCount 并行度为 2 (Source 为 1 个并行度)的四层执行图的演变过程如下所示:

public static void main(String[] args) throws Exception {

		// Checking input parameters
		final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

		// set up the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// make parameters available in the web interface
		env.getConfig().setGlobalJobParameters(params);

		// get input data
		DataStream<String> text = null;
		if (params.has("input")) {
			// union all the inputs from text files
			for (String input : params.getMultiParameterRequired("input")) {
				if (text == null) {
					text = env.readTextFile(input);
				} else {
					text = text.union(env.readTextFile(input));
				}
			}
			Preconditions.checkNotNull(text, "Input DataStream should not be null.");
		} else {
			System.out.println("Executing WordCount example with default input data set.");
			System.out.println("Use --input to specify file input.");
			// get default test text data
			text = env.fromElements(WordCountData.WORDS);
		}

		DataStream<Tuple2<String, Integer>> counts =
			// split up the lines in pairs (2-tuples) containing: (word,1)
			text.flatMap(new Tokenizer())
			// group by the tuple field "0" and sum up tuple field "1"
			.keyBy(value -> value.f0).sum(1);

		// emit result
		if (params.has("output")) {
			counts.writeAsText(params.get("output"));
		} else {
			System.out.println("Printing result to stdout. Use --output to specify output path.");
			counts.print();
		}
		// execute program
		env.execute("Streaming WordCount");
	}

相关术语解释:

StreamGraph:

        StreamNode:用来代表 operator 的类,并具有所有的相关属性,如并行度、入边和出边等。

        StreamEdge:表示连接两个 StreamEdge 的边。

JobGraph:

        JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。

        IntermediateDataSet:表示 JobVertex 的输出,经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。

        JobEdge:代表了 JobGraph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。

 ExecutionGraph:

        ExecutionJobVertex:和 JobGraph 中的 JobVertex 一一对应。每个 ExecutionJobVertex 都有和并行度一样多的 ExecutionVertex。

        ExecutionVertex:表示 ExecutionJobVertex 的其中一个并行子任务,输入是 ExecutionEdge,输出是 IntermediateResultPartition。

        IntermediateResult:和 JobGraph 中的 IntermediateDataSet 一一对应。一个 IntermediateResult 包含多个 IntermediateResultPartition,其个数等于该 operator 的并行度。

        IntermediateResultPartition:表示 ExecutionVertex 的一个输出分区,producer 是 ExecutionVertex,consumer 是若干个 ExecutionEdge。       

        ExecutionEdge:表示 ExecutionVertex 的输入,source 是 IntermediateResultPartition,target 是 ExecutionVertex。source 和 target 都只能是一个。

        Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM 和 TM 之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接收者。 

        归纳:       

  • 由于每个 JobVertex 可以有多个 IntermediateDataSet,所以每个 ExecutionJobVertex 可以有多个 IntermediaResult,因此每个 ExecutionVertex 也可以包含多个 IntermediateResultPartition;
  • ExecutionEdge 主要的作用是把 ExecutionVertex 和 IntermediateResultPartition 连接起来,表示它们之间的连接关系。

物理执行图:

        ResultPartition:代表由一个 Task 生成的数据,和 ExecutionGraph 中的 IntermediateResultPartition 一一对应。

        ResultSubPartition:是 ResultPartition 的一个子分区。每个 ResultPartition 包含多个 ResultSubPartition,其数目要由下游消费 Task 数和 DistributionPartition 来决定。

        InputGate:代表 Task 的输入封装,和 JobGraph 中 JobEdge 一一对应。每个 InputGate 消费了一个或多个的 ResultPartition。

        InputChannel:每个 InputGate 会包含一个以上的 InputChannel,和 ExecutionGraph 中的 ExecutionEdge 一一对应,也和 ResultSubpartition 一对一的相连,即一个 InputChannel 接收一个 ResultSubPartition 的输出。

二、Graph 生成的底层代码

2.1 StreamGraph 在 Client 生成

执行用户代码中的 StreamExecutionEnvironment.execute()

        -> execute(getJobName())

                -> execute(getSStreamGraph())

                        -> getStreamGraph(jobName,true)

StreamExecutionEnvironment.java

public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
		StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
		if (clearTransformations) {
			this.transformations.clear();
		}
		return streamGraph;
}

调用了 StreamGraphGenerator 的 generate() 方法

public StreamGraph generate() {
		streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
		shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
		configureStreamGraph(streamGraph);

		alreadyTransformed = new HashMap<>();

		/*TODO transformations是一个list,依次存放了用户代码里的算子*/
		for (Transformation<?> transformation: transformations) {
			transform(transformation);
	}
    final StreamGraph builtStreamGraph = streamGraph;

		alreadyTransformed.clear();
		alreadyTransformed = null;
		streamGraph = null;

		return builtStreamGraph;
}

        一个关键的参数是 List<Transformation<?>> transformations。Transformation 代表了从一个或多个 DataStream 生成新的 DataStream 的操作。DataStream 的底层就是一个 Transformation,描述了这个 DataStream 是怎么来的。

        DataSteam 上常见的 transformation 有 map、flatmap、filter 等。这些 transformation 会构造出一个 StreamTransformation 树,通过这棵树转换成 StreamGraph。

        以 map 为例,分析 List<Transformation<?>> transformations 的数据:

DataStream.java

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
        // 通过 java reflection 抽出的 mapper 的返回值类型
		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
				Utils.getCallLocationName(), true);

		return map(mapper, outType);
}


public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
		// 返回一个新的DataStream,StreamMap 为 StreamOperator 的实现类
		return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}


public <R> SingleOutputStreamOperator<R> transform(
			String operatorName,
			TypeInformation<R> outTypeInfo,
			OneInputStreamOperator<T, R> operator) {

		return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}


protected <R> SingleOutputStreamOperator<R> doTransform(
			String operatorName,
			TypeInformation<R> outTypeInfo,
			StreamOperatorFactory<R> operatorFactory) {

		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();

        // 新的 transformation 会连接上当前 DataStream 中的 transformation,从而构建成一棵树
		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
				this.transformation,
				operatorName,
				operatorFactory,
				outTypeInfo,
				environment.getParallelism());

		@SuppressWarnings({"unchecked", "rawtypes"})
		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        // 所有的 transformation 都会存到 env 中,调用 execute() 时遍历该 list 生成 StreamGraph
		getExecutionEnvironment().addOperator(resultTransform);

		return returnStream;
	}

        从上方代码可以了解到,map 转换用户自定义的函数 MapFunction 包装到 StreamMap 这个 Operator 中,再将 StreamMap 包装到 OneInputTransformation,最后该 transformation 存到 env 中,当调用 env.execute() 时,遍历其中的 transformation 集合构造出 StreamGraph。分层实现如下图所示:

        另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中的物理操作。有一些只是逻辑概念,比如 union、split / select、partition 等。如下图所示的转换树,在运行时会优化成下方的操作图。
        union、split / select、partition 中的信息会被写入到 Source -> Map 的边中。通过源码也可以发现 UnionTransformation、SplitTransformation、SelectTransformation、PartitionTransformation 由于不包含具体的操作,所以都没有 StreamOperator 成员变量,而其他的 StreamTransformation 的子类基本上都有。

        继续分析 StreamGraph 生成的源码:

        StreamExecutionEnvironment.getStreamGraph() -> StreamGraphGenerator.generator() -> StreamGraphGenerator.transform()

        StreamGraphGenerator.java

// 对每个 transformation 进行转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge
// 返回值为该 transform 的 id 集合,通常大小为 1 个(除 FeedbackTransformation)
private Collection<Integer> transform(Transformation<?> transform) {
		if (alreadyTransformed.containsKey(transform)) {
			return alreadyTransformed.get(transform);
		}

		LOG.debug("Transforming " + transform);

		if (transform.getMaxParallelism() <= 0) {

			// if the max parallelism hasn't been set, then first use the job wide max parallelism
			// from the ExecutionConfig.
			int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
			if (globalMaxParallelismFromConfig > 0) {
				transform.setMaxParallelism(globalMaxParallelismFromConfig);
			}
		}

		// call at least once to trigger exceptions about MissingTypeInfo
        // 为了触发 MissingTypeInfo 的异常
		transform.getOutputType();

		@SuppressWarnings("unchecked")
		final TransformationTranslator<?, Transformation<?>> translator =
				(TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());

		Collection<Integer> transformedIds;
		if (translator != null) {
			transformedIds = translate(translator, transform);
		} else {
			transformedIds = legacyTransform(transform);
		}

		// need this check because the iterate transformation adds itself before
		// transforming the feedback edges
		if (!alreadyTransformed.containsKey(transform)) {
			alreadyTransformed.put(transform, transformedIds);
		}

		return transformedIds;
	}
private Collection<Integer> translate(
			final TransformationTranslator<?, Transformation<?>> translator,
			final Transformation<?> transform) {
		checkNotNull(translator);
		checkNotNull(transform);

		final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());

		// the recursive call might have already transformed this
		if (alreadyTransformed.containsKey(transform)) {
			return alreadyTransformed.get(transform);
		}

		final String slotSharingGroup = determineSlotSharingGroup(
				transform.getSlotSharingGroup(),
				allInputIds.stream()
						.flatMap(Collection::stream)
						.collect(Collectors.toList()));

		final TransformationTranslator.Context context = new ContextImpl(
				this, streamGraph, slotSharingGroup, configuration);

		return shouldExecuteInBatchMode
				? translator.translateForBatch(transform, context)
				: translator.translateForStreaming(transform, context);
}

        SimpleTransformationTranslator.java

public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
		checkNotNull(transformation);
		checkNotNull(context);

		// 区分 map之类的转换算子(OneInput) 和 keyby值类的分区算子(partition)
		final Collection<

网站公告

今日签到

点亮在社区的每一天
去签到