文章目录
-
- 一、创建操作符
-
- 1. `just` —— 创建包含指定元素的流
- 2. `fromIterable` —— 从集合创建 Flux
- 3. `empty` —— 创建空的 Flux 或 Mono
- 4. `fromArray` —— 从数组创建 Flux
- 5. `fromStream` —— 从 Java 8 Stream 创建 Flux
- 6. `create` —— 使用 FluxSink 手动发射元素
- 7. `generate` —— 使用状态生成元素,适用于同步场景
- 8. `fromFuture` —— 从 CompletableFuture 创建 Mono
- 9. `interval` —— 创建周期性发射元素的 Flux
- 10. `timer` —— 创建延迟发射的 Mono
- 二、转换操作符
- 三、过滤操作符
- 四、组合操作符
- 五、错误处理操作符
- 六、延迟执行与懒加载:`Mono.defer` 和 `Flux.defer`:被订阅时才执行
Reactor 是一个用于构建反应式应用程序的 Java 库,提供了丰富的操作符(算子)来处理反应式流(Flux
和 Mono
)。本文详细介绍了 Reactor 中常用的创建、转换、过滤、组合和错误处理操作符,以及一些高级用法示例。
一、创建操作符
1. just
—— 创建包含指定元素的流
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Mono<String> mono = Mono.just("Hello");
2. fromIterable
—— 从集合创建 Flux
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Flux<Integer> flux = Flux.fromIterable(list);
3. empty
—— 创建空的 Flux 或 Mono
Flux<Integer> emptyFlux = Flux.empty();
Mono<String> emptyMono = Mono.empty();
4. fromArray
—— 从数组创建 Flux
Integer[] numbers = {1, 2, 3, 4, 5};
Flux<Integer> flux = Flux.fromArray(numbers);
5. fromStream
—— 从 Java 8 Stream 创建 Flux
Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5).stream();
Flux<Integer> flux = Flux.fromStream(stream);
6. create
—— 使用 FluxSink 手动发射元素
Flux<Integer> flux = Flux.create(sink -> {
for (int i = 0; i < 5; i++) {
sink.next(i);
}
sink.complete();
});
7. generate
—— 使用状态生成元素,适用于同步场景
Flux<Integer> flux = Flux.generate(() -> 0, (state, sink) -> {
sink.next(state);
if (state == 4) sink.complete();
return state + 1;
});
8. fromFuture
—— 从 CompletableFuture 创建 Mono
CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
Mono<String> mono = Mono.fromFuture(future);
9. interval
—— 创建周期性发射元素的 Flux
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));
10. timer
—— 创建延迟发射的 Mono
Mono<Long> timerMono = Mono.timer(Duration.ofSeconds(2));
二、转换操作符
1. map
—— 映射每个元素为新值
Flux<Integer> squared = Flux.just(1, 2, 3).map(n -> n * n);
2. flatMap
—— 扁平化异步流,将每个元素映射为异步 Publisher
Flux<Integer> result = Flux.just(1, 2, 3).flatMap(n -> Mono.just(n * 2));
3. concatMap
—— 顺序执行映射为 Publisher 的异步流
Flux<Integer> result = Flux.just(1, 2, 3).concatMap(n -> Mono.just(n * 2));
三、过滤操作符
1. filter
—— 按条件过滤元素
Flux<Integer> evens = Flux.just(1, 2, 3, 4).filter(n -> n % 2 == 0);
2. take
—— 获取前 N 个元素
Flux<Integer> firstThree = Flux.just(1, 2, 3, 4, 5).take(3);
3. skip
—— 跳过前 N 个元素
Flux<Integer> skipped = Flux.just(1, 2, 3, 4, 5).skip(2);
四、组合操作符
1. concat
—— 按顺序合并多个 Flux
Flux<Integer> combined = Flux.concat(Flux.just(1, 2), Flux.just(3, 4));
2. merge
—— 并发合并多个 Flux(无序)
Flux<Integer> merged = Flux.merge(Flux.just(1, 2), Flux.just(3, 4));
3. zip
—— 按索引组合多个 Flux 的元素
Flux<String> zipped = Flux.zip(Flux.just(1, 2), Flux.just(3, 4), (a, b) -> a + ":" + b);
五、错误处理操作符
1. onErrorReturn
—— 出错时返回默认值
Flux<Integer> result = Flux.just(1, 2, 3)
.map(n -> {
if (n == 2) throw new RuntimeException("error");
return n;
})
.onErrorReturn(-1);
2. onErrorResume
—— 出错时切换备用流
Flux<Integer> result = Flux.just(1, 2, 3)
.map(n -> {
if (n == 2) throw new RuntimeException("error");
return n;
})
.onErrorResume(e -> Mono.just(-1));
3. retry
—— 出错时重试指定次数
Flux<Integer> result = Flux.just(1, 2, 3)
.map(n -> {
if (n == 2) throw new RuntimeException("error");
return n;
})
.retry(2);
六、延迟执行与懒加载:Mono.defer
和 Flux.defer
:被订阅时才执行
Mono.defer
—— 懒加载 Mono,直到subscribe时才创建执行
Mono<String> deferredMono = Mono.defer(() -> {
System.out.println("Generating value...");
return Mono.just("Deferred Result");
});
只有当 subscribe()
被调用时,Mono.defer
中的逻辑才会真正执行。这对于需要确保执行时机晚于前一步完成场景特别重要,比如:
Mono.defer(() -> readQaResultType())
.subscribe(result -> System.out.println("QA Result: " + result));
在这段代码中,读取 qaResultType
的操作只会在前面的步骤(例如数据预处理)完全完成后才被触发。
Flux.defer
—— 懒加载 Flux,每次订阅时重新执行逻辑
Flux<Integer> deferredFlux = Flux.defer(() -> {
System.out.println("Evaluating source...");
return Flux.just(1, 2, 3);
});
每次订阅都会重新生成数据,适用于带有状态的源或依赖最新上下文的处理逻辑。