响应式编程框架Reactor【3】

发布于:2025-08-31 ⋅ 阅读:(26) ⋅ 点赞:(0)

六、错误处理与背压

6.1 错误处理操作符

package cn.tcmeta;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.time.Duration;

public class ErrorHandlingExamples {

    public void errorHandlingOperators() {
        Flux<Integer> numbers = Flux.range(1, 5)
                .map(n -> {
                    if (n == 3) throw new RuntimeException("Boom on 3!");
                    return n;
                });

        // onErrorReturn - 发生错误时返回默认值
        Flux<Integer> withDefault = numbers.onErrorReturn(0);

        // onErrorResume - 发生错误时切换到备用流
        Flux<Integer> withFallback = numbers.onErrorResume(e ->
                Flux.just(9, 8, 7)
        );

        // onErrorContinue - 发生错误时继续处理(跳过错误元素)
        Flux<Integer> withContinue = numbers.onErrorContinue((e, value) ->
                System.out.println("Error with value " + value + ": " + e.getMessage())
        );

        // retry - 重试操作
        Flux<Integer> withRetry = numbers.retry(2); // 最多重试2次

        // retryWhen - 基于条件的重试
        Flux<Integer> withConditionalRetry = numbers.retryWhen(
                Retry.withThrowable(
                        retries -> retries.zipWith(Flux.range(1, 3),
                                (error, index) -> {
                                    if (index < 3) {
                                        return Duration.ofSeconds(index); // 指数退避
                                    } else {
                                        throw new RuntimeException("Retries exhausted", error);
                                    }
                                })));

        // timeout - 操作超时处理
        Flux<Integer> withTimeout = numbers.delayElements(Duration.ofSeconds(1))
                .timeout(Duration.ofMillis(500))
                .onErrorResume(e -> Flux.just(-1));
    }

    public void errorHandlingInPractice() {
        // 模拟外部服务调用
        Mono<String> externalServiceCall = Mono.fromCallable(() -> {
            if (Math.random() > 0.7) {
                throw new RuntimeException("Service unavailable");
            }
            return "Service response";
        });

        // 添加弹性模式
        Mono<String> resilientCall = externalServiceCall
                .timeout(Duration.ofSeconds(2))
                .retryWhen(Retry.backoff(3, Duration.ofMillis(100))
                        .doBeforeRetry(e -> Mono.just("Fallback response")));

        resilientCall.subscribe(
                System.out::println,
                error -> System.err.println("Unexpected error: " + error),
                () -> System.out.println("Completed")
        );
    }
}

在 Reactor 响应式编程中,错误处理是确保流稳定性的核心环节。与传统编程的异常处理不同,Reactor 的错误具有终止性:一旦流中发生错误(通过onError信号),当前流会立即终止,后续元素不再发射。

错误处理操作符的作用是捕获错误、恢复流、转换错误或重试,避免错误直接传递到订阅者导致整个流程中断。

Reactor 错误处理操作符主要解决四类问题:

  1. 错误恢复:发生错误时,用默认值或备用流继续处理
  2. 错误转换:将原始错误转换为更有意义的业务错误
  3. 错误重试:对临时错误(如网络波动)进行重试
  4. 错误通知:仅记录错误不中断流(需配合恢复机制)

6.1.1 onErrorReturn - 发生错误时返回默认值

public final Flux<T> onErrorReturn(T fallbackValue) {
    Objects.requireNonNull(fallbackValue, "fallbackValue must not be null");
    return onAssembly(new FluxOnErrorReturn<>(this, null, fallbackValue));
}

这类操作符在捕获错误后,会生成新的元素或流,使整个流能够正常完成(而非错误终止)。

  • 作用:发生错误时,返回一个预设的默认值,流以该值结束并正常完成。
  • 特点:简单直接,适合已知错误且有明确默认值的场景。
public class OnErrorReturnExample {
    public static void main(String[] args) {
        // 模拟一个可能出错的流(第3个元素出错)
        Flux<Integer> errorProneFlux = Flux.range(1, 5)
            .map(num -> {
                if (num == 3) {
                    throw new RuntimeException("处理数字3时出错");
                }
                return num;
            });
        
        // 使用onErrorReturn:出错时返回默认值0
        errorProneFlux.onErrorReturn(0)
            .subscribe(
                num -> System.out.println("接收元素: " + num),
                error -> System.err.println("未被触发的错误处理"), // 不会执行
                () -> System.out.println("流正常完成") // 会执行
            );
    }
}
接收元素: 1
接收元素: 2
接收元素: 0  // 错误发生时返回的默认值
流正常完成   // 流正常结束,而非错误终止

6.1.2 onErrorResume - 发生错误时切换到备用流

public final Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) {
    return onAssembly(new FluxOnErrorResume<>(this, fallback));
}
  • 作用:发生错误时,切换到一个备用流(Publisher),用备用流的元素继续处理。
  • 特点:比onErrorReturn更灵活,可根据错误类型返回不同的备用流。
public class OnErrorResumeExample {
    public static void main(String[] args) {
        // 模拟可能发生两种错误的流
        Flux<String> dataStream = Flux.just("a", "b", "c")
            .map(data -> {
                if (data.equals("b")) {
                    throw new IllegalArgumentException("无效数据: b"); // 业务错误
                }
                if (data.equals("c")) {
                    throw new RuntimeException("系统错误"); // 系统错误
                }
                return data.toUpperCase();
            });

        // 使用onErrorResume:根据错误类型返回不同备用流
        dataStream.onErrorResume(error -> {
            // 对业务错误返回特定备用流
            if (error instanceof IllegalArgumentException) {
                return Flux.just("B(备用)", "B1(备用)");
            }
            // 对系统错误返回默认备用流
            else {
                return Mono.just("系统错误备用值");
            }
        })
            .subscribe(
            result -> System.out.println("接收结果: " + result),
            error -> System.err.println("未被触发的错误处理"),
            () -> System.out.println("流正常完成")
        );
    }
}
接收结果: A
接收结果: B(备用)
接收结果: B1(备用)
流正常完成

6.1.3 onErrorContinue

  • 发生错误时继续处理(跳过错误元素)
public final Flux<T> onErrorContinue(BiConsumer<Throwable, Object> errorConsumer) {
    BiConsumer<Throwable, Object> genericConsumer = errorConsumer;
    return contextWriteSkippingContextPropagation(Context.of(
        OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
        OnNextFailureStrategy.resume(genericConsumer)
    ));
}
// onErrorContinue - 发生错误时继续处理(跳过错误元素)
Flux<Integer> withContinue = numbers.onErrorContinue((e, value) ->
   System.out.println("Error with value " + value + ": " + e.getMessage())
);

6.1.4 retry - 重试操作

public final Flux<T> retry(long numRetries) {
    return onAssembly(new FluxRetry<>(this, numRetries));
}
  • 作用:发生错误时,重新订阅原始流,最多重试numRetries次。
  • 特点:简单重试,不区分错误类型,适合已知偶发错误的场景。
public class RetryExample {
    public static void main(String[] args) {
        AtomicInteger attemptCount = new AtomicInteger(0); // 记录尝试次数
        
        // 模拟可能临时失败的操作(前2次失败,第3次成功)
        Flux<String> operation = Flux.defer(() -> {
            int attempt = attemptCount.incrementAndGet();
            System.out.println("执行第" + attempt + "次尝试");
            if (attempt < 3) {
                throw new RuntimeException("第" + attempt + "次尝试失败(临时错误)");
            }
            return Flux.just("第" + attempt + "次尝试成功");
        });
        
        // 使用retry(2):最多重试2次(总尝试次数=1+2=3)
        operation.retry(2) // 允许重试2次
            .subscribe(
                result -> System.out.println("成功: " + result),
                error -> System.err.println("最终失败: " + error.getMessage())
            );
    }
}
执行第1次尝试
执行第2次尝试(重试1)
执行第3次尝试(重试2)
成功:3次尝试成功

6.1.5 timeout - 操作超时处理

public final Flux<T> timeout(Duration timeout) {
    return timeout(timeout, null, Schedulers.parallel());
}
Flux<Integer> withTimeout = numbers.delayElements(Duration.ofSeconds(1))
    .timeout(Duration.ofMillis(500)) // timeout - 操作超时处理
    .onErrorResume(e -> Flux.just(-1));

6.1.6 retryWhen

public final Flux<T> retryWhen(Retry retrySpec) {
    return onAssembly(new FluxRetryWhen<>(this, retrySpec));
}
  • 作用:基于错误信号流(Flux<Throwable>)动态控制重试策略(如指数退避、条件重试)。
  • 特点:高度灵活,支持复杂重试逻辑(如根据错误类型决定是否重试、设置重试间隔)。
public class RetryWhenExample {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger attemptCount = new AtomicInteger(0);
        
        // 模拟网络请求(前3次失败,第4次成功)
        Flux<String> networkCall = Flux.defer(() -> {
            int attempt = attemptCount.incrementAndGet();
            System.out.println("执行第" + attempt + "次网络请求");
            if (attempt < 4) {
                throw new RuntimeException("网络超时");
            }
            return Flux.just("请求成功,数据: {id:1}");
        });
        
        // 定义指数退避重试策略:最多重试3次,间隔依次为1s、2s、4s
        Retry retryStrategy = Retry.backoff(3, Duration.ofSeconds(1))
            .jitter(0.1) // 增加10%随机抖动,避免重试风暴
            .filter(error -> error instanceof RuntimeException) // 只对特定错误重试
            .onRetryExhaustedThrow((retrySpec, signal) -> 
                new RuntimeException("重试耗尽,最终失败", signal.failure()));
        
        // 使用retryWhen应用策略
        networkCall.retryWhen(retryStrategy)
            .subscribe(
                result -> System.out.println("接收结果: " + result),
                error -> System.err.println("最终错误: " + error.getMessage())
            );
        
        // 等待所有重试完成(总耗时≈1+2+4=7秒)
        Thread.sleep(8000);
    }
}

输出结果(间隔随重试次数指数增长):

执行第1次网络请求
执行第2次网络请求(1秒后重试)
执行第3次网络请求(2秒后重试)
执行第4次网络请求(4秒后重试)
接收结果: 请求成功,数据: {id:1}

6.1.7 doOnError

  • 作用:错误发生时执行副作用(如日志记录、报警),但不处理错误本身。
  • 特点:流仍会以错误终止,需与onErrorResume等配合使用。
public class DoOnErrorExample {
    private static final Logger log = LoggerFactory.getLogger(DoOnErrorExample.class);
    
    public static void main(String[] args) {
        Flux<String> dataStream = Flux.just("x", "y")
            .map(data -> {
                if (data.equals("y")) {
                    throw new RuntimeException("处理y时出错");
                }
                return data;
            });
        
        // doOnError仅记录日志,不处理错误;需配合onErrorReturn恢复流
        dataStream.doOnError(error -> {
                    // 记录错误日志(副作用)
                    log.error("捕获错误: {}", error.getMessage(), error);
                })
                .onErrorReturn("y的默认值") // 恢复流
                .subscribe(
                    result -> System.out.println("接收: " + result),
                    error -> System.err.println("不会执行,因为已恢复")
                );
    }
}
接收: x
[ERROR] 捕获错误: 处理y时出错
java.lang.RuntimeException: 处理y时出错
    ...
接收: y的默认值

6.1.8 错误处理流程与策略

错误传播与处理流程

flowchart LR
    A[正常流] -->|发生错误| B[触发onError信号]
    B --> C{是否有错误处理操作符?}
    C -->|否| D[错误传递到订阅者,流终止]
    C -->|是| E[执行错误处理逻辑]
    E -->|恢复流(如onErrorReturn)| F[流继续并正常完成]
    E -->|未恢复(如onErrorMap)| G[错误转换后传递,流终止]
    E -->|重试(如retry)| H[重新订阅原始流]

错误处理策略选择

场景 推荐操作符 原因
已知错误,需返回默认值 onErrorReturn 简单直接,适合静态默认值
错误类型多样,需动态备用流 onErrorResume 可根据错误类型返回不同备用流
需统一错误类型(如业务异常) onErrorMap 转换错误便于下游统一处理
临时错误(如网络波动) retry/retryWhen 重试可避免偶发错误导致失败
需记录错误但不中断流 doOnError + 恢复操作符 doOnError仅记录,恢复操作符保证流继续

6.1.9 最佳实践

  1. 错误处理尽早原则
    在流的上游处理错误,避免错误传递到下游多个订阅者重复处理。
  2. 区分可重试与不可重试错误
    • 可重试:网络超时、服务暂时不可用(用retryWhen+ 过滤)
    • 不可重试:业务错误(如参数无效)、权限不足(直接处理不重试)
  3. 清理资源
    错误发生时需释放资源(如数据库连接),可配合doFinally
Flux.using(
    () -> openConnection(), // 资源创建
    conn -> processData(conn), // 资源使用
    conn -> closeConnection(conn) // 资源释放(无论成功/失败)
).onErrorResume(...)
  1. 避免静默失败
    即使使用onErrorReturn,也建议用doOnError记录错误,便于排查问题。

6.1.10 总结

Reactor 的错误处理操作符提供了从简单到复杂的完整解决方案:

  • 简单恢复用onErrorReturn,动态恢复用onErrorResume
  • 错误转换用onErrorMap,便于业务统一处理;
  • 临时错误重试用retry(简单)或retryWhen(复杂策略);
  • 错误通知用doOnError,需配合恢复操作符使用。

选择合适的操作符需结合业务场景(错误类型、是否可恢复、是否需重试),核心目标是确保流的稳定性,同时保留错误上下文便于排查

6.2 背压处理

示例代码

import reactor.core.publisher.Flux;
import reactor.core.publisher.BaseSubscriber;
import org.reactivestreams.Subscription;
import java.time.Duration;

public class BackpressureExamples {
    
    public void backpressureStrategies() {
        // 创建一个快速生产的Flux
        Flux<Integer> fastProducer = Flux.range(1, 1000)
            .delayElements(Duration.ofMillis(10));
        
        // 策略1: 缓冲 (BUFFER) - 默认策略
        fastProducer.onBackpressureBuffer(50); // 指定缓冲区大小
        
        // 策略2: 丢弃最新值 (DROP)
        fastProducer.onBackpressureDrop(dropped -> 
            System.out.println("Dropped: " + dropped)
        );
        
        // 策略3: 丢弃最旧值 (LATEST)
        fastProducer.onBackpressureLatest();
        
        // 策略4: 错误 (ERROR)
        fastProducer.onBackpressureError();
    }
    
    // 自定义处理背压
    public void customBackpressure() {
        Flux.range(1, 1000)
            .subscribe(new BaseSubscriber<Integer>() {
                private int count = 0;
                private final int BATCH_SIZE = 10;
                
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    // 初始请求
                    request(BATCH_SIZE);
                }
                
                @Override
                protected void hookOnNext(Integer value) {
                    // 处理元素
                    process(value);
                    count++;
                    
                    // 每处理BATCH_SIZE个元素后请求下一批
                    if (count % BATCH_SIZE == 0) {
                        request(BATCH_SIZE);
                    }
                }
                
                @Override
                protected void hookOnComplete() {
                    System.out.println("Completed!");
                }
                
                @Override
                protected void hookOnError(Throwable throwable) {
                    System.err.println("Error: " + throwable.getMessage());
                }
                
                private void process(Integer value) {
                    // 模拟处理
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    System.out.println("Processed: " + value);
                }
            });
    }
}

在 Reactor 响应式编程中,背压(Backpressure) 是解决 “生产者速度远快于消费者” 问题的核心机制。它允许消费者主动告知生产者自己的处理能力,通过动态调节数据请求量,避免数据积压导致的内存溢出或系统崩溃。Reactor 作为 Reactive Streams 规范的优秀实现,提供了完善的背压处理方案,涵盖从基础机制到高级策略的全场景支持。

6.2.1 背压的核心原理

1. 背压的本质

背压是一种流量控制机制:消费者通过request(n)方法告知生产者 " 我现在能处理n个元素 “,生产者根据该请求量调整数据发送速度。这种” 消费者主导 “的模式,与传统” 生产者推送 " 模式形成鲜明对比。

2. Reactor 中的背压交互模型

Reactor 通过Subscription接口实现背压交互,核心流程如下:

  • 消费者订阅生产者后,生产者创建Subscription对象并通过onSubscribe传递给消费者;
  • 消费者调用subscription.request(n),表示 " 请求n个元素 ";
  • 生产者收到请求后,发送不超过n个元素(通过onNext);
  • 消费者处理完部分元素后,再次调用request(m)请求更多元素,形成闭环。
生产者 Subscription 消费者 订阅(subscribe) 调用onSubscribe(Subscription) request(2) // 初始请求2个元素 转发请求 onNext(元素1) onNext(元素2) 处理元素1 request(1) // 再请求1个 onNext(元素3) 处理元素2 request(1) // 再请求1个 onNext(元素4) ...持续到流结束... 生产者 Subscription 消费者

6.2.2 Reactor 中的背压策略

当生产者速度超过消费者处理能力(即未处理元素数量超过消费者请求量)时,Reactor 提供了多种策略来处理超额数据,核心通过onBackpressureXXX系列操作符实现。

1. 缓冲策略:onBackpressureBuffer()

  • 原理:将超额数据存入缓冲区暂存,直到消费者请求新数据时再发送。
  • 默认行为:Reactor 中大多数操作符默认使用缓冲区(如Flux.range),默认缓冲区大小为Queues.SMALL_BUFFER_SIZE(256 个元素)。
  • 适用场景:生产者速度偶尔超过消费者,且内存资源充足(可容忍短期缓冲)。
public class BackpressureBufferExample {
    public static void main(String[] args) throws InterruptedException {
        // 生产者:快速生成100个元素(每10ms一个)
        Flux<Integer> fastProducer = Flux.range(1, 100)
            .doOnNext(num -> System.out.println("生产: " + num))
            .delayElements(Duration.ofMillis(10)); // 快速生产
        
        // 消费者:处理速度慢(每100ms处理一个)
        fastProducer
            // 缓冲超额数据(默认大小256,这里显式指定以强调策略)
            .onBackpressureBuffer(
                100, // 缓冲区大小
                dropped -> System.out.println("缓冲区满,丢弃: " + dropped), // 缓冲区满时的回调
                false // 满时是否抛出异常(false=丢弃)
            )
            .publishOn(Schedulers.boundedElastic()) // 切换到消费者线程
            .doOnNext(num -> {
                try {
                    // 模拟慢速处理
                    Thread.sleep(100);
                    System.out.println("处理: " + num);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            })
            .subscribe();
        
        // 等待所有处理完成
        Thread.sleep(15000);
    }
}

关键输出(缓冲区暂存超额数据):

生产: 1 → 生产: 2... → 生产: 10(快速连续生产)
处理: 1100ms后)
处理: 2(再100ms后)
...(生产者持续生产,缓冲区暂存,消费者按自己速度处理)
  1. 丢弃策略:onBackpressureDrop()
  • 原理:当缓冲区满时,直接丢弃新产生的元素(不存入缓冲区)。
  • 扩展能力:可通过回调记录丢弃的元素,便于监控数据丢失情况。
  • 适用场景:数据实时性要求高,允许丢失旧数据(如实时监控指标,可丢弃过期数据)。
public class BackpressureDropExample {
    public static void main(String[] args) throws InterruptedException {
        // 生产者:快速生成20个元素
        Flux<Integer> fastProducer = Flux.range(1, 20)
            .doOnNext(num -> System.out.println("生产: " + num))
            .delayElements(Duration.ofMillis(50)); // 50ms/个
        
        // 消费者:每200ms处理1个(速度仅为生产者的1/4)
        fastProducer
            // 缓冲区满时丢弃新元素,并记录丢弃的元素
            .onBackpressureDrop(dropped -> System.out.println("丢弃元素: " + dropped))
            .publishOn(Schedulers.boundedElastic())
            .doOnNext(num -> {
                try {
                    Thread.sleep(200);
                    System.out.println("处理: " + num);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            })
            .subscribe();
        
        Thread.sleep(5000);
    }
}

关键输出(超过处理能力的元素被丢弃):

生产: 1 → 生产: 2 → 生产: 3 → 生产: 4 → 生产: 5(快速生产)
处理: 1200ms后)
生产: 6
丢弃元素: 6(缓冲区满,新元素被丢弃)
生产: 7
丢弃元素: 7
处理: 2(再200ms后)
...

3. 错误策略:onBackpressureError()

  • 原理:当缓冲区满时,立即抛出BufferOverflowException,终止流的处理。
  • 适用场景:数据不允许丢失,且希望快速发现流量不匹配问题(如关键业务数据处理)。
public class BackpressureErrorExample {
    public static void main(String[] args) throws InterruptedException {
        // 生产者:快速生成10个元素
        Flux<Integer> fastProducer = Flux.range(1, 10)
            .doOnNext(num -> System.out.println("生产: " + num))
            .delayElements(Duration.ofMillis(50));
        
        // 消费者:处理速度慢,且不允许数据丢失
        fastProducer
            // 缓冲区满时抛出错误
            .onBackpressureError()
            .publishOn(Schedulers.boundedElastic())
            .doOnNext(num -> {
                try {
                    Thread.sleep(200);
                    System.out.println("处理: " + num);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            })
            .subscribe(
                null,
                error -> System.err.println("背压错误: " + error.getMessage()) // 捕获缓冲区溢出错误
            );
        
        Thread.sleep(3000);
    }
}

关键输出(缓冲区满时立即报错):

生产: 1 → 生产: 2... → 生产: 6(默认缓冲区满)
处理: 1
背压错误: Buffer overflow

4. 保留最新策略:onBackpressureLatest()

  • 原理:只保留最新产生的元素,当消费者请求时,直接发送最新元素(覆盖旧元素)。
  • 适用场景:只需要最新数据,旧数据无意义(如实时股价、用户当前位置)。
public class BackpressureLatestExample {
    public static void main(String[] args) throws InterruptedException {
        // 生产者:快速生成1-10的数字(模拟实时数据更新)
        Flux<Integer> fastProducer = Flux.range(1, 10)
            .doOnNext(num -> System.out.println("生产: " + num))
            .delayElements(Duration.ofMillis(100));
        
        // 消费者:每500ms处理一次(只关心最新数据)
        fastProducer
            // 只保留最新元素
            .onBackpressureLatest()
            .publishOn(Schedulers.boundedElastic())
            .doOnNext(num -> {
                try {
                    Thread.sleep(500);
                    System.out.println("处理(最新): " + num);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            })
            .subscribe();
        
        Thread.sleep(6000);
    }
}

关键输出(只处理最新元素,跳过中间值):

生产: 12345500ms内产生5个元素)
处理(最新): 5(只保留最新的5)
生产: 678910(下一个500ms)
处理(最新): 10

6.2.3 热流与冷流的背压差异

Reactor 中的流分为冷流(Cold Stream)热流(Hot Stream),它们的背压处理方式存在本质区别:

  1. 冷流(如Flux.range、数据库查询)
  • 特点:为每个订阅者单独生成数据(“一对一”),数据生成与订阅强关联。
  • 背压支持:天然支持背压,生产者可根据消费者的request(n)动态调节数据生成速度(如按需从数据库拉取数据)。
  1. 热流(如Flux.interval、WebSocket 消息)
  • 特点:数据生成独立于订阅(“一对多”),无论是否有订阅者都持续产生数据。
  • 背压挑战:热流生产者无法感知单个消费者的处理能力,超额数据必须通过缓冲、丢弃等策略处理(如onBackpressureDrop)。

示例:热流的背压处理

// 热流:每100ms生成一个元素(独立于订阅)
Flux<Long> hotStream = Flux.interval(Duration.ofMillis(100))
    .share(); // 转为热流(多订阅者共享)

// 消费者1:处理快(100ms/个)
hotStream.subscribe(num -> System.out.println("消费者1处理: " + num));

// 消费者2:处理慢(500ms/个),需用背压策略
hotStream.onBackpressureDrop(dropped -> System.out.println("消费者2丢弃: " + dropped))
    .subscribe(num -> {
        try {
            Thread.sleep(500);
            System.out.println("消费者2处理: " + num);
        } catch (InterruptedException e) { /* 忽略 */ }
    });

6.2.4 背压监控与调度

Reactor 提供了工具监控背压状态,帮助排查流量不匹配问题:

1. metrics()操作符:暴露流的关键指标(如元素数量、背压请求量)。

Flux.range(1, 100)
    .metrics() // 启用指标收集
    .onBackpressureBuffer()
    .subscribe();

2. doOnRequest监控请求量:跟踪消费者的request(n)调用。

Flux.range(1, 10)
    .doOnRequest(n -> System.out.println("收到请求: " + n))
    .subscribe(num -> System.out.println("处理: " + num));
  1. SubmissionPublisherestimatePending():估算未处理元素数量。
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
publisher.subscribe(new MySubscriber());
publisher.submit(1);
System.out.println("未处理元素: " + publisher.estimatePending()); // 输出未处理数量

6.2.5 背压最佳实践

  1. 根据业务场景选择策略

    • 不允许丢数据 → onBackpressureBuffer(需评估内存);
    • 实时性优先 → onBackpressureLatestonBackpressureDrop
    • 严格流量控制 → onBackpressureError(快速失败)。
  2. 控制缓冲区大小:默认缓冲区(256)可能不适合高并发场景,需根据内存和吞吐量调整

.onBackpressureBuffer(1024) // 调整缓冲区为1024个元素
  1. 避免在消费者中执行阻塞操作:阻塞会导致背压请求延迟,间接引发数据积压(应使用publishOn切换到专用线程池)。

  2. 热流必须显式处理背压:热流无法天然响应背压,需通过onBackpressureXXX明确策略,否则可能丢失数据而不报错。

  3. 结合限流操作符:对于突发流量,可先用limitRate(n)限制生产者速度,减轻背压压力

6.2.6 limitRate

在 Reactor 中,limitRate(n)是一个用于控制生产者发送速率的操作符,它通过限制每次请求的元素数量,间接平衡生产者和消费者的速度,是【背压机制的重要补充】。

onBackpressureXXX系列操作符(消费者侧处理)不同,limitRate生产者侧主动限制流量,避免一次性发送过多元素导致消费者处理压力过大。

核心作用

limitRate的本质是拆分请求:当消费者调用request(m)请求m个元素时,limitRate会将这个大请求拆分为多个小请求,每次向上游生产者请求n个元素(nlimitRate的参数),从而控制元素流入下游的速度。

  • 默认行为:如果不使用limitRate,消费者的一次request(1000)会直接传递给上游,可能导致上游一次性发送 1000 个元素,引发下游处理压力。
  • limitRate效果:若设置limitRate(100),则request(1000)会被拆分为 10 次request(100),上游每次最多发送 100 个元素,下游分批处理。

两个重载方法

  • limitRate(int prefetch):单参数版本,prefetch为每次向上游请求的元素数量(默认预取阈值为prefetch/2)。
  • limitRate(int prefetch, int lowTide):双参数版本,prefetch为每次请求量,lowTide为触发新请求的阈值(当剩余元素少于lowTide时,自动请求下一批)。
下游消费者 limitRate操作符 上游生产者 request(500) // 消费者请求500个元素 limitRate(100):每次向上游请求100个 request(100) // 第一次请求100个 发送100个元素 转发100个元素 处理元素,剩余元素逐渐减少 剩余元素<50(lowTide=100/2) request(100) // 自动请求下一批 发送100个元素 转发100个元素 重复直到满足500个元素请求 下游消费者 limitRate操作符 上游生产者

示例说明

  1. limitRate的情况(可能会导致元素突发)
// 上游:快速生成1000个元素
Flux.range(1, 1000)
    .doOnRequest(n -> System.out.println("上游收到请求: " + n))
    .subscribe(
        num -> { /* 处理元素 */ },
        error -> {},
        () -> System.out.println("处理完成")
    );

输出(一次性请求所有元素):

上游收到请求: 9223372036854775807  // 默认请求Long.MAX_VALUE
处理完成
  1. 使用limitRate的情况(分批请求)
public class LimitRateExample {
    public static void main(String[] args) {
        // 上游:生成1000个元素
        Flux<Integer> upstream = Flux.range(1, 1000)
            .doOnRequest(n -> System.out.println("上游收到请求: " + n)); // 监控上游请求
        
        // 使用limitRate(100):每次向上游请求100个元素
        upstream.limitRate(100)
            .subscribe(
                num -> {
                    // 模拟消费者处理(不打印所有元素,只关注请求逻辑)
                },
                error -> {},
                () -> System.out.println("处理完成")
            );
    }
}

输出(分批请求,每次 100 个):

上游收到请求: 100  // 第一次请求
上游收到请求: 100  // 当剩余元素少于50(100/2)时,自动请求下一批
上游收到请求: 100
...(重复直到1000个元素)
处理完成
  1. limitRate适用场景

  2. 上游生产者无法感知下游背压:如某些第三方库或 legacy 系统,需通过limitRate主动限制发送速度。

  3. 避免大批次元素处理导致的 GC 压力:分批处理可减少内存占用,降低垃圾回收频率。

  4. 平滑流量波动:在突发流量场景下,limitRate可将集中请求拆分为平稳的小批次请求。

  5. 与热流配合:热流(如Flux.interval)无法响应背压,limitRate可控制其流入下游的速度。

6.2.7 limitRate + 背压策略

limitRate通常与背压策略配合使用,形成 “上游限流 + 下游缓冲” 的双层保护:

  • limitRate控制上游发送速度,避免一次性涌入过多元素;
  • onBackpressureBuffer在下游缓冲少量超额元素,应对短期速度波动。
public class LimitRateWithBackpressure {
    public static void main(String[] args) throws InterruptedException {
        // 上游:快速生成元素(每10ms一个)
        Flux<Integer> fastUpstream = Flux.range(1, 200)
            .doOnRequest(n -> System.out.println("上游请求: " + n))
            .delayElements(Duration.ofMillis(10));
        
        // 下游:处理较慢(每100ms一个)
        fastUpstream
            .limitRate(20) // 每次向上游请求20个,控制流入速度
            .onBackpressureBuffer(30) // 下游缓冲30个,应对短期波动
            .publishOn(Schedulers.boundedElastic())
            .doOnNext(num -> {
                try {
                    Thread.sleep(100); // 模拟慢速处理
                    System.out.println("处理元素: " + num);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            })
            .subscribe();
        
        Thread.sleep(20000);
    }
}

6.2.8 与类似操作符的区别

操作符 核心作用 适用场景
limitRate(n) 拆分请求,控制上游发送速率(每次请求n个) 平滑流量,避免大批次元素
limitRequest(n) 限制下游总请求量(最多请求n个元素) 仅需前n个元素的场景
onBackpressureBuffer(n) 下游缓冲超额元素(最多缓冲n个) 处理短期速度不匹配
throttleFirst(Duration) 单位时间内只保留第一个元素 限流高频事件(如点击)

6.2.9 最佳实践

  1. 合理设置prefetch
    • 过小(如 10):请求次数过多,增加开销;
    • 过大(如 1000):可能失去限流效果;
    • 建议:根据下游处理能力设置(如下游每秒处理 100 个,则prefetch设为 100-200)。
  2. 双参数版本更灵活:当元素处理时间不稳定时,用limitRate(prefetch, lowTide)调整触发新请求的阈值:
.limitRate(200, 50) // 每次请求200个,剩余50个时触发下一次请求
  1. publishOn配合使用publishOn会请求一批元素到自己的缓冲区,limitRate应放在publishOn上游,避免缓冲区积压:
// 正确:limitRate在上游,控制流入publishOn缓冲区的速度
upstream.limitRate(100).publishOn(...)

// 错误:limitRate在下游,无法控制publishOn的缓冲区
upstream.publishOn(...).limitRate(100)

6.2.10 总结

Reactor 的背压机制通过 “消费者主动请求” 模式,解决了生产者与消费者的速度不匹配问题,其核心价值在于:

  • 流量可控:避免消费者被压垮,保障系统稳定性;
  • 策略灵活:提供缓冲、丢弃、错误、保留最新等策略,适配不同业务场景;
  • 兼容性:严格遵循 Reactive Streams 规范,可与其他响应式库(如 RxJava)协同工作。

在实践中,需根据流的类型(冷 / 热)和业务需求(数据重要性、实时性)选择合适的背压策略,同时通过监控工具持续优化流量控制,确保系统在高并发场景下的可靠性。


网站公告

今日签到

点亮在社区的每一天
去签到