Flink 高可用机制简述(Standalone 模式)
在 Flink 的高可用(HA)架构中,核心是对 JobManager 的主节点(Leader) 的管理与选举。
Flink
通过一套可插拔的服务接口,来实现高可用功能,主要涉及以下几个关键类:1.
HighAvailabilityServices
- Flink 高可用功能的统一入口。
- 提供用于 Leader 选举 与 Leader 监听 的服务实例。
- 不同部署模式(如 ZooKeeper、Kubernetes、Standalone)通过不同实现类来生成选举服务。
2.
LeaderRetrievalService
- 用于“监听”当前的 Leader。
- 当检测到 Leader 发生变化时,会调用
LeaderRetrievalListener.notifyLeaderAddress()
方法,通知对应组件更新地址。 - 常用于
ResourceManager
、TaskManager
等组件,获取最新的Dispatcher
/JobManager
地址。
3.
LeaderElection
- 是对
LeaderElectionService
的封装,代表一次具体的“参选过程”。 - 被动接收
LeaderElectionService
的选举结果,并代理调用LeaderContender
的方法,完成注册、注销、确认等操作。
4.
LeaderElectionService
- 提供具体的选举服务(例如基于
ZooKeeper
或Kubernetes
实现)。 - 负责整个选举生命周期:启动、竞选、发现已有 Leader 等。
- 是 HA 的核心抽象组件。
- 注意:
- 在 Standalone 模式 下,Flink 实际并不会使用
LeaderElectionService
接口的实现类。也就是说,在Standalone
模式下的源码路径中,LeaderElectionService
是被绕过的,这也是为什么你在调试启动流程时没有看到它被真正调用的原因。
- 在 Standalone 模式 下,Flink 实际并不会使用
5.
LeaderContender
- 这是一个接口,用于表示“竞选 Leader 的参与者”。
- 想要支持高可用的组件(例如:
JobManager
、Dispatcher
)都需要实现这个接口。 - 当被选为
Leader
时,会被回调grantLeadership()
方法。
说明
Flink
默认提供了两种 HA 实现:
- 基于
ZooKeeper
的实现:最常见、最成熟,适用于生产环境。 - 基于
Kubernetes API
的实现:用于在原生K8s
环境下管理 HA。
本笔记聚焦于 Standalone 模式
下的源码解析,因此不会深入 ZooKeeper
/ K8s
的实现。未来可能会单独撰写关于基于 ZooKeeper
的 Flink HA
实现细节。
StandaloneHaServices
Standalone 模式下的 HighAvailabilityServices
实现分析
在 Flink 中,HighAvailabilityServices
是高可用服务的顶层接口,提供如下核心能力:
- 返回各组件的
LeaderRetrievalService
:用于监听 Leader 地址变化; - 返回各组件的
LeaderElectionService
:用于注册参与 Leader 选举; - 提供任务状态(如 checkpoint/savepoint)存储相关服务;
- 提供作业图(JobGraph)持久化服务;
- 提供运行中 Job 的注册与发现服务。
StandaloneHaServices
的继承结构
HighAvailabilityServices (接口)
↑
AbstractNonHaServices (抽象类)
↑
StandaloneHaServices (实现类)
AbstractNonHaServices
表示“非高可用”的通用实现抽象;
StandaloneHaServices
是 Flink 在 standalone 模式 下提供的具体实现,本质上是一个“伪高可用”实现,用于本地或测试环境。
StandaloneHaServices#getResourceManagerLeaderRetriever
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return new StandaloneLeaderRetrievalService(resourceManagerRpcAddress, resourceManagerHostname);
}
直接返回了一个 StandaloneLeaderRetrievalService
实例;
该服务不会监听 ZooKeeper 或 Kubernetes 等外部系统的变化,而是返回 预设的地址;
一旦配置写死,整个系统中所有组件都会认为这个地址是 ResourceManager 的 Leader,不存在自动 failover 的机制。
StandaloneHaServices
的Leader初始化流程
在 Flink 的 Standalone 模式 中,不依赖 ZooKeeper 或 Kubernetes 来做 Leader 选举。取而代之的是一套轻量级的本地模拟机制,核心体现在以下几个方面:
1. Leader 地址字段初始化
StandaloneHaServices
中维护了如下成员变量:
private final String resourceManagerAddress;
private final String dispatcherAddress;
private final String clusterRestEndpointAddress;
这三个地址是从 Flink 配置中读取的(即当前启动节点的 RPC 地址);
因为没有外部协调组件,这些地址就是系统默认认可的 Leader 地址。
2. Leader Election 的伪实现
调用 getResourceManagerLeaderElection()
时,会返回:
@Override
public LeaderElection getResourceManagerLeaderElection(LeaderContender contender) {
return new StandaloneLeaderElection(contender);
}
- 使用的是
StandaloneLeaderElection
,是一个非真正选举的实现; - 内部自动分配一个
UUID
作为 leader session ID; - 然后立即调用
contender.grantLeadership(UUID)
,将该 contender 设置为 Leader。
也就是说:谁启动,谁就是 Leader,没有真正的竞选过程。
3.StandaloneHaServices
源码
public class StandaloneHaServices extends AbstractNonHaServices {
/** The fix address of the ResourceManager. */
private final String resourceManagerAddress;
/** The fix address of the Dispatcher. */
private final String dispatcherAddress;
private final String clusterRestEndpointAddress;
/**
* Creates a new services class for the fix pre-defined leaders.
*
* @param resourceManagerAddress The fix address of the ResourceManager
* @param clusterRestEndpointAddress
*/
public StandaloneHaServices(
String resourceManagerAddress,
String dispatcherAddress,
String clusterRestEndpointAddress) {
this.resourceManagerAddress =
checkNotNull(resourceManagerAddress, "resourceManagerAddress");
this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
this.clusterRestEndpointAddress =
checkNotNull(clusterRestEndpointAddress, clusterRestEndpointAddress);
}
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderElection getResourceManagerLeaderElection() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElection(DEFAULT_LEADER_ID);
}
}
@Override
public LeaderElection getDispatcherLeaderElection() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElection(DEFAULT_LEADER_ID);
}
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService("UNKNOWN", DEFAULT_LEADER_ID);
}
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(
JobID jobID, String defaultJobManagerAddress) {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(
defaultJobManagerAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderElection getJobManagerLeaderElection(JobID jobID) {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElection(DEFAULT_LEADER_ID);
}
}
@Override
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(
clusterRestEndpointAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderElection getClusterRestEndpointLeaderElection() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElection(DEFAULT_LEADER_ID);
}
}
}
StandaloneLeaderElection
在 Flink
的高可用架构中,LeaderElection
扮演了连接 LeaderContender
与 LeaderElectionService
的桥梁角色,而在 Standalone
模式下,StandaloneLeaderElection
则是该机制的本地实现版本。
StandaloneLeaderElection
简要结构
该类继承自 LeaderElection
,其内部包含两个核心成员变量:
UUID leaderSessionID
:模拟的 Leader 标识;LeaderContender contender
:高可用组件(如 ResourceManager、Dispatcher 等),实现了LeaderContender
接口。
启动逻辑:
- 通过调用
startLeaderElection()
方法:- 直接触发
contender.grantLeadership(leaderSessionID)
; - 模拟该组件成为 Leader;
- 直接触发
- 无需任何外部系统协作。
这也是为什么在 Standalone 模式中,启动即是 Leader,完全不依赖真正的选举过程。
为什么说 LeaderElection
是连接的“代理层”
在真正的高可用场景(如 ZooKeeper 模式)下:
LeaderElectionService
:- 负责与外部系统打交道(例如创建 ZNode、监听节点变化);
- 判断当前节点是否有资格成为 Leader;
- 是具体的选举机制实现者。
LeaderElection
:- 是一个薄封装;
- 屏蔽了选举服务的复杂性,只关注:是否被选中,以及如何通知
LeaderContender
; - 在内部会调用
startLeaderElection
的方法,同时向外部组件暴露一个统一接口。
因此,LeaderElection
才是实现“选举逻辑”与“业务组件”解耦的关键抽象。
StandaloneLeaderElection
源码
public class StandaloneLeaderElection implements LeaderElection {
private final Object lock = new Object();
private final UUID sessionID;
@Nullable private LeaderContender leaderContender;
public StandaloneLeaderElection(UUID sessionID) {
//获取生成的uuid
this.sessionID = sessionID;
}
@Override
public void startLeaderElection(LeaderContender contender) throws Exception {
synchronized (lock) {
Preconditions.checkState(
leaderContender == null,
"No LeaderContender should have been registered with this LeaderElection, yet.");
this.leaderContender = contender;
//这里调用 具体的逻辑 启动 contender
this.leaderContender.grantLeadership(sessionID);
}
}
@Override
public CompletableFuture<Void> confirmLeadershipAsync(
UUID leaderSessionID, String leaderAddress) {
return FutureUtils.completedVoidFuture();
}
@Override
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
synchronized (lock) {
return CompletableFuture.completedFuture(
this.leaderContender != null && this.sessionID.equals(leaderSessionId));
}
}
@Override
public void close() throws Exception {
synchronized (lock) {
if (this.leaderContender != null) {
this.leaderContender.revokeLeadership();
this.leaderContender = null;
}
}
}
}