RabbitMQ 消息不重复消费和顺序性

发布于:2025-05-10 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、引言

在使用 RabbitMQ 进行消息传递时,确保消息不重复消费和保证消息的顺序性是两个重要的问题。不重复消费可以避免业务逻辑的重复执行,保证数据的一致性;而消息的顺序性则在某些业务场景下(如订单处理、状态更新等)至关重要。本技术文档将详细介绍在 Java 中如何解决这两个问题。

二、保证消息不重复消费

2.1 问题分析

消息重复消费通常是由于网络波动、消费者故障等原因导致消息确认机制出现问题,RabbitMQ 会重新投递消息,从而造成重复消费。

2.2 解决方案

2.2.1 唯一消息 ID

为每条消息生成一个唯一的 ID,在消费者端对消息 ID 进行记录和判断。如果该消息 ID 已经被处理过,则直接忽略该消息。

2.2.2 代码示例

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

public class Consumer {
    private static final String QUEUE_NAME = "test_queue";
    private static final Set<String> processedMessageIds = new HashSet<>();

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String messageId = delivery.getProperties().getMessageId();
                if (processedMessageIds.contains(messageId)) {
                    System.out.println(" [x] Message already processed: " + messageId);
                    return;
                }
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 处理消息的业务逻辑
                    doWork(message);
                } finally {
                    processedMessageIds.add(messageId);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
        }
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

public class Producer {
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello, RabbitMQ!";
            String messageId = UUID.randomUUID().toString();
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                   .messageId(messageId)
                   .build();
            channel.basicPublish("", QUEUE_NAME, properties, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "' with ID: " + messageId);
        }
    }
}

2.2.3 代码解释

  • 生产者:为每条消息生成一个唯一的 UUID 作为消息 ID,并将其添加到消息的属性中。
  • 消费者:在消费消息时,首先检查消息 ID 是否已经在 processedMessageIds 集合中。如果已经存在,则忽略该消息;否则,处理消息并将消息 ID 添加到集合中。

三、保证消息的顺序性

3.1 问题分析

在 RabbitMQ 中,默认情况下消息是异步处理的,多个消费者可以同时从队列中获取消息,这可能导致消息的顺序被打乱。

3.2 解决方案

3.2.1 单消费者模式

为每个需要保证顺序的队列只配置一个消费者,这样可以确保消息按照发送的顺序依次被处理。

3.2.2 代码示例

import com.rabbitmq.client.*;

import java.io.IOException;

public class OrderedConsumer {
    private static final String QUEUE_NAME = "ordered_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for ordered messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 处理消息的业务逻辑
                    doWork(message);
                } finally {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
        }
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

public class OrderedProducer {
    private static final String QUEUE_NAME = "ordered_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

3.2.3 代码解释

  • 生产者:按顺序发送一系列消息到队列中。
  • 消费者:单个消费者从队列中依次获取消息并处理,确保消息的顺序性。

四、总结

通过为消息生成唯一 ID 并在消费者端进行记录,可以有效避免消息的重复消费;而采用单消费者模式可以保证消息的顺序性。在实际应用中,需要根据具体的业务场景和需求选择合适的解决方案。

以上代码示例基于 RabbitMQ Java 客户端,确保你已经添加了相应的依赖。在 Maven 项目中,可以在 pom.xml 中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

这样,你就可以在 Java 中保证 RabbitMQ 消息的不重复消费和顺序性。


网站公告

今日签到

点亮在社区的每一天
去签到