6.1 PekkoInvocationHandler 类
仅摘取了 Flink RPC 进行通信的时候一段代码,也是异步通信的典型代码。
// execute an asynchronous call
final CompletableFuture<?> resultFuture =
//1. ask 发起rpc调用的方法,它返回一个 CompletableFuture ,表示rpc调用的异步结果
ask(rpcInvocation, futureTimeout)
/**
* 2. thenApply:ask 任务执行完成后,执行thenApply回调方法任务,将ask执行结果(resultValue)作为入参,
* 传递到 thenApply 对应的任务中
* thenApply 是有返回值的
*/
.thenApply(
resultValue ->
// 3. deserializeValueIfNeeded 对返回的结果进行反序列化
deserializeValueIfNeeded(
resultValue, method, flinkClassLoader));
//4. 这里创建了一个新的 CompletableFuture 对象,用于封装 resultFuture 的完成结果或异常
final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
/**
* 5. 任务执行完成后,无论任务是否抛出异常都会调用回调 whenComplete ,接收两个参数
* resultValue : thenApply序列化后的值
* failure : 执行过程中的异常信息,如果没有,则为null
*/
resultFuture.whenComplete(
(resultValue, failure) -> {
// 6. 如果异常信息不为 null
if (failure != null) {
// 7. 使用 completeExceptionally 来标记 Future 为完成状态 并且是异常状态。这样在调用get()方法时就会抛出异常
completableFuture.completeExceptionally(
resolveTimeoutException(
ExceptionUtils.stripCompletionException(failure),
callStackCapture,
address,
rpcInvocation));
} else {
// 8. complete方法是用来将CompletableFuture对象标记为已完成状态,并指定完成时的结果。
completableFuture.complete(resultValue);
}
});
if (Objects.equals(returnType, CompletableFuture.class)) {
result = completableFuture;
} else {
try {
/**
* 9. get(long timeout, TimeUnit unit) 设置一个超时时间
* 阻塞获取结果如果超过设置的时间则报错
*/
result = completableFuture.get(futureTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException ee) {
throw new RpcException(
"Failure while obtaining synchronous RPC result.",
ExceptionUtils.stripExecutionException(ee));
}
}
6.2 PekkoRpcService 类
TaskExecutor向ResourceManager注册时候的一段代码。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(
final String address,
final Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
checkState(!stopped, "RpcService is stopped");
LOG.debug(
"Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
address,
clazz.getName());
/**
* 获取 ActorRef 地址
*/
final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
/**
* 1. actorRefFuture.thenCompose : 当 actorRefFuture 完成(即成功解析 Actor 地址)时,执行内部的 lambda 表达式
* 2. Patterns.ask : 向解析得到的ActorRef发送一个 RemoteHandshakeMessage ,该消息包含目标类(clazz)和版本号(getVersion())。
* 发送请求时还指定了一个超时时间。
* 3. ScalaFutureUtils.toJava : 将Scala Future 转换为Java CompletableFuture
* 4. <HandshakeSuccessMessage>mapTo : 将返回的响应映射为 HandshakeSuccessMessage 类型
* 直白一点说就是进行了一次握手
* 进行一次Handshake握手,需要保证RpcEndpoint节点正常
*
* 定义了一个CompletableFuture对象handshakeFuture,其泛型类型为HandshakeSuccessMessage
* 这意味着这个Future对象将在握手成功后返回一个HandshakeSuccessMessage对象
*/
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
/**
* thenCompose方法用于组合两个异步操作:首先等待actorRefFuture完成,然后使用其结果(即ActorRef)来执行另一个异步操作。
* 返回CompletableFuture的函数,以构建更复杂的异步操作链。
*/
actorRefFuture.thenCompose(
(ActorRef actorRef) ->
// 将 Pekko 中 ask 返回值 future 转换为 Java 中的 CompletableFuture
ScalaFutureUtils.toJava(
// ask 发送异步消息 等待相应结果 相应结果为 future
Patterns.ask(
actorRef,
new RemoteHandshakeMessage(
clazz, getVersion()),
configuration.getTimeout().toMillis())
.<HandshakeSuccessMessage>mapTo(
ClassTag$.MODULE$
.<HandshakeSuccessMessage>apply(
HandshakeSuccessMessage
.class))));
/**
* actorRefFuture.thenCombineAsync(handshakeFuture,...,actorSystem.dispatcher())
* 当 actorRefFuture 和 handshakeFuture 都完成时,他们的结果(分别是 ActorRef 和 HandshakeSuccessMessage )
* 将被传递给提供的 lambda 表达式进行进一步处理。这个异步操作使用 actorSystem.dispatcher() 作为调度器,这意味着它将在一个专用的线程池中执行。
* 这通常是 Akka 的默认调度器,它具有适当的配置以实现高吞吐量。
* 1. 创建一个 InvocationHandler 。 这个处理器将用于处理代理对象的方法调用,井将这些调用转发到远程的 Actor.
* 2. 获取当前类的 ClassLoader ,这样做的原因是,在某些情况下(例如 Flink 嵌入在其他应用中时), Flink 需要使用其他 ClassLoader 来加载远程 Actor 的类。
* 使用当前类的 ClassLoader 可以确保代理对象也使用相同的 ClassLoader 加载,从而避免类加载器相关的问题。
* 3. 使用 Proxy.newProxyInstance 方法创建一个新的代理对象。这个代理对象实现了 clazz 指定的接口,并使用了 InvocationHandler 作为处理器。
* 这个代理对象会拦截接口方法的调用,并将他们委托给 InvocationHandler 处理。
*/
final CompletableFuture<C> gatewayFuture =
/**
* thenCombineAsync方法用于异步组合两个CompletableFuture对象:actorRefFuture和handshakeFuture。
* 这意味着只有当这两个CompletableFuture对象都成功完成时,才会执行提供的lambda表达式。
* 这里使用了actorSystem.dispatcher()作为执行lambda的调度器。
*/
actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
InvocationHandler invocationHandler =
invocationHandlerFactory.apply(actorRef);
// Rather than using the System ClassLoader directly, we derive the
// ClassLoader from this class.
// That works better in cases where Flink runs embedded and
// all Flink code is loaded dynamically
// (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
// 调用 Proxy.newProxyInstance 静态方法创建动态代理类
@SuppressWarnings("unchecked")
C proxy =
(C)
Proxy.newProxyInstance(
classLoader,
new Class<?>[] {clazz},
invocationHandler);
return proxy;
},
actorSystem.dispatcher());
return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}
/**
* 私有方法 resolveActorAddress,它接收一个字符串类型的地址作为参数,
* 并返回一个 CompletableFuture 对象,
* 该对象包含了 ActorRef 类型的结果。
* @param address
* @return
*/
private CompletableFuture<ActorRef> resolveActorAddress(String address) {
// 使用ActorSystem的actorSelection方法,根据给定的地址获取一个ActorSelection对象。
final ActorSelection actorSel = actorSystem.actorSelection(address);
// 使用ActorSelection的resolveOne方法,尝试解析并连接到单个Actor
return actorSel.resolveOne(configuration.getTimeout())
// 将 resolveOne 方法的返回结果(scala的future或其他异步结果)转换为java的CompletableFuture对象
.toCompletableFuture()
/**
* 使用 exceptionally 方法处理可能发生的异常。如果resolveOne方法或其转换过程中发生异常,
* 这里会捕获该异常,并抛出一个新的 CompletionException
*/
.exceptionally(
error -> {
throw new CompletionException(
new RpcConnectionException(
String.format(
"Could not connect to rpc endpoint under address %s.",
address),
error));
});
}