在现代应用开发中,异步编程和并发任务处理变得越来越重要。Java 8 引入的 CompletableFuture
为我们提供了一个强大且灵活的工具来简化这一过程。在本文中,我们将结合各种实战场景,详细介绍 CompletableFuture
的核心API及其应用。
CompletableFuture 核心API及实战应用
1. 异步计算
场景:从远程服务器获取数据
在某个应用中,我们需要从远程服务器异步获取用户数据并处理。
runAsync(Runnable runnable): 在一个独立的线程中执行一个没有返回值的任务。
CompletableFuture.runAsync(() -> { System.out.println("开始从服务器获取数据..."); // 模拟网络延迟 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("数据获取完成"); });
supplyAsync(Supplier supplier): 在一个独立的线程中执行一个有返回值的任务。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("开始从服务器获取用户数据..."); // 模拟网络延迟和数据获取 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "用户数据"; }); future.thenAccept(data -> { System.out.println("处理数据:" + data); });
2. 任务组合
场景:获取用户数据并发送欢迎邮件
在获取用户数据后,我们需要处理数据并发送一封欢迎邮件。
thenApply(Function<T, U> fn): 执行一个函数,并返回一个新的 CompletableFuture。
CompletableFuture.supplyAsync(() -> { System.out.println("获取用户数据..."); return "用户数据"; }).thenApply(data -> { System.out.println("处理用户数据..."); return "处理后的" + data; }).thenAccept(result -> { System.out.println("发送欢迎邮件:" + result); });
3. 任务组合(进阶)
场景:从多个数据源并行获取数据并合并
我们需要从两个不同的数据源获取数据,然后合并处理这些数据。
allOf(CompletableFuture<?>… cfs): 当所有任务完成时,返回一个新的 CompletableFuture。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { // 模拟数据源1获取数据 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "数据源1数据"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { // 模拟数据源2获取数据 try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } return "数据源2数据"; }); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2); combinedFuture.thenRun(() -> { try { String result1 = future1.get(); String result2 = future2.get(); System.out.println("合并数据:" + result1 + " & " + result2); } catch (Exception e) { e.printStackTrace(); } });
4. 异常处理
场景:处理远程数据获取中的异常
在从远程服务器获取数据时,可能会出现网络异常,我们需要进行处理。
exceptionally(Function<Throwable, ? extends T> fn): 当计算结果发生异常时,执行一个函数来恢复计算。
CompletableFuture.supplyAsync(() -> { if (true) { // 模拟异常情况 throw new RuntimeException("网络异常"); } return "用户数据"; }).exceptionally(ex -> { System.out.println("处理异常:" + ex.getMessage()); return "默认用户数据"; }).thenAccept(data -> { System.out.println("结果数据:" + data); });
5. 手动完成
场景:在条件满足时手动完成任务
某些情况下,我们需要在特定条件满足时手动完成任务。
complete(T value): 手动设置结果并完成任务。
CompletableFuture<String> future = new CompletableFuture<>(); // 在其他线程或条件下手动完成 new Thread(() -> { try { Thread.sleep(2000); // 模拟延迟 future.complete("手动完成数据"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); future.thenAccept(data -> { System.out.println("结果数据:" + data); });
6. 延时任务
场景:设置任务的超时时间
在某些情况下,我们需要设置任务的超时时间,如果任务超时则返回默认值。
- orTimeout(long timeout, TimeUnit unit): 设置任务的超时时间。
// 创建一个在3秒后执行的任务
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
CompletableFuture<String> delayedFuture = CompletableFuture.supplyAsync(() -> {
return "延时任务";
}, executor.schedule(() -> {}, 3, TimeUnit.SECONDS));
delayedFuture.thenAccept(data -> {
System.out.println("结果数据:" + data);
});
// 设置任务超时时间
CompletableFuture<String> futureWithTimeout = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000); // 模拟长时间任务
} catch (InterruptedException e) {
e.printStackTrace();
}
return "长时间任务";
}).orTimeout(3, TimeUnit.SECONDS).exceptionally(ex -> {
System.out.println("任务超时:" + ex.getMessage());
return "默认数据";
});
futureWithTimeout.thenAccept(data -> {
System.out.println("结果数据:" + data);
});
7. 常见的静态方法
场景:创建已经完成或失败的任务
有时候我们需要创建一个已经完成或失败的任务,用于测试或特殊处理。
completedFuture(T value): 返回一个已经完成的 CompletableFuture。
CompletableFuture<String> future = CompletableFuture.completedFuture("已完成数据"); future.thenAccept(System.out::println);
failedFuture(Throwable ex): 返回一个已经失败的 CompletableFuture。
CompletableFuture<String> future = CompletableFuture.failedFuture(new RuntimeException("模拟异常")); future.exceptionally(ex -> { System.out.println("处理失败:" + ex.getMessage()); return "默认数据"; }).thenAccept(System.out::println);
重点案例:大任务中执行小任务,避免假死问题
在实际开发中,我们经常需要将一个大任务拆分为多个小任务并行执行。但是,如果处理不当,可能会导致线程饥饿或假死现象,尤其是在大任务等待小任务完成的场景中。
假设我们有一个大任务,它需要拆分为多个小任务并行执行,然后等待所有小任务完成后再进行汇总处理。如果这些小任务都在同一个线程池中执行,而线程池大小不够,则可能导致死锁或假死问题。
问题描述:
- 大任务启动,并提交多个小任务到线程池执行。
- 线程池中所有线程都被占用,小任务等待其他小任务的结果。
- 由于线程池已满,小任务无法调度,导致整个系统陷入死锁。
解决方案:
- 使用足够大的线程池。
- 采用分层线程池或直接创建新的线程来避免竞争。
- 使用
CompletableFuture
提供的灵活组合方式,避免嵌套等待。
以下是一个示例,展示如何使用 CompletableFuture
来避免这种问题:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class CompletableFutureDeadlockExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 模拟大任务
CompletableFuture<Void> bigTask = CompletableFuture.supplyAsync(() -> {
System.out.println("大任务开始...");
// 提交小任务
CompletableFuture<Void> allSmallTasks = CompletableFuture.allOf(
IntStream.range(0, 4)
.mapToObj(i -> CompletableFuture.runAsync(() -> {
System.out.println("小任务 " + i + " 开始...");
// 模拟小任务工作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("小任务 " + i + " 完成...");
}, executorService))
.toArray(CompletableFuture[]::new)
);
allSmallTasks.join(); // 等待所有小任务完成
return null;
}, executorService);
// 大任务完成后的处理
bigTask.thenRun(() -> {
System.out.println("大任务完成...");
});
// 关闭线程池
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
解决方案详细解析:
使用足够大的线程池:
- 确保线程池的大小能够容纳所有的小任务和大任务。
- 在示例中,我们使用了固定大小为 4 的线程池,可以根据实际情况调整大小。
分层线程池或新的线程:
- 可以使用不同的线程池来处理不同的任务层级,避免互相竞争。
- 也可以直接创建新的线程来执行小任务,避免占用现有线程池的资源。
灵活的组合方式:
- 使用
CompletableFuture.allOf
方法来组合所有的小任务,确保大任务等待所有小任务完成。 - 避免在同一个线程池中嵌套等待。
- 使用
通过以上API和实战示例,可以清晰地看到 CompletableFuture
的主要用途和核心API的应用。在实际开发中,灵活运用这些API,可以编写出高效、清晰和易维护的异步代码。希望本文的实战场景能帮助你更好地理解和应用 CompletableFuture
。