Java 响应式编程是一种基于异步数据流处理的编程范式,它强调数据流的声明式构建和传播变化的自动响应。Java 9 引入的Flow API
为响应式编程提供了标准接口,而 Reactor 和 RxJava 等第三方库则提供了更丰富的操作符和工具。
核心概念
- Publisher(发布者):产生数据流的源头。
- Subscriber(订阅者):消费数据流的接收者。
- Subscription(订阅):连接发布者和订阅者的桥梁,管理背压(Backpressure)。
- Processor(处理者):兼具发布者和订阅者的功能,用于转换数据流。
简单示例:使用 Java Flow API
下面是一个使用 Java 标准库Flow API
的简单响应式编程示例:
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class ReactiveExample {
public static void main(String[] args) throws InterruptedException {
// 创建发布者
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
// 创建订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("订阅成功");
this.subscription = subscription;
subscription.request(1); // 请求1个数据
}
@Override
public void onNext(String item) {
System.out.println("接收到数据: " + item);
subscription.request(1); // 处理完后再请求1个
}
@Override
public void onError(Throwable throwable) {
System.out.println("发生错误: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("数据流处理完成");
}
};
// 订阅
publisher.subscribe(subscriber);
// 发布数据
publisher.submit("Hello");
publisher.submit("Reactive");
publisher.submit("World");
// 等待所有数据处理完成
Thread.sleep(1000);
}
}
}
常用操作符(以 Reactor 库为例)
Reactor 是 Spring 生态中推荐的响应式编程库,提供了Mono
(0-1 个元素)和Flux
(0-N 个元素)两种核心类型:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorExample {
public static void main(String[] args) {
// 创建Flux
Flux<String> flux = Flux.just("A", "B", "C")
.map(String::toLowerCase) // 转换操作
.filter(s -> s.startsWith("a")); // 过滤操作
// 创建Mono
Mono<String> mono = Mono.just("Hello")
.flatMap(s -> Mono.just(s + " World")); // 异步转换
// 订阅并消费
flux.subscribe(
System.out::println, // 正常数据处理
Throwable::printStackTrace, // 错误处理
() -> System.out.println("Flux完成") // 完成回调
);
mono.subscribe(System.out::println);
}
}
背压(Backpressure)处理
响应式编程的重要特性是支持背压,即消费者可以控制生产者发送数据的速率:
Flux.range(1, 1000) // 生成1到1000的整数
.onBackpressureBuffer(100) // 缓冲100个元素
.subscribe(
num -> {
// 模拟慢速处理
try { Thread.sleep(100); } catch (InterruptedException e) {}
System.out.println(num);
},
Throwable::printStackTrace,
() -> System.out.println("处理完成")
);
响应式 Web 示例(Spring WebFlux)
Spring WebFlux 是基于 Reactor 的响应式 Web 框架:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
public class WebFluxExample {
public static void main(String[] args) {
SpringApplication.run(WebFluxExample.class, args);
}
}
@RestController
class HelloController {
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("Hello, Reactive Web!");
}
}
总结
Java 响应式编程通过异步数据流提供了高效处理大量并发请求的能力,适合构建非阻塞、低延迟的应用程序。主要应用场景包括微服务、实时数据处理和高并发系统。