Flink核心功能与运行流程详解

发布于:2025-07-01 ⋅ 阅读:(16) ⋅ 点赞:(0)

目录

一、背景 

二、图构建

三、任务执行流程(yarn per-job模式)

3.1 Flink组件

3.2 执行流程

四、分布式调度

4.1 TM的slot

4.2 TM的slot的CPU与内存

4.3 节点的部署

4.4 节点的状态

4.5 节点部署流程

五、数据传输

5.1 内存分配

5.2 传输流程

六、高可用

6.1 节点高可用

6.2 作业高可用

6.2.1 作业异常

6.2.1 状态后端

6.2.2 状态存储位置

6.2.3 分布式快照算法

6.2.4 exactly once

七、新特性

7.1 窗口

7.1.1 窗口存储的位置

7.1.2 分配器

7.13 触发器

7.14 整体流程

八、批流一体

1. 统一的编程模型:写一次,跑在批或流上

2. 统一的执行引擎核心 (Runtime)

3. 统一的核心抽象:数据即流 (Data as Stream)

4. 执行策略的“自适应”而非“分裂”

一、背景 

很火的分布式计算框架,解析一下其核心功能与运行流程,只分析在流式计算下的情况,不分析批的情况,内容大量基于DeepSeek回复(已根据本人理解校验过一遍正确性)

二、图构建

我们的写的计算逻辑会封装在Operator相关类中

大概类似下图所示:

Operator和Operator之间的关系即路由关系会封装在TransForm中,如下图所示,input是上一个转换,OperatorFactory是当前计算逻辑

最后所有的路由关系会在Enviroment中transformation中保存

详情可以看我的:短分享-Flink图构建_路线图的构建使用flink-CSDN博客

后面会根据路由构建StreamGraph->JobGraph->ExecutionGraph->物理执行图

其中Operator的合并发生在StreamGraph->JobGraph中

ExecutionGraph算是拓扑图

物理执行图的对象中就包含具体的数据了

具体对象关系可以见下图所示

三、任务执行流程(yarn per-job模式

下面是复制 DeepSeek的回答(个人感觉它的回答已经很全面了)

3.1 Flink组件

  1. Dispatcher (调度器)

    • 职责:Flink 集群的“前台”和“作业入口”。

      • 接收客户端提交的 JobGraph(作业图)。

      • 为每个提交的作业启动一个新的 JobMaster

      • 提供 Web UI 和 REST API 的入口。

    • 高可用需求:如果 Dispatcher 挂了,新的作业提交、Web UI/REST 访问都会中断。需要选举新的 Leader Dispatcher 来接管这些职责。

  2. ResourceManager (资源管理器 - RM)

    • 职责:Flink 集群的“资源总监”。

      • 管理 TaskManager (TM) 资源(Slot)。

      • 与底层资源框架(如 YARN ResourceManager, Kubernetes API Server, Mesos Master)通信,申请/释放资源。

      • 处理 TM 的注册、心跳、Slot 报告。

      • 响应 JobMaster 的资源请求,为其分配 Slot。

    • 高可用需求:如果 RM 挂了,资源管理(申请、释放、分配 Slot)瘫痪,作业无法启动新任务或替换失败任务。需要选举新的 Leader RM 来恢复资源管理。

  3. JobMaster (作业主管)

    • 职责每个 Flink 作业的“专属管家”和“执行指挥官”。

      • 一个运行的作业对应一个 JobMaster 实例(由 Dispatcher 创建)。

      • 管理单个作业的执行生命周期(调度、部署、检查点协调、故障恢复、保存点、作业取消/完成)。

      • 持有该作业的 ExecutionGraph(执行图)。

      • 向 ResourceManager 申请 Slot 资源。

      • 管理分配给该作业的 TaskManager Task Slot。

      • 协调 Source、Sink 和算子的 Task。

    • 高可用需求:如果某个作业的 JobMaster 挂了,该作业的管理和执行就瘫痪了。需要为该作业选举新的 Leader JobMaster 来接管其恢复和执行(从 Checkpoint 恢复状态,重新调度任务等)。

  4. BlobServer / BlobCache (二进制大对象服务/缓存):

  • 职责:分布式文件缓存,用于存储和分发用户 JAR 包、配置文件等。

  • 高可用需求:通常 HA 对其影响较小,因为文件本身存储在 HA 存储(如 HDFS)中。

以上的几个组件组成了JobManager

3.2 执行流程

  1. 作业提交 (flink run -m yarn-per-job ...):

    • 用户在命令行执行 flink run -t yarn-per-job ... 提交 Flink 作业。

    • Flink Client (flink run) 开始工作:

      • 解析命令行参数和配置。

      • 生成作业的 JobGraph(优化后的执行计划)。

      • 将作业的 JAR 文件、库依赖、配置文件等上传到分布式存储(通常是 HDFS)的一个临时位置。

      • 创建一个 YARN Client

      • YARN Client 向 YARN ResourceManager (RM) 提交一个新的 YARN Application。提交的信息包括:

        • 启动 AM 所需的命令:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

        • AM 所需的资源(如 1 vcore, 1024 MB 内存)。

        • 作业的配置 (flink-conf.yaml)。

        • 包含作业 JAR、依赖和配置的分布式存储路径

        • YARN_APPLICATION_NAME (通常基于作业名)。

      • 提交后,Client 可以退出或保持连接等待作业结果(-d 参数决定)。

  2. YARN 资源分配与 AM/JM 启动:

    • YARN ResourceManager (RM) 收到新的 Application 提交请求。

    • YARN RM 找到一个合适的 YARN NodeManager (NM) 节点(基于调度策略和资源可用性)。

    • YARN RM 向选定的 NM 发送指令,要求它启动一个 Container 来运行 ApplicationMaster (AM)

    • YARN NodeManager (NM) 收到指令:

      • 准备 Container 的运行环境(本地目录、资源限制)。

      • 执行启动 AM 的命令:YarnJobClusterEntrypoint。这标志着 Flink JobManager (JM) 进程的启动

      • NM 监控 Container 的生命周期并向 RM 报告状态。

  3. Flink JobManager (JM) / AM 初始化:

    • YarnJobClusterEntrypoint 进程启动。

    • 初始化 Flink 运行时环境:

      • 加载配置 (flink-conf.yaml 和从 Client 上传的配置)。

      • 启动 RPC 服务端点 (Akka / Netty)。

      • 启动 Blob Server (Artifact Server),使其准备好提供文件下载服务。

      • 启动 Dispatcher 组件。

      • 启动 Flink ResourceManager (RM),其具体实现是 YarnResourceManager

      • 启动 Metrics ReporterWeb UI 等组件。

    • YarnResourceManager 向 YARN ResourceManager (RM) 注册自己作为该 YARN Application 的 AM。至此,Flink JM 进程正式承担起 AM 的角色。

    • Dispatcher 从分布式存储加载 Client 上传的 Job JAR、依赖和配置。

    • Dispatcher 根据加载的 Job JAR 和配置,创建 JobGraph (如果 Client 没生成,这里会生成)。

    • Dispatcher 启动一个 JobMaster 实例来管理这个即将运行的作业。它将 JobGraph 传递给 JobMaster。

  4. TaskManager (TM) 资源申请与启动:

    • JobMaster 接收到 JobGraph 后,开始进行调度准备工作:

      • 解析 JobGraph 成 ExecutionGraph (包含所有并行任务实例)。

      • 计算作业所需的 Slot 总数 (基于算子的并行度)。

    • JobMaster 向 Flink ResourceManager (RM) (YarnResourceManager) 发出 Slot 资源请求

    • YarnResourceManager 收到 Slot 请求:

      • 根据配置 (taskmanager.numberOfTaskSlots) 和所需 Slot 总数,计算出需要启动的 TM 实例数量

      • 考虑资源规格 (taskmanager.memory.process.sizetaskmanager.cpu.cores)。

      • 将这些需求转化为向 YARN ResourceManager (RM) 申请特定数量、特定资源规格的 Container 的请求。

      • YarnResourceManager 通过 YARN AM-RM 协议发送 Container 申请给 YARN RM。申请中指定了启动 TM 所需的资源(CPU, 内存)和启动命令 (org.apache.flink.yarn.entrypoint.YarnTaskExecutorRunner)。

    • YARN ResourceManager (RM) 收到 Container 申请:

      • 根据集群资源和调度策略(Capacity/Fair Scheduler),在满足条件的 YARN NodeManager (NM) 节点上进行资源分配。

      • 将分配好的 Container 信息(在哪个 NM 上、资源详情)通过 Container Launch Context 的形式“发放”给 AM (YarnResourceManager)。

    • YarnResourceManager 收到 YARN RM 分配的 Container 信息:

      • 它通知对应的 YARN NodeManager (NM) 启动 Container。发送的信息包含启动 TM 的确切命令 (YarnTaskExecutorRunner) 和必要的环境变量(特别是 JM 的 RPC 地址Blob Server 地址)。

    • YARN NodeManager (NM) 收到启动 Container 的指令:

      • 准备 Container 的运行环境。

      • 执行启动命令:YarnTaskExecutorRunner。这标志着 Flink TaskManager (TM) 进程的启动

      • NM 监控该 Container 的生命周期并向 YARN RM 报告状态。

  5. TaskManager 注册与 Slot 提供:

    • Flink TaskManager (TM) 进程 (YarnTaskExecutorRunner) 启动后:

      • 加载配置。

      • 初始化 RPC 服务、内存管理器、网络栈、I/O 管理器等。

      • 根据配置创建指定数量的 Task Slot

      • 获取到启动时传入的 JM 的 RPC 地址

      • 主动向 JobManager 的 RPC 端点发起注册请求。注册信息包含自己的资源 ID、主机名、可用 Slot 信息、TM 的 RPC 地址等。

    • Flink ResourceManager (RM) (YarnResourceManager) 收到 TM 的注册请求:

      • 记录这个 TM 实例及其 Slot 资源。

      • 将 TM 注册成功的信息通知给 SlotManager (Flink RM 内部组件)。

    • SlotManager 知道有新的 Slot 可用后,通知 JobMaster:之前申请的 Slot 资源现在可用了。

    • JobMaster 收到 Slot 可用通知:

      • 开始实际的任务调度

      • 将具体的 Task(序列化的执行代码和数据)部署到注册上来的 TM 的 Slot 中。

      • 部署是通过 RPC 调用将 TaskDeploymentDescriptor (TDD) 发送给 TM 来完成的。TDD 包含了执行任务所需的所有信息,包括从 JM 的 Blob Server 下载用户代码 JAR 文件的地址。

  6. 作业执行:

    • TaskManager (TM) 收到 JobMaster 部署 Task 的请求:

      • 如果需要,从 JM 的 Blob Server 下载用户代码 JAR 和依赖。

      • 在指定的 Slot 中反序列化并启动 Task 线程执行用户代码。

      • 任务开始执行计算逻辑,处理数据流。

      • TM 定期向 JM 发送心跳和状态更新(如指标信息)。

    • JobMaster 持续监控所有 Task 的执行状态:

      • 协调检查点 (Checkpointing) 和保存点 (Savepointing)。

      • 处理 Task 失败(重试、重启策略)。

      • 管理作业的生命周期状态(RUNNING, FINISHED, FAILED, CANCELED)。

  7. 作业完成与集群清理:

    • 当作业最终状态变为 FINISHEDFAILED 或 CANCELED

      • JobMaster 通知 Flink ResourceManager (RM) (YarnResourceManager) 作业结束。

      • YarnResourceManager 向 YARN ResourceManager (RM) 发送 Application 完成 的最终状态报告。

      • YarnResourceManager 主动请求 YARN RM 释放所有为该 Application 分配的 Container (即所有的 TM Container)。

      • YARN RM 通知对应的 NM 停止 TM Container。

      • Flink JobManager (JM) / AM 进程 (YarnJobClusterEntrypoint) 正常退出。

      • YARN NM 监控到 AM Container 退出,向 YARN RM 报告 Application 完成。

      • YARN RM 清理 Application 相关的记录。

      • Flink Client 如果还在等待,会收到作业最终状态的报告。

      • 临时上传到分布式存储(HDFS)的作业 JAR、依赖和配置文件通常会被自动清理

四、分布式调度

有dag图之后,要把节点deploy到机器上

4.1 TM的slot

首先要了解下机器能承受多少个节点并发执行

TM的slot个数是TM能承受的并发Task个数,由用户配置

  • 关键配置参数是 taskmanager.numberOfTaskSlots

  • 你可以在 flink-conf.yaml 文件中设置这个值,或者在启动 TaskManager 时通过命令行参数 (-Dtaskmanager.numberOfTaskSlots=<N>) 指定。

  • 理论上,你可以设置成任意正整数。

  • 实践中,这个值通常设置为 TaskManager 所在机器/容器可用的 CPU 核心数(或其整数倍)

4.2 TM的slot的CPU与内存

因为TM其实是一个JVM,所以一个TM上的所有slot共享所在机器上的所有CPU资源

内存的话:

JVM堆内存:

  • 框架堆内存 (Framework Heap): Flink 框架本身运行所需的内存(如 RPC、协调等)。

  • 任务堆内存 (Task Heap): 分配给用户代码(如 UDF、窗口状态中的对象)使用的堆内存。

  • 元空间

堆外内存:

  • 托管内存 (Managed Memory): 这是最关键的部分。Flink 自己管理的一块内存区域(可以是堆内或堆外,默认是堆外内存),主要用于:

    • 排序 (Sorting)

    • 哈希表 (Hash Tables - Joins/Aggregations)

    • RocksDB 状态后端时的缓存 (Caching) (当使用 RocksDBStateBackend 时,RocksDB 本身使用 JNI 管理自己的堆外内存,但 Flink 会预留 Managed Memory 给它做缓存)

    • 批处理作业的中间结果 (Batch Intermediate Results)

  • 网络内存 (Network Memory): 用于任务之间数据传输(Shuffle)的缓冲(Buffers)。这对流处理和高吞吐批处理至关重要。

  •  它们在 flink-conf.yaml 中通过不同的参数配置大小:

  • taskmanager.memory.managed.size/fraction 控制 Managed Memory 总量。

  • taskmanager.memory.network.min/max/fraction 控制 Network Memory 总量。

每个slot平分的是堆外内存,而不是JVM内存

4.3 节点的部署

上面说到了Flink会把算子构造ExecutionGraph,要去做节点部署时,会根据ExecutionGraph构造一个拓扑图DefaultExecutionTopology,调度的顺序依赖拓扑图决定,这个我们后面再看,先看当决定要部署一个节点时,做了哪些工作。

ExecutionVertex相关任务的内容会被打包成TDD,发送给TaskExecutor

TaskExecutor和TaskManager的区别:

TaskManager 是 JVM 进程,TaskExecutor 是运行在这个 JVM 进程内的核心服务组件。TaskManager 进程启动时会启动 TaskExecutor 服务。我们通常说“任务部署到 TaskManager 上”,更精确地说是部署到 TaskManager 内部的 TaskExecutor 服务上

然后:

  • 创建:当 TaskExecutor 接收到 JobManager 发来的 TaskDeploymentDescriptor (TDD) 并决定部署任务时,它会:

    1. 实例化 Task 对象(解析 TDD,加载类,初始化内部组件如状态后端、输入/输出网关)。

    2. 将实例化好的 Task 对象提交给 TaskExecutor 的线程池 (ExecutorService)。

  • 启动:线程池中的一个空闲线程被分配来执行这个 Task 对象的 run() 方法。这时,Task 线程开始运行,用户代码(算子逻辑)开始执行。

  • 执行

    • 线程执行 Task.run() 方法。

    • 这通常涉及:

      • 初始化算子状态(从状态后端恢复或初始化空状态)。

      • 打开算子(调用算子的 open() 方法)。

      • 进入主循环:从输入通道(InputGate)读取数据 -> 调用算子链中的 processElement/processWatermark/processLatencyMarker 等方法处理数据 -> 将结果写入输出通道(ResultPartitionWriter)。

      • 处理检查点(Checkpoint)触发、任务取消(Cancel)等事件。

  • 结束

    • 正常结束:当所有输入数据处理完毕(达到数据结束标记)或收到 JobManager 的正常停止指令时,任务会优雅结束:调用算子的 close() 方法,清理资源,报告完成状态。

    • 异常结束:如果任务执行中抛出未捕获的异常、被外部取消(如 Failover)、或超时等,线程会异常终止。TaskExecutor 会捕获异常,清理资源,并向 JobManager 报告任务失败。JobManager 会触发重试或整个作业的 Failover。

4.4 节点的状态

节点状态:

节点部署后,在JM中用Execution表示,在TM中用Task表示。

它们有共同的状态,且Task 的状态变化会通过 RPC 主动上报给 JobManager,JobManager 更新 ExecutionGraph 中对应 Execution 的状态。状态共有如下几种:

  1. CREATED (已创建):

    • 顶点刚被创建时的初始状态。

    • 尚未请求任何资源,也未被调度。

  2. SCHEDULED (已调度):

    • 核心状态! 表示该顶点已被调度

    • 调度器已成功为其请求并分配到了所需的 Slot

    • 该顶点的 TaskDeploymentDescriptor (TDD) 已生成已发送给目标 TaskExecutor (通过 RPC)。

    • 此时,该顶点的输出 IntermediateResultPartition 状态会被设置为 SCHEDULED 这满足了依赖它的下游顶点的调度条件(输入分区可消费)。

    • 注意: 任务尚未在 TaskExecutor 上启动。它可能在 TaskExecutor 的接收队列中,或者 TaskExecutor 正在创建 Task 对象。

  3. DEPLOYING (部署中):

    • 可选的中间状态 (并非所有实现都显式使用)。

    • 表示目标 TaskExecutor 已接收到 JobMaster 发来的 TDD,并开始创建对应的 Task 对象和配置执行环境。

    • 任务尚未开始执行用户代码 (SourceFunction.run() 或 processElement())。

  4. RUNNING (运行中):

    • 核心状态! 表示任务已在 TaskExecutor 上成功启动正在执行

    • TaskExecutor 已将 Task 对象提交给线程池,Task.run() 方法正在执行。

    • 任务已完成初始化 (调用了所有算子的 open() 方法),并已通知下游其输出分区可用。

    • 任务可能正在:

      • (Source) 调用 SourceFunction.run() 产生数据。

      • (非 Source) 从 InputGate 读取数据并进行处理。

      • 处理检查点屏障 (Checkpoint Barrier)。

      • 将结果写入 ResultPartitionWriter

    • 任务会定期向 JobMaster 发送心跳,表明其存活。

  5. FINISHED (已完成):

    • 表示任务已成功完成其所有工作

    • 对于流任务:通常意味着收到了来自上游的终止信号 (如 EndOfPartitionEvent) 并处理完了所有输入数据。

    • 对于批任务:意味着处理完了分配给它处理的所有数据分片。

    • 任务已调用所有算子的 close() 方法进行清理。

    • 任务释放了占用的 Slot 资源 (通知 JobMaster)。

    • 任务的输出 IntermediateResultPartition 状态变为 PRODUCED (如果所有并行任务都完成) 或 FINISHED

  6. CANCELING (取消中):

    • 表示任务收到了取消请求 (例如用户手动取消作业、发生全局故障触发取消),正在执行取消操作。

    • 任务会尝试中断执行线程 (如果可能且安全),并尽快调用算子的 close() 方法。

    • 这是一个短暂的中间状态。

  7. CANCELED (已取消):

    • 表示任务已被成功取消

    • 任务已停止执行,并完成了清理工作。

    • 任务释放了占用的 Slot 资源。

    • 任务的输出 IntermediateResultPartition 状态变为 CANCELED

  8. FAILED (已失败):

    • 核心状态! 表示任务在执行过程中遇到了未处理的异常而失败。

    • 任务执行线程因异常退出。

    • 任务会向 JobMaster 报告失败原因 (FailureEnricher 可能添加额外信息)。

    • JobMaster 根据配置的重启策略 (Restart Strategy) 决定是否以及如何重新调度该任务 (可能导致整个 ExecutionGraph 重启或仅重启失败任务链)。

    • 任务占用的 Slot 会被释放。

    • 任务的输出 IntermediateResultPartition 状态变为 FAILED

4.5 节点部署流程

前面介绍了一些基本的状态、TaskExecutor等等,下面基于前面的基础知识,详细说明节点部署的全流程:

 从 ExecutionGraph 到物理部署 (JobMaster 侧):

  1. ExecutionGraph 就绪:

    • 客户端提交 JobGraph。

    • JobMaster 将 JobGraph 转换成并行化的、包含所有并行子任务(ExecutionVertex)和中间结果分区(IntermediateResultPartition)的 ExecutionGraph

    • ExecutionGraph 代表了逻辑执行计划在物理层面的并行化视图,包含任务、数据交换方式(点对点、广播等)、容错信息(检查点/保存点)等。

  2. 资源请求 (Slot Allocation):

    • JobMaster 的 Scheduler(通常是 DefaultScheduler)遍历 ExecutionGraph

    • 对于每个 ExecutionVertex,调度器计算其所需的资源规格(ResourceProfile,主要是托管内存大小)。

    • 调度器通过 SlotPoolService 向 ResourceManager 请求相应数量和规格的 Slot。Slot 是 TaskManager 上资源的抽象单位(CPU、内存)。

  3. Slot 分配:

    • ResourceManager 从注册的 TaskManager 资源池中选择满足要求的 Slot(可能从现有 TaskManager 分配,也可能触发启动新的 TaskManager)。

    • 选中的 Slot 被分配给 JobMaster 的 SlotPoolService

  4. 部署描述符生成 (TaskDeploymentDescriptor - TDD):

    • 一旦一个 ExecutionVertex 所需的所有输入数据分区(上游任务产生的)都处于可消费状态( SCHEDULED 以上都可以),并且其所需的 Slot 已分配,调度器就会触发该顶点的部署。

    • JobMaster 为该 ExecutionVertex 创建一个 TaskDeploymentDescriptor (TDD)。TDD 包含了该特定子任务执行所需的所有信息:

      • Job ID / Job Master Actor Path: 用于任务与 JobMaster 通信(心跳、状态更新)。

      • Execution Attempt ID: 该任务执行尝试的唯一标识(用于容错)。

      • 顶点配置: 序列化的 JobInformation(作业主类、配置等)和 TaskInformation(该子任务的算子链、输入/输出格式、检查点配置、用户代码类加载器等)。

      • 输入信息:

        • 输入通道数量。

        • 每个输入通道对应的 ResultPartitionID(上游任务产生的特定数据分区)。

        • 数据消费方式(点对点、广播等)。

        • 对应上游 TaskManager 的位置信息(用于建立网络连接)。

      • 输出信息:

        • 输出 ResultPartitionDescriptor(描述该任务将产生的数据分区的类型、位置等)。

      • Slot 信息: 分配到的 Slot 的 ID 和位置(哪个 TaskManager)。

      • 检查点配置: 序列化的检查点配置对象。

      • TaskManager 上的文件缓存: 需要预先传输到 TaskManager 的 JAR 包和其他文件列表。

  5. 任务分发:

    • JobMaster 通过 RPC(通常是 Akka 或 Netty)将 TDD 发送到持有分配给该任务 Slot 的 TaskExecutor(TaskManager 的核心组件)。

    • 分发通常是异步批量进行的。调度器会考虑拓扑顺序(例如优先调度 Source 任务)和资源可用性,但分发本身是多线程并行执行的。

注意,在以上过程中:

  • 遍历顶点是顺序的: 调度器需要按逻辑顺序(如拓扑顺序、调度策略决定的顺序)逐个处理 ExecutionVertex 来决定是否需要为其请求 Slot。

  • Slot 请求的发出是并发/异步的: 调度器向 SlotPoolService 提交请求,以及 SlotPoolService 向 ResourceManager 发送请求,都是非阻塞和并发的。一个顶点的请求发出后,下一个顶点的请求可以立即发出,无需等待前一个请求的响应。

  • ResourceManager 处理是并发的: RM 并行处理接收到的多个 Slot 请求,并行查找资源池,并行分配 Slot 并返回响应。

  • 分配结果的接收是并发的: SlotPoolService 和 Scheduler 并发地接收和处理来自 RM 的 Slot 分配结果通知。

TaskManager 侧的任务执行 (TaskExecutor 侧):

  1. 接收 TDD:

    • TaskExecutor 的 RPC 端点接收到 JobMaster 发来的 submitTask(TaskDeploymentDescriptor tdd) 调用。

  2. 创建 Task 对象:

    • TaskExecutor 反序列化 TDD 中的 TaskInformation 和其他必要信息。

    • 使用 TaskInformation 和 TDD 中的其他元数据(输入/输出描述符、JobID、ExecutionID 等)实例化一个 Task 对象。Task 是 Flink 运行时执行的基本单位,它封装了一个算子链(可能包含多个算子,如 Source -> Map -> Sink)。

  3. 配置执行环境:

    • 根据 TDD 配置 Task 的 Environment 对象。这包括:

      • 设置输入 InputGate:连接到上游任务产生的 ResultPartitionInputGate 负责从网络(或本地)读取数据。

      • 设置输出 ResultPartitionWriter:用于将任务产生的数据写入 ResultPartition,供下游任务消费。

      • 设置状态后端(如果启用检查点)。

      • 配置用户代码类加载器(加载用户定义的算子类)。

      • 初始化算子链中的每个算子(调用 Operator#setup 方法)。

  4. 线程分配与执行:

    • 关键点: 每个 Task 由一个独立的线程执行。

    • TaskExecutor 内部维护一个线程池(通常是 ForkJoinPool 或固定大小的线程池,可通过 taskmanager.numberOfTaskSlots 配置间接影响)。

    • TaskExecutor 将实例化好的 Task 对象 提交给这个线程池执行。具体执行调用的是 Task.run() 方法。

    • 线程模型:

      • Slot 不是线程! Slot 是资源的容器(内存)。一个 TaskManager 有 N 个 Slot(taskmanager.numberOfTaskSlots 配置项)。

      • 每个 Slot 可以运行一个或多个 Task。 在 Flink 中,一个 Slot 内可以运行由算子链接形成的 Task(一个线程)。默认情况下,一个 Slot 运行一个 Task(一个线程)。开启 Slot Sharing 后,同一个 Slot 内可以运行来自同一个 Job 的多个不同算子链(不同 Task),这些 Task 共享 Slot 的内存资源,但每个 Task 仍然由独立的线程执行。例如,一个 Slot 里可能同时运行一个 Source Task 线程和一个 Map Task 线程。

      • 总结: TaskManager 的总并行线程数上限 ≈ taskmanager.numberOfTaskSlots * max(每个Slot内共享的Task数) 通常配置为 Slot 数等于 CPU 核心数,且每个 Slot 运行一个 Task(即一个线程),这样线程数大致等于核心数。

  5. Task.run() 执行流程:

    • 初始化阶段:

      • 调用算子链中所有算子的 Operator#open() 方法(用户初始化代码在此执行)。

      • 向 JobMaster 注册(确认任务开始执行)。

      • 建立与上游 ResultPartition 的网络连接(初始化 InputGate)。

      • 通知下游消费者(InputGate)本任务的 ResultPartition 已就绪。

    • 主循环阶段 (处理数据):

      • 对于 Source Task: 主动调用 SourceFunction.run() 产生数据。

      • 对于 非 Source Task: 循环从 InputGate 请求数据(BufferOrEvent)。

      • 数据到达后,InputGate 将数据传递给算子链的头部算子。

      • 数据在算子链内流动(OneInputStreamOperator#processElement / TwoInputStreamOperator#processElement1/2),可能经过转换、计算、聚合等操作。

      • 处理后的数据被算子链尾部的算子通过 ResultPartitionWriter 写入到对应的 ResultPartition 中。

      • 网络交互: InputGate 从网络缓冲区读取数据(来自上游),ResultPartitionWriter 将数据写入网络缓冲区(发送给下游)。这部分由 Netty 线程高效处理,Task 线程主要处理业务逻辑。

      • 检查点屏障处理: 当接收到检查点屏障(Checkpoint Barrier)时,任务会触发异步快照操作(对齐阶段、异步快照状态),完成后将屏障继续传递给下游。

    • 结束阶段:

      • 正常结束: 所有输入数据处理完毕(对于流任务可能是收到终止信号),调用算子链中所有算子的 Operator#close() 方法(用户清理代码在此执行)。

      • 异常结束: 发生未捕获异常,任务标记为失败,向 JobMaster 报告失败原因。JobMaster 根据策略(重启策略)决定是否重新调度该任务。

      • 无论正常或异常结束,最终都会释放占用的 Slot 资源(通知 JobMaster)并清理 ResultPartition 等资源。

五、数据传输

5.1 内存分配

LocalBufferPool是Task级别的

NetwordBufferPool是TM级别的

5.2 传输流程

我们这里只探讨远程TM之间的数据交互,本地交互没有背压,逻辑简单,就不讨论了

1. 每个产出数据会被序列化成字节数组存储到PipelinedSubpartition中的一个内存块(BufferConsumer)中

ArrayDeque<BufferConsumer> buffers

当内存块满了之后会去localbufferpool中再去申请内存块

2. 第一个数据到达时,会申请内存块的之后会通知下游,以后的数据不会通知

3. 通知到下游后下游会发送初始的credit

4. 上游将接收到的 Credit 值累加到该下游对应的 creditsAvailable 计数器上

5.  上游检查 creditsAvailable > 0 并且 本地有数据 Buffer(之前申请的内存块) 可发送。

6. 上游发送数据: 调用 writeAndFlushNextMessageIfPossible()(或其内部调用的方法)真正将 Buffer 数据封装成 Netty 消息发送出去。

7. 每发送一个 Buffer,上游的 creditsAvailable 计数器减 1。

8. 5-7过程会不断的循环

8. 下游每次收到数据会根据backlog(上游积压数)选择是否更新Credit,下游处理完Buffer也会更新Credit,如果更新的话上游就会重复第4步骤

9. 下游RemoteInputChannel收到buffer后会从内存池localbufferpool中申请内存块存储相应数据,申请到的内存块放到

ArrayDeque<Buffer> receivedBuffers

中,并把自己放到等待处理的List中

ArrayDeque<InputChannel> inputChannelsWithData

10 下游有线程在不断的循环从inputChannelsWithData中取出对应的channel,然后获取channel中的receivedBuffers的buffer进行处理,默认采取阻塞模式读,即如果inputChannelsWithData没数据就会调用inputChannelsWithData.wait()

六、高可用

6.1 节点高可用

首先要看Flink的运行模式,

1. Local Mode (单机模式)

2. Standalone Cluster Mode (独立集群模式)

3. Apache Hadoop YARN Mode

  • YARN Session Mode:

    • 先向 YARN 申请一个长期运行的 Flink 集群(包含 JobManager 和 TaskManager 容器)。这个集群就像一个资源池。

    • 用户随后可以向这个 Session 集群提交多个作业。这些作业共享集群资源(JobManager, TaskManagers)。

    • 类似 Standalone 集群,但资源由 YARN 管理。

  • YARN Per-Job Mode:

    • 每个提交的作业单独向 YARN 申请资源。YARN 会为每个作业启动一个专属的 JobManager 和一组 TaskManager。

    • 作业之间资源隔离性最好。

    • 作业完成后,其占用的所有资源(JobManager + TaskManagers)会被释放回 YARN。

  • YARN Application Mode:

    • 类似于 Per-Job Mode,也是为每个应用(Application)启动独立的集群。

    • 关键区别: main() 方法在 JobManager 上执行(在 YARN 的 Application Master 容器内),而不是在提交作业的客户端上执行。这解决了客户端依赖和负载问题,特别适合需要复杂依赖或大量参数传递的应用。

    • 是目前在 YARN 上运行 Flink 生产作业的推荐模式(优于 Per-Job),因为它更好地解耦了客户端和集群。

4. Kubernetes Native Mode (云原生模式的主要代表)

总结:因为我的工作环境用的是Per-Job Mode,所以我这里只讨论Per-Job Mode

当TM节点挂了,Flink的恢复流程(来自DeepSeek):

  1. 停止所有数据处理: 当 JM 检测到故障(TM 挂掉或自身挂掉被新 JM 接管)并决定启动恢复流程时,它做的第一件事通常是停止整个作业当前的数据处理流(或至少停止受影响子图的数据流)。这是为了确保一个干净的恢复起点。

  2. 重启所有任务:

    • TM 故障: JM 会重启所有受故障 TM 影响的任务(即那些在故障 TM 上运行的任务)。其他健康的 TM 上运行的任务不会被 JM 主动停止或重启。

    • JM 故障: 在新的 Leader JM 接管后,它会重新调度并启动整个作业的所有任务(相当于整个作业重启)。

  3. 状态恢复指令: JM 会向所有需要启动的任务(无论是新启动替换故障任务的副本,还是健康 TM 上被要求重启的任务,或者在 JM 故障恢复时所有任务)发送指令,明确要求它们从指定的最新成功检查点恢复状态

  4. 健康任务的状态重置: 这是最关键的一点:

    • 那些在健康 TM 上持续运行的任务,虽然在物理进程上没有失败,但它们的内存状态已经“超前”于我们选定的恢复点(最新的完整检查点)。它们可能已经处理了检查点之后的一些数据,更新了本地状态。

    • 为了确保整个作业状态的一致性(即所有任务都从完全相同的逻辑点开始),JM 必须命令这些健康的任务也丢弃掉它们当前内存中持有的、属于检查点之后的状态更新,并从持久化存储中重新加载该任务在选定检查点时刻的状态快照

    • 简单说:所有任务(新启动的副本、健康的任务)在恢复开始时,都需要将自己的状态回滚到那个全局一致的检查点状态。健康任务“知道”自己该回滚到哪里,因为它们持续参与了检查点机制,保存了状态句柄。

  5. 数据重放与状态对齐: 在所有任务(包括健康的和新的)都成功加载了检查点状态后:

    • JM 命令 Source 任务从检查点对应的位置开始重新读取数据(重放)。

    • 数据流重新建立。

    • 为了实现 Exactly-Once 语义,Flink 会进行状态对齐:任务在开始处理重放数据中检查点之后的部分之前,会等待接收齐所有上游在检查点时刻“飞行中”的数据(这些数据在故障发生时可能未被下游处理完)。这确保了即使健康任务重置了状态,它们也不会丢失或重复处理任何数据,状态更新会基于重放的数据流精确地重新计算出来。

当JM挂了,Flink的恢复流程(来自DeepSeek):

基础知识:

  1. ❌ 无预先部署的备 JM
    在 YARN per-job 模式下,每个作业独立启动一个 Application Master(AM),该 AM 即 Flink JobManager(JM)。不存在预先启动的备用 JM 进程。作业运行期间只有 一个 JM 实例(即 AM)。

  2. ✅ ZK 的选举对象是 JM 内部的领导者组件
    当配置 high-availability: zookeeper 时,ZK 的选举作用发生在 单个 JM 内部,而非多个 JM 之间:

    • Flink JM 由多个子服务构成(如 DispatcherResourceManagerJobMaster)。

    • ZK 用于选举这些子服务的 Leader(例如选举 Dispatcher 的 Leader),确保 JM 内部服务的高可用

    • 不是选举多个 JM 实例中的主备!

  3. 🚀 JM 整体故障恢复由 YARN 接管
    当整个 JM(AM)进程崩溃时:

    • YARN ResourceManager(RM) 检测到 AM 失败。

    • YARN RM 根据配置的 yarn.application-attempts 重启一个新的 AM 容器(即新 JM)。

    • 新 JM 启动后,通过 ZK 获取原作业的元数据指针(如 JobGraph 路径),完成恢复。

流程:

  1. TM 会停止执行任务:

    • 当原 JM 崩溃时,它与其管理的所有 TM 之间的心跳和 RPC 连接会立即中断。

    • TM 配置了 heartbeat.timeout。当 TM 在这个超时时间内无法收到来自 JM 的心跳响应或任何通信时,它会判定当前的 JM 已经失效。

    • 判定 JM 失效后,TM 会主动关闭自己正在执行的所有任务 (Tasks)。 这是标准行为,因为任务的协调者(JM)已经不存在了,继续执行可能造成状态不一致或数据丢失。

    • 关闭任务后,TM 本身不会立即退出。它会尝试向 YARN ResourceManager 重新注册(taskmanager.registration.initial-backoff 和 max-backoff 参数控制重试间隔和次数),等待新的 JM 来认领它。然而,在 JM 崩溃恢复期间,这个重注册过程通常不会成功,因为新 JM 尚未完全启动或尚未通知 YARN RM 它已准备好接收 TM。

  2. YARN 会清理原 AM 的容器:

    • YARN RM 检测到 AM 失联(通常是 AM-RM 心跳超时)后,会将该 AM 标记为失败。

    • YARN RM 的一个核心职责是管理集群资源。当它判定一个 AM 失败后,它会负责清理该 AM 申请的所有容器,包括该 AM 自己的容器(原 JM)和它申请的所有 TM 容器

    • 这个清理过程是通过 yarn.resourcemanager.amliveliness-monitor.expiry-interval-ms (默认 600000ms) 和 yarn.am.liveness-monitor.expiry-interval-ms (默认 600000ms) 等参数控制的。一旦超时,YARN RM 会主动发送 SIGKILL 信号给这些容器。

    • 因此,即使 TM 自己没有主动退出(在等待重注册),YARN RM 最终也会强制杀死这些 TM 容器,回收它们占用的资源(CPU、内存)。

  3. 新 JM 重新申请 TM 资源是正确的:

    • 由于步骤 1 和 2,原有的 TM 容器最终都会被清理掉(任务已停止,容器将被回收)。

    • 新 JM 启动后,它面对的是一个“空”的 TaskManager 集群(旧的 TM 要么自己关闭了,要么被 YARN 杀死了)。

    • 为了恢复作业执行,新 JM 必须向 YARN RM 重新申请所需数量的 TaskManager 资源。这是唯一能获得计算资源来运行任务的方式。

    • 新申请的 TM 是全新的容器实例,与之前的 TM 无关。

  4. 恢复过程:

    • 新 JM 从 ZK 获取轻量级元数据(leader 信息、JobGraph HDFS 路径、最新 Checkpoint HDFS 路径)。

    • 新 JM 从 HDFS (high-availability.storageDir) 下载完整的恢复数据(JobGraph、Checkpoint 元数据、用户 Jar 包)。

    • 新 JM 向 YARN RM 申请新的 TM 容器资源。

    • 新的 TM 容器启动,向新 JM 注册。

    • 新 JM 将 JobGraph 中的任务 (Tasks) 调度到新注册的 TM 上运行。

    • 关键点:任务恢复:新 JM 指示所有任务(尤其是那些有状态的任务)从 HDFS 上持久化的最新 Checkpoint 或 Savepoint 恢复它们的状态。对于 Source 任务,它们会根据 Checkpoint/Savepoint 中记录的读取位置(例如 Kafka offset)进行“重放”,确保数据一致性(精确一次或至少一次语义,取决于配置)。

6.2 作业高可用

6.2.1 作业异常

平时我们经常会说,作业异常自动拉起,这个自动拉起指的是 Task (任务) 的重启次数,而不是整个 TaskManager (TM) 进程的重启次数。 这里的“拉起”更准确地说是任务的重新调度和执行

  • 当 Flink 作业中的一个 Task (例如一个 MapFunction、一个 FlatMapFunction 等算子实例) 由于各种原因失败时(包括抛出未捕获的异常),Flink 的容错机制会根据配置的 重启策略 来决定如何处理。

  • 重启策略会尝试在可用的资源(Slot)上重新启动这个失败的 Task。这个重启过程可能发生在:

    • 同一个 TM 的另一个 Slot 上(如果该 TM 还有资源且健康)。

    • 另一个健康的 TM 上(集群模式)。

    • 甚至是同一个 Slot 上(如果 TM 进程没有崩溃)。

  • 关键点: 重启策略配置的“重启次数”(例如 restart-strategy.fixed-delay.attempts: 3)限制的是同一个 Task 实例(或者更精确地说,是同一个 ExecutionVertex)被重新调度执行的次数上限

  • 如果重启次数已达到上限:JobManager 会认为该 Task 无法成功恢复,进而导致整个 Job 失败

6.2.1 状态后端

有些作业需要记录消费过程中的一些信息,Flink提供了状态供我们存储信息,并在作业崩溃后恢复

Flink提供了两种状态,一种是算子状态,一种是键控状态

  1. 算子状态 (Operator State / Non-Keyed State)

    • 作用域: 绑定到一个并行算子实例(一个任务 Task)。同一个算子并行度内的不同实例,其 Operator State 是彼此独立的。

    • 访问方式: 不需要数据流按 Key 分区。算子内的所有数据都可以访问这个状态。

    • 典型应用场景:

      • Source 连接器: 存储 Kafka 分区的偏移量 (offset)。每个 Source 实例负责读取特定分区,存储对应分区的 offset。

      • 维护全局计数器或聚合 (谨慎使用): 例如,统计整个作业处理的总记录数(如果并行度改变,需要小心处理状态重分配)。

      • 广播状态 (Broadcast State): 一种特殊的 Operator State。它把一个低吞吐流(广播流)的状态广播给所有下游算子的并行实例,使得所有实例都持有完全相同的状态副本。常用于动态配置规则、模式匹配等。

    • 状态重新分配 (Rescaling): 当作业并行度改变时(如增加或减少 TaskManager 或 Slot),Flink 需要将 Operator State 重新分配到新的(可能更多或更少的)算子实例上。用户通常需要实现 CheckpointedFunction 或 ListCheckpointed 接口来定义状态如何拆分 (snapshotState) 和合并 (restoreState / initializeState)。

  2. 键控状态 (Keyed State)

    • 作用域: 绑定到数据流中每个 Key。更准确地说,是绑定到键控流 (KeyedStream) 中每个 Key 的每个算子实例

    • 访问方式: 必须在 KeyedStream 上使用。算子只能访问当前处理数据所属 Key 对应的状态。不同 Key 的状态是隔离的。

    • 数据结构: Flink 提供了几种原生的、类型化的数据结构来存储 Keyed State:

      • ValueState 存储单个值 (e.g., T)。每个 Key 对应一个值。

      • ListState 存储一个值的列表 (e.g., List)。每个 Key 对应一个列表。

      • MapState 存储一个键值映射 (e.g., Map)。每个 Key 对应一个 Map。

      • ReducingState / AggregatingState 存储一个值,该值是应用了 ReduceFunction 或 AggregateFunction 后聚合的结果。新数据到来时会自动聚合更新状态值。

    • 典型应用场景: 这是 Flink 中最常用、最强大的状态类型。

      • 窗口聚合 (Windowed Aggregations): 计算每个 Key 在窗口内的计数、求和、最大值等。

      • 状态机 (State Machines): 实现复杂事件处理 (CEP) 或模式检测,状态转换依赖于当前 Key 的历史事件序列。

      • 去重 (Deduplication): 记录每个 Key 最近看到的事件 ID 或时间戳,用于检测或过滤重复数据。

      • 实时特征计算: 如计算每个用户的会话时长、点击率、最近 N 次行为等。

6.2.2 状态存储位置

上面提到了状态,那么状态存储在哪里呢?Flink提供了几种不同的存储方法

1. MemoryStateBackend (内存状态后端)

  • 状态存储位置: 堆内内存 (Heap Memory)

  • 机制:

    • 工作状态 (Working State): 直接存储在 TaskManager JVM 的堆内存中。例如,ValueStateListState 等对象直接存在于 Java 堆上。

    • 检查点 (Checkpoint): 当触发 Checkpoint 时,状态会被序列化,然后发送给 JobManager,并存储在 JobManager 的堆内存中。

  • 特点:

    • 速度最快: 内存访问,无序列化/反序列化开销(工作状态)。

    • 容量最小: 受限于 TaskManager 堆大小(工作状态)和 JobManager 堆大小(检查点)。仅适用于小状态(例如测试、实验、极小状态流)

    • GC 压力大: 大量状态对象直接存在于堆上,会显著增加垃圾回收负担和停顿时间。

  • 总结: 纯堆内存储。

2. FsStateBackend (文件系统状态后端)

  • 状态存储位置: 堆内内存 (Heap Memory) + 外部文件系统 (External File System)

  • 机制:

    • 工作状态 (Working State): 存储在 TaskManager JVM 的堆内存中。与 MemoryStateBackend 相同。

    • 检查点 (Checkpoint): 当触发 Checkpoint 时,状态会被序列化,然后异步写入配置的分布式文件系统或对象存储(如 HDFS, S3, GCS, OSS, NFS 等)。

  • 特点:

    • 工作状态在堆内: 访问快,但受 TaskManager 堆大小限制。

    • 检查点在外部: 状态大小不再受 JobManager 内存限制,理论上只受文件系统容量限制,支持大状态

    • 恢复较慢: 恢复时需要从外部存储读取并反序列化状态。

    • 仍有 GC 压力: 工作状态在堆内,大状态应用 GC 压力依然显著。

  • 总结: 工作状态堆内存储,检查点外部存储。

3. RocksDBStateBackend (RocksDB 状态后端)

  • 状态存储位置: 堆外内存 (Off-Heap Memory) + 本地磁盘 (Local Disk) + 外部文件系统 (External File System)

  • 机制:

    • 工作状态 (Working State):

      • 主要存储: 存储在 TaskManager 节点的本地磁盘文件中(由 RocksDB 管理)。

      • 内存缓存:

        • Write Buffer (MemTable): 新写入/更新的数据首先进入位于 JVM 堆外内存的 MemTable(可配置大小)。

        • Block Cache: 用于缓存频繁读取的 SST 文件块,位于 JVM 堆外内存(使用 Flink 的 Managed Memory 配额)。这是提升读取性能的关键

    • 检查点 (Checkpoint):

      • 增量 Checkpoint (默认推荐): 只将 RocksDB 中自上次 Checkpoint 以来变化的数据文件复制到配置的外部文件系统(如 HDFS, S3)。效率高。

      • 全量 Checkpoint: 将整个 RocksDB 数据库目录复制到外部文件系统。

6.2.3 分布式快照算法

上面提到了状态的存储,那对于一个流式系统来说,什么时候触发状态的存储呢?

Flink一般管这种触发点叫checkpoint,存储的内容叫做checkpoint文件,它的生成基于 Chandy-Lamport算法

Chandy-Lamport算法用于有向无环图的checkpoint生成,用于有环图的算法也有,但我没看懂

如果我们的job作业中没有使用状态,就是普通的消费数据,那其实是用不到checkpoint(不用的话任务重启的时候可能有丢很少数据的风险,还是看业务了)

具体触发checkpoint的流程为(依旧来自于deepseek)

  1. Checkpoint Coordinator 发起 Checkpoint:

    • JobManager 中的 CheckpointCoordinator 组件负责整个作业的 Checkpoint。

    • 它会周期性地(基于配置的 checkpointInterval)向 JobManager 中管理所有 TaskManager 的组件发送触发消息。

    • 关键点: 这个触发是 全局统一 的。CheckpointCoordinator 会为这次 Checkpoint 生成一个唯一的、单调递增的 ID(称为 checkpointId)。

  2. Barrier 注入:

    • JobManager 通过 TaskManager 向该作业的 所有 Source Task 发送触发消息,消息中包含当前的 checkpointId

    • 多 Source 处理的核心: 每个 Source Task 在接收到触发消息后:

      • 在自己的数据流中插入一个特殊的 Barrier (n),其中 n 就是当前的 checkpointId

      • 这个 Barrier 会被插入到 Source 当前正在处理或即将发出的 所有数据记录之后

      • Source Task 开始执行自己状态的快照(例如 Kafka Source 会记录当前消费的 offset 集合)。

      • 重要: 每个 Source Task 是 独立地、几乎同时地 注入属于同一个 checkpointId 的 Barrier。它们不需要彼此等待或同步注入时刻。Barrier 会跟随正常的数据流向下游流动。

  3. Barrier 传播与对齐:

    • Barrier 随着数据记录一起流向下游算子。

    • 当算子(Operator)有多个输入流时(这正是多 Source 或 keyBy/rebalance 等操作导致的结果),Checkpoint 的关键机制 Barrier 对齐 开始发挥作用:

      • 对齐阶段: 算子会为每个输入通道维护一个小的缓冲区。当它从某个输入通道接收到 Barrier n 时:

        • 它会将该通道 阻塞,暂停从该通道消费数据(但该通道接收到的 Barrier n 之后 的数据依然会被放入缓冲区暂存,不会被处理也不会被丢弃)。

        • 它继续处理其他 尚未接收到 Barrier n 的输入通道的数据。

      • 快照触发: 只有当算子从 所有 输入通道都收到了 Barrier n 时:

        • 算子知道,在它所有输入流中,属于 Checkpoint n 之前的所有数据都已到达并被处理(或已缓存在 Barrier 前的缓冲区)。

        • 此时,算子:

          • 执行其自身状态的快照(例如 WindowOperator 的快照包含已触发但未清除的窗口状态、定时器等)。

          • 将 Barrier n 广播 到它所有的输出通道(给下游算子)。

        • 在广播完 Barrier n 之后,算子解除对所有输入通道的阻塞,开始处理之前被阻塞通道缓冲区中的数据。

    • 单输入算子: 如果算子只有一个输入通道,它在接收到 Barrier n 时,会立即执行状态快照,然后将 Barrier n 发送到其所有输出通道。无需对齐。

  4. 状态快照与异步写入:

    • 当算子(无论是 Source、Transformation 还是 Sink)执行自身状态快照时(步骤 2 和 3 中提到的):

      • 这个快照操作通常是 异步 执行的,以避免阻塞主数据处理线程。

      • 算子将状态拷贝到一个临时位置(内存或本地磁盘),然后由后台线程异步地将这份拷贝写入配置的 持久化存储后端(如 HDFS, S3, NFS, RocksDB 等)。

      • Exactly-Once 保证的核心: 在 Barrier 之后到达的数据(属于 Checkpoint n+1)不会影响 Checkpoint n 的状态快照内容。Barrier 对齐确保了快照点的一致性。

  5. 确认 (Acknowledge) 与完成:

    • 当 TaskManager 上的一个 Task(算子的一个并行实例)成功完成 以下两件事:

      1. 将自己负责的所有状态(算子状态和/或 Keyed State)异步写入持久化存储。

      2. 将 Barrier n 发送给了它的所有下游任务(对于 Sink 任务,没有下游,发送 Barrier 这一步自然完成)。

    • 它会向 JobManager 的 CheckpointCoordinator 发送一个 Ack 消息。这个消息包含:

      • checkpointId (n)

      • 该 Task 的状态句柄(State Handle) - 指向持久化存储中该 Task 状态文件位置的指针。

      • 该 Task 的算子 ID 和子任务索引(Subtask Index)。

    • 多 Source/多 Task 处理的核心: CheckpointCoordinator 会 收集所有 Task 发来的针对 checkpointId = n 的 Ack 消息。每个 Task 都是独立报告完成的。

  6. Checkpoint 完成:

    • 当 CheckpointCoordinator 收到了所有 Task 对同一个 checkpointId = n 的 Ack 消息后:

      • 它知道这次 Checkpoint n 在 整个作业的所有 Task 上都已成功完成。

      • 它将这次 Checkpoint n 的元数据(包括所有 State Handle 的集合、checkpointId、时间戳等)持久化 到高可用的存储(如 ZooKeeper 或配置的 StateBackend 支持的元数据存储)。

      • (可选)它可能会触发一些完成后的操作,如删除更早的、不再需要的 Checkpoint。

    • 此时,Checkpoint n 正式完成,可用于故障恢复。

6.2.4 exactly once

经常在网上能看到,Flink支持excatly once,这个once到底是什么once呢?是Flink消费source消费一次吗?还是只会在中间算子中只会计算一次?还是输出只有一次?

这里的excatly once指的是对于Flink中状态数据的计算只有一次,因为Flink的重启,异常恢复等等,source中的数据会被多次消费,同一个数据在中间算子中也会被也有可能多次被计算,也可能多次输出,但这些数据在整个流程中,只被统计了一次

七、新特性

7.1 窗口

首先Flink为什么要有窗口?

流数据本质上是无界的,但很多数据比如排序、TOP N等,都需要一个有界的数据来计算,所以设计窗口解决这部分排序

7.1.1 窗口存储的位置

如果key stream的window,存储在key state,如果是data stream的window,存储在Operator state

7.1.2 分配器

一个窗口算子内部通常会管理着多个(通常是大量的、动态变化的)窗口实例,如对于每分钟的滚动窗口 (TumblingEventTimeWindows.of(Time.minutes(1))), 分配器会为每个 2023-10-27 10:00:00 到 2023-10-27 10:01:00、 2023-10-27 10:01:00 到 2023-10-27 10:02:00 等这样的时间区间创建一个窗口实例

 WindowAssigner(分配器) 是 WindowOperator 内部使用的核心组件。它的职责非常明确且关键:对于到来的每一个数据元素,根据该元素携带的信息(主要是时间戳,有时也包括元素本身),计算出这个元素应该被分配到哪些窗口实例中

7.13 触发器

决定何时对窗口的内容调用窗口函数进行计算(并可能输出结果)。

我们拿滚动窗口来举例子

如果滚动窗口基于处理时间,那系统上会有个定时器来定时触发窗口的计算

如果滚动窗口基于事件时间,那触发窗口的计算就依赖于水位线

水位线代表 位于水位线表示时间之前的元素都已到达,如果水位线时间大于窗口时间,就会触发窗口的计算

水位线一般是在source节点产出的,它夹杂在正常数据中发出

有的节点可能上游有四五个节点,那么它的水位线是其接收的最小的水位线,看起来有点像barrier对齐。

7.14 整体流程

给个示例代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置事件时间

// 1. 定义 WatermarkStrategy
WatermarkStrategy<Order> strategy = WatermarkStrategy
        .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10)) // 允许10秒乱序
        .withTimestampAssigner((order, timestamp) -> order.getOrderTimestamp()); // 提取事件时间戳

// 2. 应用 WatermarkStrategy 到数据源流
DataStream<Order> orderStream = env.addSource(new OrderSource())
        .assignTimestampsAndWatermarks(strategy);

// 3. 定义基于事件时间的窗口处理
orderStream
        .keyBy(Order::getProductId)
        .window(TumblingEventTimeWindows.of(Time.minutes(5))) // 5分钟滚动窗口
        .reduce((o1, o2) -> ...) // 或 aggregate/process
        .print();
  1. OrderSource 产生订单事件流。

  2. assignTimestampsAndWatermarks(strategy) 算子:

    • 使用 TimestampAssigner 从每个 Order 对象中提取 orderTimestamp 作为该事件的事件时间戳。

    • 内部维护一个状态:currentMaxTimestamp(当前观察到的最大事件时间戳)。

    • 每当一个新事件到达,更新 currentMaxTimestamp = max(currentMaxTimestamp, orderTimestamp)

    • 周期性生成水位线(默认200ms): 每隔 200ms,计算新的水位线 watermark = currentMaxTimestamp - 10000(10秒),并将这个水位线记录插入到输出流中向下游传播。

  3. 下游的 keyBy 和 window 算子:

    • 接收带有事件时间戳的事件和水位线。

    • 对于每个 Key(ProductId),窗口算子维护着 5 分钟长的窗口(如 [10:00, 10:05)[10:05, 10:10))。

    • 当水位线 W(T) 到达窗口算子时,算子检查所有 Key 的所有窗口:对于结束时间 <= T 的窗口(例如,当 T >= 10:05 时,[10:00, 10:05) 窗口结束),触发该窗口的计算(执行 reduce 函数)。

    • 事件时间戳为 10:03 的订单,即使它在系统处理时间 10:08才到达(假设之前 currentMaxTimestamp 已经是 10:07),只要它在 10:07 - 10s = 9:57 之后到达(允许10s乱序),它仍然会被正确地放入 [10:00, 10:05) 窗口并被计算。如果它在水位线 W(10:05)(对应 currentMaxTimestamp=10:15)之后才到达,它就会被视为迟到数据(默认丢弃)。

八、批流一体

前面说了很多在流式计算下的情况,我们都知道Flink也支持批处理,它是如何支持批处理的呢?

比如对于Join,可以来一条数据做一次点查,批处理一般是先把全量数据load到机器,然后发往下游算子,Flink如何平衡这种情况?

首先Flink会去判断作业是批的还是流的,如果作业都是批处理则是批作业,反之有一个是流处理则是流作业。

对于批作业,Flink会使用一些批处理优化

在Flink批处理汇总,Flinkgroup by 、reduce等操作是上游算子全部计算完成才会去算下游算子,会发EndOfPartitionEvent表示上游数据已计算完成,并且区别于spark,Flink采用推送的方式写数据,而不是Spark的拉取数据。

有个常见问题就是,既然flink对于批处理也采取了另外一种方式,为什么说flink是批流一体的呢?

以下是来自deepseekd的回答:

1. 统一的编程模型:写一次,跑在批或流上

  • DataStream API: 开发者使用同一套 DataStream API 编写代码。无论是处理 Kafka 实时流 (env.addSource(kafkaSource)) 还是处理 HDFS 历史文件 (env.readTextFile(path)),代码逻辑(算子链:mapfilterkeyBywindowjoin完全一致。引擎根据 Source 的 Boundedness 自动适配底层执行策略。

  • Table API / SQL (更高层抽象):

    • 用户写一条标准 SQL SELECT ... FROM A JOIN B ON ...

    • Flink 优化器根据 A 和 B 是流表还是批表

      • 流表 -> 生成基于状态管理 + 时间属性的执行计划。

      • 批表 -> 生成基于 Sort-Merge-Join / Hash-Join 的执行计划。

    • 用户无需修改查询! 同一份 SQL 可同时用于实时报表(流)和历史数据分析(批)。

对比“调度层分流”方案: 需要维护两套代码(Flink 流作业 + Spark 批作业),即使逻辑相同。


2. 统一的执行引擎核心 (Runtime)

  • 相同的任务调度与资源管理: 无论是流作业还是批作业,都提交给同一个 JobManager,使用相同的 TaskManager 资源池,基于相同的分布式快照 (Checkpoint/Savepoint) 机制做容错/恢复。

  • 相同的状态后端 (State Backend): HashMapStateBackend (内存), EmbeddedRocksDBStateBackend (磁盘) 同时服务于:

    • 流处理: 存储窗口状态、双流 JOIN 状态。

    • 批处理: 在某些优化(如部分聚合)或复杂批处理算法中临时存储中间状态。

  • 相同的网络协议与序列化: 基于 Netty 的网络栈、高效的二进制序列化(如 Flink 自带的 TypeSerializer)同时用于流式的 Pipelined Shuffle 和批式的 Blocking Shuffle。

对比“调度层分流”方案: Flink 和 Spark 是两个独立引擎,调度器、网络协议、内存管理、状态存储完全不同。


3. 统一的核心抽象:数据即流 (Data as Stream)

  • 核心哲学: Flink 将批处理视为流处理的一个特例

    • 无界流 (Unbounded Stream): 永不结束的实时数据流(标准流处理)。

    • 有界流 (Bounded Stream): 有限、已知终点的历史数据集(即批处理)。

  • 带来的革命性优势:

    • 一致的语义: 时间语义(Event Time/Processing Time)、窗口计算、状态处理在批和流上具有完全一致的定义。例如,一个 Event Time 的滚动窗口在流上持续输出,在批上一次性输出最终结果,但逻辑相同。

    • 渐进式算法: 批处理算法可视为流处理算法在“输入结束”时的最终快照。例如,批处理的 Sort-Merge-Join 是流式 JOIN 状态管理在“数据有界”条件下的终极优化形态。

    • 真正的混合计算: 可轻松实现 “增量批处理” 或 “重放历史流”

      • Lambda 架构的终结: 无需维护实时流链路(Flink)和离线批链路(Spark/Hive),一套 Flink 作业即可处理实时数据并定期重跑修正历史结果(利用有界流重放)。

      • 流批同源: 同一份代码处理实时事件流和历史回填数据,保证结果一致性。

对比“调度层分流”方案: 流和批是割裂的概念,两套系统计算结果可能因实现差异而不一致(如窗口触发机制不同)。


4. 执行策略的“自适应”而非“分裂”

  • 关键认知: 您观察到的差异(Push vs Event 触发下游、Pipelined vs Blocking Shuffle)不是“两套引擎”,而是同一引擎在统一模型下对不同数据特性(有界/无界)的智能适配

    • 流处理模式 (无界流): 优先保证 低延迟、持续处理 -> 采用 Pipelined 数据传输,所有算子常驻内存,背压流量控制。

    • 批处理模式 (有界流): 优先保证 高吞吐、资源效率、利用有界性优化 -> 采用 Blocking 交换 + EndOfPartitionEvent 触发下游,允许中间结果落盘,使用 Sort-Merge 等算法。

  • 引擎的“一体性”体现在:

    • 自动切换: 引擎根据 Source 的 Boundedness 自动选择执行策略,无需用户干预。

    • 共享基础设施: 两种模式复用相同的网络栈、序列化器、内存管理器、故障恢复机制Blocking Shuffle 在 Flink 中也是通过流式引擎的扩展实现的(只是加了同步点)。

    • 统一优化器: Table API/SQL 的优化器是统一的,根据输入性质生成最优物理计划。


网站公告

今日签到

点亮在社区的每一天
去签到