一、Reactor 框架概述与理论基础
官方文档
:
Getting Started :: Reactor Core Reference Guide
https://www.reactive-streams.org/
https://www.reactive-streams.org/
https://projectreactor.io/
https://projectreactor.io/docs/core/release/reference/gettingStarted.html
1.1 响应式编程(Reactive Programming)是什么?
响应式编程
是一种面向数据流和变化传播的编程范式。它允许你声明式地定义数据流的转换、组合和处理逻辑,系统自动处理异步、背压、错误传播等复杂问题。
[!tip]
✅ 核心思想:数据流是第一公民,一切皆流(Everything is a Stream)。
1.2 Reactive Streams 规范
Reactor 实现了 Reactive Streams 规范,该规范定义了四个核心接口:
Publisher<T>
:发布者Subscriber<T>
:订阅者Subscription
:订阅关系(支持背压)Processor<T,R>
:处理器
有兴趣参照网址查看: reactive-streams.org
[!note]
🔗 Reactor 是 Project Reactor 的简称,由 Pivotal(现 VMware)开发,是 Spring WebFlux 的底层引擎。
1.3 响应式编程与 Reactor 的诞生
响应式编程(Reactive Programming) 是一种面向数据流和变化传播的编程范式,其核心思想是:将程序视为数据流的处理管道,通过异步非阻塞的方式传递和处理数据,并通过背压(Backpressure) 机制平衡生产者和消费者的速度差异。
在 Java 生态中,Reactor 框架是 Reactive Streams 规范的优秀实现,由 Pivotal 公司开发(与 Spring 同属一个团队),于 2013 年首次发布。它的诞生解决了以下核心问题:
- 传统同步阻塞 IO 在高并发场景下的性能瓶颈
- 异步编程中的 “回调地狱” 问题
- 缺乏标准化的背压机制导致的资源失控
- 与 Spring 生态(如 Spring WebFlux、Spring Cloud)的深度集成需求
Reactor 的核心理念是:“以声明式的方式处理异步数据流,同时保持代码的可读性和可维护性”。
1.4 Reactor核心特性
特性 | 说明 |
---|---|
异步非阻塞 | 基于事件驱动模型,避免线程阻塞,提高系统吞吐量 |
背压支持 | 消费者可主动告知生产者自己的处理能力,防止数据积压 |
声明式编程 | 通过操作符组合描述 “做什么”,而非 “怎么做” |
数据流组合 | 支持复杂的流组合(合并、连接、嵌套等) |
完善的错误处理 | 提供丰富的错误捕获、恢复和传递机制 |
与 Java 生态融合 | 兼容 Java 8 + 的 Stream API,支持 CompletableFuture 转换 |
轻量级 | 核心库体积小,无强依赖 |
1.5 Reactor与其它响应式框架比较
flowchart LR
A[响应式框架] --> B[Reactor]
A --> C[RxJava]
A --> D[Akka Streams]
B --> B1[与Spring生态深度集成]
B --> B2[严格遵循Reactive Streams]
B --> B3[专为Java 8+优化]
B --> B4[更简洁的API设计]
C --> C1[更早出现,生态成熟]
C --> C2[支持多语言]
C --> C3[操作符更丰富但复杂]
D --> D1[基于Actor模型]
D --> D2[分布式场景优势]
D --> D3[学习曲线陡峭]
Reactor 的独特优势在于:
- 与 Spring WebFlux、Spring Cloud Gateway 等现代 Spring 组件无缝集成
- 对 Java 新特性(如虚拟线程、密封类)的原生支持
- 更简洁的 API 设计,降低响应式编程的学习门槛
二、Reactor核心类型
2.1 Reactor 核心概念
Reactor执行流程
2.2 核心类型
Reactor 提供了两个核心发布者类型:
类型 | 特点 | 适用场景 |
---|---|---|
Mono<T> |
0 或 1 个元素的异步序列 | 单个结果(如 HTTP 请求、数据库查询) |
Flux<T> |
0 到 N 个元素的异步序列 | 多个结果(如列表、事件流) |
2.3 Mono【0个或者1个元素的流】
Mono
用于表示包含 0 或 1 个元素的异步结果,适合处理单次操作(如数据库查询、HTTP 请求)的结果。
// 创建Mono【相当于事件的发布者】
Mono<String> mono = Mono.just("Hello Reactor"); // 直接值
Mono<String> emptyMono = Mono.empty(); // 空流
Mono<String> fromCallable = Mono.fromCallable(() -> "动态计算值"); // 延迟计算
// 订阅Mono(触发执行)
mono.subscribe(
value -> System.out.println("接收值:" + value), // 成功回调
error -> System.err.println("错误:" + error), // 错误回调
() -> System.out.println("完成") // 完成回调
);
2.4 Flux【0到N个元素的流】
Flux
用于表示包含 0 到多个元素的异步数据流,支持完整的生命周期(正常结束、错误终止)。常见场景:集合数据处理、事件流、批量操作等。
// 创建Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); // 固定元素
Flux<Integer> rangeFlux = Flux.range(1, 5); // 范围1-5
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)); // 每秒生成递增数(需手动取消订阅)
// 订阅Flux
flux.map(x -> x * 2) // 转换操作符
.filter(x -> x % 3 != 0) // 过滤操作符
.subscribe(
System.out::println, // 简化写法:仅处理成功事件
Throwable::printStackTrace,
() -> System.out.println("Flux完成")
);
2.5 数据流生命周期
无论是Flux
还是Mono
,都遵循相同的生命周期:
- 正常事件:通过
onNext()
发送元素(Flux
可多次调用,Mono
最多调用一次) - 终止事件:
- 成功终止:
onComplete()
(无元素发送) - 错误终止:
onError(Throwable)
(携带异常信息)
- 成功终止:
2.6 Reactor数据流模型
2.7 操作符链式调用
2.8 线程切换时序图
三、基础应用
引入Maven依赖:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2024.0.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.1 基础Mono使用
@Test
public void monoBasicTest() {
// 1. 创建一个Mono对象(发射一个字符串)
Mono<String> mono = Mono.just("Hello, Reactor!");
// 2. 订阅并消费
mono.subscribe(
value -> System.out.println("✅ 接收到: " + value),
error -> System.err.println("❌ 错误: " + error),
() -> System.out.println("🎉 完成"),
subscription -> {
System.out.println("🔗 订阅建立");
subscription.request(1); // 背压:请求 1 个
});
}
3.2 基础Flux使用
@Test
public void fluxBasicTest() {
// 创建一个Flux对象(发射多个字符串)
Flux<String> flux = Flux.just("Hello", "Reactor", "Face", "Smail")
.map(String::toUpperCase)
.filter(s -> s.length() > 5)
.log();
flux.subscribe(
System.out::println,
System.err::println,
() -> System.out.println("流结束")
);
}
🔍 log()
是调试利器,可查看所有信号(onNext, onError, onComplete)。
3.3 异步与线程切换
@Test
public void asyncTest(){
Flux.just("张小三", "A", "B", "C")
.map(data -> {
System.out.println("🔄 处理线程: " + Thread.currentThread().getName());
return data + "-processed";
})
.subscribeOn(Schedulers.boundedElastic()) // 订阅在弹性线程池
.publishOn(Schedulers.parallel()) // 发布在并行线程池
.subscribe(result -> {
System.out.println("📩 接收线程: " + Thread.currentThread().getName() + ", 数据: " + result);
});
System.out.println("MAIN THREAD: " + Thread.currentThread().getName());
try {
TimeUnit.MILLISECONDS.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
⚠️ subscribeOn()
影响上游执行线程,publishOn()
影响下游执行线程。
3.4 背压(Backpressure)演示
/**
* 背压演示
*/
@Test
public void backPressureTest() {
Flux.range(1, 1000)
.onBackpressureDrop(item -> System.out.println("🗑️ 丢弃: " + item)) // 缓冲区满时丢弃
.subscribe(new CoreSubscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(10); // 初始请求 10 个
}
@Override
public void onNext(Integer item) {
System.out.println("✅ 接收: " + item);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
subscription.request(1); // 每处理一个再要一个
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("✅ 完成");
}
});
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
3.5 错误处理
/**
* 错误处理
*/
@Test
public void errorHandlerTest() {
Flux.range(1, 5)
.map(i -> {
if (i == 3) throw new RuntimeException("模拟错误");
return "Item " + i;
})
.onErrorResume(e -> {
System.err.println("⚠️ 捕获错误: " + e.getMessage());
return Flux.just("Fallback 1", "Fallback 2"); // 错误后返回备用数据
})
.retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))) // 重试 2 次
.subscribe(System.out::println);
}