Spring Boot3 RabbitMq 项目地址
https://gitee.com/supervol/loong-springboot-study
(记得给个start,感谢)
RabbitMq 概述
RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)的开源消息中间件,核心优势在于解耦、削峰、异步通信;而 Spring Boot 3 作为主流的 Java 开发框架,通过 spring-boot-starter-amqp
starter 简化了 RabbitMQ 的整合流程。本文将从基础概念、环境搭建、核心功能、高级特性到最佳实践,全面讲解 Spring Boot 3 与 RabbitMQ 的整合方案。
RabbitMq 核心
1. RabbitMQ 组件
组件 | 作用说明 |
---|---|
Broker | RabbitMQ 服务器实例,负责接收、存储、转发消息 |
Exchange | 交换机,接收生产者发送的消息,根据路由规则将消息路由到绑定的队列 |
Queue | 消息队列,存储待消费的消息,支持持久化、限流、死信等特性 |
Binding | 交换机与队列的绑定关系,包含 “路由键(Routing Key)” 用于匹配消息 |
Routing Key | 消息的 “地址标识”,交换机通过 Routing Key 决定消息路由到哪个队列 |
Virtual Host | 虚拟主机,实现多租户隔离(不同应用使用不同 Virtual Host,避免资源冲突) |
Connection | 客户端与 Broker 的 TCP 连接,重量级资源,一般复用 |
Channel | 基于 Connection 的轻量级通信通道, RabbitMQ 推荐通过 Channel 操作消息(减少 TCP 连接开销) |
交换机(Exchange)的 4 种核心类型,交换机是 RabbitMQ 消息路由的核心,不同类型对应不同的路由策略:
- Direct Exchange(直连交换机):精确匹配 Routing Key(消息的 Routing Key 与 Binding 的 Routing Key 完全一致才路由),适用于点对点通信(如订单支付通知)。
- Topic Exchange(主题交换机):模糊匹配 Routing Key(支持
*
匹配单个单词、#
匹配多个单词,单词间用.
分隔),适用于发布订阅 + 多条件过滤(如日志按 “服务名。级别” 路由)。 - Fanout Exchange(扇出交换机):忽略 Routing Key,将消息广播到所有绑定的队列,适用于广播通信(如系统通知、缓存清理)。
- Headers Exchange(头交换机):不依赖 Routing Key,通过匹配消息头(Headers)的键值对路由,适用于复杂属性匹配(较少用,灵活但性能略低)。
2. Spring AMQP 核心组件
Spring Boot 3 整合 RabbitMQ 依赖 Spring AMQP(版本与 Spring Boot 3 强绑定,如 Spring Boot 3.2 对应 Spring AMQP 3.2+),核心组件如下:
- RabbitTemplate:封装了 RabbitMQ 的消息发送逻辑,支持同步 / 异步发送、消息回调、消息转换器等。
- AmqpAdmin:用于声明交换机、队列、绑定关系(支持编程式声明,也可通过注解声明)。
- @RabbitListener:注解式消费者,标注在方法上即可监听指定队列,支持批量消费、手动确认等。
- MessageListenerContainer:消费者容器,负责管理消费者生命周期(如并发消费、消息重试、异常处理),Spring Boot 会自动配置默认容器。
RabbitMq 示例
1. 前提条件
Spring Boot 3 对依赖版本有严格要求,避免版本冲突:
组件 | 最低版本要求 | 推荐版本 |
---|---|---|
JDK | JDK 17+ | JDK 17/21 |
RabbitMQ | 3.9+ | 3.12+ |
Spring Boot | 3.0+ | 3.2.x(稳定版) |
Spring AMQP | 3.0+(随 Spring Boot 自动引入) | 3.2.x |
2. 代码位置
请参考项目地址中 springboot-mq/springboot-rabbitmq 模块代码。
RabbitMq 高级
基础整合仅满足简单通信,实际项目需解决消息丢失、重复消费、延迟消息等问题,本节讲解核心高级特性。
1. 消息可靠性保障
RabbitMQ 消息丢失可能发生在三个环节:生产者→Broker、Broker 存储、Broker→消费者,需针对性防护。
环节 | 防护措施 |
---|---|
生产者→Broker | 开启生产者确认(publisher-confirm-type: correlated )+ 回调重试 |
Broker 存储 | 交换机 / 队列持久化(durable=true )+ 消息持久化(deliveryMode=PERSISTENT ) |
Broker→消费者 | 手动确认(acknowledge-mode: manual )+ 消费失败转发死信队列 |
(1)消息持久化配置
在声明交换机和队列时,需设置 durable=true
;发送消息时,需设置 deliveryMode=PERSISTENT
:
// 1. 声明持久化交换机
DirectExchange durableExchange = new DirectExchange("durable-exchange", true, false);
// 2. 声明持久化队列
Queue durableQueue = new Queue("durable-queue", true, false, false);
// 3. 发送持久化消息(通过 RabbitTemplate 设置消息属性)
rabbitTemplate.convertAndSend(
"durable-exchange",
"durable-routing-key",
"持久化消息",
message -> {
// 设置消息持久化(DeliveryMode.PERSISTENT)
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString())
);
2. 死信队列
死信是指无法被正常消费的消息(如消费失败、消息过期、队列满),死信队列用于存储这些消息,避免丢失或阻塞正常队列。
(1)死信产生条件
- 消息被消费者拒绝(
basicReject
/basicNack
,且requeue=false
)。 - 消息过期(队列设置
x-message-ttl
或消息单独设置expiration
)。 - 队列达到最大长度(
x-max-length
),无法存储新消息。
(2)死信队列配置示例
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadLetterQueueConfig {
// 1. 死信交换机(普通 Direct 交换机)
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx-exchange", true, false);
}
// 2. 死信队列(存储死信消息)
@Bean
public Queue deadLetterQueue() {
return new Queue("dlx-queue", true, false, false);
}
// 3. 绑定死信交换机与死信队列
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dlx-routing-key"); // 死信路由键
}
// 4. 普通队列(设置死信属性,将死信转发到死信交换机)
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal-queue")
.withArgument("x-dead-letter-exchange", "dlx-exchange") // 死信交换机
.withArgument("x-dead-letter-routing-key", "dlx-routing-key") // 死信路由键
.withArgument("x-message-ttl", 10000) // 消息过期时间(10秒)
.build();
}
// 5. 绑定普通队列与普通交换机
@Bean
public Binding normalBinding(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue)
.to(normalExchange)
.with("normal-routing-key");
}
// 6. 普通交换机
@Bean
public DirectExchange normalExchange() {
return new DirectExchange("normal-exchange", true, false);
}
}
测试:发送消息到 normal-queue
,若 10 秒内未被消费,消息会自动转为死信,进入 dlx-queue
。
3. 延迟队列
延迟队列用于 “消息延迟指定时间后再消费”(如订单超时未支付自动取消、定时任务),RabbitMQ 无原生延迟队列,需通过以下两种方式实现:
(1)基于死信队列 + TTL
利用 “消息过期后转为死信” 的特性,设置队列的 x-message-ttl
,死信队列即为延迟队列。
缺陷:队列中所有消息的延迟时间固定,无法灵活设置不同延迟时间。
(2)基于 RabbitMQ 延迟插件
RabbitMQ 提供 rabbitmq_delayed_message_exchange
插件,支持自定义消息延迟时间,灵活性更高。
步骤 1:安装延迟插件
- 下载
rabbitmq_delayed_message_exchange插件,并放到指定位置
。 - 安装插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
。 - 验证:访问管理界面,在
Exchanges
的Type
下拉框中可看到x-delayed-message
。 - 注意,本文不讨论和涉及rabbitmq及其插件安装和配置,请自行搜索。
步骤 2:配置延迟交换机与队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayedQueueConfig {
// 1. 声明延迟交换机(类型为 x-delayed-message)
@Bean
public CustomExchange delayedExchange() {
// 参数:名称、类型、持久化、自动删除、附加参数(指定延迟交换机的路由类型)
return new CustomExchange(
"delayed-exchange",
"x-delayed-message",
true,
false,
Map.of("x-delayed-type", "direct") // 延迟交换机的底层路由类型(如 direct)
);
}
// 2. 声明延迟队列
@Bean
public Queue delayedQueue() {
return new Queue("delayed-queue", true, false, false);
}
// 3. 绑定延迟交换机与队列
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue())
.to(delayedExchange())
.with("delayed-routing-key")
.noargs();
}
}
步骤 3:发送延迟消息
// 发送延迟消息(设置延迟时间,单位:毫秒)
public void sendDelayedMessage(Object message, long delayMs) {
rabbitTemplate.convertAndSend(
"delayed-exchange",
"delayed-routing-key",
message,
msg -> {
// 设置延迟时间
msg.getMessageProperties().setDelay((int) delayMs);
// 消息持久化
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},
new CorrelationData(UUID.randomUUID().toString())
);
}
// 调用:延迟 5 秒后消费
sendDelayedMessage("延迟 5 秒的消息", 5000);
4. 消息幂等性
重复消费:同一消息被消费者多次处理(如消费者确认前宕机,Broker 重新投递)。需保证 “重复消费不影响业务正确性”(即幂等)。
(1) 解决方案:唯一 ID + 去重存储
- 生成唯一消息 ID:生产者发送消息时,设置
messageId
(如 UUID)。 - 消费前检查去重:消费者接收消息后,先查询存储(Redis / 数据库)中是否存在该
messageId
,若存在则跳过,若不存在则处理业务并记录messageId
。
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Service
public class IdempotentConsumerService {
@Resource
private RedisTemplate<String, String> redisTemplate;
// 幂等消费逻辑
public void consumeIdempotentMessage(Object message, Message amqpMessage, Channel channel) throws IOException {
String messageId = amqpMessage.getMessageProperties().getMessageId();
String redisKey = "rabbitmq:message:id:" + messageId;
try {
// 1. Redis 分布式锁:避免并发重复处理(setIfAbsent 原子操作)
Boolean isFirstConsume = redisTemplate.opsForValue().setIfAbsent(
redisKey,
"CONSUMED",
24, // 过期时间(根据业务调整,避免 Redis 堆积)
TimeUnit.HOURS
);
if (Boolean.FALSE.equals(isFirstConsume)) {
// 2. 非首次消费:直接确认消息
System.out.printf("消息已重复消费,ID=%s%n", messageId);
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
return;
}
// 3. 首次消费:处理业务逻辑
System.out.printf("幂等消费消息:ID=%s,内容=%s%n", messageId, message);
// 4. 处理完成:确认消息
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 5. 消费失败:拒绝消息(不重回队列)
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, false);
System.err.printf("幂等消费失败:ID=%s,原因=%s%n", messageId, e.getMessage());
}
}
}
5. 监控与运维
1. RabbitMQ Management UI
RabbitMQ 管理界面是最基础的监控工具,关键监控指标:
- Exchanges:交换机是否正常,绑定数、消息入站 / 出站速率。
- Queues:队列长度(
Ready
数,若持续增长需扩容消费者)、消息消费速率(Consumers
数、Acknowledged
数)。 - Connections/Channels:连接数、信道数是否超出阈值(避免资源耗尽)。
- Admin:用户权限、虚拟主机配置是否正确。
2. Spring Boot Actuator 监控
通过 Spring Boot Actuator 暴露 RabbitMQ metrics,结合 Prometheus + Grafana 可实现可视化监控。
<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 可选:Prometheus 适配(用于对接 Grafana) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
配置暴露监控端点
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus # 暴露的端点
metrics:
export:
prometheus:
enabled: true # 启用 Prometheus 导出
endpoint:
health:
show-details: always # 显示健康详情
查看监控数据
- 访问
http://localhost:8080/actuator/health
:查看 RabbitMQ 连接健康状态(rabbitmq
节点为UP
表示正常)。 - 访问
http://localhost:8080/actuator/metrics/rabbitmq.messages.sent
:查看消息发送总数。 - 访问
http://localhost:8080/actuator/prometheus
:获取 Prometheus 格式的 metrics,用于 Grafana 可视化。
RabbitMq 总结
组件设计规范:
- 交换机 / 队列命名:按 “业务模块 - 类型 - 用途” 命名(如
order-direct-exchange
、order-pay-queue
)。 - 虚拟主机隔离:不同环境(开发 / 测试 / 生产)或不同应用使用独立 Virtual Host。
- 交换机 / 队列命名:按 “业务模块 - 类型 - 用途” 命名(如
性能优化:
- 连接池配置:使用
CachingConnectionFactory
缓存信道(默认缓存 25 个),避免频繁创建信道。 - 消息体大小:单个消息不超过 1MB(大消息建议存储到 MinIO/OSS,消息中携带文件地址)。
- 并发控制:消费者并发数(
concurrency
)根据 CPU 核心数调整(如 2-4 倍核心数),避免过度并发导致资源竞争。
- 连接池配置:使用
可靠性优先:
- 必开特性:生产者确认、手动确认、消息持久化、死信队列。
- 避免滥用自动确认:仅在 “消费逻辑无副作用” 场景使用
acknowledge-mode: auto
。
问题排查:
- 日志配置:开启 RabbitMQ DEBUG 日志(
logging.level.org.springframework.amqp=DEBUG
),便于追踪消息流转。 - 死信监控:定期检查死信队列,分析死信原因(如消费异常、消息过期)。
- 日志配置:开启 RabbitMQ DEBUG 日志(