20. TaskExecutor与ResourceManager心跳
- 现在,需要回过头看
ResourceManager
是如何产生心跳管理服务的。 - cluster.initializeServices 方法的 heartbeatServices = createHeartbeatServices(configuration);产生一个 HeartbeatServicesImpl
- jobmanager的 resourceManager启动的时候,会启动 startHeartbeatServices();
startHeartbeatServices
方法
- 该方法产生2个对象。1个taskManagerHeartbeatManager和1个jobManagerHeartbeatManager。这次主要看前面1个。
private void startHeartbeatServices() {
taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
- taskManagerHeartbeatManager:负责监控所有已注册的 TaskExecutor,是 ResourceManager 与 TaskExecutor 间心跳通信的核心组件。
- jobManagerHeartbeatManager:与 JobManager 保持心跳(此处暂不关注)。
HeartbeatManagerSenderImpl
HeartbeatManagerSenderImpl
是 ResourceManager 端的心跳发送管理器,具备以下特点:- 继承 HeartbeatManagerImpl,是具体的心跳机制实现;
- 实现 Runnable 接口,自身是一个周期性执行的任务。
- 核心点:
- 周期性遍历
getHeartbeatTargets()
(即所有注册的 TaskExecutor); - 逐个向它们发送心跳请求(
requestHeartbeat
方法)。
- 周期性遍历
HeartbeatManagerSenderImpl(
long heartbeatPeriod,
long heartbeatTimeout,
int failedRpcRequestsUntilUnreachable,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log,
HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
super(
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
ownResourceID,
heartbeatListener,
mainThreadExecutor,
log,
heartbeatMonitorFactory);
this.heartbeatPeriod = heartbeatPeriod;
//这里表明周期调度
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}
@Override
public void run() {
if (!stopped) {
log.debug("Trigger heartbeat request.");
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
//这里就是循环将前面taskExecutor注册心跳取出来,进行心跳
requestHeartbeat(heartbeatMonitor);
}
//这里单线程周期调度
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
requestHeartbeat 方法
//heartbeatMonitor封装了一个taskexecutor网关。说白了就是heartbeatTarget就是调用 taskexecutor方法进行交互。
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
heartbeatTarget
//TaskExecutorHeartbeatSender 就是前面讲的rpc调用
.requestHeartbeat(getOwnResourceID(), payload)
.whenCompleteAsync(
handleHeartbeatRpc(heartbeatMonitor.getHeartbeatTargetId()),
getMainThreadExecutor());
}
heartbeatTarget:
实际上是封装的
TaskExecutorHeartbeatSender
;通过 RPC 接口向对应 TaskExecutor 发出心跳请求;
本质上是对 TaskExecutor 方法的远程调用。
TaskExecutorHeartbeatSender 实质上是一个封装了 TaskExecutor RPC 网关的对象,负责通过 RPC 调用向 TaskExecutor 发送心跳请求。推荐配合 Debug 调试理解整个心跳交互过程,有助于深入掌握 ResourceManager 与 TaskExecutor 之间的通信机制。
小结
ResourceManager 在启动阶段就为所有 TaskExecutor 准备好了心跳监控;
依靠单线程周期调度器,实现对所有 TaskExecutor 的心跳请求发送;
心跳本质是对 TaskExecutor RPC 方法的远程调用;
如果某个 TaskExecutor 心跳超时或失败,会触发资源回收与故障恢复机制。