Flink-Source算子状态恢复分析

发布于:2025-07-06 ⋅ 阅读:(22) ⋅ 点赞:(0)
背景
修改 source 算子

kafka_old_topic 消费任务运行一段时间后,暂停状态并保留。然后将 uid 和 topic 都改了,消费者 offset 会从 earliest 开始。

// before
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer)
    .uid("kafka-source-old")
    .name("kafka-source-old");

// after
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer)
    .uid("kafka-source-new")
    .name("kafka-source-new");
新增 source 算子

但是只新增一个同样的 kafka-source-new 算子(old 保留,消费者 offset 却会从最近开始。

FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer)
    .uid("kafka-source-old")
    .name("kafka-source-old");

// 新增
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer)
    .uid("kafka-source-new")
    .name("kafka-source-new");
算子(链)子任务状态列表(operatorSubtaskStates)

针对第一种情况,job 的算子状态(localStates)有三个,分别对应xxx,

当给 Task【Source: kafka-source-new -> map-heart (org.apache.flink.streaming.runtime.tasks.SourceStreamTask) 】(被修改的 source)分配状态时,该 Task 的每个算子都会绑定一个状态(OperatorState):“kafka-source-new”、“map-heart”,只不过这两个 OperatorState 有点差异:

这两个算子状态的 operatorSubtaskStates (存储算子子任务的状态信息)集合一个为空,一个不为空。原因就是在分配 “kafka-source-new” 算子状态时,由于其不在 localState,于是走了默认的构造函数创建 OperatorState 对象:

其实关键点就在 operatorSubtaskStates 的封装。

TaskStateAssignment 任务状态分配

TaskStateAssignment 的构造方法有个核心参数 hasNonFinishedState。

如果当前 Task 的子任务状态列表(operatorSubtaskStates全集)不为空,该值就为 true。

一旦该值为 true,就会执行 assignTaskStateToExecutionJobVertices:

给当前 Task 的每个 subTask 赋值状态:

那么每个 subTask 都会有一份状态(JobManagerTaskRestore,绑定 checkpointId):

JobManagerTaskRestore(JM与TM状态交互中间站)

一个 Execution 就是一个 subTask:

Task 部署阶段(JM 向 TM 提交 Task 任务),TM 会根据 TaskDeploymentDescriptor 来恢复状态和创建算子(其中 taskRestore 就是 JobManagerTaskRestore,在 setInitialState 中赋值)。

TM 接收到提交任务请求时,解析出 taskRestore 创建任务状态管理器(TaskStateManager)

TaskStateManager(TM 的任务管理器)
算子子任务状态获取

prioritizedOperatorState:传入算子 ID,即可从 JobMangerTaskRestore 获取子任务状态。

  1. 如果 JobMangerTaskRestore 为 null,那么返回一个空的 PrioritizedOperatorSubtaskState(checkpoint设置为null

  1. 如果不为 null,则会从 JobManagerTaskRestore 中根据算子ID封装 PrioritizedOperatorSubtaskState。

StateInitializationContext(UDF-算子状态初始化上下文)

KafkaConsumerBase 在初始化状态阶段,会调用context.isRestored()判断是否从状态恢复:

算子状态句柄(StreamOperatorStateHandler)处理算子状态的初始化,该阶段会调用 UDFKafkaConsumerBase.initializeState初始化算子的本地状态并且 checkpointId 就是在这里被写入状态上下文 StateInitializationContext(该上下文是可以被用户访问的)。

StreamOperatorStateContext(initializeState全局上下文)

以上得知 checkpointId 来自context.getRestoredCheckpointId,那么 context (该上下文是不可以被用户访问的)从何而来?

算子状态初始化AbstractStreamOperator.initialState,利用 StreamTaskStateInitializer

封装 StreamOperatorStateContext:

那么 checkpointId 的封装肯定在streamTaskStateManger.streamOperatorStateContext中:

方法中通过 taskStateManger 封装算子状态,如果 prioritizedOperatorSubtaskState 为空对象,那么这里的 checkpointId 就为 null


针对第二种情况,task 的算子状态(有多个算子,算子链)不存在 localOperators 中,则默认使用构造方法封装 OperatorState,每个OperatorState 的 operatorSubtaskStates 集合都为空。