深入理解 CompletableFuture —— Java 异步编程的工具

发布于:2024-12-19 ⋅ 阅读:(15) ⋅ 点赞:(0)

前言

在 Java 编程中,异步编程越来越成为一种常见的需求,特别是在高并发或IO密集型的场景下。CompletableFuture 是 Java 8 引入的一个强大工具,它极大地简化了异步编程的开发过程。本文将详细介绍 CompletableFuture 的作用、适用场景、常见方法及其使用案例。

一、什么是CompletableFuture?

CompletableFuture是Java8引入的一个类,位于 java.util.concurrent 包下。它是一个可以表示异步计算结果的对象。CompletableFuture 不仅可以用于异步编程,还可以进行组合、链式调用等复杂操作。相比传统的线程池或回调方式,它提供了一种更加灵活和可读性更强的方式来处理异步操作。

CompletableFuture 可以:

  1. 执行异步任务:可以通过它启动异步计算,并在计算完成后得到结果。
  2. 组合多个异步任务:通过链式调用,你可以将多个异步任务串联起来。
  3. 同步等待结果:在需要等待异步任务结果的地方,你可以通过 join 或 get 方法来同步等待计算结果。

二、CompletableFuture 的作用和适用场景

CompletableFuture 主要解决的问题是异步计算的结果。它可以帮助我们更轻松地处理以下场景:

  1. IO密集型任务:例如 Web 服务调用、数据库查询、文件操作等。
  2. 高并发处理:需要处理大量并发请求时,CompletableFuture 可以避免阻塞线程,提高系统的响应性和吞吐量。
  3. 异步计算的组合:当多个异步任务需要组合完成时,可以通过 CompletableFuture 轻松实现任务的串行、并行执行。

三、CompletableFuture 的常见方法及其用法

CompletableFuture 提供了丰富的方法来处理异步操作。以下是一些常见方法的介绍和示例:

1. supplyAsync

CompletableFuture.supplyAsync() 方法用于提交一个返回值的异步任务。它接收一个 Supplier 函数作为参数,并在后台线程中执行该任务。

用法

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 1 + 1;
});

在上述代码中,supplyAsync 提交了一个异步任务,它返回 1 + 1 的计算结果。

2. runAsync

CompletableFuture.runAsync()方法用于提交一个没有返回值的异步任务。它接收一个 Runnable函数作为参数,在后台线程中执行。

用法

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Executing task in a separate thread.");
});


//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务  
public static CompletableFuture<Void> runAsync(Runnable runnable)   
//自定义线程,根据runnable构建执行任务  
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)  

该方法用于执行没有返回值的异步操作,适用于任务执行但无需计算结果的场景。

3. thenApply

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

用法

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 2)
        .thenApply(result -> result * 3);

这里的 thenApply 方法会在第一个异步任务完成后执行,把结果乘以 3,并返回新的计算结果。可以链式调用多个 thenApply 方法。

4. thenAccept/thenAcceptAsync

第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参 ,传递到回调方法中,但是回调方法是没有返回值的。

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> 2)
        .thenAccept(result -> System.out.println("Processed result: " + result));

5. thenCombine

会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值。

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);

CompletableFuture<Integer> combined = future1.thenCombine(future2, (result1, result2) -> result1 + result2);

6. allOf 和 anyOf

allOf 和 anyOf 用于处理多个异步任务的组合,分别表示所有任务完成或任意任务完成。

- allOf:等待所有任务都完成。

- anyOf:只要有一个任务完成。

public void testCompletableAallOf() throws ExecutionException, InterruptedException {  
    //创建线程池  
    ExecutorService executorService = Executors.newFixedThreadPool(10);  
    //开启异步任务1  
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 1;  
        System.out.println("异步任务1结束");  
        return result;  
    }, executorService);  

    //开启异步任务2  
    CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 2;  
        try {  
            Thread.sleep(3000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        System.out.println("异步任务2结束");  
        return result;  
    }, executorService);  

    //开启异步任务3  
    CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 3;  
        try {  
            Thread.sleep(4000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        System.out.println("异步任务3结束");  
        return result;  
    }, executorService);  

    //任务组合  
    CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3);  

    //等待所有任务完成  
    allOf.get();  
    //获取任务的返回结果  
    System.out.println("task结果为:" + task.get());  
    System.out.println("task2结果为:" + task2.get());  
    System.out.println("task3结果为:" + task3.get());  
}

四、 CompletableFuture 使用案例

public class EcommerceApp {

    public static void main(String[] args) {
        // 异步任务:计算商品价格
        CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> {
            return 100.0;  // 商品基础价格
        });

        // 异步任务:计算运费
        CompletableFuture<Double> shippingFuture = CompletableFuture.supplyAsync(() -> {
            return 20.0;  // 运费
        });

        // 异步任务:应用折扣
        CompletableFuture<Double> finalPriceFuture = priceFuture.thenCombine(shippingFuture, (price, shipping) -> {
            return price + shipping - 10.0;  // 最终价格=基础价+运费-折扣
        });

        // 输出最终价格
        finalPriceFuture.thenAccept(finalPrice -> {
            System.out.println("Final Price: " + finalPrice);
        });

        // 等待所有任务完成
        finalPriceFuture.join();
    }
}

五、CompletableFuture使用有哪些注意点

1、Future需要获取返回值,才能获取异常信息

public void testWhenCompleteExceptionally() {  
    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {  
        if (1 == 1) {  
            throw new RuntimeException("出错了");  
        }  
        return 0.11;  
    });  

    //如果不加 get()方法这一行,看不到异常信息  
    //future.get();  
}  

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。可考虑添加try...catch...或者exceptionally方法

2、CompletableFuture的get()方法是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

//反例  
 CompletableFuture.get();  
//正例  
CompletableFuture.get(5, TimeUnit.SECONDS);  

3、不建议使用默认线程池

一般建议使用自定义线程池,优化线程池配置参数。