文章目录
Reactor RabbitMQ 简介
Reactor RabbitMQ 是 Reactor 项目的一部分,旨在提供基于 Project Reactor 的 RabbitMQ 响应式编程支持。它将 RabbitMQ 的消息队列功能与 Reactor 的非阻塞、背压友好特性结合,适用于高吞吐量的异步消息处理场景。
注意: Reactor RabbitMQ 是对原生amqp-client 的封装,同样性能强大简单易用。reactor-rabbitmq是spring-boot-starter-amqp 之外的另外一种选择。
维度 | AMQP-Client | Reactor RabbitMQ | Spring Boot Starter AMQP |
---|---|---|---|
编程模型 | 命令式、手动管理 | 响应式、非阻塞 | 声明式、自动配置 |
框架依赖 | 无 | Reactor | Spring Boot |
适用场景 | 轻量级/非 Spring 项目 | 响应式微服务 | Spring Boot 企业应用 |
资源管理 | 手动 | 自动 | 自动 |
功能丰富度 | 基础协议操作 | 背压、高并发优化 | 事务、确认、死信队列等 |
学习曲线 | 中等(需理解 AMQP) | 高(需掌握 Reactor) | 低(Spring 生态友好) |
Reactor RabbitMQ核心特性
- 响应式流支持:基于 Reactor 的
Flux
和Mono
实现消息的发布与订阅。 - 背压管理:自动处理消费者与生产者之间的速率匹配。
- 非阻塞 API:避免传统 RabbitMQ 客户端的线程阻塞问题。
- 声明式配置:支持通过代码或配置文件定义队列、交换机和绑定。
使用方法
添加依赖
在 Maven 项目中添加以下依赖:
<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.5.6</version>
</dependency>
创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Sender sender = RabbitFlux.createSender(
Mono.fromCallable(() -> connectionFactory.newConnection())
);
Receiver receiver = RabbitFlux.createReceiver(
Mono.fromCallable(() -> connectionFactory.newConnection())
);
发送消息
sender.send(
Flux.just(new OutboundMessage(
"exchange-name",
"routing-key",
"Hello RabbitMQ".getBytes()
))
).subscribe();
接收消息
receiver.consumeAutoAck("queue-name")
.map(delivery -> new String(delivery.getBody()))
.subscribe(System.out::println);
高级配置
消息确认模式
支持自动确认(autoAck
)和手动确认(manualAck
):
receiver.consumeManualAck("queue-name")
.delayUntil(delivery ->
delivery.ack()
.thenReturn(delivery.getBody())
)
.subscribe();
错误处理
通过 Reactor 的 onError
机制处理异常:
sender.send(messages)
.doOnError(e -> System.err.println("Send failed: " + e))
.retry(3)
.subscribe();
集群监听(自动ACK)
// 1. 配置集群连接
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(new ConnectionFactory() {{
setUsername("guest");
setPassword("guest");
}})
.connectionSupplier(cf -> cf.newConnection(
new Address[]{
new Address("localhost", 5672),
new Address("localhost", 5673),
new Address("localhost", 5674)
},
"reactive-cluster"
));
// 2. 创建 Receiver
Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
// 监听队列(自动负载均衡)
receiver.consumeAutoAck("queue1") // 队列名(需在集群中预先创建)
.subscribe(
delivery -> {
String message = new String(delivery.getBody());
System.out.println("收到消息: " + message);
},
error -> System.err.println("监听错误: " + error)
);
// 保持程序运行
Mono.never().block();
集群监听手动ACK
// 1. 配置集群连接
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(new ConnectionFactory() {{
setUsername("guest");
setPassword("guest");
}})
.connectionSupplier(cf -> cf.newConnection(
new Address[]{
new Address("localhost", 5672),
new Address("localhost", 5673),
new Address("localhost", 5674)
},
"reactive-cluster"
));
// 2. 创建 Receiver
Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
// 消费消息并手动ACK
receiver.consumeManualAck("queue1")
.flatMap(delivery -> {
try {
String message = new String(delivery.getBody());
log.info("received message:" + message);
// 业务逻辑处理...
boolean success = false;
int i = RandomUtil.randomInt();
if (i % 2 == 0) {
success = true;
}
if (success) {
log.info("ack success");
// 处理成功,手动ACK
return Mono.fromRunnable(() -> delivery.ack())
.thenReturn("ACK");
} else {
log.info("ack fail");
// 处理失败,手动NACK(可选择重试或丢弃)
return Mono.fromRunnable(() -> delivery.nack(true)) // false表示不重新入队
.thenReturn("NACK");
}
} catch (Exception e) {
// 异常情况,NACK并可选择重试
delivery.nack(true); // true表示重新入队
return Mono.error(e);
}
})
.subscribe(
result -> log.info("Message processed:" + result),
error -> log.info("Error:" + error)
);
// 保持程序运行
Mono.never().block();
性能优化建议
- 连接复用:避免频繁创建/关闭连接,使用
Mono
缓存连接。 - 批量发送:通过
Flux.buffer()
合并多条消息后一次性发送。 - 线程池调优:自定义
Scheduler
以匹配业务场景的并发需求。
适用场景
- 微服务间的异步通信。
- 事件驱动的数据处理流水线。
- 需要高吞吐量和低延迟的消息系统。
如需进一步功能(如事务、RPC 模式),可参考官方文档或源码示例。