文章目录
RabbitMQ的消息可靠传输
AMQP协议架构
RabbitMQ是基于AMQP协议实现的消息中间件,AMQP有一套自己的架构,RabbitMQ的架构也基于此。
如图所示就是AMQP协议的基础架构。其中主要角色有三个:生产者、Broker(中间人)、消费者。更加细分地话,就是四个:生产者、交换机、队列、消费者。
生产者
生产者就是我们发送消息的角色,是消息的发送方,负责把需要被处理的消息发送到下游——交换机。
交换机
交换机是Broker中的一个组件,是消息的分发中心,负责将接收到的消息按照设定的规则路由到对应的队列中去。
直连交换机: 将消息路由到与消息中的路由键完全匹配的队列中。
扇出交换机: 将消息广播到所有与该交换机绑定了的队列。
主题交换机: 根据通配符去匹配路由键,将消息路由到队列中去。
队列
队列是消息的存储区域,每个队列都可以绑定一个或者多个交换机。
消费者
消费者用于从队列中取出消息并进行消费,它是消息消费过程的最后一个环节,通过订阅队列来从需要的队列中得到消息。
消息可靠性保障
消息丢失的场景
1.生产者发送消息的过程中,Broker宕机、网络断开,或者是生产者发送的消息,MQ没有办法接收(交换机不匹配、无法路由到队列中)。
2.Broker接收到消息但还没有及时存储(持久化)就宕机了,消息也丢失了。
3.消费者在取消息的过程中,Broker宕机,或者网络断开。消费者拿到消息之后,处理出错了,或者是拿到消息之后还没来及的存储就宕机了。
总之,如果不做可靠性保障,消息丢失是不可避免地会发生的。
针对这三种场景的消息丢失,RabbitMQ提供了相应的解决方案:confirm消息确认机制(生产者)、消息持久化机制(RabbitMQ)、ACK事务机制(消费者)。
生产者(confirm消息确认机制)
生产者在发送消息前,会为消息指定要发送的交换机名称和路由键。
channel.basicPublish(
String exchange, // 指定交换机名称
String routingKey, // 指定路由键
AMQP.BasicProperties props,
byte[] body
);
这里就涉及到两方面的确认了,分别是交换机接收消息失败和队列接收消息失败。
**publisher-confirm:**MQ在每次接收到消息之后无论是否匹配到目标交换机上都会给生产者返回一个publisher-confirm确认消息。
消息成功匹配到交换机上的话,就会返回ack。
消息未成功匹配交换机的话就会返回nack。
publisher-return: 消息成功从交换机投递到队列中,那么就什么都不返回,如果生产者发送消息的时候队列绑定错了,那么就会返回ack给生产者,通知生产者。
消息持久化机制(RabbitMQ)
MQ收到消息后会将消息保存到磁盘中,保证RabbitMQ服务器在宕机或者重启的时候,消息不会丢失。
生成发送消息的时候可以把消息的delivery_mod属性设置为2,就可以将消息标记为持久化。
队列也可以通过durable属性设置为true,标记为持久化,这样MQ会将队列持久化到磁盘。
注意如果只将队列持久化但是消息没有持久化,那么消息是会丢失的;同时,只将消息持久化,队列没有持久化的话,消息依旧会丢失,因为消息是存储在对应队列中的。
ACK事务机制(消费者)
ACK事务机制确保消费者成功消费了消息,当消息被消费者成功消费后,消费者就会发送ACK确认消息给MQ,告知MQ我消费了消息,MQ在收到ACK通知之后就会将该条消息移除。这里的过程是自动处理的,同样也可以关闭这个自动ACK进行手动ACK处理。
注意点
上面的消息可靠性保障有几个注意点:
- 因为ack机制需要知道这个ack是确认了哪条消息的成功投递,所以每条消息都要有一个唯一id,可以使用默认的也可以通过UUID手动给定。
- 上面是RabbitMQ提供的确认机制,同时还是需要应用层Java层面编写代码进行配合的,就比如如果生成者发送的消息因为网络波动或者MQ宕机了,MQ根本没收到这条消息,也就不会做任何反应,生产者这边需要进行超时重传。消费者也是同理,如果MQ一直收不到消费者的确认ACK消息,也是需要超时重传的。
- 消息重传后就会遇见一个新的问题消息重复消费,这就需要我们保证消息消费的幂等性了,这点后续再讨论。