RabbitMQ

发布于:2025-04-15 ⋅ 阅读:(32) ⋅ 点赞:(0)

1.谈谈你对RabbitMQ的理解?

RabbitMQ是一个生产者和消费者模型,主要负责接受、存储和转发消息。RabbitMQ主要分为生产者消费者BrokerExchangeQueue

  1. 生产者就是消息的发送方,生产者将消息发送给Exchange。
  2. Exchange会将消息路由到其绑定的队列Queue中,Exchange不能存储消息。
  3. Queue用于存储消息,多个消费者可以订阅同一个队列,将消息发送到消费者。
  4. 消费者从队列中取到消息,然后对消息进行处理。

2.交换机Exchange类型有哪些?

交换机类型主要有FanoutDirectTopic三种类型。

  1. Fanout交换机是采用广播的形式,将消息发送给每一个绑定的Queue中。比如支付案例中,用户支付成功之后,需要通知交易服务更新订单状态,通知短信服务给用户发短信,通知积分服务给用户加积分。只需要将这些微服务的队列绑定到Fanout交换机,就可以实现发送一条消息,每个微服务都能处理自己对应的业务。
  2. Direct会将接收到的消息根据路由规则路由到指定的Queue,因此称为定向路由。Fanout交换机将消息路由给每一个与之绑定的队列,Direct交换机根据消息的RoutingKey和队列的BindingKey判断路由给哪个队列。如果多个队列具有相同的BindingKey,则与Fanout功能类似。
  3. Topic也是基于RoutingKeyBindingKey做消息路由,但是RoutingKey通常是多个单词的组合,并且以.分割。Topic交换机与队列绑定时的bindingKey可以指定通配符,#代表0个或多个词,*代表1个词。

交换机的类型主要有三种,分别是Fanout、Direct和Topic。Fanout交换机采用广播机制,将消息发送给每个绑定的Queue。Direct是定向路由,将消息根据RoutingKey和BindingKey路由到指定的Queue中。Topic也是基于RoutingKey和BindingKey做消息队列,但是RoutingKey通常是多个单词的组合,并以’.'分割。

3.如何声明队列和交换机?

Spring提供了基于@RabbitListener注解方式来声明队列、交换机和BindingKey。@RabbitListrner注解的bindings属性,类型为@QueueBinding注解,在这个注解里面传入三个属性:value、exchange和key。

  1. value指定队列名
  2. exchange指定交换机名和类型
  3. key指定对应的BindingKey,可以使用数组指定多个。

RabbitMQ声明队列、交换机和BindingKey需要使用到@RabbitListener注解的binding属性,在这个属性中分别指定队列名、交换机名和BindingKey。

4.RabbitMQ如何实现消息的发送?

发消息用的是rabbitTemplate中断中的convertAndSend方法,方法中第一个参数的为交换机名称,第二个参数为routingKey,第三个是发送的消息。消息在网络中传输是基于JDK序列化的,JDK序列化存在以下问题:

  1. JDK序列化存在安全风险,在反序列化的时候容易被代码侵入。
  2. JDK序列化占用空间太多
  3. JDK序列化可读性很差

JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

RabbitMQ通过convertAndSend方法来发送消息,第一个参数为交换机名,第二个参数为routingKey,第三个参数为发送的消息。

5.RabbitMQ如何保证消息的可靠性?

可能出现消息丢失的场景分别是生产者发送消息时消息丢失、MQ宕机导致消息丢失,消费者处理消息时抛异常导致消息丢失。

为了保证生产者的可靠性,可以采用发送者重试机制和发送者确认机制。

  1. 发送者重试机制,在配置文件中的spring rabbitmq-template-retry下开启重试机制,并设置失败后的等待时间以及最大重试次数。如果对业务性能有要求,建议禁用重试机制。
  2. 发送者确认机制,有两种机制包括confirm和return。
    • 当消息投递到MQ,但是路由失败时,通过Return返回异常信息,同时通过confirm返回ACK的确认信息,代表投递成功。
    • 消息投递成功,通过confirm返回ACK
    • 消息投递失败,返回NACK

为了保证MQ的可靠性,可以采用数据持久化。

  1. 交换机持久化,在控制台的Exchanges页面将Durability参数设置为Durable开启持久化。
  2. 队列持久化,在控制台的Queue页面将Durability参数设置为Durable开启持久化。
  3. 消息持久化,在发送消息的时候,配置持久化属性。

为了保证消费者的可靠性采用消费者确认机制、失败重试机制以及失败处理策略。

  1. 消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。ack:成功处理消息,RabbitMQ从队列中删除该消息。nack:消息处理失败,RabbitMQ需要再次投递消息给消费者。reject:消息处理失败并拒绝该消息,比如消息在转换过程中出现异常,RabbitMQ会从队列中删除该消息。
  2. 失败者重试机制,当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试。重试达到最大次数后,Spring会返回reject,消息会被丢弃。
  3. 失败处理策略,Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的。失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

RabbitMQ为了保证生产者的可靠性,通过重试机制和确认机制实现。重试机制需要设置发送失败后的等待时间以及最大重连次数。确认机制是消息投递成功返回ACK,消息投递失败返回NACK。
RabbitMQ为了保证MQ的可靠性,采用数据持久化,包括交换机持久化、队列持久化和消息持久化,都是需要配置Durability属性来开启持久化的。
RabbitMQ为了保证消费者的可靠性,采用消费者重试机制、确认机制以及失败处理策略。消费者重试机制是当消费者出现异常后,消息不断requeue到队列,再重发给消费者。确认机制是在处理消息后,向RabbtiMQ发送处理状态。ack表示消息处理成功,直接从队列中删除消息。nack表示消息处理失败,需要RabbitMQ再次投递给消费者。失败处理逻辑就是,消息处理失败后会投递到一个指定的队列中,等待后续人工处理。

6.业务幂等性的常见方案有哪些?

采用确认机制和重试机制中,很有可能出现同一个消息被重复消费的可能,可以采用唯一ID的方式来保证幂等性。

  1. 每一条消息都生成一个唯一ID,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库。
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

采用唯一ID的方式确保业务幂等性,每条消息都生成一个唯一ID,消费者处理完消息后将消息ID存放到数据库。下次收到相同的消息就去数据库中查询,如果存在就是重复消息。

7.如何解决消息积压?

消息堆积的原因就是生产者生成速度大于消费者消费速度。

解决消息堆积主要有三种方法:

  1. 单台实例启动多线程,加快消息消费
  2. 开启工作队列模型,多个实例订阅同一个queue,分摊任务。
  3. 使用惰性队列,队列接收到的消息直接存入磁盘,需要的时候再从磁盘中拿出,推送给消费者。

8.如何确保支付服务与交易服务之间的订单状态一致性?

  1. 首先,支付服务会在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  2. 为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务故障导致消息丢失。因此无论是发送方、MQ本身和消费者都能确保消息可靠。这就可以确保消息一定投递到消费者,至少让消费者处理一次。
  3. 由于可能会出现重复投递消息,最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。

网站公告

今日签到

点亮在社区的每一天
去签到