MQ 消息幂等性保证

发布于:2025-03-25 ⋅ 阅读:(270) ⋅ 点赞:(0)

MQ 消息幂等性保证

1. 什么是幂等性

在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据 id 删除数据
  • 查询数据

在实际业务中,避免不了出现用户连续点击退款、重复点击删除等情况,这种情况下,就需要对多个消息进行处理,避免短时间内多次执行相同业务(类似于防抖)。

如:

  1. 幂等性操作

    • 查询
    • 根据条件删除
    • 更新
  2. 非幂等性操作

    1. 增加:多次执行,会插入多条数据

      即使使用唯一约束,也只能解决一部分情况

    2. 更新

解决方案:

  • 唯一消息 ID
  • 业务状态判断

2. 唯一消息 ID

这个思路非常简单:

  1. 每一条消息都生成一个唯一的 id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库或 Redis
  3. 如果下次又收到相同消息,去数据库或 Redis 查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一 ID 呢?

其实很简单,SpringAMQP 的 MessageConverter 自带了 MessageID 的功能,我们只要开启这个功能即可。

以 Jackson 的消息转换器为例:

@Bean
public MessageConverter messageConverter() {
  // 1. 定义消息转换器
  Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
  // 2. 配置自动创建消息 id,用于识别不同消息,也可以在业务中基于 ID 判断是否是重复消息
  jjmc.setCreateMessageIds(true);
  return jjmc;
}

然后再使用自动装配,将其注入 Spring 中即可

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
 com....messageConverter

3. 业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求,不同的业务场景判断的思路也不一样。

相比较而言,使用唯一消息 ID 的方案需要操作数据库或 Redis 保存消息 ID,所以更推荐使用业务判断的方案。

以支付修改订单的业务为例:

@Override
public void markOrderPaySuccess(Long orderId) {
  // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
  lambdaUpdate()
    .set(Order::getStatus, 2)
    .set(Order::getPayTime, LocalDateTime.now())
    .eq(Order::getId, orderId)
    .eq(Order::getStatus, 1) // 添加对业务状态的判断,只有为未支付状态的订单才能支付
    .update();
}

4. 业务补偿

​ 虽然利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息 100% 的可靠。万一真的 MQ 通知失败该怎么办呢?有没有其它补偿方案,能够确保订单的支付状态一致呢?

其实思想很简单:既然 MQ 通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的 MQ 通知失败,我们依然能通过主动查询来保证订单状态的一致。

图中黄色线圈起来的部分就是 MQ 通知失败后的补偿处理方案,由交易服务自己主动去查询支付状态。