【响应式编程】Reactor 常用操作符与使用指南

发布于:2025-04-14 ⋅ 阅读:(24) ⋅ 点赞:(0)

Reactor 是一个用于构建反应式应用程序的 Java 库,提供了丰富的操作符(算子)来处理反应式流(FluxMono)。本文详细介绍了 Reactor 中常用的创建、转换、过滤、组合和错误处理操作符,以及一些高级用法示例。


一、创建操作符

1. just —— 创建包含指定元素的流

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Mono<String> mono = Mono.just("Hello");

2. fromIterable —— 从集合创建 Flux

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Flux<Integer> flux = Flux.fromIterable(list);

3. empty —— 创建空的 Flux 或 Mono

Flux<Integer> emptyFlux = Flux.empty();
Mono<String> emptyMono = Mono.empty();

4. fromArray —— 从数组创建 Flux

Integer[] numbers = {1, 2, 3, 4, 5};
Flux<Integer> flux = Flux.fromArray(numbers);

5. fromStream —— 从 Java 8 Stream 创建 Flux

Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5).stream();
Flux<Integer> flux = Flux.fromStream(stream);

6. create —— 使用 FluxSink 手动发射元素

Flux<Integer> flux = Flux.create(sink -> {
    for (int i = 0; i < 5; i++) {
        sink.next(i);
    }
    sink.complete();
});

7. generate —— 使用状态生成元素,适用于同步场景

Flux<Integer> flux = Flux.generate(() -> 0, (state, sink) -> {
    sink.next(state);
    if (state == 4) sink.complete();
    return state + 1;
});

8. fromFuture —— 从 CompletableFuture 创建 Mono

CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
Mono<String> mono = Mono.fromFuture(future);

9. interval —— 创建周期性发射元素的 Flux

Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));

10. timer —— 创建延迟发射的 Mono

Mono<Long> timerMono = Mono.timer(Duration.ofSeconds(2));

 

二、转换操作符

1. map —— 映射每个元素为新值

Flux<Integer> squared = Flux.just(1, 2, 3).map(n -> n * n);

2. flatMap —— 扁平化异步流,将每个元素映射为异步 Publisher

Flux<Integer> result = Flux.just(1, 2, 3).flatMap(n -> Mono.just(n * 2));

3. concatMap —— 顺序执行映射为 Publisher 的异步流

Flux<Integer> result = Flux.just(1, 2, 3).concatMap(n -> Mono.just(n * 2));

 

三、过滤操作符

1. filter —— 按条件过滤元素

Flux<Integer> evens = Flux.just(1, 2, 3, 4).filter(n -> n % 2 == 0);

2. take —— 获取前 N 个元素

Flux<Integer> firstThree = Flux.just(1, 2, 3, 4, 5).take(3);

3. skip —— 跳过前 N 个元素

Flux<Integer> skipped = Flux.just(1, 2, 3, 4, 5).skip(2);

 

四、组合操作符

1. concat —— 按顺序合并多个 Flux

Flux<Integer> combined = Flux.concat(Flux.just(1, 2), Flux.just(3, 4));

2. merge —— 并发合并多个 Flux(无序)

Flux<Integer> merged = Flux.merge(Flux.just(1, 2), Flux.just(3, 4));

3. zip —— 按索引组合多个 Flux 的元素

Flux<String> zipped = Flux.zip(Flux.just(1, 2), Flux.just(3, 4), (a, b) -> a + ":" + b);

 

五、错误处理操作符

1. onErrorReturn —— 出错时返回默认值

Flux<Integer> result = Flux.just(1, 2, 3)
    .map(n -> {
        if (n == 2) throw new RuntimeException("error");
        return n;
    })
    .onErrorReturn(-1);

2. onErrorResume —— 出错时切换备用流

Flux<Integer> result = Flux.just(1, 2, 3)
    .map(n -> {
        if (n == 2) throw new RuntimeException("error");
        return n;
    })
    .onErrorResume(e -> Mono.just(-1));

3. retry —— 出错时重试指定次数

Flux<Integer> result = Flux.just(1, 2, 3)
    .map(n -> {
        if (n == 2) throw new RuntimeException("error");
        return n;
    })
    .retry(2);

 

六、延迟执行与懒加载:Mono.deferFlux.defer:被订阅时才执行

Mono.defer —— 懒加载 Mono,直到subscribe时才创建执行

Mono<String> deferredMono = Mono.defer(() -> {
    System.out.println("Generating value...");
    return Mono.just("Deferred Result");
});

只有当 subscribe() 被调用时,Mono.defer 中的逻辑才会真正执行。这对于需要确保执行时机晚于前一步完成场景特别重要,比如:

Mono.defer(() -> readQaResultType())
    .subscribe(result -> System.out.println("QA Result: " + result));

在这段代码中,读取 qaResultType 的操作只会在前面的步骤(例如数据预处理)完全完成后才被触发

Flux.defer —— 懒加载 Flux,每次订阅时重新执行逻辑

Flux<Integer> deferredFlux = Flux.defer(() -> {
    System.out.println("Evaluating source...");
    return Flux.just(1, 2, 3);
});

每次订阅都会重新生成数据,适用于带有状态的源或依赖最新上下文的处理逻辑。



网站公告

今日签到

点亮在社区的每一天
去签到