【Flink源码分析】6. Flink1.19源码分析-Flink底层的异步通信

发布于:2025-02-11 ⋅ 阅读:(9) ⋅ 点赞:(0)

6.1 PekkoInvocationHandler 类

仅摘取了 Flink RPC 进行通信的时候一段代码,也是异步通信的典型代码。


// execute an asynchronous call
final CompletableFuture<?> resultFuture =
        //1. ask 发起rpc调用的方法,它返回一个 CompletableFuture ,表示rpc调用的异步结果
        ask(rpcInvocation, futureTimeout)
                /**
                 * 2. thenApply:ask 任务执行完成后,执行thenApply回调方法任务,将ask执行结果(resultValue)作为入参,
                 * 传递到 thenApply 对应的任务中
                 * thenApply 是有返回值的
                 */
                .thenApply(
                        resultValue ->
                                // 3. deserializeValueIfNeeded 对返回的结果进行反序列化
                                deserializeValueIfNeeded(
                                        resultValue, method, flinkClassLoader));
//4. 这里创建了一个新的 CompletableFuture    对象,用于封装 resultFuture 的完成结果或异常
final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
/**
 * 5. 任务执行完成后,无论任务是否抛出异常都会调用回调 whenComplete ,接收两个参数
 *  resultValue : thenApply序列化后的值
 *  failure : 执行过程中的异常信息,如果没有,则为null
 */
resultFuture.whenComplete(
        (resultValue, failure) -> {
            // 6. 如果异常信息不为 null
            if (failure != null) {
                // 7. 使用 completeExceptionally 来标记 Future 为完成状态 并且是异常状态。这样在调用get()方法时就会抛出异常
                completableFuture.completeExceptionally(
                        resolveTimeoutException(
                                ExceptionUtils.stripCompletionException(failure),
                                callStackCapture,
                                address,
                                rpcInvocation));
            } else {
                // 8. complete方法是用来将CompletableFuture对象标记为已完成状态,并指定完成时的结果。
                completableFuture.complete(resultValue);
            }
        });

if (Objects.equals(returnType, CompletableFuture.class)) {
    result = completableFuture;
} else {
    try {
        /**
         * 9. get(long timeout, TimeUnit unit) 设置一个超时时间
         * 阻塞获取结果如果超过设置的时间则报错
         */
        result = completableFuture.get(futureTimeout.toMillis(), TimeUnit.MILLISECONDS);
    } catch (ExecutionException ee) {
        throw new RpcException(
                "Failure while obtaining synchronous RPC result.",
                ExceptionUtils.stripExecutionException(ee));
    }
}

6.2 PekkoRpcService 类

TaskExecutor向ResourceManager注册时候的一段代码。


private <C extends RpcGateway> CompletableFuture<C> connectInternal(
        final String address,
        final Class<C> clazz,
        Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
    checkState(!stopped, "RpcService is stopped");

    LOG.debug(
            "Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
            address,
            clazz.getName());
    /**
     * 获取 ActorRef 地址
     */
    final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
    /**
     *  1. actorRefFuture.thenCompose : 当 actorRefFuture 完成(即成功解析 Actor 地址)时,执行内部的 lambda 表达式
     *  2. Patterns.ask : 向解析得到的ActorRef发送一个 RemoteHandshakeMessage ,该消息包含目标类(clazz)和版本号(getVersion())。
     *  发送请求时还指定了一个超时时间。
     *  3. ScalaFutureUtils.toJava : 将Scala Future 转换为Java CompletableFuture
     *  4. <HandshakeSuccessMessage>mapTo : 将返回的响应映射为 HandshakeSuccessMessage 类型
     *  直白一点说就是进行了一次握手
     *  进行一次Handshake握手,需要保证RpcEndpoint节点正常
     *
     *  定义了一个CompletableFuture对象handshakeFuture,其泛型类型为HandshakeSuccessMessage
     *  这意味着这个Future对象将在握手成功后返回一个HandshakeSuccessMessage对象
     */
    final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
            /**
             * thenCompose方法用于组合两个异步操作:首先等待actorRefFuture完成,然后使用其结果(即ActorRef)来执行另一个异步操作。
             * 返回CompletableFuture的函数,以构建更复杂的异步操作链。
             */
            actorRefFuture.thenCompose(
                    (ActorRef actorRef) ->
                            // 将 Pekko 中 ask 返回值 future 转换为 Java 中的 CompletableFuture
                            ScalaFutureUtils.toJava(
                                    // ask 发送异步消息 等待相应结果  相应结果为 future
                                    Patterns.ask(
                                                    actorRef,
                                                    new RemoteHandshakeMessage(
                                                            clazz, getVersion()),
                                                    configuration.getTimeout().toMillis())
                                            .<HandshakeSuccessMessage>mapTo(
                                                    ClassTag$.MODULE$
                                                            .<HandshakeSuccessMessage>apply(
                                                                    HandshakeSuccessMessage
                                                                            .class))));
    /**
     *  actorRefFuture.thenCombineAsync(handshakeFuture,...,actorSystem.dispatcher())
     *  当 actorRefFuture 和 handshakeFuture 都完成时,他们的结果(分别是 ActorRef 和 HandshakeSuccessMessage )
     *  将被传递给提供的 lambda 表达式进行进一步处理。这个异步操作使用 actorSystem.dispatcher() 作为调度器,这意味着它将在一个专用的线程池中执行。
     *  这通常是 Akka 的默认调度器,它具有适当的配置以实现高吞吐量。
     *  1. 创建一个 InvocationHandler 。 这个处理器将用于处理代理对象的方法调用,井将这些调用转发到远程的 Actor.
     *  2. 获取当前类的 ClassLoader ,这样做的原因是,在某些情况下(例如 Flink 嵌入在其他应用中时), Flink 需要使用其他 ClassLoader 来加载远程 Actor 的类。
     *  使用当前类的 ClassLoader 可以确保代理对象也使用相同的 ClassLoader 加载,从而避免类加载器相关的问题。
     *  3. 使用 Proxy.newProxyInstance 方法创建一个新的代理对象。这个代理对象实现了 clazz 指定的接口,并使用了 InvocationHandler 作为处理器。
     *  这个代理对象会拦截接口方法的调用,并将他们委托给 InvocationHandler 处理。
     */
    final CompletableFuture<C> gatewayFuture =
            /**
             * thenCombineAsync方法用于异步组合两个CompletableFuture对象:actorRefFuture和handshakeFuture。
             * 这意味着只有当这两个CompletableFuture对象都成功完成时,才会执行提供的lambda表达式。
             * 这里使用了actorSystem.dispatcher()作为执行lambda的调度器。
             */
            actorRefFuture.thenCombineAsync(
                    handshakeFuture,
                    (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
                        InvocationHandler invocationHandler =
                                invocationHandlerFactory.apply(actorRef);

                        // Rather than using the System ClassLoader directly, we derive the
                        // ClassLoader from this class.
                        // That works better in cases where Flink runs embedded and
                        // all Flink code is loaded dynamically
                        // (for example from an OSGI bundle) through a custom ClassLoader
                        ClassLoader classLoader = getClass().getClassLoader();

                        // 调用 Proxy.newProxyInstance 静态方法创建动态代理类
                        @SuppressWarnings("unchecked")
                        C proxy =
                                (C)
                                        Proxy.newProxyInstance(
                                                classLoader,
                                                new Class<?>[] {clazz},
                                                invocationHandler);

                        return proxy;
                    },
                    actorSystem.dispatcher());

    return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}

/**
 * 私有方法 resolveActorAddress,它接收一个字符串类型的地址作为参数,
 * 并返回一个 CompletableFuture 对象,
 * 该对象包含了 ActorRef 类型的结果。
 * @param address
 * @return
 */
private CompletableFuture<ActorRef> resolveActorAddress(String address) {
    // 使用ActorSystem的actorSelection方法,根据给定的地址获取一个ActorSelection对象。
    final ActorSelection actorSel = actorSystem.actorSelection(address);

    // 使用ActorSelection的resolveOne方法,尝试解析并连接到单个Actor
    return actorSel.resolveOne(configuration.getTimeout())
            // 将 resolveOne 方法的返回结果(scala的future或其他异步结果)转换为java的CompletableFuture对象
            .toCompletableFuture()
            /**
             * 使用 exceptionally 方法处理可能发生的异常。如果resolveOne方法或其转换过程中发生异常,
             * 这里会捕获该异常,并抛出一个新的 CompletionException
             */
            .exceptionally(
                    error -> {
                        throw new CompletionException(
                                new RpcConnectionException(
                                        String.format(
                                                "Could not connect to rpc endpoint under address %s.",
                                                address),
                                        error));
                    });
}