MQ消息丢失解决方案

发布于:2025-03-05 ⋅ 阅读:(11) ⋅ 点赞:(0)

目录

生产者丢失:

方法一: 发送数据之前开启RabbitMQ 事务channel.txSelect(不推荐

方法二:开启confirm机制 (推荐,异步

代码演示:

MQ中丢失:

 方法一:开启 RabbitMQ 的持久化

设置持久化有两个步骤:

消费端丢失:

方法一: 关闭自动ack,开启手动ack

自动ack和手动ack区别:

自动ack:

 手动ack:

完整代码演示:


在MQ当中消息丢失有几种不同的场景,可能出现在生产者、MQ、消费者中。

生产者丢失:

方法一: 发送数据之前开启RabbitMQ 事务channel.txSelect(不推荐

但是这种方法是不推荐的,因为事务机制是同步的,提交一个事务后会阻塞,大大降低了系统的吞吐量,损耗性能。

方法二:开启confirm机制 (推荐,异步

  1. 在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,
  2. 然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。
  3. 如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。 
  4. 可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

代码演示:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

public class ConfirmProducer {
    private static final String QUEUE_NAME = "confirm_queue";
    
    // 维护消息 ID 和消息内容的映射
    private static final Map<Long, String> messageMap = new ConcurrentHashMap<>();
    // 存储未确认的消息 ID
    private static final Set<Long> unconfirmedSet = Collections.newSetFromMap(new ConcurrentHashMap<>());

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // 连接 RabbitMQ 服务器
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 开启 confirm 模式
            channel.confirmSelect();

            // 处理确认消息的回调
            ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
                if (multiple) {
                    unconfirmedSet.removeIf(tag -> tag <= deliveryTag);
                } else {
                    unconfirmedSet.remove(deliveryTag);
                }
                messageMap.remove(deliveryTag);
                System.out.println("[x] Message " + deliveryTag + " acknowledged");
            };

            // 处理未确认消息的回调
            ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
                System.out.println("[!] Message " + deliveryTag + " NOT acknowledged, retrying...");
                String message = messageMap.get(deliveryTag);
                if (message != null) {
                    // 重新发送消息
                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                }
            };

            // 添加确认和未确认回调
            channel.addConfirmListener(ackCallback, nackCallback);

            // 发送消息
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                long nextSeqNo = channel.getNextPublishSeqNo(); // 获取消息 ID
                messageMap.put(nextSeqNo, message); // 存储消息
                unconfirmedSet.add(nextSeqNo); // 记录未确认消息 ID
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("[x] Sent " + message);
            }

            // 监控未确认消息超时重发
            new Thread(() -> {
                while (true) {
                    try {
                        Thread.sleep(5000); // 每 5 秒检查一次未确认消息
                        for (Long msgId : unconfirmedSet) {
                            System.out.println("[!] Retrying message " + msgId);
                            String message = messageMap.get(msgId);
                            if (message != null) {
                                // 重新发送未确认的消息
                                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

 

MQ中丢失:

 方法一:开启 RabbitMQ 的持久化

开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。

设置持久化有两个步骤:

  1. 创建 queue 的时候将其设置为持久化,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。
  2. 第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。 
  3. 持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。
  4. 注意,有一种情况,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
。。。。。。
// 声明队列,设置持久化(durable = true)
   channel.queueDeclare(QUEUE_NAME, true, false, false, null);

。。。。。。
// 发送持久化消息(deliveryMode = 2)
   channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

消费端丢失:

方法一: 关闭自动ack,开启手动ack

自动ack和手动ack区别:

自动ack:

消费者接收到消息时,RabbitMQ 立即认为消息已被成功处理,并自动从队列中删除,不管消费者是否真正处理完。

//true代表自动ack
channel.basicConsume(QUEUE_NAME, true, consumer);
 手动ack:

消费者在确认消息处理完毕后,手动发送 ACK 确认,RabbitMQ 才会将消息从队列中移除。  

//false代表取消自动ack
channel.basicConsume(QUEUE_NAME, false, consumer);
//手动返回ack
channel.basicAck(envelope.getDeliveryTag(), false);
自动 ACK 适用于不重要的日志、监控等,无需保证消息可靠性
手动 ACK 适用于订单、支付等关键业务,需要确保消息处理成功

完整代码演示:

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

public class ConfirmProducer {
    private static final String QUEUE_NAME = "confirm_queue";
    
    // 维护消息 ID 和消息内容的映射
    private static final Map<Long, String> messageMap = new ConcurrentHashMap<>();
    // 存储未确认的消息 ID
    private static final Set<Long> unconfirmedSet = Collections.newSetFromMap(new ConcurrentHashMap<>());

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // 连接 RabbitMQ 服务器
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明队列,设置持久化(durable = true)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 关闭自动 ACK,改为手动确认
            boolean autoAck = false;

            // 开启 confirm 模式
            channel.confirmSelect();

            // 处理确认消息的回调
            ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
                if (multiple) {
                    unconfirmedSet.removeIf(tag -> tag <= deliveryTag);
                } else {
                    unconfirmedSet.remove(deliveryTag);
                }
                messageMap.remove(deliveryTag);
                System.out.println("[x] Message " + deliveryTag + " acknowledged");
            };

            // 处理未确认消息的回调
            ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
                System.out.println("[!] Message " + deliveryTag + " NOT acknowledged, retrying...");
                String message = messageMap.get(deliveryTag);
                if (message != null) {
                    // 重新发送消息,确保持久化
                    channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                }
            };

            // 添加确认和未确认回调
            channel.addConfirmListener(ackCallback, nackCallback);

            // 发送消息
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                long nextSeqNo = channel.getNextPublishSeqNo(); // 获取消息 ID
                messageMap.put(nextSeqNo, message); // 存储消息
                unconfirmedSet.add(nextSeqNo); // 记录未确认消息 ID
                // 发送持久化消息(deliveryMode = 2)
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println("[x] Sent " + message);
            }

            // 消费者处理消息,并手动 ack
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("[x] Received and processing: " + message);
                    try {
                        // 模拟消息处理
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 处理完成后手动 ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    System.out.println("[x] Message processed and acknowledged: " + message);
                }
            };
            
            // 监听队列并消费消息
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到