响应式编程框架Reactor【1】

发布于:2025-08-30 ⋅ 阅读:(13) ⋅ 点赞:(0)

一、Reactor 框架概述与理论基础

官方文档:

Project Reactor官网

Getting Started :: Reactor Core Reference Guide

https://www.reactive-streams.org/

https://www.reactive-streams.org/
https://projectreactor.io/
https://projectreactor.io/docs/core/release/reference/gettingStarted.html

1.1 响应式编程(Reactive Programming)是什么?

响应式编程是一种面向数据流和变化传播的编程范式。它允许你声明式地定义数据流的转换、组合和处理逻辑,系统自动处理异步、背压、错误传播等复杂问题。

[!tip]

✅ 核心思想:数据流是第一公民,一切皆流(Everything is a Stream)。

1.2 Reactive Streams 规范

Reactor 实现了 Reactive Streams 规范,该规范定义了四个核心接口:

  • Publisher<T>:发布者
  • Subscriber<T>:订阅者
  • Subscription:订阅关系(支持背压)
  • Processor<T,R>:处理器

有兴趣参照网址查看: reactive-streams.org

[!note]

🔗 Reactor 是 Project Reactor 的简称,由 Pivotal(现 VMware)开发,是 Spring WebFlux 的底层引擎。

1.3 响应式编程与 Reactor 的诞生

响应式编程(Reactive Programming) 是一种面向数据流和变化传播的编程范式,其核心思想是:将程序视为数据流的处理管道,通过异步非阻塞的方式传递和处理数据,并通过背压(Backpressure) 机制平衡生产者和消费者的速度差异。

在 Java 生态中,Reactor 框架是 Reactive Streams 规范的优秀实现,由 Pivotal 公司开发(与 Spring 同属一个团队),于 2013 年首次发布。它的诞生解决了以下核心问题:

  • 传统同步阻塞 IO 在高并发场景下的性能瓶颈
  • 异步编程中的 “回调地狱” 问题
  • 缺乏标准化的背压机制导致的资源失控
  • 与 Spring 生态(如 Spring WebFlux、Spring Cloud)的深度集成需求

Reactor 的核心理念是:“以声明式的方式处理异步数据流,同时保持代码的可读性和可维护性”

1.4 Reactor核心特性

特性 说明
异步非阻塞 基于事件驱动模型,避免线程阻塞,提高系统吞吐量
背压支持 消费者可主动告知生产者自己的处理能力,防止数据积压
声明式编程 通过操作符组合描述 “做什么”,而非 “怎么做”
数据流组合 支持复杂的流组合(合并、连接、嵌套等)
完善的错误处理 提供丰富的错误捕获、恢复和传递机制
与 Java 生态融合 兼容 Java 8 + 的 Stream API,支持 CompletableFuture 转换
轻量级 核心库体积小,无强依赖

1.5 Reactor与其它响应式框架比较

flowchart LR
    A[响应式框架] --> B[Reactor]
    A --> C[RxJava]
    A --> D[Akka Streams]
    
    B --> B1[与Spring生态深度集成]
    B --> B2[严格遵循Reactive Streams]
    B --> B3[专为Java 8+优化]
    B --> B4[更简洁的API设计]
    
    C --> C1[更早出现,生态成熟]
    C --> C2[支持多语言]
    C --> C3[操作符更丰富但复杂]
    
    D --> D1[基于Actor模型]
    D --> D2[分布式场景优势]
    D --> D3[学习曲线陡峭]

Reactor 的独特优势在于:

  • 与 Spring WebFlux、Spring Cloud Gateway 等现代 Spring 组件无缝集成
  • 对 Java 新特性(如虚拟线程、密封类)的原生支持
  • 更简洁的 API 设计,降低响应式编程的学习门槛

二、Reactor核心类型

2.1 Reactor 核心概念

Reactive Streams
Reactor
Publisher
Flux: 0..N elements
Mono: 0..1 elements
Operators
Transformation
Filtering
Combination
Error Handling

Reactor执行流程

Subscriber Publisher (Flux/Mono) Operators Scheduler subscribe() 创建操作链 安排执行(如果需要) 在指定线程执行 onSubscribe(Subscription) request(n) 请求数据 onNext(data) 应用转换/过滤 onNext(processedData) request(m) (更多数据) onComplete() (数据完成) onComplete() 错误处理路径 onError(throwable) onError(throwable) Subscriber Publisher (Flux/Mono) Operators Scheduler

2.2 核心类型

Reactor 提供了两个核心发布者类型:

类型 特点 适用场景
Mono<T> 0 或 1 个元素的异步序列 单个结果(如 HTTP 请求、数据库查询)
Flux<T> 0 到 N 个元素的异步序列 多个结果(如列表、事件流)

2.3 Mono【0个或者1个元素的流】

Mono用于表示包含 0 或 1 个元素的异步结果,适合处理单次操作(如数据库查询、HTTP 请求)的结果。

// 创建Mono【相当于事件的发布者】
Mono<String> mono = Mono.just("Hello Reactor"); // 直接值
Mono<String> emptyMono = Mono.empty(); // 空流
Mono<String> fromCallable = Mono.fromCallable(() -> "动态计算值"); // 延迟计算

// 订阅Mono(触发执行)
mono.subscribe(
    value -> System.out.println("接收值:" + value), // 成功回调
    error -> System.err.println("错误:" + error), // 错误回调
    () -> System.out.println("完成") // 完成回调
);

2.4 Flux【0到N个元素的流】

Flux用于表示包含 0 到多个元素的异步数据流,支持完整的生命周期(正常结束、错误终止)。常见场景:集合数据处理、事件流、批量操作等。

// 创建Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); // 固定元素
Flux<Integer> rangeFlux = Flux.range(1, 5); // 范围1-5
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)); // 每秒生成递增数(需手动取消订阅)

// 订阅Flux
flux.map(x -> x * 2) // 转换操作符
    .filter(x -> x % 3 != 0) // 过滤操作符
    .subscribe(
        System.out::println, // 简化写法:仅处理成功事件
        Throwable::printStackTrace,
        () -> System.out.println("Flux完成")
    );

2.5 数据流生命周期

无论是Flux还是Mono,都遵循相同的生命周期:

  • 正常事件:通过onNext()发送元素(Flux可多次调用,Mono最多调用一次)
  • 终止事件:
    • 成功终止:onComplete()(无元素发送)
    • 错误终止:onError(Throwable)(携带异常信息)
订阅(subscribe)
onNext(元素)
onComplete()
onError(异常)
初始化
运行中
完成
错误

2.6 Reactor数据流模型

subscribe
request(n)
onNext
onError
onComplete
Publisher
Subscriber

2.7 操作符链式调用

Flux.just(1,2,3)
.map(x*2)
.filter(>5)
.log()
.subscribe()

2.8 线程切换时序图

Main boundedElastic parallel subscribeOn() map() 执行 publishOn() subscribe() 回调 Main boundedElastic parallel

三、基础应用

引入Maven依赖:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.6</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>

    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.1 基础Mono使用

@Test
public void monoBasicTest() {
    // 1. 创建一个Mono对象(发射一个字符串)
    Mono<String> mono = Mono.just("Hello, Reactor!");

    // 2. 订阅并消费
    mono.subscribe(
        value -> System.out.println("✅ 接收到: " + value),
        error -> System.err.println("❌ 错误: " + error),
        () -> System.out.println("🎉 完成"),
        subscription -> {
            System.out.println("🔗 订阅建立");
            subscription.request(1); // 背压:请求 1 个
        });
}

在这里插入图片描述

3.2 基础Flux使用

@Test
public void fluxBasicTest() {
    // 创建一个Flux对象(发射多个字符串)
    Flux<String> flux = Flux.just("Hello", "Reactor", "Face", "Smail")
        .map(String::toUpperCase)
        .filter(s -> s.length() > 5)
        .log();

    flux.subscribe(
        System.out::println,
        System.err::println,
        () -> System.out.println("流结束")
    );
}

在这里插入图片描述

🔍 log() 是调试利器,可查看所有信号(onNext, onError, onComplete)。

3.3 异步与线程切换

@Test
public void asyncTest(){
    Flux.just("张小三", "A", "B", "C")
        .map(data -> {
            System.out.println("🔄 处理线程: " + Thread.currentThread().getName());
            return data + "-processed";
        })
        .subscribeOn(Schedulers.boundedElastic()) // 订阅在弹性线程池
        .publishOn(Schedulers.parallel()) // 发布在并行线程池
        .subscribe(result -> {
            System.out.println("📩 接收线程: " + Thread.currentThread().getName() + ", 数据: " + result);
        });

    System.out.println("MAIN THREAD: " + Thread.currentThread().getName());
    try {
        TimeUnit.MILLISECONDS.sleep(3000);
    }catch (InterruptedException e){
        e.printStackTrace();
    }
}

在这里插入图片描述

⚠️ subscribeOn() 影响上游执行线程,publishOn() 影响下游执行线程。

3.4 背压(Backpressure)演示

/**
     * 背压演示
     */
    @Test
    public void backPressureTest() {
        Flux.range(1, 1000)
                .onBackpressureDrop(item -> System.out.println("🗑️ 丢弃: " + item)) // 缓冲区满时丢弃
                .subscribe(new CoreSubscriber<Integer>() {
                    private Subscription subscription;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.subscription = s;
                        subscription.request(10);  // 初始请求 10 个
                    }

                    @Override
                    public void onNext(Integer item) {
                        System.out.println("✅ 接收: " + item);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                        }
                        subscription.request(1); // 每处理一个再要一个
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("✅ 完成");
                    }
                });

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

在这里插入图片描述

3.5 错误处理

/**
     * 错误处理
     */
@Test
public void errorHandlerTest() {
    Flux.range(1, 5)
        .map(i -> {
            if (i == 3) throw new RuntimeException("模拟错误");
            return "Item " + i;
        })
        .onErrorResume(e -> {
            System.err.println("⚠️ 捕获错误: " + e.getMessage());
            return Flux.just("Fallback 1", "Fallback 2"); // 错误后返回备用数据
        })
        .retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))) // 重试 2 次
        .subscribe(System.out::println);
}

在这里插入图片描述