Spring Boot 3 集成 RabbitMQ 实践指南
1. RabbitMQ 核心原理
1.1 什么是RabbitMQ
RabbitMQ是一个开源的消息代理和队列服务器,使用Erlang语言开发,基于AMQP(Advanced Message Queuing Protocol)协议实现。它支持多种消息传递模式,具有高可用性、可扩展性和可靠性等特点。
1.2 核心概念
1.2.1 基础组件
Producer(生产者)
- 消息的发送者
- 负责创建消息并发布到RabbitMQ中
Consumer(消费者)
- 消息的接收者
- 连接到RabbitMQ服务器并订阅队列
Exchange(交换机)
- 接收生产者发送的消息并根据路由规则转发到队列
- 类型:
- Direct Exchange:根据routing key精确匹配
- Topic Exchange:根据routing key模式匹配
- Fanout Exchange:广播到所有绑定队列
- Headers Exchange:根据消息属性匹配
Queue(队列)
- 消息存储的地方
- 支持持久化、临时、自动删除等特性
Binding(绑定)
- 交换机和队列之间的虚拟连接
- 定义消息路由规则
1.2.2 高级特性
消息持久化
- 交换机持久化:创建时设置durable=true
- 队列持久化:创建时设置durable=true
- 消息持久化:设置delivery-mode=2
消息确认机制
- 生产者确认:Publisher Confirm和Return机制
- 消费者确认:自动确认、手动确认、批量确认
死信队列(DLX)
- 消息被拒绝且不重新入队
- 消息过期(TTL)
- 队列达到最大长度
1.3 应用场景
异步处理
- 发送邮件、短信通知
- 日志处理、报表生成
- 文件处理、图片处理
应用解耦
- 系统间通信
- 服务解耦
- 流程分离
流量控制
- 削峰填谷
- 请求缓冲
- 流量整形
定时任务
- 延迟队列
- 定时处理
- 任务调度
2. 环境搭建
2.1 基础环境
- Spring Boot: 3.x
- Java: 17+
- RabbitMQ: 3.12+
- Maven/Gradle
2.2 依赖配置
<dependencies>
<!-- Spring Boot Starter AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
2.3 基础配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 消息确认配置
publisher-confirm-type: correlated # 开启发布确认
publisher-returns: true # 开启发布返回
template:
mandatory: true # 消息路由失败返回
# 消费者配置
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 每次获取消息数量
retry:
enabled: true # 开启重试
initial-interval: 1000 # 重试间隔时间
max-attempts: 3 # 最大重试次数
multiplier: 1.0 # 重试时间乘数
# SSL配置(可选)
ssl:
enabled: false
key-store: classpath:keystore.p12
key-store-password: password
trust-store: classpath:truststore.p12
trust-store-password: password
3. 核心配置类
3.1 RabbitMQ配置类
@Configuration
@EnableRabbit
public class RabbitMQConfig {
// 交换机名称
public static final String BUSINESS_EXCHANGE = "business.exchange";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
// 队列名称
public static final String BUSINESS_QUEUE = "business.queue";
public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
// 路由键
public static final String BUSINESS_KEY = "business.key";
public static final String DEAD_LETTER_KEY = "dead.letter.key";
// 业务交换机
@Bean
public DirectExchange businessExchange() {
return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE)
.durable(true)
.build();
}
// 死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE)
.durable(true)
.build();
}
// 业务队列
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>(3);
// 消息过期时间
args.put("x-message-ttl", 60000);
// 队列最大长度
args.put("x-max-length", 1000);
// 死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);
return QueueBuilder.durable(BUSINESS_QUEUE)
.withArguments(args)
.build();
}
// 死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
// 业务绑定
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with(BUSINESS_KEY);
}
// 死信绑定
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(DEAD_LETTER_KEY);
}
// 消息转换器
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// RabbitTemplate配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}
3.2 消息确认配置
@Configuration
@Slf4j
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送到交换机成功: correlationData={}", correlationData);
} else {
log.error("消息发送到交换机失败: correlationData={}, cause={}", correlationData, cause);
// 处理失败逻辑,如重试、告警等
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息路由到队列失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText(),
new String(returned.getMessage().getBody()));
// 处理失败逻辑,如重试、告警等
}
}
4. 消息生产者
4.1 消息发送服务
@Service
@Slf4j
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(Object message, String exchange, String routingKey) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
log.info("消息发送成功: message={}, exchange={}, routingKey={}, correlationData={}",
message, exchange, routingKey, correlationData);
} catch (Exception e) {
log.error("消息发送异常: message={}, exchange={}, routingKey={}, correlationData={}, error={}",
message, exchange, routingKey, correlationData, e.getMessage());
throw new RuntimeException("消息发送失败", e);
}
}
public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor messagePostProcessor = msg -> {
msg.getMessageProperties().setDelay((int) delayMillis);
return msg;
};
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
log.info("延迟消息发送成功: message={}, exchange={}, routingKey={}, delay={}, correlationData={}",
message, exchange, routingKey, delayMillis, correlationData);
} catch (Exception e) {
log.error("延迟消息发送异常: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}",
message, exchange, routingKey, delayMillis, correlationData, e.getMessage());
throw new RuntimeException("延迟消息发送失败", e);
}
}
}
5. 消息消费者
5.1 消息处理服务
@Service
@Slf4j
public class MessageConsumer {
@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)
public void handleMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 获取消息内容
String messageBody = new String(message.getBody());
log.info("收到消息: message={}, deliveryTag={}", messageBody, deliveryTag);
// 业务处理
processMessage(messageBody);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("消息处理成功: deliveryTag={}", deliveryTag);
} catch (Exception e) {
log.error("消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());
// 判断是否重新投递
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重试,拒绝消息: deliveryTag={}", deliveryTag);
channel.basicReject(deliveryTag, false);
} else {
log.info("消息首次处理失败,重新投递: deliveryTag={}", deliveryTag);
channel.basicNack(deliveryTag, false, true);
}
}
}
private void processMessage(String message) {
// 实现具体的业务逻辑
log.info("处理消息: {}", message);
}
}
5.2 死信消息处理
@Service
@Slf4j
public class DeadLetterConsumer {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
public void handleDeadLetter(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String messageBody = new String(message.getBody());
log.info("收到死信消息: message={}, deliveryTag={}", messageBody, deliveryTag);
// 死信消息处理逻辑
processDeadLetter(messageBody);
channel.basicAck(deliveryTag, false);
log.info("死信消息处理成功: deliveryTag={}", deliveryTag);
} catch (Exception e) {
log.error("死信消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());
channel.basicReject(deliveryTag, false);
}
}
private void processDeadLetter(String message) {
// 实现死信消息处理逻辑
log.info("处理死信消息: {}", message);
}
}
6. 接口控制器
@RestController
@RequestMapping("/api/mq")
@Slf4j
public class MessageController {
@Autowired
private MessageProducer messageProducer;
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody MessageDTO message) {
try {
messageProducer.sendMessage(message.getContent(),
RabbitMQConfig.BUSINESS_EXCHANGE,
RabbitMQConfig.BUSINESS_KEY);
return ResponseEntity.ok("消息发送成功");
} catch (Exception e) {
log.error("消息发送失败: {}", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("消息发送失败: " + e.getMessage());
}
}
@PostMapping("/send/delay")
public ResponseEntity<String> sendDelayMessage(
@RequestBody MessageDTO message,
@RequestParam long delayMillis) {
try {
messageProducer.sendDelayMessage(message.getContent(),
RabbitMQConfig.BUSINESS_EXCHANGE,
RabbitMQConfig.BUSINESS_KEY,
delayMillis);
return ResponseEntity.ok("延迟消息发送成功");
} catch (Exception e) {
log.error("延迟消息发送失败: {}", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("延迟消息发送失败: " + e.getMessage());
}
}
}
7. 监控与运维
7.1 RabbitMQ管理界面
- 访问地址:http://localhost:15672
- 默认账号:guest/guest
- 主要功能:
- 队列监控
- 交换机管理
- 连接状态
- 消息追踪
7.2 Prometheus + Grafana监控
# prometheus.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['localhost:15692']
7.3 日志配置
logging:
level:
org.springframework.amqp: INFO
com.your.package: DEBUG
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
7.4 告警配置
@Configuration
public class RabbitMQAlertConfig {
@Value("${alert.dingtalk.webhook}")
private String webhookUrl;
@Bean
public AlertService alertService() {
return new DingTalkAlertService(webhookUrl);
}
}
8. 最佳实践
8.1 消息幂等性处理
@Service
public class MessageIdempotentHandler {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean isProcessed(String messageId) {
String key = "mq:processed:" + messageId;
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS));
}
}
8.2 消息重试策略
@Configuration
public class RetryConfig {
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
}
8.3 消息序列化
@Configuration
public class MessageConverterConfig {
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setCreateMessageIds(true);
return converter;
}
}
8.4 消息追踪
@Aspect
@Component
@Slf4j
public class MessageTraceAspect {
@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable {
String messageId = MDC.get("messageId");
log.info("开始处理消息: messageId={}", messageId);
try {
Object result = joinPoint.proceed();
log.info("消息处理完成: messageId={}", messageId);
return result;
} catch (Exception e) {
log.error("消息处理异常: messageId={}, error={}", messageId, e.getMessage());
throw e;
}
}
}
9. 常见问题与解决方案
9.1 消息丢失问题
- 生产者确认机制
- 消息持久化
- 手动确认模式
- 集群高可用
9.2 消息重复消费
- 幂等性处理
- 消息去重
- 业务检查
9.3 消息堆积问题
- 增加消费者数量
- 提高处理效率
- 队列分片
- 死信队列处理
9.4 性能优化
- 合理设置预取数量
- 批量确认消息
- 消息压缩
- 连接池优化
10. 高可用部署
10.1 集群配置
spring:
rabbitmq:
addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672
username: admin
password: password
virtual-host: /
10.2 镜像队列
# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
10.3 负载均衡
# nginx.conf
upstream rabbitmq_cluster {
server rabbit1:15672;
server rabbit2:15672;
server rabbit3:15672;
}