18.TaskExecutor获取ResourceManagerGateway

发布于:2025-07-21 ⋅ 阅读:(15) ⋅ 点赞:(0)

TaskExecutor获取ResourceManagerGateway

  • TaskExecutorResourceManager 之间的交互机制较为复杂,核心可以拆分为三个阶段:

    • 首次发现与注册
    • 连接建立
    • 心跳维持

    本文聚焦连接建立阶段,详细分析底层 RPC 连接的实现原理。

回顾:startRegistration方法

在注册过程中,TaskExecutor 会调用如下逻辑,通过 rpcService.connectResourceManager 建立远程通信连接:

//其中,targetType 是 ResourceManagerGateway.class。重点关注 rpcService.connect 方法。
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
    rpcGatewayFuture = (CompletableFuture<G>)
        rpcService.connect(
            targetAddress,
            fencingToken,
            targetType.asSubclass(FencedRpcGateway.class)
        );
}

PekkoRpcService

  • Flink 内部的 RPC 框架由 Pekko(Akka) 支撑,PekkoRpcService 就是基于 Pekko 实现的通信组件,负责不同组件之间的远程通信。
@Override
public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
        String address, F fencingToken, Class<C> clazz) {
    return connectInternal(
        address,
        clazz,
        (ActorRef actorRef) -> {
            Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);

            return new FencedPekkoInvocationHandler<>(   // 关键:创建 InvocationHandler
                addressHostname.f0,
                addressHostname.f1,
                actorRef,
                configuration.getTimeout(),
                configuration.getMaximumFramesize(),
                configuration.isForceRpcInvocationSerialization(),
                null,
                () -> fencingToken,  // 支持 fencingToken 防止旧节点通信
                captureAskCallstacks,
                flinkClassLoader
            );
        }
    );
}

核心方法:connectInternal

connectInternal 方法的任务是:

  • 通过目标组件的 RPC 地址,获取 ActorRef(类似 NIO 中的 selector-channel);
  • 与目标 Actor(如 ResourceManager)完成一次握手校验;
  • 基于 InvocationHandler 生成远程组件的代理对象(Gateway),用于后续透明 RPC 调用。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(
        String address,
        Class<C> clazz,
        Function<ActorRef, InvocationHandler> invocationHandlerFactory) {

    final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);

    final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
        actorRefFuture.thenCompose(
            (ActorRef actorRef) -> Patterns.ask(
                actorRef,
                new RemoteHandshakeMessage(clazz, getVersion()),   // 发送握手请求
                configuration.getTimeout().toMillis()
            ).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class))
        );

    final CompletableFuture<C> gatewayFuture =
        actorRefFuture.thenCombineAsync(
            handshakeFuture,
            (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
                InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);

                @SuppressWarnings("unchecked")
                C proxy = (C) Proxy.newProxyInstance(
                    getClass().getClassLoader(),
                    new Class<?>[] {clazz},
                    invocationHandler
                );

                return proxy;   // 返回 ResourceManagerGateway 动态代理
            },
            actorSystem.dispatcher()
        );

    return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}

获取 ActorRef:resolveActorAddress

private CompletableFuture<ActorRef> resolveActorAddress(String address) {
    final ActorSelection actorSel = actorSystem.actorSelection(address);

    return actorSel.resolveOne(configuration.getTimeout())
        .toCompletableFuture()
        .exceptionally(error -> {
            throw new CompletionException(
                new RpcConnectionException(
                    String.format("Could not connect to rpc endpoint under address %s.", address),
                    error
                )
            );
        });
}

  • 根据 address 定位到目标组件的 Actor(类似根据地址寻找远程服务端点)。
  • 异步获取目标组件的 ActorRef,这是后续所有远程消息传递的核心通信对象。
  • 如果解析失败,立即包装为 RpcConnectionException 抛出,阻断注册链路。
  • 特别注意
    该方法返回的 CompletableFuture<ActorRef> 是由 Akka 内部线程异步完成的(即依赖 ActorSystem 自身的调度机制)。
    因此,无需显式调用 executor 来管理异步逻辑,整个链式流程天然是异步的,并由 Akka 自身的事件机制驱动完成。
    这也是 AkkaPekko)模型的设计优势:
    组件间通信与任务调度完全解耦,基于 ActorRef 的消息传递自动异步非阻塞。

RemoteHandshakeMessage:初次握手阶段

  • 作用:
    在建立正式通信前,TaskExecutor 必须先与 ResourceManager 进行协议层握手,确保:

    • 版本一致;
    • 所请求的网关类型(如 ResourceManagerGateway)是对方支持的。
  • 源码

 private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {
        if (!isCompatibleVersion(handshakeMessage.getVersion())) {
            sendErrorIfSender(
                    new HandshakeException(
                            String.format(
                                    "Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.",
                                    handshakeMessage.getVersion(), getVersion())));
        } else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
            sendErrorIfSender(
                    new HandshakeException(
                            String.format(
                                    "The rpc endpoint does not support the gateway %s.",
                                    handshakeMessage.getRpcGateway().getSimpleName())));
        } else {
            //告诉taskExecutor,可以连接
            getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
        }
    }
握手处理逻辑:PekkoRpcActor.handleHandshakeMessage()
  1. 版本校验

    • 如果通信双方的 Flink 版本不一致(可能是跨版本集群或配置错误),直接拒绝握手并返回错误信息。
  2. 接口类型校验

    • 检查请求方希望通信的 Gateway 接口(即 ResourceManagerGateway)是否被当前端点支持。
    • 不支持的网关说明连接请求本质错误,拒绝握手。
  3. 握手成功

    • 前两步校验都通过,表明可以安全建立通信。

    • 此时向对方返回:

      java
      
      
      复制编辑
      getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
      

      即告诉发起方(如 TaskExecutor):可以正式建立连接。

actorRefFuture.thenCombineAsync:构建 ResourceManagerGateWay代理

核心目的:
  • 根据 ResourceManager 的 ActorRef 构建其 RPC 代理(即 ResourceManagerGateway 的动态代理对象)
源码解析
final CompletableFuture<C> gatewayFuture =
    actorRefFuture.thenCombineAsync(
        handshakeFuture,
        (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
            InvocationHandler invocationHandler =
                invocationHandlerFactory.apply(actorRef);

            ClassLoader classLoader = getClass().getClassLoader();

            @SuppressWarnings("unchecked")
            C proxy = (C)
                Proxy.newProxyInstance(
                    classLoader,
                    new Class<?>[] {clazz},
                    invocationHandler);

            return proxy;
        },
        actorSystem.dispatcher());

整体过程
  1. 等待 actorRefFuture 和 handshakeFuture 都完成:
    • actorRefFuture:已经获取了 ResourceManager 的 ActorRef。
    • handshakeFuture:握手成功,确认可以通信。
  2. 生成 InvocationHandler:
    • 实际是封装了 actorRef 和通信参数的一个代理调用处理器。
    • 后续所有发往 ResourceManager 的方法调用,都会被转化成消息,通过这个 handler 发送到 actorRef 对应的远程组件。
  3. 构建代理对象:
    • 使用 JDK 动态代理(Proxy.newProxyInstance)创建了一个ResourceManagerGateway 的动态代理
    • 对用户代码来说,这个代理就是一个普通的 ResourceManagerGateway,只是内部通过 actorRef 做远程消息发送。
  4. 返回代理对象(proxy):
    • proxy 就是一个“可直接远程调用 ResourceManager”的接口对象。

总结

  • TaskExecutor 已获取 ResourceManager 的代理网关(即 ResourceManagerGateway 代理对象);
  • 该代理对象封装了与 ResourceManager 通信所需的 actorRef 和 RPC 协议细节;
  • TaskExecutor 接下来只需要通过该网关对象,正式发起注册请求

这一阶段的核心工作是:

  • 建立连接(即通过 rpcService.connect 拿到 ResourceManager 的 actorRef 并创建代理网关);
  • 完成握手(确保版本兼容和接口匹配);
  • 生成代理(通过动态代理对外提供 ResourceManagerGateway 接口)。

下一步就是:

  • TaskExecutor 通过该网关对象向 ResourceManager 发起注册;
  • ResourceManager 受理注册请求;
  • 建立心跳与 slot 汇报等稳定的会话机制。

网站公告

今日签到

点亮在社区的每一天
去签到