大纲
1.FutureTask(Future/Callable)的使用例子
2.FutureTask(Future/Callable)的实现原理
3.FutureTask(Future/Callable)的源码分析
4.CompletableFuture的基本介绍
5.CompletionStage方法及作用说明
6.CompletableFuture的实现原理分析
7.CompletableFuture的核心源码分析
5.CompletionStage方法及作用说明
(1)CompletionStage示例
(2)CompletionStage的方法概述
(3)有传参但没返回值的方法
(4)有传参且有返回值的方法
(5)没传参也没返回值的方法
(6)组合起来串行执行的方法
(7)异常处理方法
(1)CompletionStage示例
CompletionStage表示任务执行的一个阶段,每个异步任务都会返回一个新的CompletionStage对象,可针对多个CompletionStage对象进行串行、并行、聚合等操作。简单来说,CompletionStage就是实现异步任务执行后的自动回调功能。
下面的CompletionStage例子:首先需要调用一个远程方法获得结果,然后把返回结果保存到数据库。所以代码中先定义一个异步任务处理远程调用,并返回CompletionStage,接着调用thenAccept()方法把第一步的执行结果保存到数据库中。
public class CompletionStageExample {
public static void main(String[] args) {
CompletionStage<String> cf = CompletableFuture.supplyAsync(() -> "远程调用的返回结果");
cf.thenAccept(result -> {
System.out.println("第一个异步任务的返回值是:" + result);
System.out.println("把result保存到数据库");
});
}
}
可以看见和Future明显不一样的地方就是:thenAccept()方法中传入的回调对象是第一个异步任务执行完后自动触发的,不需要像Future那样去阻塞当前线程等待返回结果,还可以使用thenAcceptAsync()方法让保存到数据库的任务使用独立线程池。
(2)CompletionStage的方法概述
CompletionStage总共提供了38个方法来实现多个任务的串行、并行、聚合等功能,这些方法可以按功能进行如下的分类:
一.有传参但没返回值的方法
二.有传参且有返回值的方法
三.没传参也没返回值的方法
四.组合起来串行执行的方法
五.异常处理的方法
注意:Accept关键字有传参没有返回值,Run关键字没传参没返回值。
(3)有传参但没返回值的方法
有传参但没返回值的方法就是:用上一个异步任务的结果作为当前方法的参数进行下一步运算,并且当前方法会产生一个新的没有返回值的CompletionStage对象。有传参但没返回值的方法都包含Accept关键字。
一.依赖单个CompletionStage任务完成
thenAccept()相关方法用上一个任务的执行结果作为参数执行当前的action,这些方法接收的参数是一个函数式接口Consumer,表示一个待执行的任务。这些方法的返回值是CompletionStage,表示没有返回值。
注意:方法以Async结尾,表示使用单独的线程池来执行action,否则使用执行当前任务的线程来执行action。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that, when this stage completes normally,
//is executed with this stage's result as the argument to the supplied action.
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
//Returns a new CompletionStage that, when this stage completes normally,
//is executed using this stage's default asynchronous execution facility,
//with this stage's result as the argument to the supplied action.
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
//Returns a new CompletionStage that, when this stage completes normally,
//is executed using the supplied Executor,
//with this stage's result as the argument to the supplied action.
//@param action the action to perform before completing the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@return the new CompletionStage
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
...
}
public class CompletionStageExample {
//当cf实例的任务执行完成后,会回调传入thenAcceptAsync()方法中的回调函数
//其中回调函数的result表示cf异步任务的返回结果
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "thenAccept message");
cf.thenAcceptAsync((result) -> {
System.out.println(Thread.currentThread().getName() + "第一个异步任务的返回值:" + result);
});
}
}
二.依赖两个CompletionStage任务都完成
thenAcceptBoth()相关方法提供了与thenAccept()相关方法类似的功能。不同点在于thenAcceptBoth()相关方法多了一个CompletionStage参数,表示当两个CompletionStage任务都完成后,才执行后面的action。而且这个action可以接收两个参数,这两个参数分别表示两个任务的返回值。thenAcceptBoth()相关方法相当于实现了两个异步任务的组合。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when this and the other given stage both complete normally,
//is executed with the two results as arguments to the supplied action.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@param <U> the type of the other CompletionStage's result
//@return the new CompletionStage
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
//Returns a new CompletionStage that,
//when this and the other given stage complete normally,
//is executed using this stage's default asynchronous execution facility,
//with the two results as arguments to the supplied action.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@param <U> the type of the other CompletionStage's result
//@return the new CompletionStage
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
//Returns a new CompletionStage that,
//when this and the other given stage complete normally,
//is executed using the supplied executor,
//with the two results as arguments to the supplied function.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@param <U> the type of the other CompletionStage's result
//@return the new CompletionStage
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
...
}
public class ThenAcceptBothExample {
//task1和task2都执行完成后,会得到两个任务的返回值AcceptBoth和message,
//接着开始执行thenAcceptBoth()中的action,
//这个action会接收前面两个任务的执行结果r1和r2,并最终打印出:执行结果为"AcceptBoth+message"
public static void main(String[] args) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "AcceptBoth");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "message");
task1.thenAcceptBoth(task2, (r1, r2) -> {
System.out.println("执行结果" + r1 + "+" + r2);
});
//或者采用Fluent风格来写
//CompletableFuture.supplyAsync(() -> "AcceptBoth").thenAcceptBoth(
// CompletableFuture.supplyAsync(() -> "message"), (r1, r2) -> {
// System.out.println("执行结果:" + r1 + ", " + r2);
// }
//);
}
}
三.依赖两个CompletionStage任务中的任何一个完成
acceptEither()相关方法和thenAcceptBoth()相关方法几乎一样。它同样接收两个CompletionStage任务,但是只需要保证其中一个任务完成,就会回调acceptEither()方法中传入的action任务。这两个CompletionStage任务谁先完成就会获得谁的返回值,作为参数传给后续的action任务。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally,
//is executed with the corresponding result as argument to the supplied action.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally,
//is executed using this stage's default asynchronous execution facility,
//with the corresponding result as argument to the supplied action.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally,
//is executed using the supplied executor,
//with the corresponding result as argument to the supplied function.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@return the new CompletionStage
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);
...
}
(4)有传参且有返回值的方法
有传参且有返回值的方法就是:用上一个异步任务的执行结果作为当前方法的参数进行下一步计算,并且当前方法会产生一个新的有返回值的CompletionStage对象。
一.依赖单个CompletionStage任务完成
thenApply()这一组方法的功能是等上一个CompletionStage任务执行完后,就会把执行结果传递给函数fn,将函数fn作为一个新的执行任务去执行,最后返回一个新的有返回值的CompletionStage对象。
其中以Async结尾的方法表示函数fn这个任务将采用单独的线程池来执行。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that, when this stage completes normally,
//is executed with this stage's result as the argument to the supplied function.
//@param fn the function to use to compute the value of the returned CompletionStage
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
//Returns a new CompletionStage that, when this stage completes normally,
//is executed using this stage's default asynchronous execution facility,
//with this stage's result as the argument to the supplied function.
//@param fn the function to use to compute the value of the returned CompletionStage
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
//Returns a new CompletionStage that, when this stage completes normally,
//is executed using the supplied Executor,
//with this stage's result as the argument to the supplied function.
//@param fn the function to use to compute the value of the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
...
}
二.依赖两个CompletionStage任务都完成
thenCombine()这一组方法的功能类似于thenAcceptBoth()方法。它表示两个CompletionStage任务并行执行结束后,把这两个CompletionStage任务的执行结果传递给函数fn,函数fn执行后返回一个新的有返回值的CompletionStage对象。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when this and the other given stage both complete normally,
//is executed with the two results as arguments to the supplied function.
//@param other the other CompletionStage
//@param fn the function to use to compute the value of the returned CompletionStage
//@param <U> the type of the other CompletionStage's result
//@param <V> the function's return type
//@return the new CompletionStage
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
//Returns a new CompletionStage that,
//when this and the other given stage complete normally,
//is executed using this stage's default asynchronous execution facility,
//with the two results as arguments to the supplied function.
//@param other the other CompletionStage
//@param fn the function to use to compute the value of the returned CompletionStage
//@param <U> the type of the other CompletionStage's result
//@param <V> the function's return type
//@return the new CompletionStage
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
//Returns a new CompletionStage that,
//when this and the other given stage complete normally,
//is executed using the supplied executor,
//with the two results as arguments to the supplied function.
//@param other the other CompletionStage
//@param fn the function to use to compute the value of the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@param <U> the type of the other CompletionStage's result
//@param <V> the function's return type
//@return the new CompletionStage
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
...
}
public class ThenCombineExample {
public static void main(String[] args) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Combine");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "message");
CompletableFuture<String> cf = task1.thenCombineAsync(task2, (r1, r2) -> {
System.out.println("执行结果:" + r1 + ", " + r2);
return r1 + r2;
});
System.out.println(cf.get());
//或者采用Fluent风格来写
//CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Combine").thenCombineAsync(
// CompletableFuture.supplyAsync(() -> "message"), (r1, r2) -> {
// System.out.println("执行结果:" + r1 + ", " + r2);
// return r1 + r2;
// }
//);
//System.out.println(cf.get());
}
}
三.依赖两个CompletionStage任务中的任何一个完成
applyToEither()方法表示两个CompletionStage任务中任意一个任务完成后,都执行传入applyToEither()方法中的函数fn,函数fn执行后返回一个新的有返回值的CompletionStage对象。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally,
//is executed with the corresponding result as argument to the supplied function.
//@param other the other CompletionStage
//@param fn the function to use to compute the value of the returned CompletionStage
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally,
//is executed using this stage's default asynchronous execution facility,
//with the corresponding result as argument to the supplied function.
//@param other the other CompletionStage
//@param fn the function to use to compute the value of the returned CompletionStage
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally,
//is executed using the supplied executor,
//with the corresponding result as argument to the supplied function.
//@param other the other CompletionStage
//@param fn the function to use to compute the value of the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);
...
}
(5)没传参也没返回值的方法
没传参也没返回值的方法就是:当前方法不依赖上一个异步任务的执行结果,只要上一个异步任务执行完成就执行当前方法,并且当前方法会产生一个新的没有返回值的CompletionStage对象,没传参也没返回值的方法都包含Run关键字。
一.依赖单个CompletionStage任务完成
thenRun()方法只要上一个阶段的任务执行完成后,便立即执行指定action。thenRunAsync()表示采用ForkjoinPool.commonPool()线程池来执行action,action执行完成后会返回一个新的没有返回值的CompletionStage对象。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when this stage completes normally, executes the given action.
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> thenRun(Runnable action);
//Returns a new CompletionStage that, when this stage completes normally,
//executes the given action using this stage's default asynchronous execution facility.
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> thenRunAsync(Runnable action);
//Returns a new CompletionStage that, when this stage completes normally,
//executes the given action using the supplied Executor.
//@param action the action to perform before completing the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@return the new CompletionStage
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);
...
}
二.依赖两个CompletionStage任务都完成
runAfterBoth()方法接收一个CompletionStage任务。该方法要保证两个CompletionStage任务都完成,再执行指定的action。action执行完成后会返回一个新的没有返回值的CompletionStage对象。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when this and the other given stage both complete normally, executes the given action.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
//Returns a new CompletionStage that,
//when this and the other given stage complete normally,
//executes the given action using this stage's default asynchronous execution facility.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
//Returns a new CompletionStage that,
//when this and the other given stage complete normally,
//executes the given action using the supplied executor.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@return the new CompletionStage
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
...
}
三.依赖两个CompletionStage任务中的任何一个完成
runAfterEither()方法接收一个CompletionStage任务。它只需要保证两个任务中任意一个任务执行完成,即可执行指定的action,action执行完成后会返回一个新的没有返回值的CompletionStage对象。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally, executes the given action.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally,
//executes the given action using this stage's default asynchronous execution facility.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@return the new CompletionStage
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
//Returns a new CompletionStage that,
//when either this or the other given stage complete normally,
//executes the given action using the supplied executor.
//@param other the other CompletionStage
//@param action the action to perform before completing the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@return the new CompletionStage
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
...
}
(6)组合起来串行执行的方法
thenCompose()是多任务组合方法,它的作用是把两个CompletionStage任务进行组合达到串行执行的目的,也就是把第一个任务的执行结果作为参数传递给第二个任务执行。
thenCompose()方法有点类似于thenCombine()方法,但thenCompose()方法中的两个任务存在先后关系,而thenCombine()方法中的两个任务是并行执行的。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that, when this stage completes normally,
//is executed with this stage as the argument to the supplied function.
//@param fn the function returning a new CompletionStage
//@param <U> the type of the returned CompletionStage's result
//@return the CompletionStage
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
//Returns a new CompletionStage that, when this stage completes normally,
//is executed using this stage's default asynchronous execution facility,
//with this stage as the argument to the supplied function.
//@param fn the function returning a new CompletionStage
//@param <U> the type of the returned CompletionStage's result
//@return the CompletionStage
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
//Returns a new CompletionStage that, when this stage completes normally,
//is executed using the supplied Executor,
//with this stage's result as the argument to the supplied function.
//@param fn the function returning a new CompletionStage
//@param executor the executor to use for asynchronous execution
//@param <U> the type of the returned CompletionStage's result
//@return the CompletionStage
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
...
}
public class ThenComposeExample {
//下面使用supplyAsync()方法构建了一个异步带返回值的任务,返回值为"Compose Message";
//接着使用thenCompose()方法组合另外一个任务,并把前面任务的返回值r作为参数传递给第二个任务
//在第二个任务中同样使用supplyAsync()方法构建了一个新的任务将参数r转为大写
//最后thenCompose()方法返回一个新的没有返回值的CompletionStage对象
public static void main(String[] args) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Compose Message");
CompletableFuture<String> cf = task1.thenCompose(r -> CompletableFuture.supplyAsync(() -> r.toUpperCase()));
System.out.println(cf.get());
//或者采用Fluent风格来写
//CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Compose Message")
// .thenCompose(r -> CompletableFuture.supplyAsync(() -> r.toUpperCase())
//);
//System.out.println(cf.get());
}
}
(7)异常处理方法
上述介绍的方法都是CompletionStage任务正常执行时的处理方法。如果依赖的前一个任务出现异常,那么会导致后续的任务无法正常执行。比如下述代码,如果前置任务cf出现异常,那么会影响后置任务的执行。
public class RunAfterBothExample {
public static void main(String[] args) {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Exception");
}).runAfterBoth(CompletableFuture.supplyAsync(() -> "Message"), () -> {
System.out.println("Done");
});
System.out.println(cf.get());
}
}
CompletionStage提供了3类异常处理的方法。
一.whenComplete()方法
whenComplete()这一组方法表示的是:不论前置的CompletionStage任务是正常执行结束还是出现异常,都能触发执行指定action,最后返回一个没返回值的CompletionStage对象。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage with the same result or exception as this stage,
//that executes the given action when this stage completes.
//@param action the action to perform
//@return the new CompletionStage
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
//Returns a new CompletionStage with the same result or exception as this stage,
//that executes the given action using this stage's default asynchronous execution facility when this stage completes.
//@param action the action to perform
//@return the new CompletionStage
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
//Returns a new CompletionStage with the same result or exception as this stage,
//that executes the given action using the supplied Executor when this stage completes.
//@param action the action to perform
//@param executor the executor to use for asynchronous execution
//@return the new CompletionStage
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
...
}
二.handle()方法
handle()这一组方法表示的是:不论前置的CompletionStage任务是正常执行结束还是出现异常,都会执行其中的函数fn,最后返回一个有返回值的CompletionStage对象。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when this stage completes either normally or exceptionally,
//is executed with this stage's result and exception as arguments to the supplied function.
//@param fn the function to use to compute the value of the returned CompletionStage
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
//Returns a new CompletionStage that,
//when this stage completes either normally or exceptionally,
//is executed using this stage's default asynchronous execution facility,
//with this stage's result and exception as arguments to the supplied function.
//@param fn the function to use to compute the value of the returned CompletionStage
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
//Returns a new CompletionStage that,
//when this stage completes either normally or exceptionally,
//is executed using the supplied executor,
//with this stage's result and exception as arguments to the supplied function.
//@param fn the function to use to compute the value of the returned CompletionStage
//@param executor the executor to use for asynchronous execution
//@param <U> the function's return type
//@return the new CompletionStage
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
...
}
public class HandleExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Exception");
}).handleAsync((r, th) -> {
return th != null ? "出现异常" : "正常执行";
});
System.out.println(cf.get());
}
}
三.exceptionally()方法
exceptionally()方法接收一个函数fn,当上一个CompletionStage任务出现异常时,会把该异常作为参数传递给fn,最后返回一个有返回值的CompletionStage对象。
public interface CompletionStage<T> {
...
//Returns a new CompletionStage that,
//when this stage completes exceptionally,
//is executed with this stage's exception as the argument to the supplied function.
//Otherwise, if this stage completes normally,
//then the returned stage also completes normally with the same value.
//@param fn the function to use to compute the value of the returned CompletionStage if this CompletionStage completed exceptionally
//@return the new CompletionStage
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
...
}
public class ExceptionallyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Exception");
}).exceptionally(e -> {
log.error(e);
return "ExceptionallyExample";
});
System.out.println(cf.get());
}
}
6.CompletableFuture的实现原理分析
(1)CompletableFuture实现回调的例子
(2)CompletableFuture如何存储任务
(3)Completion的几个实现类
(4)Completion的栈结构存储回调任务
(5)Completion中的回调任务的执行和总结
(1)CompletableFuture实现回调的例子
CompletableFuture实现了Future接口和CompletionStage接口,CompletionStage接口为CompletableFuture提供了丰富的异步回调接口,CompletableFuture可以使用这些接口来实现复杂的异步计算工作。
下面是一个使用CompletableFuture回调的例子。其中构建了两个CompletionStage任务,第一个任务是返回"thenAccept message"字符串,第二个任务是打印第一个任务的返回值。注意:Accept关键字有传参没有返回值,Run关键字没传参没返回值。
这两个任务建立了串行执行的关系,第二个任务相当于第一个任务执行结束后的异步回调,并且多个CompletionStage任务可以使用链式风格串联。
public class CompletionStageExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> "thenAccept message")
.thenAcceptAsync((result) -> {
System.out.println("第一个异步任务的返回值:" + result);
});
cf.get();
}
}
(2)CompletableFuture如何存储任务
一.CompletableFuture的成员变量
CompletableFuture的成员变量只有两个:result和stack。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
//表示CompletionStage任务的返回结果或者一个异常的封装对象AltResult
volatile Object result;//Either the result or boxed AltResult
//表示依赖操作栈的栈顶,链式调用中传递的任务都会被压入这个stack中
volatile Completion stack;//Top of Treiber stack of dependent actions
...
}
二.表示具体执行任务的Completion
成员变量stack是一个存储Completion对象的Treiber Stack结构,Treiber Stack是一种基于CAS机制实现的无锁并发栈。
Completion表示一个具体的执行任务。每个回调任务都会封装成Completion对象,然后放入Treiber Stack中。Completion中的成员变量next保存了栈中的下一个回调任务。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
volatile Completion next;//Treiber stack link
//Performs completion action if triggered, returning a dependent that may need propagation, if one exists.
//@param mode SYNC, ASYNC, or NESTED
abstract CompletableFuture<?> tryFire(int mode);
//Returns true if possibly still triggerable. Used by cleanStack.
abstract boolean isLive();
public final void run() {
tryFire(ASYNC);
}
public final boolean exec() {
tryFire(ASYNC);
return true;
}
public final Void getRawResult() {
return null;
}
public final void setRawResult(Void v) {
}
}
...
}
(3)Completion的几个实现类
一.UniCompletion
当使用如thenRun()、thenApply()等方法处理单个任务的,那么这些任务就会封装成UniCompletion对象。
二.CoCompletion
当使用如thenCombine()、applyToEither()等方法处理两个任务的,那么这些任务会封装成CoCompletion对象。
三.Signaller
当使用如get()、join()等方法处理任务时,那么调用方也会作为任务被封装成Signaller对象。
(4)Completion的栈结构存储回调任务
以如下创建一个CompletableFuture任务为例,说明在CompletableFuture中是如何存储这些回调任务的。注意:该例子在Debug中没有发现baseFuture的成员变量stack的变化。
public class CompletionStackExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建一个CompletableFuture任务对象
CompletableFuture<String> baseFuture = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("开始执行入栈baseFuture的第一个异步任务");
Thread.sleep(5000);
System.out.println("第一个异步任务执行完毕");
} catch (Exception e) {
}
return "BaseFuture";
});
System.out.println("主线程第一次打印");
baseFuture.thenApply(r -> {
System.out.println("开始执行入栈baseFuture的第二个异步任务");
try {
Thread.sleep(5000);
System.out.println("第二个异步任务执行完毕");
} catch (Exception e) {
}
return "Then Apply";
});//输出结果中没有"Then Apply",因为没有任务使用"Then Apply"这个返回值
System.out.println("主线程第二次打印");
baseFuture.thenAccept(r -> {
System.out.println("开始执行入栈baseFuture的第三个异步任务");
try {
Thread.sleep(5000);
System.out.println("第三个异步任务执行完毕: " + r);
} catch (Exception e) {
}
}).thenAccept(Void -> {
System.out.println("baseFuture的第三个异步任务返回的新CompletableFuture,入栈第一个异步任务");
try {
Thread.sleep(5000);
System.out.println("第三个异步任务的子任务执行完毕");
} catch (Exception e) {
}
});
System.out.println("主线程第三次打印");
baseFuture.thenApply(r -> {
System.out.println("开始执行入栈baseFuture的第四个异步任务");
try {
Thread.sleep(5000);
System.out.println("第四个异步任务执行完毕");
} catch (Exception e) {
}
return "Apply Message";
}).thenAccept(r -> {
System.out.println("baseFuture的第四个异步任务返回的新CompletableFuture,入栈第一个异步任务");
try {
Thread.sleep(5000);
System.out.println("第四个异步任务的子任务执行完毕: " + r);
} catch (Exception e) {
}
});
System.out.println("主线程第四次打印");
System.out.println("finish: " + baseFuture.get());
}
//输出的结果如下:
//主线程第一次打印
//开始执行入栈baseFuture的第一个异步任务
//主线程第二次打印
//主线程第三次打印
//主线程第四次打印
//第一个异步任务执行完毕
//开始执行入栈baseFuture的第四个异步任务
//开始执行入栈baseFuture的第三个异步任务
//第四个异步任务执行完毕
//第三个异步任务执行完毕: BaseFuture
//baseFuture的第三个异步任务返回的新CompletableFuture,入栈第一个异步任务
//baseFuture的第四个异步任务返回的新CompletableFuture,入栈第一个异步任务
//第三个异步任务的子任务执行完毕
//第四个异步任务的子任务执行完毕: Apply Message
//开始执行入栈baseFuture的第二个异步任务
//第二个异步任务执行完毕
//finish: BaseFuture
}
一.第一阶段的Completion Stack结构
主线程第一次打印和第二次打印执行完成后,会创建如下图所示的结构。此时Completion类型是UniCompletion,因为thenApply()方法只接收一个任务。
二.第二阶段的Completion Stack结构
主线程第三次打印执行完成后,就会创建如下图所示的结构。
首先使用baseFuture.thenAccept()方法在baseFuture上增加一个回调,此时会把这个回调对应的Completion压入baseFuture的stack的栈顶。
然后会产生一个新的CompletableFuture对象实例继续执行thenAccept(),由于这个新的CompletableFuture对象实例是在栈顶的Completion中产生的,因此在栈顶的Completion中会有一个dep属性指向这个新的对象实例。
在新的CompletableFuture对象中又调用thenAccept()来构建一个回调任务,所以又会有一个新的Completion Stack结构。
三.第三阶段的Completion Stack结构
主线程第四次打印执行完成后,就会创建如下图所示的结构。
首先是在baseFuture上使用thenApply()方法创建一个带有返回值的回调,这个回调对应的Completion同样会压入baseFuture的stack的栈顶。然后同样会创建一个新的CompletableFuture对象实例。接着在这个新的对象实例中继续使用thenAccept()方法添加另外一个回调,这个回调对应的Completion会压入新的CompletableFuture的stack的栈顶。
(5)Completion中的回调任务的执行和总结
从Completion Stack的栈顶中逐个出栈来执行。
如果当前出栈的Completion存在一个子Completion Stack,那么就优先执行这一条链路的Completion任务。
CompletableFuture中的回调任务,是基于Completion来实现的。针对CompletionStage中不同类型的方法,Completion有不同的子类处理。
Completion表示一个具体的回调任务,这些Completion采用了一种Treiber Stack结构来存储。由于每个Completion都可能会产生新的CompletableFuture,所以整个结构看起来像一棵很深的树。
7.CompletableFuture的核心源码分析
(1)CompletableFuture的核心源码
(2)CompletableFuture对象的创建
(3)Completion Stack的构建
(4)Completion任务的执行流程
(5)Completion任务的执行结果获取
(6)总结
(1)CompletableFuture的核心源码
CompletableFuture的源码主要分四部分:
一.CompletableFuture对象的创建
二.Completion Stack的构建
三.get()方法获取任务处理结果时阻塞和唤醒线程
四.当前置任务执行完成后,Completion Stack的执行流程
(2)CompletableFuture对象的创建
假设使用supplyAsync()方法来创建一个CompletableFuture对象。那么在执行supplyAsync()方法时触发调用的asyncSupplyStage()方法中,便会使用线程池来执行一个由AsyncSupply()构造方法构建的任务,这个线程池默认情况下是由ForkJoinPool的commonPool()方法返回的。
当线程池执行由AsyncSupply()构造方法构建的任务时,会调用AsyncSupply的run()方法来执行具体的任务。
在AsyncSupply的run()方法中:首先会使用f.get()来获得Supplier这个函数式接口的执行结果,然后通过执行CompletableFuture的completeValue()方法,把执行结果通过CAS设置到CompletableFuture的成员变量result中。最后调用CompletableFuture的postComplete()方法表示执行完成,该postComplete()方法会执行Completion Stack中的所有回调任务。
//Represents a supplier of results.
//There is no requirement that a new or distinct result be returned each time the supplier is invoked.
//This is a functional interface whose functional method is get().
//@param <T> the type of results supplied by this supplier
@FunctionalInterface
public interface Supplier<T> {
//Gets a result.
//@return a result
T get();
}
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
//Returns a new CompletableFuture that is asynchronously completed by a task
//running in the ForkJoinPool#commonPool() with the value obtained by calling the given Supplier.
//@param supplier a function returning the value to be used to complete the returned CompletableFuture
//@param <U> the function's return type
//@return the new CompletableFuture
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
//使用线程池来执行一个由AsyncSupply()方法构建的任务
e.execute(new AsyncSupply<U>(d, f));
//返回一个新的CompletableFuture对象
return d;
}
static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep;
Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep;
this.fn = fn;
}
public final Void getRawResult() {
return null;
}
public final void setRawResult(Void v) {
}
public final boolean exec() {
run();
return true;
}
public void run() {
CompletableFuture<T> d;
Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//首先使用f.get()来获得Supplier这个函数式接口中的执行结果
//然后通过执行CompletableFuture的completeValue()方法,
//把执行结果设置到CompletableFuture的成员变量result中;
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//最后调用CompletableFuture的postComplete()方法执行Completion Stack中的所有回调任务
d.postComplete();
}
}
}
//Completes with a non-exceptional result, unless already completed.
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t);
}
//Pops and tries to trigger all reachable dependents. Call only when known to be done.
final void postComplete() {
//On each step, variable f holds current dependents to pop and run.
//It is extended along only one path at a time, pushing others to avoid unbounded recursion.
CompletableFuture<?> f = this;
Completion h;
while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d;
Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
private static final sun.misc.Unsafe UNSAFE;
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
static {
try {
final sun.misc.Unsafe u;
UNSAFE = u = sun.misc.Unsafe.getUnsafe();
Class<?> k = CompletableFuture.class;
RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
NEXT = u.objectFieldOffset(Completion.class.getDeclaredField("next"));
} catch (Exception x) {
throw new Error(x);
}
}
...
}
(3)Completion Stack的构建
假设已经使用了CompletableFuture的supplyAsync()方法创建了源任务,接着需要使用CompletionStage的thenApply()等方法来构建回调任务。
源任务 -> Supplier接口的实现类对象(get()方法),回调任务 -> Function接口的实现类对象(apply()方法)。
CompletableFuture的thenApply()方法会触发执行uniApplyStage()方法。在uniApplyStage()方法中,首先会创建一个新的CompletableFuture对象,然后根据CompletableFuture的uniApply()方法判断源任务是否已经完成。如果源任务已经完成,则不需要入栈,直接执行回调任务的apply()方法。如果源任务还没执行完成,才将回调任务封装为UniApply对象并入栈。
源任务还没执行完成的处理过程具体如下:首先把回调任务封装成一个UniApply对象,然后调用CompletableFuture的push()方法,把UniApply对象压入源任务所在CompletableFuture对象中的stack的栈顶,最后调用UniApply的tryFire()方法来尝试执行该回调任务。
注意:UniApply对象其实是一个Completion对象,因为UniApply类继承自UniCompletion类,而UniCompletion类又继承自Completion类。
//Represents a function that accepts one argument and produces a result.
//This is a functional interface whose functional method is apply(Object).
//@param <T> the type of the input to the function
//@param <R> the type of the result of the function
@FunctionalInterface
public interface Function<T, R> {
//Applies this function to the given argument.
//@param t the function argument
//@return the function result
R apply(T t);
...
}
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
//创建一个新的CompletableFuture对象
CompletableFuture<V> d = new CompletableFuture<V>();
//根据CompletableFuture的uniApply()方法判断源任务是否已经完成
//如果源任务已经完成,则不需要入栈
if (e != null || !d.uniApply(this, f, null)) {
//首先把回调任务f封装成一个UniApply对象
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
//然后调用CompletableFuture的push()方法
//把UniApply对象压入源任务所在的CompletableFuture对象中的stack的栈顶
push(c);
//最后调用UniApply的tryFire()方法来尝试执行该回调任务
c.tryFire(SYNC);
}
return d;
}
final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) {
Object r;
Throwable x;
//如果任务还没完成(result == null),直接返回false
if (a == null || (r = a.result) == null || f == null) {
return false;
}
tryComplete: if (result == null) {
//判断result是否为异常类型
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
//如果result是异常类型,则使用completeThrowable()方法处理,并返回true
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
//如果result不为空,任务已经执行完成,并且没有出现异常
try {
if (c != null && !c.claim()) {
return false;
}
//把源任务的执行结果s作为参数传给回调任务f
//直接执行回调任务的apply()方法,并将结果设置到CompletableFuture对象的成员变量result中
@SuppressWarnings("unchecked") S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
//Pushes the given completion (if it exists) unless done.
final void push(UniCompletion<?,?> c) {
if (c != null) {
while (result == null && !tryPushStack(c)) {
lazySetNext(c, null); // clear on failure
}
}
}
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
static void lazySetNext(Completion c, Completion next) {
UNSAFE.putOrderedObject(c, NEXT, next);
}
...
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn) {
super(executor, dep, src);
this.fn = fn;
}
//尝试执行当前CompletableFuture中的Completion
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d;
CompletableFuture<T> a;
//执行CompletableFuture的uniApply()方法尝试执行回调任务
if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) {
return null;
}
dep = null;
src = null;
fn = null;
//执行CompletableFuture的postFire()方法
return d.postFire(a, mode);
}
}
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
if (mode < 0 || a.result == null) {
a.cleanStack();
} else {
a.postComplete();
}
}
if (result != null && stack != null) {
if (mode < 0) {
return this;
} else {
postComplete();
}
}
return null;
}
abstract static class UniCompletion<T,V> extends Completion {
Executor executor;//执行当前任务的线程池
CompletableFuture<V> dep;//构建当前任务的CompletableFuture对象
CompletableFuture<T> src;//指向源任务
UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src) {
this.executor = executor;
this.dep = dep;
this.src = src;
}
//判断是否使用单独的线程池来执行任务
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null) {
return true;
}
executor = null; // disable
e.execute(this);
}
return false;
}
//判断任务是否存活
final boolean isLive() {
return dep != null;
}
}
}
(4)Completion任务的执行流程
一.CompletableFuture的postComplete()方法
CompletableFuture中的任务完成后即源任务完成后,会通过CompletableFuture.postComplete()方法来完成后置逻辑,也就是把当前CompletableFuture.stack中存储的Completion逐项出栈执行。
postComplete()方法会触发stack中所有可执行的回调任务Completion,该方法会遍历整个stack,并通过Completion任务的tryFire()方法来尝试执行。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result;
volatile Completion stack;
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
//使用线程池来执行一个由AsyncSupply()方法构建的任务
e.execute(new AsyncSupply<U>(d, f));
//返回一个新的CompletableFuture对象
return d;
}
static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep;
Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep;
this.fn = fn;
}
...
public void run() {
CompletableFuture<T> d;
Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//首先使用f.get()来获得Supplier这个函数式接口中的执行结果
//然后通过执行CompletableFuture的completeValue()方法,
//把执行结果设置到CompletableFuture的成员变量result中;
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//最后调用CompletableFuture的postComplete()方法执行Completion Stack中的所有回调任务
d.postComplete();
}
}
}
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t);
}
final void postComplete() {
CompletableFuture<?> f = this;
Completion h;
//如果stack不为空,则不断循环从stack中出栈Completion任务
while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d;
Completion t;
//通过CAS逐个取出stack中的Completion任务并重置stack
if (f.casStack(h, t = h.next)) {
if (t != null) {
//表示h.tryFire()返回了另外一个CompleableFuture对象
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
//执行指定Completion的tryFire()方法,比如UniApply.tryFire()方法
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
...
}
二.Completion任务的执行流程图
(5)Completion任务的执行结果获取
可以通过get()或join()方法获取CompletableFuture的执行结果。当任务还没执行结束时(r == null),则调用waitingGet()方法进行阻塞等待。主要会先自旋256次判断执行是否结束,如果不是才挂起线程进行阻塞,从而避免直接挂起线程带来的性能开销。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
//Waits if necessary for this future to complete, and then returns its result.
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
//Returns raw result after waiting, or null if interruptible and interrupted.
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0) {
spins = (Runtime.getRuntime().availableProcessors() > 1) ? 1 << 8 : 0;
} else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0) {
--spins;
}
} else if (q == null) {
q = new Signaller(interruptible, 0L, 0L);
} else if (!queued) {
queued = tryPushStack(q);
} else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
} else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible) {
r = null; // report interruption
} else {
Thread.currentThread().interrupt();
}
}
}
postComplete();
return r;
}
...
}
(6)总结
CompletableFuture的核心在于CompletionStage,CompletionStage提供了最基础的异步回调机制。也就是主线程不需要通过阻塞方式来等待异步任务的执行结果,而是当异步任务执行完成后主动通知来触发执行下一个任务。此外,CompletionStage全部采用了函数式接口的方式来实现,可以通过链式的方式来对多个CompletionStage进行组合。