Java CompletableFuture 实战应用指南

发布于:2024-06-05 ⋅ 阅读:(75) ⋅ 点赞:(0)

在现代应用开发中,异步编程和并发任务处理变得越来越重要。Java 8 引入的 CompletableFuture 为我们提供了一个强大且灵活的工具来简化这一过程。在本文中,我们将结合各种实战场景,详细介绍 CompletableFuture 的核心API及其应用。

CompletableFuture 核心API及实战应用

1. 异步计算

场景:从远程服务器获取数据

在某个应用中,我们需要从远程服务器异步获取用户数据并处理。

  • runAsync(Runnable runnable): 在一个独立的线程中执行一个没有返回值的任务。

    CompletableFuture.runAsync(() -> {
        System.out.println("开始从服务器获取数据...");
        // 模拟网络延迟
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("数据获取完成");
    });
    
  • supplyAsync(Supplier supplier): 在一个独立的线程中执行一个有返回值的任务。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("开始从服务器获取用户数据...");
        // 模拟网络延迟和数据获取
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "用户数据";
    });
    
    future.thenAccept(data -> {
        System.out.println("处理数据:" + data);
    });
    

2. 任务组合

场景:获取用户数据并发送欢迎邮件

在获取用户数据后,我们需要处理数据并发送一封欢迎邮件。

  • thenApply(Function<T, U> fn): 执行一个函数,并返回一个新的 CompletableFuture。

    CompletableFuture.supplyAsync(() -> {
        System.out.println("获取用户数据...");
        return "用户数据";
    }).thenApply(data -> {
        System.out.println("处理用户数据...");
        return "处理后的" + data;
    }).thenAccept(result -> {
        System.out.println("发送欢迎邮件:" + result);
    });
    

3. 任务组合(进阶)

场景:从多个数据源并行获取数据并合并

我们需要从两个不同的数据源获取数据,然后合并处理这些数据。

  • allOf(CompletableFuture<?>… cfs): 当所有任务完成时,返回一个新的 CompletableFuture。

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        // 模拟数据源1获取数据
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "数据源1数据";
    });
    
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        // 模拟数据源2获取数据
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "数据源2数据";
    });
    
    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
    
    combinedFuture.thenRun(() -> {
        try {
            String result1 = future1.get();
            String result2 = future2.get();
            System.out.println("合并数据:" + result1 + " & " + result2);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    

4. 异常处理

场景:处理远程数据获取中的异常

在从远程服务器获取数据时,可能会出现网络异常,我们需要进行处理。

  • exceptionally(Function<Throwable, ? extends T> fn): 当计算结果发生异常时,执行一个函数来恢复计算。

    CompletableFuture.supplyAsync(() -> {
        if (true) { // 模拟异常情况
            throw new RuntimeException("网络异常");
        }
        return "用户数据";
    }).exceptionally(ex -> {
        System.out.println("处理异常:" + ex.getMessage());
        return "默认用户数据";
    }).thenAccept(data -> {
        System.out.println("结果数据:" + data);
    });
    

5. 手动完成

场景:在条件满足时手动完成任务

某些情况下,我们需要在特定条件满足时手动完成任务。

  • complete(T value): 手动设置结果并完成任务。

    CompletableFuture<String> future = new CompletableFuture<>();
    
    // 在其他线程或条件下手动完成
    new Thread(() -> {
        try {
            Thread.sleep(2000); // 模拟延迟
            future.complete("手动完成数据");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    
    future.thenAccept(data -> {
        System.out.println("结果数据:" + data);
    });
    

6. 延时任务

场景:设置任务的超时时间

在某些情况下,我们需要设置任务的超时时间,如果任务超时则返回默认值。

  • orTimeout(long timeout, TimeUnit unit): 设置任务的超时时间。
// 创建一个在3秒后执行的任务
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
CompletableFuture<String> delayedFuture = CompletableFuture.supplyAsync(() -> {
    return "延时任务";
}, executor.schedule(() -> {}, 3, TimeUnit.SECONDS));

delayedFuture.thenAccept(data -> {
    System.out.println("结果数据:" + data);
});

// 设置任务超时时间
CompletableFuture<String> futureWithTimeout = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(5000); // 模拟长时间任务
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "长时间任务";
}).orTimeout(3, TimeUnit.SECONDS).exceptionally(ex -> {
    System.out.println("任务超时:" + ex.getMessage());
    return "默认数据";
});

futureWithTimeout.thenAccept(data -> {
    System.out.println("结果数据:" + data);
});

7. 常见的静态方法

场景:创建已经完成或失败的任务

有时候我们需要创建一个已经完成或失败的任务,用于测试或特殊处理。

  • completedFuture(T value): 返回一个已经完成的 CompletableFuture。

    CompletableFuture<String> future = CompletableFuture.completedFuture("已完成数据");
    future.thenAccept(System.out::println);
    
  • failedFuture(Throwable ex): 返回一个已经失败的 CompletableFuture。

    CompletableFuture<String> future = CompletableFuture.failedFuture(new RuntimeException("模拟异常"));
    future.exceptionally(ex -> {
        System.out.println("处理失败:" + ex.getMessage());
        return "默认数据";
    }).thenAccept(System.out::println);
    

重点案例:大任务中执行小任务,避免假死问题

在实际开发中,我们经常需要将一个大任务拆分为多个小任务并行执行。但是,如果处理不当,可能会导致线程饥饿或假死现象,尤其是在大任务等待小任务完成的场景中。

假设我们有一个大任务,它需要拆分为多个小任务并行执行,然后等待所有小任务完成后再进行汇总处理。如果这些小任务都在同一个线程池中执行,而线程池大小不够,则可能导致死锁或假死问题。

问题描述:

  1. 大任务启动,并提交多个小任务到线程池执行。
  2. 线程池中所有线程都被占用,小任务等待其他小任务的结果。
  3. 由于线程池已满,小任务无法调度,导致整个系统陷入死锁。

解决方案:

  • 使用足够大的线程池。
  • 采用分层线程池或直接创建新的线程来避免竞争。
  • 使用 CompletableFuture 提供的灵活组合方式,避免嵌套等待。

以下是一个示例,展示如何使用 CompletableFuture 来避免这种问题:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class CompletableFutureDeadlockExample {

    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        // 模拟大任务
        CompletableFuture<Void> bigTask = CompletableFuture.supplyAsync(() -> {
            System.out.println("大任务开始...");
            // 提交小任务
            CompletableFuture<Void> allSmallTasks = CompletableFuture.allOf(
                IntStream.range(0, 4)
                    .mapToObj(i -> CompletableFuture.runAsync(() -> {
                        System.out.println("小任务 " + i + " 开始...");
                        // 模拟小任务工作
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("小任务 " + i + " 完成...");
                    }, executorService))
                    .toArray(CompletableFuture[]::new)
            );
            allSmallTasks.join();  // 等待所有小任务完成
            return null;
        }, executorService);

        // 大任务完成后的处理
        bigTask.thenRun(() -> {
            System.out.println("大任务完成...");
        });

        // 关闭线程池
        executorService.shutdown();
        try {
            executorService.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

解决方案详细解析:

  1. 使用足够大的线程池:

    • 确保线程池的大小能够容纳所有的小任务和大任务。
    • 在示例中,我们使用了固定大小为 4 的线程池,可以根据实际情况调整大小。
  2. 分层线程池或新的线程:

    • 可以使用不同的线程池来处理不同的任务层级,避免互相竞争。
    • 也可以直接创建新的线程来执行小任务,避免占用现有线程池的资源。
  3. 灵活的组合方式:

    • 使用 CompletableFuture.allOf 方法来组合所有的小任务,确保大任务等待所有小任务完成。
    • 避免在同一个线程池中嵌套等待。

通过以上API和实战示例,可以清晰地看到 CompletableFuture 的主要用途和核心API的应用。在实际开发中,灵活运用这些API,可以编写出高效、清晰和易维护的异步代码。希望本文的实战场景能帮助你更好地理解和应用 CompletableFuture


网站公告

今日签到

点亮在社区的每一天
去签到