1.谈谈你对RabbitMQ的理解?
RabbitMQ
是一个生产者和消费者模型,主要负责接受、存储和转发消息。RabbitMQ主要分为生产者
、消费者
、Broker
、Exchange
和Queue
。
- 生产者就是消息的发送方,生产者将消息发送给Exchange。
- Exchange会将消息路由到其绑定的队列Queue中,Exchange不能存储消息。
- Queue用于存储消息,多个消费者可以订阅同一个队列,将消息发送到消费者。
- 消费者从队列中取到消息,然后对消息进行处理。
2.交换机Exchange类型有哪些?
交换机类型主要有Fanout
、Direct
和Topic
三种类型。
- Fanout交换机是采用广播的形式,将消息发送给每一个绑定的Queue中。比如支付案例中,用户支付成功之后,需要通知交易服务更新订单状态,通知短信服务给用户发短信,通知积分服务给用户加积分。只需要将这些微服务的队列绑定到Fanout交换机,就可以实现发送一条消息,每个微服务都能处理自己对应的业务。
- Direct会将接收到的消息根据路由规则路由到指定的Queue,因此称为定向路由。Fanout交换机将消息路由给每一个与之绑定的队列,Direct交换机根据消息的
RoutingKey
和队列的BindingKey
判断路由给哪个队列。如果多个队列具有相同的BindingKey
,则与Fanout功能类似。 - Topic也是基于
RoutingKey
和BindingKey
做消息路由,但是RoutingKey
通常是多个单词的组合,并且以.
分割。Topic交换机与队列绑定时的bindingKey
可以指定通配符,#
代表0个或多个词,*
代表1个词。
交换机的类型主要有三种,分别是Fanout、Direct和Topic。Fanout交换机采用广播机制,将消息发送给每个绑定的Queue。Direct是定向路由,将消息根据RoutingKey和BindingKey路由到指定的Queue中。Topic也是基于RoutingKey和BindingKey做消息队列,但是RoutingKey通常是多个单词的组合,并以’.'分割。
3.如何声明队列和交换机?
Spring提供了基于@RabbitListener
注解方式来声明队列、交换机和BindingKey。@RabbitListrner注解的bindings
属性,类型为@QueueBinding
注解,在这个注解里面传入三个属性:value、exchange和key。
value
指定队列名exchange
指定交换机名和类型key
指定对应的BindingKey
,可以使用数组指定多个。
RabbitMQ声明队列、交换机和BindingKey需要使用到@RabbitListener注解的binding属性,在这个属性中分别指定队列名、交换机名和BindingKey。
4.RabbitMQ如何实现消息的发送?
发消息用的是rabbitTemplate
中断中的convertAndSend
方法,方法中第一个参数的为交换机名称,第二个参数为routingKey,第三个是发送的消息。消息在网络中传输是基于JDK序列化的,JDK序列化存在以下问题:
- JDK序列化存在安全风险,在反序列化的时候容易被代码侵入。
- JDK序列化占用空间太多
- JDK序列化可读性很差
JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
RabbitMQ通过convertAndSend方法来发送消息,第一个参数为交换机名,第二个参数为routingKey,第三个参数为发送的消息。
5.RabbitMQ如何保证消息的可靠性?
可能出现消息丢失的场景分别是生产者发送消息时消息丢失、MQ宕机导致消息丢失,消费者处理消息时抛异常导致消息丢失。
为了保证生产者的可靠性,可以采用发送者重试机制和发送者确认机制。
- 发送者重试机制,在配置文件中的spring rabbitmq-template-retry下开启重试机制,并设置失败后的等待时间以及最大重试次数。如果对业务性能有要求,建议禁用重试机制。
- 发送者确认机制,有两种机制包括confirm和return。
- 当消息投递到MQ,但是路由失败时,通过Return返回异常信息,同时通过confirm返回
ACK
的确认信息,代表投递成功。 - 消息投递成功,通过confirm返回
ACK
。 - 消息投递失败,返回
NACK
。
- 当消息投递到MQ,但是路由失败时,通过Return返回异常信息,同时通过confirm返回
为了保证MQ的可靠性,可以采用数据持久化。
- 交换机持久化,在控制台的Exchanges页面将Durability参数设置为Durable开启持久化。
- 队列持久化,在控制台的Queue页面将Durability参数设置为Durable开启持久化。
- 消息持久化,在发送消息的时候,配置持久化属性。
为了保证消费者的可靠性采用消费者确认机制、失败重试机制以及失败处理策略。
- 消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。
ack
:成功处理消息,RabbitMQ从队列中删除该消息。nack
:消息处理失败,RabbitMQ需要再次投递消息给消费者。reject
:消息处理失败并拒绝该消息,比如消息在转换过程中出现异常,RabbitMQ会从队列中删除该消息。 - 失败者重试机制,当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试。重试达到最大次数后,Spring会返回reject,消息会被丢弃。
- 失败处理策略,Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由
MessageRecovery
接口来定义的。失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
RabbitMQ为了保证生产者的可靠性,通过重试机制和确认机制实现。重试机制需要设置发送失败后的等待时间以及最大重连次数。确认机制是消息投递成功返回ACK,消息投递失败返回NACK。
RabbitMQ为了保证MQ的可靠性,采用数据持久化,包括交换机持久化、队列持久化和消息持久化,都是需要配置Durability属性来开启持久化的。
RabbitMQ为了保证消费者的可靠性,采用消费者重试机制、确认机制以及失败处理策略。消费者重试机制是当消费者出现异常后,消息不断requeue到队列,再重发给消费者。确认机制是在处理消息后,向RabbtiMQ发送处理状态。ack表示消息处理成功,直接从队列中删除消息。nack表示消息处理失败,需要RabbitMQ再次投递给消费者。失败处理逻辑就是,消息处理失败后会投递到一个指定的队列中,等待后续人工处理。
6.业务幂等性的常见方案有哪些?
采用确认机制和重试机制中,很有可能出现同一个消息被重复消费的可能,可以采用唯一ID的方式来保证幂等性。
- 每一条消息都生成一个唯一ID,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库。
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
采用唯一ID的方式确保业务幂等性,每条消息都生成一个唯一ID,消费者处理完消息后将消息ID存放到数据库。下次收到相同的消息就去数据库中查询,如果存在就是重复消息。
7.如何解决消息积压?
消息堆积的原因就是生产者生成速度大于消费者消费速度。
解决消息堆积主要有三种方法:
- 单台实例启动多线程,加快消息消费
- 开启工作队列模型,多个实例订阅同一个queue,分摊任务。
- 使用惰性队列,队列接收到的消息直接存入磁盘,需要的时候再从磁盘中拿出,推送给消费者。
8.如何确保支付服务与交易服务之间的订单状态一致性?
- 首先,支付服务会在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
- 为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务故障导致消息丢失。因此无论是发送方、MQ本身和消费者都能确保消息可靠。这就可以确保消息一定投递到消费者,至少让消费者处理一次。
- 由于可能会出现重复投递消息,最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。