目录
方法一: 发送数据之前开启RabbitMQ 事务channel.txSelect(不推荐
在MQ当中消息丢失有几种不同的场景,可能出现在生产者、MQ、消费者中。
生产者丢失:
方法一: 发送数据之前开启RabbitMQ 事务channel.txSelect(不推荐
但是这种方法是不推荐的,因为事务机制是同步的,提交一个事务后会阻塞,大大降低了系统的吞吐量,损耗性能。
方法二:开启confirm机制 (推荐,异步
- 在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,
- 然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。
- 如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。
- 可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
代码演示:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
public class ConfirmProducer {
private static final String QUEUE_NAME = "confirm_queue";
// 维护消息 ID 和消息内容的映射
private static final Map<Long, String> messageMap = new ConcurrentHashMap<>();
// 存储未确认的消息 ID
private static final Set<Long> unconfirmedSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 连接 RabbitMQ 服务器
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 开启 confirm 模式
channel.confirmSelect();
// 处理确认消息的回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
unconfirmedSet.removeIf(tag -> tag <= deliveryTag);
} else {
unconfirmedSet.remove(deliveryTag);
}
messageMap.remove(deliveryTag);
System.out.println("[x] Message " + deliveryTag + " acknowledged");
};
// 处理未确认消息的回调
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("[!] Message " + deliveryTag + " NOT acknowledged, retrying...");
String message = messageMap.get(deliveryTag);
if (message != null) {
// 重新发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
};
// 添加确认和未确认回调
channel.addConfirmListener(ackCallback, nackCallback);
// 发送消息
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
long nextSeqNo = channel.getNextPublishSeqNo(); // 获取消息 ID
messageMap.put(nextSeqNo, message); // 存储消息
unconfirmedSet.add(nextSeqNo); // 记录未确认消息 ID
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent " + message);
}
// 监控未确认消息超时重发
new Thread(() -> {
while (true) {
try {
Thread.sleep(5000); // 每 5 秒检查一次未确认消息
for (Long msgId : unconfirmedSet) {
System.out.println("[!] Retrying message " + msgId);
String message = messageMap.get(msgId);
if (message != null) {
// 重新发送未确认的消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
MQ中丢失:
方法一:开启 RabbitMQ 的持久化
开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。
设置持久化有两个步骤:
- 创建 queue 的时候将其设置为持久化,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。
- 第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
- 持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。
- 注意,有一种情况,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
。。。。。。
// 声明队列,设置持久化(durable = true)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
。。。。。。
// 发送持久化消息(deliveryMode = 2)
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
消费端丢失:
方法一: 关闭自动ack,开启手动ack
自动ack和手动ack区别:
自动ack:
当消费者接收到消息时,RabbitMQ 立即认为消息已被成功处理,并自动从队列中删除,不管消费者是否真正处理完。
//true代表自动ack
channel.basicConsume(QUEUE_NAME, true, consumer);
手动ack:
消费者在确认消息处理完毕后,手动发送 ACK
确认,RabbitMQ 才会将消息从队列中移除。
//false代表取消自动ack
channel.basicConsume(QUEUE_NAME, false, consumer);
//手动返回ack
channel.basicAck(envelope.getDeliveryTag(), false);
自动 ACK
适用于不重要的日志、监控等,无需保证消息可靠性
手动 ACK
适用于订单、支付等关键业务,需要确保消息处理成功
完整代码演示:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
public class ConfirmProducer {
private static final String QUEUE_NAME = "confirm_queue";
// 维护消息 ID 和消息内容的映射
private static final Map<Long, String> messageMap = new ConcurrentHashMap<>();
// 存储未确认的消息 ID
private static final Set<Long> unconfirmedSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 连接 RabbitMQ 服务器
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列,设置持久化(durable = true)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 关闭自动 ACK,改为手动确认
boolean autoAck = false;
// 开启 confirm 模式
channel.confirmSelect();
// 处理确认消息的回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
unconfirmedSet.removeIf(tag -> tag <= deliveryTag);
} else {
unconfirmedSet.remove(deliveryTag);
}
messageMap.remove(deliveryTag);
System.out.println("[x] Message " + deliveryTag + " acknowledged");
};
// 处理未确认消息的回调
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("[!] Message " + deliveryTag + " NOT acknowledged, retrying...");
String message = messageMap.get(deliveryTag);
if (message != null) {
// 重新发送消息,确保持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}
};
// 添加确认和未确认回调
channel.addConfirmListener(ackCallback, nackCallback);
// 发送消息
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
long nextSeqNo = channel.getNextPublishSeqNo(); // 获取消息 ID
messageMap.put(nextSeqNo, message); // 存储消息
unconfirmedSet.add(nextSeqNo); // 记录未确认消息 ID
// 发送持久化消息(deliveryMode = 2)
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] Sent " + message);
}
// 消费者处理消息,并手动 ack
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("[x] Received and processing: " + message);
try {
// 模拟消息处理
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完成后手动 ACK
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("[x] Message processed and acknowledged: " + message);
}
};
// 监听队列并消费消息
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
}