Java响应式编程

发布于:2025-07-29 ⋅ 阅读:(15) ⋅ 点赞:(0)

Java 响应式编程是一种基于异步数据流处理的编程范式,它强调数据流的声明式构建和传播变化的自动响应。Java 9 引入的Flow API为响应式编程提供了标准接口,而 Reactor 和 RxJava 等第三方库则提供了更丰富的操作符和工具。

核心概念

  1. Publisher(发布者):产生数据流的源头。
  2. Subscriber(订阅者):消费数据流的接收者。
  3. Subscription(订阅):连接发布者和订阅者的桥梁,管理背压(Backpressure)。
  4. Processor(处理者):兼具发布者和订阅者的功能,用于转换数据流。

简单示例:使用 Java Flow API

下面是一个使用 Java 标准库Flow API的简单响应式编程示例:

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class ReactiveExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建发布者
        try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
            // 创建订阅者
            Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
                private Flow.Subscription subscription;

                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    System.out.println("订阅成功");
                    this.subscription = subscription;
                    subscription.request(1); // 请求1个数据
                }

                @Override
                public void onNext(String item) {
                    System.out.println("接收到数据: " + item);
                    subscription.request(1); // 处理完后再请求1个
                }

                @Override
                public void onError(Throwable throwable) {
                    System.out.println("发生错误: " + throwable.getMessage());
                }

                @Override
                public void onComplete() {
                    System.out.println("数据流处理完成");
                }
            };

            // 订阅
            publisher.subscribe(subscriber);

            // 发布数据
            publisher.submit("Hello");
            publisher.submit("Reactive");
            publisher.submit("World");

            // 等待所有数据处理完成
            Thread.sleep(1000);
        }
    }
}

常用操作符(以 Reactor 库为例)

Reactor 是 Spring 生态中推荐的响应式编程库,提供了Mono(0-1 个元素)和Flux(0-N 个元素)两种核心类型:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactorExample {
    public static void main(String[] args) {
        // 创建Flux
        Flux<String> flux = Flux.just("A", "B", "C")
                .map(String::toLowerCase)  // 转换操作
                .filter(s -> s.startsWith("a"));  // 过滤操作

        // 创建Mono
        Mono<String> mono = Mono.just("Hello")
                .flatMap(s -> Mono.just(s + " World"));  // 异步转换

        // 订阅并消费
        flux.subscribe(
                System.out::println,  // 正常数据处理
                Throwable::printStackTrace,  // 错误处理
                () -> System.out.println("Flux完成")  // 完成回调
        );

        mono.subscribe(System.out::println);
    }
}

背压(Backpressure)处理

响应式编程的重要特性是支持背压,即消费者可以控制生产者发送数据的速率:

Flux.range(1, 1000)  // 生成1到1000的整数
    .onBackpressureBuffer(100)  // 缓冲100个元素
    .subscribe(
        num -> {
            // 模拟慢速处理
            try { Thread.sleep(100); } catch (InterruptedException e) {}
            System.out.println(num);
        },
        Throwable::printStackTrace,
        () -> System.out.println("处理完成")
    );

响应式 Web 示例(Spring WebFlux)

Spring WebFlux 是基于 Reactor 的响应式 Web 框架:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@SpringBootApplication
public class WebFluxExample {
    public static void main(String[] args) {
        SpringApplication.run(WebFluxExample.class, args);
    }
}

@RestController
class HelloController {
    @GetMapping("/hello")
    public Mono<String> hello() {
        return Mono.just("Hello, Reactive Web!");
    }
}

总结

Java 响应式编程通过异步数据流提供了高效处理大量并发请求的能力,适合构建非阻塞、低延迟的应用程序。主要应用场景包括微服务、实时数据处理和高并发系统。


网站公告

今日签到

点亮在社区的每一天
去签到