在 RabbitMQ 中,消息的全局唯一 ID(Message ID)并不是由系统自动分配的,而是需要开发者手动设置。通过为每条消息指定唯一的 message_id
,可以实现幂等性处理、去重消费等功能。
1. 手动设置消息的全局唯一 ID
在发送消息时,可以通过 MessageProperties
设置 messageId
属性。该属性属于 AMQP 协议中的标准字段之一,允许用户自定义值。以下是一个示例代码:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageWithId() {
String msgBody = "This is a test message";
String customMessageId = java.util.UUID.randomUUID().toString();
MessageProperties props = new MessageProperties();
props.setMessageId(customMessageId); // 设置全局唯一ID
Message message = new Message(msgBody.getBytes(), props);
rabbitTemplate.send("exchange.name", "routing.key", message);
}
}
这样,每条消息在发送前都会被赋予一个唯一的 messageId
。
2. 在消费者端获取并使用消息 ID
消费者可以通过 Message
对象获取到该字段,并结合 Redis 或数据库进行去重校验:
@Component
@RabbitListener(queues = "your.queue")
public class Consumer {
@RabbitHandler
public void process(Message message) {
String receivedMessageId = message.getMessageProperties().getMessageId();
String body = new String(message.getBody());
// 使用 Redis 或其他方式判断是否已消费过此消息
if (isMessageProcessed(receivedMessageId)) {
System.out.println("重复消费,忽略该消息:" + receivedMessageId);
return;
}
System.out.println("消费消息内容:" + body);
markMessageAsProcessed(receivedMessageId);
}
private boolean isMessageProcessed(String messageId) {
// 实现查询 Redis 或数据库逻辑
return false;
}
private void markMessageAsProcessed(String messageId) {
// 实现将 messageId 存入 Redis 或数据库逻辑
}
}
这种方式可以有效防止消息重复消费的问题 。
3. 消息 ID 的生成策略
UUID:适用于大多数场景,生成简单且唯一性强。
Snowflake:适合分布式系统中对性能要求较高的场景。
时间戳+序列号:如
System.currentTimeMillis() + "-" + counter
,也可作为轻量级方案。
4. 注意事项
如果未显式设置
messageId
,RabbitMQ 不会自动为每条消息分配唯一标识。若需保证全局唯一性,请确保不同生产者之间不会产生冲突。
建议结合持久化机制(如 Redis)来记录已处理的消息 ID,以支持重启后状态恢复 2。