RabbitMQ在消息传递过程中为了保证消息可靠性,主要分为三个阶段来进行保障:
- 生产者到Broker(消息投递确认)
- 消息交换机队列持久化(持久化和绑定)
- 队列到消费者(消费确认和手动应答)
一、生产者到Broker(消息投递确认)
生产者在将消息发送到交换机时,可能会因为网络中断、交换机不存在等原因导致消息丢失。为此,RabbitMQ提供了**发布者确认机制(Publisher Confirms)**来确保消息成功到达Broker。
实现方法
- 开启发布确认模式(
publisher-confirm-type: correlated
)。 - 设置回调函数,监听确认或失败通知。
Spring Boot 配置
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
配置类
@Configuration
public class RabbitProducerConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置确认回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("执行confirm逻辑");
if (b) {
System.out.println("接收到了消息");
} else {
System.out.println("没有接收到消息");
}
}
});
// 创建一个带有returns逻辑的RabbitTemplate
// 消息被退回时执行的逻辑
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息已被退回 :"+returnedMessage);
}
});
return rabbitTemplate;
}
}
生产者
@RestController
@RequestMapping("/producer")
public class ReliableProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/{message}")
public String sendMessage(@PathVariable String message) {
rabbitTemplate.convertAndSend("direct_exchange", "routing_key", message);
return "Message sent: " + message;
}
}
二、消息交换机队列持久化(持久化和绑定)
即使消息成功到达Broker,如果交换机或队列未正确配置或崩溃,仍有可能导致丢失。为了保证可靠性,交换机、队列和消息本身都需要进行持久化。
实现方法
- 设置交换机持久化:
new DirectExchange(name, true, false)
。 - 设置队列持久化:
new Queue(name, true)
。 - 设置消息持久化:
MessageProperties.PERSISTENT_TEXT_PLAIN
。
配置类
@Configuration
public class ReliableExchangeQueueConfig {
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String DURABLE_QUEUE = "durable_queue";
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
@Bean
public Queue durableQueue() {
return new Queue(DURABLE_QUEUE, true);
}
@Bean
public Binding binding(Queue durableQueue, DirectExchange directExchange) {
return BindingBuilder.bind(durableQueue).to(directExchange).with("routing_key");
}
}
生产者发送持久化消息
@GetMapping("/send/durable/{message}")
public String sendDurableMessage(@PathVariable String message) {
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message msg = new Message(message.getBytes(), props);
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE, "routing_key", msg);
return "Durable message sent: " + message;
}
三、队列到消费者(消费确认和手动应答)
消息到达队列后并不意味着可靠消费,还需要保证消费者成功处理后才能确认接收。如果消费失败或处理异常,消息会被重新投递。
实现方法
- 设置手动应答模式:
acknowledge="manual"
。 - 正确处理消息确认:
channel.basicAck
、channel.basicNack
或channel.basicReject
。
配置类
application.yml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
消费者
@Component
public class ReliableConsumer {
@RabbitListener(queues = ReliableExchangeQueueConfig.DURABLE_QUEUE)
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
System.out.println("Received: " + message);
// 处理消息成功,手动确认
channel.basicAck(tag, false);
} catch (Exception e) {
try {
// 处理失败,拒绝消息并重新入队
channel.basicNack(tag, false, true);
System.err.println("Message processing failed: " + e.getMessage());
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
总结
RabbitMQ消息可靠性从生产者到消费者整个链路分为三个部分:
- 生产者到Broker:通过发布者确认机制确保消息成功到达Broker。
- 消息交换机队列持久化:通过持久化交换机、队列和消息来防止丢失。
- 队列到消费者:通过手动确认机制确保消息被成功消费和处理。
RabbitMQ Retry机制
是什么?
RabbitMQ Retry机制是指在消费者处理消息失败时,自动对消息进行重新消费的一种机制。其目的是确保消息可靠传输和处理,避免因为临时性故障或异常而导致消息丢失。
什么时候使用?
当消费者在处理消息时,可能会因为以下原因导致消费失败:
- 网络异常:暂时无法连接到外部服务或数据库。
- 业务异常:如处理逻辑中出现意外错误或数据不符合预期。
- 资源不足:如系统负载过高导致处理失败。
- 依赖服务不可用:如调用第三方API时发生错误。
在这些场景下,通过Retry机制可以自动重新尝试消费消息,避免消息丢失或堆积。
适合在消息处理可靠性要求高且消费失败具有暂时性的场景中使用,比如:
- 订单处理:第三方支付系统短暂不可用时自动重试。
- 库存更新:因数据库短暂异常导致更新失败时重试。
- 异步通知:调用外部API失败时进行重试,保证通知可靠性。
如果消息经过多次重试仍然失败,通常会配合死信队列(DLQ)进行最终处理,如记录日志或人工干预。
通过合理配置重试机制,可以提升系统的可靠性和稳定性,防止因短暂性故障导致的数据丢失或业务中断。
Spring Retry配置
application.yml
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 1000 # 初始间隔1秒
max-attempts: 5 # 最大重试5次
multiplier: 2.0 # 每次重试间隔乘以2
RabbitMQ中的死信队列(DLQ)
是什么?
死信队列(Dead Letter Queue,简称DLQ)是指在消息被消费或投递过程中出现异常时,被转发到特殊队列中进行保存的机制。
这些特殊队列称为死信队列,其作用是保存无法被正常消费的消息,以便后续人工干预或进一步处理。
消息什么时候会变成死信?
消息在以下三种情况下会变成死信:
消息被拒绝(Rejected)且不重新入队:
- 消费者通过
basicReject
或basicNack
拒绝消息,并且requeue
参数设置为false
。
- 消费者通过
消息在队列中超时(TTL过期):
- 队列或消息设置了TTL(Time To Live),超时时未被消费就会成为死信。
队列达到最大长度(队列满):
- 当队列达到了最大长度限制时,新的消息将变为死信。
死信队列在Spring Boot中的使用
1、配置交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_LETTER_EXCHANGE = "dlx_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
public static final String ROUTING_KEY = "normal_key";
public static final String DLX_ROUTING_KEY = "dlx_key";
// 正常交换机
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
// 死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 正常队列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE)
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) // 绑定死信交换机
.withArgument("x-dead-letter-routing-key", DLX_ROUTING_KEY) // 死信路由键
.withArgument("x-message-ttl", 10000) // 消息过期时间(10秒)
.build();
}
// 死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
// 绑定正常队列到正常交换机
@Bean
public Binding normalBinding() {
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(ROUTING_KEY);
}
// 绑定死信队列到死信交换机
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DLX_ROUTING_KEY);
}
}
2、生产者发送消息
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/message")
public class MessageProducer {
private final RabbitTemplate rabbitTemplate;
public MessageProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/send/{msg}")
public String sendMessage(@PathVariable String msg) {
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.ROUTING_KEY, msg);
return "Message sent: " + msg;
}
}
3、消费者监听正常队列和死信队列
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE)
public void receiveMessage(String message) {
System.out.println("Received normal message: " + message);
// 模拟消费失败,抛出异常导致消息进入死信队列
if (message.contains("error")) {
throw new RuntimeException("Message processing failed");
}
}
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
public void receiveDeadLetterMessage(String message) {
System.out.println("Received dead letter message: " + message);
}
}
死信队列的应用场景
消息可靠性保障:
- 当消息无法消费时,放入死信队列,防止消息丢失。
- 适合订单消息、支付消息等关键场景。
异常消息监控:
- 通过死信队列监控异常消息进行告警或分析,便于定位问题。
人工干预和补偿机制:
- 可以通过死信队列定期检查并人工处理异常消息,进行人工补偿或修正。
限流保护:
- 当消息堆积过多时,将其转入死信队列,防止队列阻塞。
延时队列实现:
- 通过TTL结合死信队列来实现延时消息投递。
RabbitMQ中的延时队列
延时队列(Delayed Queue)
是什么?
延时队列是指消息在指定的延迟时间后才被投递到消费者的一种队列机制。
它在订单超时取消、定时任务调度、延迟通知等场景中非常常见,能够确保消息在指定时间后进行处理。
插件安装
1. 下载插件
插件下载地址:
GitHub插件页面
下载命令:
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
移动插件到插件目录
假设RabbitMQ安装在/usr/lib/rabbitmq
:
mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-*/plugins/
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启RabbitMQ
sudo service rabbitmq-server restart
配置类
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String DELAYED_EXCHANGE = "delayed_exchange";
public static final String DELAYED_QUEUE = "delayed_queue";
public static final String ROUTING_KEY = "delayed_key";
// 声明延时交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
// 声明延时队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE, true);
}
// 绑定延时队列到延时交换机
@Bean
public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();
}
}
生产者发送延时消息
package com.example.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/send")
public class DelayedMessageProducer {
private final RabbitTemplate rabbitTemplate;
public DelayedMessageProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/{message}/{delay}")
public String sendMessage(@PathVariable String message, @PathVariable int delay) {
rabbitTemplate.convertAndSend(
"delayed_exchange",
"delayed_key",
message,
msg -> {
msg.getMessageProperties().setHeader("x-delay", delay);
return msg;
}
);
return "Sent delayed message: " + message + " with delay: " + delay + " ms";
}
}
消费者监听延时队列
package com.example.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DelayedMessageConsumer {
@RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE)
public void receiveMessage(String message) {
System.out.println("Received delayed message: " + message);
}
}