SpringCloud -- MQ高级

发布于:2025-07-31 ⋅ 阅读:(12) ⋅ 点赞:(0)

目录

一、发送者的可靠性

1. 生产者重试机制

2. 生产者确认机制

二、MQ的可靠性

1.数据持久化

2. LazyQueue

3. 非持久化、持久化、LazyQueue的区别

三、消费者的可靠性

1. 消费者确认机制

2. 失败重试机制

3. 失败处理策略

4. 业务幂等性

4.1 唯一消息ID

4.2 业务判断

四、延迟消息

1. 死信交换机和延迟消息

1.1 死信交换机

1.2 延迟消息

2. 超时订单问题


一、发送者的可靠性

消息从发送者发送消息,到消费者处理消息,需要经过的流程如下:

会有很多原因导致发送出来的消息丢失:

  • 发送消息时丢失:

    • 生产者发送消息时连接MQ失败

    • 生产者发送消息到达MQ后未找到Exchange

    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue

    • 消息到达MQ后,处理消息的进程发生异常

  • MQ导致消息丢失:

    • 消息到达MQ,保存到队列后,尚未消费就突然宕机

  • 消费者处理消息时:

    • 消息接收后尚未处理突然宕机

    • 消息接收后处理过程中抛出异常

1. 生产者重试机制

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

在生产者的yaml文件中添加如下的配置:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间,即每一次连接失败都等到1s再去尝试连接
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务的性能有要求,建议禁用重试机制。

2. 生产者确认机制

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象。

针对该情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

只要消息可以成功投递到交换机,就会返回ack的确认消息,表示投递成功。

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

可以在生产者的ymal文件中添加如下配置,来开启两个机制:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

注意:

开启生产者确认比较消耗MQ性能,一般不建议开启。而且触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致

  • 交换机名称错误:同样是编程错误导致

  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

因此,一般情况下只要注意编码正确,就可以避免消息在发送者这里丢失。

二、MQ的可靠性

1.数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

交换机持久化、队列持久化、消息持久化。

上述的持久化都可以通过控制台进行设置。

交换机持久化

设置为Durable就是持久化模式,Transient就是临时模式。

如果不对交换机进行持久化的设置,那么在RabbitMQ重启之后,该交换机及其绑定的关系将丢失,所有依赖该交换机工作的队列和消息都无法正常工作。

队列持久化

消息持久化

当然,MQ发送消息是默认持久化的,如果想要发送非持久化的消息,需要自定义消息构建器。

2. LazyQueue

在默认情况下,RabbitMQ会将接收到的消息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,会导致消息的积压,比如:

  • 消费者宕机或出现网络故障

  • 消息发送量激增,超过了消费者处理速度

  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决上述问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存

  • 消费者如果想要消费消息就需要去磁盘中读取并加载到内存中

  • 支持百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

3. 非持久化、持久化、LazyQueue的区别

非持久化并不是说消息不会向磁盘中进行存储,而是消息到达的时候首先在内存中进行存储,当内存中放不下的时候,再进行PageOut操作,将消息写入磁盘。但是这样也会导致消息丢失,存在内存中没有写入磁盘的消息就会丢失。

持久化就是发送消息时,内存中保存了什么消息就直接写入磁盘,内存一份、磁盘一份。但是持久化机制是发一条消息就往磁盘中写一条,导致并发能力下降。

LazyQueue是将消息直接写入磁盘,根本就不在内存中进行保留。同时因为优化了IO操作,并发能力也大大增强。如果消费者的需求大时,会提前将磁盘中的消息存入缓存当中(最多2048条消息)。

三、消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,也会导致消息的丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。

1. 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息

  • nack:消息处理失败,RabbitMQ需要再次投递消息

  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式使用的比较少,一般是消息格式有问题,投递多少次都没有用。

在消费者的yaml配置文件中添加如下配置就可以实现消费者的确认机制:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;

2. 失败重试机制

失败重试机制就是消费者处理消息失败后,限制队列向消费者投递的次数。

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

修改消费者服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试

  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3. 失败处理策略

这在某些对于消息可靠性要求较高的业务场景下,如果消息投递重试次数到达最大值进行丢弃,显然不太合适了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,我们使用RepublishMessageRecover来进行处理(重试耗尽后,将失败消息投递到指定的交换机)。

首先在消费者服务中创建一个配置文件,定义处理失败消息的交换机和队列:

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

之后在该文件中定义一个RepublishMessageRecoverer,关联队列和交换机:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

之后,处理失败的消息就会被投递到error.queue这个队列当中。

4. 业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如查询、删除等业务。

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况

  • 退款业务。重复退款对商家而言会有经济损失。

因此,必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID

  • 业务状态判断

4.1 唯一消息ID

这个思路非常简单:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。

  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库

  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

该如何给消息添加唯一ID呢?

其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。

以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

该方法的缺点是有业务侵入、且有数据库的操作影响业务性能。

4.2 业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

例如在支付案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以更推荐使用业务判断的方案。

四、延迟消息

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

1. 死信交换机和延迟消息

1.1 死信交换机

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

上图中的dlx.direct和dlx.queue其实只是一个普通的交换机和队列,只不过normal.queue通过dead-letter-exchange属性指定了一个交换机(dlx.direct)。因此normal.queue中的死信就会通过该交换机被投递到dlx.queue队列当中。

1.2 延迟消息

如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:

假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒:

消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信:

死信被再次投递到死信交换机hmall.direct,并沿用之前的RoutingKey,也就是blue

由于direct.queue1hmall.direct绑定的key是blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:

也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息

当然也可以通过安装DelayExchange插件来将一个普通的队列设置为一个延迟队列,这里就不多加赘述。

2. 超时订单问题

对于一个用户下单的业务,首先用户会去下单,在交易服务中回去创建一个新的订单,其订单状态是未付款。接着会进入商品服务,扣减对应的库存。之后用户就会向支付服务发起请求进行支付的行为,如果用户支付成功,理论上支付服务就会通过MQ向交易服务发送一条消息,即“用户已支付”。交易服务就会将订单状态改为已支付

但是如果支付服务因为某种原因无法发送“用户已支付”这个通知,那么支付服务中的交易流水会变为已支付的状态,但是订单状态却还是未付款,就出现了数据不一致的现象。同时,因为交易服务并不知道支付状态,就会一直等待,商品的库存也会处于扣减状态,此时就相当于一直在占用商品,会导致其他用户没办法进行下单。

为了解决上述问题,就需要用到延迟消息这一解决方法。定义延迟消息为15分钟,15分钟后交易服务发现支付服务还没有来通知我们,交易服务就会主动去向支付服务进行查询,查询到相关支付状态,然后对订单进行修改。

如果查询到的状态还是未付款,那么就将该订单取消,回复库存。

上述就是一个利用延迟消息解决业务的例子。


网站公告

今日签到

点亮在社区的每一天
去签到