rabbitMQ延时队列实现,怎么保证消息的幂等

发布于:2025-08-31 ⋅ 阅读:(23) ⋅ 点赞:(0)

一、RabbitMQ 延时队列实现方式

  1. 基于 TTL(Time-To-Live)+ 死信队列(Dead Letter Queue)
    这是最常用的实现方式,核心思路是:
    (1)消息设置过期时间(TTL)
    (2)消息过期后进入绑定的死信队列
    (3)消费者监听死信队列,实现延时消费
// 1. 配置交换机和队列
@Configuration
public class DelayQueueConfig {
    // 普通交换机(用于接收原始消息)
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机
    public static final String DEAD_EXCHANGE = "dead_exchange";
    // 普通队列(消息过期后会进入死信队列)
    public static final String NORMAL_QUEUE = "normal_queue";
    // 死信队列(实际消费的队列)
    public static final String DEAD_QUEUE = "dead_queue";

    // 声明普通交换机
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    // 声明死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }

    // 声明普通队列(设置死信相关参数)
    @Bean
    public Queue normalQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置死信交换机
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信路由键
        args.put("x-dead-letter-routing-key", "dead_routing_key");
        // 队列消息统一过期时间(可选,也可在发送消息时单独设置)
        // args.put("x-message-ttl", 10000); 
        return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();
    }

    // 声明死信队列
    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    // 绑定普通队列和普通交换机
    @Bean
    public Binding normalBinding() {
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_key");
    }

    // 绑定死信队列和死信交换机
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_routing_key");
    }
}

// 2. 发送延时消息(设置消息级别的TTL)
@Service
public class DelayMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(String message, long delayMillis) {
        // 设置消息过期时间
        rabbitTemplate.convertAndSend(
            DelayQueueConfig.NORMAL_EXCHANGE,
            "normal_routing_key",
            message,
            correlationData -> {
                correlationData.getMessageProperties().setExpiration(String.valueOf(delayMillis));
                return correlationData;
            }
        );
    }
}

// 3. 消费死信队列消息(延时后的消息)
@Service
public class DelayMessageReceiver {
    @RabbitListener(queues = DelayQueueConfig.DEAD_QUEUE)
    public void receiveDelayMessage(String message) {
        System.out.println("收到延时消息:" + message + ",时间:" + new Date());
    }
}
  1. 基于 RabbitMQ 插件(rabbitmq_delayed_message_exchange)
    更推荐的方式,需先安装插件:
    (1)下载对应版本的 rabbitmq_delayed_message_exchange 插件
    (2)启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
@Configuration
public class DelayedExchangeConfig {
    // 延时交换机(类型必须是 x-delayed-message)
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct"); // 指定转发类型
        return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable("delayed_queue").build();
    }

    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed_routing_key").noargs();
    }
}

// 发送延时消息
@Service
public class DelayedMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayedMessage(String message, long delayMillis) {
        rabbitTemplate.convertAndSend(
            "delayed_exchange",
            "delayed_routing_key",
            message,
            correlationData -> {
                // 设置延时时间(毫秒)
                correlationData.getMessageProperties().setHeader("x-delay", delayMillis);
                return correlationData;
            }
        );
    }
}

二、保证消息幂等性的方案
消息幂等性指:同一条消息被多次消费时,结果是一致的,不会重复处理。常见实现方式:

  1. 基于唯一 ID + Redis / 数据库去重
    (1)发送消息时生成唯一 ID(如 UUID)
    (2)消费前检查该 ID 是否已处理
    (3)处理完成后标记该 ID 为已处理
@Service
public class IdempotentMessageReceiver {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @RabbitListener(queues = "delayed_queue")
    public void receiveMessage(Message message) {
        // 1. 获取消息唯一ID(假设放在消息头)
        String messageId = message.getMessageProperties().getMessageId();
        if (StringUtils.isEmpty(messageId)) {
            // 非法消息,直接拒绝
            throw new AmqpRejectAndDontRequeueException("消息ID为空");
        }

        // 2. 检查是否已处理(Redis分布式锁保证原子性)
        String key = "message:processed:" + messageId;
        Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
        if (Boolean.FALSE.equals(isFirst)) {
            // 已处理过,直接返回
            System.out.println("消息已处理,ID:" + messageId);
            return;
        }

        // 3. 处理消息业务逻辑
        String content = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("处理消息:" + content);
    }
}
  1. 基于业务唯一标识去重
    如果消息没有全局 ID,可使用业务字段组合作为唯一标识(如订单号):
// 例如处理订单支付消息,用订单号作为唯一标识
String orderNo = extractOrderNo(content); // 从消息中提取订单号
String key = "order:processed:" + orderNo;
// 后续逻辑同上(检查Redis -> 处理业务)
  1. 数据库唯一约束
    通过数据库唯一索引实现幂等:
@Transactional
public void processOrder(String orderNo) {
    // 插入记录前检查,或直接插入(利用唯一索引报错)
    try {
        orderMapper.insert(new Order(orderNo)); // 假设orderNo有唯一索引
        // 处理订单逻辑
    } catch (DuplicateKeyException e) {
        // 已处理过,忽略
        log.info("订单已处理:{}", orderNo);
    }
}

总结
1.延时队列实现:
(1)简单场景用 TTL + 死信队列
(2)生产环境推荐用 rabbitmq_delayed_message_exchange 插件(更可靠)
2.幂等性保证核心:
(1)为消息生成唯一标识(全局 ID 或业务唯一键)
(2)消费前检查标识是否已处理(Redis / 数据库)
(3)确保检查和标记操作的原子性(分布式锁 / 事务)
这两种机制结合,可实现可靠的延时任务处理。


网站公告

今日签到

点亮在社区的每一天
去签到