RabbitMQ专项

发布于:2025-03-09 ⋅ 阅读:(25) ⋅ 点赞:(0)

1. 什么是消息中间件

  • 消息(Message):在应用层传送的数据。比如文本字符串、JSON 等。
  • 消息队列中间件(Message Queue Middleware,MQ):利用可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过消息排队模型,它可以在分布式环境下扩展进程间的通信。

消息队列中间件一般有两种传递模式:

  • 点对点(P2P,Point-to-Point)模式基于队列,消息生产者发送消息到队列,消息消费者从队列中接收消息。队列的存在使得消息的异步传输成 为可能。

  • 发布 / 订阅(Pub/Sub)模式定义了如何向一个内容节点发布和订阅消息,内容节点称为主题(topic),消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。该模式在消息的一对多广播时采用

RabbitMQ 是一种典型的点对点模式,而 Kafka 是一种典型的发布订阅模式。但是 RabbitMQ 中可以通过设置交换器类型来实现发布订阅模式而达到广播消费的效果。

消息中间件适用于需要可靠的数据传送的分布式环境。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。实现应用程序之间的协同,优点在于能够在客户和服务器之间提供同步和异步的链接,并且在任何时刻都可以将消息进行传送或存储转发。

2. 消息中间件的作用

2.1 作用

在不同的应用场景下有不同的作用,总的来说,可以概括如下:

2.1.1 解耦

对于项目的变化,很难预测到未来的变动。消息中间件在处理过程中间插入了一个隐含、基于数据的接口层,两边的处理过程都要实现这一接口,但是两边都可以独立的扩展或则修改自己的处理过程,只要确保他们遵守同样的接口约束即可。

解耦前:

解耦后:

2.1.2 异步

传统模式下使用串行接口调用的方式,一些非必要的业务逻辑以同步的方式运行,阻塞后续接口的调用,耗费时间。如果使用消息中间件方式,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,可以加快响应速度。

同步调用:

异步:

2.1.3 削峰

一个系统访问流量有高峰时期,也有低峰时期,如果高峰期流量太大,我们的系统、数据库可能就会崩溃。这时如果使用 MQ 进行流量削峰,将用户的大量消息直接放到 MQ 里面,然后我们的系统去按自己的最大消费能力去消费这些消息,就可以保证系统的稳定,随后跟进业务逻辑,给用户返回特定页面或者稍后通过其他方式通知其结果。

以胜天半子回调为例:接口只在一天内的某一段时间高频访问。所以可以将集中式的高压平摊到全天的其他时间段,使用消息队列达到削峰填谷的效果。

2.2 缺点

2.2.1 系统可用性降低

在系统中引入 MQ,需要考虑MQ服务挂掉的风险与后续的补偿。一般而言,引入的外部依赖越多,系统越脆弱,每一个依赖出问题都会导致整个系统的崩溃。

2.2.2 系统复杂度提高

需要考虑 MQ 的各种情况,比如:消息的重复消费、消息丢失、保证消息传递的顺序性等等。

2.2.3 数据一致性问题

比如 A 系统已经给客户返回操作成功,这时候操作 BC 都成功了,操作 D 却失败了,导致数据不一致。

3. 消息中间件的架构模型与横向对比

3.1 消息模型

所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

3.2 JMS Queue模型

下图显示的是JMS Queue模型,可以看到,应用1和应用2发送消息到JMS服务器,这些消息根据到达的顺序形成一个队列,应用3和应用4进行消息的消费。这里需要注意的是,应用3和应用4收到的消息是不同的,也就是说在JMS Queue的方式下,如果Queue里面的消息被一个应用处理了,那么连接到JMS Queue上的另一个应用是收不到这个消息的,也就是说所有连接到这个JMS Queue上的应用共同消费了所有的消息。消息从发送端发送出来时不能确定最终会被哪个应用消费,但是可以明确的是只有一个应用会去消费这条消息,所以JMS Queue模型也被称为Peer To Peer(PTP)方式

3.3 JMS Topic模型

下图显示的是JMS Topic模型。从发送消息的部分和JMS Topic内部的逻辑来看,JMS Topic和JMS Queue是一样的,二者最大的差别在于消息接收的部分,在Topic模型中,接收消息的应用3和应用4是可以独立收到所有到达Topic的消息的。JMS Topic模型也被称为Pub/Sub方式。

3.4 RabbitMQ 架构

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性: RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
  2. 消息集群: 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
  3. 高可用: 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  4. 多种协议: RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
  5. 多语言客户端: RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
  6. 管理界面: RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
  7. 插件机制: RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

RabbitMQ 内部结构如下:

  1. Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  2. Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  3. Broker 表示消息队列服务器实体。
  4. Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  5. Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  6. Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  7. Binding 绑定,用于消息队列和交换器或者交换器与交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  8. Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里,等待消费者连接到这个队列将其取走。多个消费者可以订阅同一个队列,队列中的消息会被平均分摊给多个消费者处理,而不是每个消费者都会收到所有消息并处理。
  9. Connection 网络连接,比如一个 TCP 连接。
  10. Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

3.5 RocketMQ架构

RocketMQ 的架构整体有四部分组成:NameServer、Broker、Producer、Consumer,其中每一个模块都可以进行水平扩展。

NameServer

NameServer 提供轻量级的服务发现和路由。每个 NameServer 记录完整的路由信息,提供相应的读写服务,并支持快速的存储扩展。

Broker

Broker 通过提供轻量级的 TOPIC 和 QUEUE 机制来存储消息。包含容错机制(2 个或 3 个副本),并提供强大的峰值填充功能和按原始时间顺序累积数千亿条消息的能力。此外,Broker 还提供灾难恢复,丰富的指标统计信息和警报机制,而这是传统消息中间件所没有的。

Producer

Producer 支持分布式部署。分布式生产者通过多种负载平衡模式将消息发送到 Broker 集群。发送过程支持快速失败并且延迟低。

Consumer

消费者支持分布式部署。还支持消息广播、实时消息订阅机制,可以满足大多数消费者的需求。

3.6 Kafka架构

Apache Kafka 采取了分区日志模型,是一种分布式数据存储。经过优化以实时提取和处理流数据。流数据是指由数千个数据源持续生成的数据,通常可同时发送数据记录。流平台需要处理这些持续流入的数据,按照顺序逐步处理。

Kafka 为其用户提供三项主要功能

  • 发布和订阅记录流
  • 按照记录的生成顺序高效地存储记录流
  • 实时处理记录流。

kafka的关键概念如下:

Broker

Broker 是 kafka 实例,每个服务器上有一个或多个 kafka 的实例。每个 kafka 集群内的 broker 都有一个不重复的编号,如图中的 broker-0、broker-1等。

Topic

消息的主题,可以理解为消息的分类,kafka 的数据就保存在 topic。在每个 broker 上都可以创建多个 topic。

Partition

一个 Topic 可以认为是一类信息,逻辑上的队列,每条消息都要指定 Topic。为了使得 Kafka 的吞吐量可以线性提高,物理上将 Topic 分成一个或多个 Partition。每个 Partition 在存储层面时 append log 文件,消息 push 进来后,会被追加到 log 文件的尾部,每条消息在文件中的位置成为 offset (偏移量),offset 是一个 long 型数字,唯一的标识一条信息。因为每条消息都追加到 Partition 的尾部,所以属于磁盘的顺序写,效率很高。如下图:

Replication

每一个分区都有多个副本,副本的作用是做备份。当主分区(Leader)故障的时候会选择一个备份(Follower)上位,成为 Leader。在 kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,follower 和 leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。

Consumer Group

我们可以将多个消费组组成一个消费者组,在 kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic 的不同分区的数据,这也是为了提高 kafka 的吞吐量。

3.7 消息中间件的横向对比

特性

RabbitMQ

RocketMQ

Kafka

单机吞吐量

万级

十万级

百万级

时效性

μs级

ms级

ms级

可用性

主从,采用镜像模式实现

非常高

分布式,主从

非常高

分布式,主从

消费模式

推 + 拉

死信队列

支持

支持

不支持

延迟队列

支持

支持

不支持

支持协议

AMQP、MQTT、STOMP

自定义

自定义

消息顺序

条件苛刻,需要单线程发送,单线程消费并不采用延迟队列、优先级队列

支持

支持单分区顺序

消息存储

内存、磁盘

磁盘

内存、磁盘、数据库

消息回溯

不支持

支持指定时间点的回溯

支持指定分区offset位置的回溯

优点

erlang 语言开发,性能极好、延时很低、MQ 功能完备,管理界面非常好,社区活跃

接口简单易用,吞吐量大,分布式扩展方便、社区比较活跃,支持大规模的 Topic、支持复杂的业务场景,可以基于源码进行定制开发

超高吞吐量,ms 级的时延,极高的可用性和可靠性,分布式扩展方便

缺点

吞吐量较低,erlang 语言开发不容易进行定制开发,集群动态扩展麻烦

没在MQ核心实现JMS等接口,broker主从切换不能自动实现

消息队列功能较为简单,MQ高级功能支持不足

应用

都有应用

用于大规模吞吐、复杂业务中

在大数据的实时计算和日志采集中被大规模使用

4. AMQP 中的消息路由

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

4.1 Exchange 类型

Exchange 分发消息时根据类型的不同分发策略有区别,目前主要共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

4.1.1 direct

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为 “info”,则只转发 routing key 标记为 “info” 的消息,不会转发 “debug”,也不会转发 “info.temp” 等等。它是完全匹配、单播的模式。

4.1.2 fanout

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

4.1.3 topic

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符用于模糊匹配:符号 “#” 和符号 “*”。# 匹配 0 个或多个单词,* 匹配一个单词。

5. RabbitMQ 常见的五种模式

在 RabbitMQ 中,常见的五种模式如下

  1. Direct 模式
  2. Worker 模式
  3. Publish/Subscribe 模式
  4. Routing 模式
  5. Topics 模式

5.1 Direct 模式

最简单的模式,只会有一个 Producer 负责发送 message 到 Queue 里、而也只有一个 Consumer 去 Queue 里消费 message。

5.2 Worker 模式

跟 Direct 模式很像,但是差别是 Worker 模式中会同时有多个 Consumer 会去消费 Queue 里的 message,增加 message 消费的速度。

5.3 Publish/Subscribe 模式

从这个模式之后,在 Producer、Queue、Consumer 之间开始使用交换器Exchange,Producer 不再是直接把 message 投递到 Queue 里,而是让 Producer 把 message 投递 Exchange,再交由 Exchange 去决定要把这个 message 投递给哪个 Queue

在 Publish/Subscribe 模式中,使用的是 Exchange 的 fanout type,当 Producer 把 message 投递给 Exchange 时,Exchange 会把这个 message 投递到它绑定的所有 Queue 上。

Publish/Subscribe 模式也是一个常用的模式,通常是用在需要订阅的情况下。

比如当订单服务有了一笔新订单之后就要去通知短信服务、商品服务… 等,那如果是用前面提到RabbitMQ 中的 Direct 模式,在订单服务就要自己指定要把message 投递到哪个 Queue 上,如果又有新的服务要来订阅这个 message 的话,需要更新代码。

但是使用 Publish/Subscribe 模式的话,订单服务就可以直接将message 投递 Exchange,而我们只要把新的服务使用的 Queue 和 Exchange 绑定,Exchange 就可以自动把 message 也投递到新的 Queue 里了。

5.4 Routing 模式

Routing 模式也是一个用到了 Exchange 的模式,这个模式使用的是 Exchange 的 direct type

当 Producer 把 message 投递 Exchange 时,同时要在这个message 上面带上一个routing key,而 Exchange 就会根据这个 routing key,将 message 投递到指定的 Queue 上

Routing 模式不同于发布/订阅模式的地方在于它会把消息根据routing key进行分发,到匹配需要的队列中去,而不是分发到所有的队列。

5.5 Topics 模式

用到了 Exchange 的 topic type,用法基本上跟 Routing 模式一样,只是 routing key 提供了模糊匹配的规则

6. RabbitMQ 消费模式

RabbitMQ的消费模式分为两种:推(Push)模式和拉(Pull)模式。

6.1 推模式

推模式使用持续持续订阅的方式来消费消息。

6.2 拉模式

拉模式请求channel获得单条消息。

6.3 两种模式各自的问题

6.3.1 慢消费

慢消费是push模型最大的致命伤,串成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。

6.3.2 消息延迟与忙等

这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。 但等待多久就很难判定了。有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生。在RocketMQ里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来。

7. RabbitMQ 死信队列

“死信” 是 RabbitMQ 中的一种消息机制,当消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,使用 channel.basicNack或 channel.basicReject ,并且此时 requeue 属性被设置为 false。
  • 消息在队列的存活时间超过设置的 TTL 时间。
  • 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为 “死信”。

“死信” 消息会被 RabbitMQ 进行特殊处理,如果配置了死信交换器信息,那么该消息将会被投递到死信交换器中,如果没有配置,则该消息将会被丢弃,死信常用于延迟队列的实现。

Message的Header 中会包含一些死信的属性:

字段名

含义

x-first-death-exchange

第一次被抛入的死信交换机的名称

x-first-death-reason

第一次成为死信的原因,

rejected:消息在重新进入队列时被队列拒绝,由于

default-requeue-rejected参数被设置为false。

expired:消息过期。

maxlen: 队列内消息数量超过队列最大容量

x-first-death-queue

第一次成为死信前所在队列名称

x-death

历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更

8. RabbitMQ 延迟队列

8.1 设置TTL

TTL,Time to Live的缩写,即过期时间,RabbitMQ可以为队列和消息设置单独的过期时间。如果为队列设置过期时间,队列中的所有消息都会有相同的过期时间。也可以为消息设置过期时间,每个消息的过期时间可以不同。如果同时进行了设置,会以两者间小的那个为准。一旦超过设置的TTL值时,消息就会变成死信(Dead Message)

8.2 设置DLX

即Dead-letter-exchange,死信交换器,当一个队列中的消息过期后,它可以被投递到死信交换器中去,然后通过死信交换器投递到队列中。

通过设置队列TTL和DLX的方式,可以实现RabbitMQ的延迟队列功能

如图为队列设置TTL,队列中的消息具有过期时间,消息过期后投递到队列的DLX中,DLX根据routing key投递到正常队列中去,消费者从正常队列中消费从而达到延迟队列的功能。

延迟队列没有消费者,消息在过期后会投递到设置的x-dead-letter-exchange交换器中去。

9. RabbitMQ 安装使用

安装erlang和RabbitMQ。使用命令启动RabbitMQ

启用web插件,通过网页可以查看管理RabbitMQ

启动之后,RabbitMQ 使用的 port 默认是 5672,而 RabbitMQ 的 Web 管理介面使用的端口默认是 15672,因此只要访问http://localhost:15672,就会出现 RabbitMQ 的 Web 管理介面,初始用户密码 guest guest,可以添加别的用户访问。

10. RabbitMQ集群

10.1 普通集群

集成方式: 多台服务器单独部署节点,每个节点保存着 Queue 的元数据,同一条消息只存在与其中一个节点上,当访问一个仅含有Queue 元数据的节点时,则会从具有实例的节点拉取数据。

优点:可以访问多台MQ实例去消费,提高吞吐量。

缺点:可能在RabbitMQ集群内产生大量的消息传输;无高可用保障,某节点宕机,会导致节点上的Queue数据丢失,不是高可用的。

10.2 镜像集群

集成方式: 多台服务器单独部署节点,一个节点会向其他节点同步数据,每个节点都持有所有的消息数据。

优点:高可用,每个节点都含有Queue的全部数据,元数据与消息数据都会包含,任一节点宕机,其余节点都有全量数据,不影响消费者消息。

缺点:同步消息数据,性能开销大。

10.3 镜像集群负载均衡

镜像集群本身没有提供负载均衡功能,使用HAproxy来做 RabbitMQ 集群负载均衡,用 Keepalived 来保证 HAproxy 的高可用,整体架构图如下:

如果只采用一台HAProxy,那么它就存在明显的单点故障的问题,所以至少需要两台HAProxy,同时这两台HAProxy 之间需要能够自动进行故障转移,通常的解决方案就是 KeepAlived 。KeepAlived 采用VRRP来解决单点失效的问题,它通常由一组一备两个节点组成,同一时间内只有主节点会提供对外服务,并同时提供一个虚拟的 IP 地址。如果主节点故障,那么备份节点会自动接管 VIP 并成为新的主节点,直到原有的主节点恢复。

11. Web 管理页面

登录之后会在 Overview 页面,在 Overview 页面可以查看 Queue 的整体情况以及 cluster node ,以及 cpu/memory 使用状况等

在 RabbitMQ 管理页面右侧是virtual host 的列表,可以当作分组,一个 virtual host 就是一组,当 RabbitMQ 运行时,默认会产生一个 virtual host 叫做/,如果不做特别调整,所有的 Queue 都会创建在/的 virtual host 里。

所以如果想要在 RabbitMQ 里面做权限控制的话,只要多创建几个 virtual host,user 就可以在不同的 virtual host 下创建 Queue 和 Exchange,不同 virtual host 里的 Queue 和 Exchange 无法互通,然后也可以去限制某些 user的权限,只能访问某些 virtual host。

之后是连接和信道的信息,因为RabbitMQ采用了连接的多路复用,一个连接会有多个信道。但一个信道一般只会对应一个消费者

之后是Exchanges 标签,可以查看目前已存在的 Exchange,以及新增一个新的 Exchange

点击其中一个已存在的 Exchange,可以查看该 Exchange 的详细信息,可以查看其与上游交换器与下游队列的绑定关系,以及绑定的规则。

点击上方的 Queue 标签页进入到 Queue 页面,可以查看目前已存在的 Queue,也可以新增一个新的 Queue

同理,点击其中一个已存在的 Queue,可以查看该 Queue 的详细信息,也可以对该Queue 进行一系列的操作,可以看到最近的消费情况、交换器绑定关系以及消费者信息。

最后的admin标签页会提供RabbitMQ的用户和vhost管理功能

12. 消息队列常见问题

12.1 多消费者订阅同一队列

使用两个消费者订阅同一队列

管理后台可以看到队列有两个消费者

发送消息到队列中,只会有一个消费者收到消息,在RabbitMQ中多消费者订阅同一队列不会分发到所有消费者中,会采取轮询的方式只发送到一个消费者。

12.2 消息可靠传输

12.2.1 生产者保证
- 开启事务

此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

// 开启事务
channel.txSelect
try {
// 发送消息
} catch (Exception e) {
// 事务回滚
channel.txRollback

// 重发这条消息
}

// 提交事务
channel.txCommit

事务可以解决发送方与RabbitMQ之间消息确认的问题,只有消息被RabbitMQ接收,事务才能提交成功,否则可在捕获异常后进行回滚,同时重发消息,但是使用事务会同步阻塞队列,降低性能。

- 生产者确认

生产者将信道设为confirm模式,每次写的消息都会分配一个唯一的 id,然后如果消息写入了 RabbitMQ 中,RabbitMQ 会回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调一个 nack 接口,告诉发送方消息接收失败,可以重试。可以结合这个机制在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,可以重发。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

事务机制和 confirm 机制是互斥的,最大的不同在于,事务机制是同步的,提交一个事务之后会阻塞,但是 confirm 机制是异步的,发送个消息之后就可以发送下一个消息,然后消息 RabbitMQ 接收了之后会异步回调接口通知这个消息接收到了。

12.2.2 队列保证

开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,

设置持久化有两个步骤:

  • 创建 queue 的时候将其设置为持久化 这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它不会持久化 queue 里的数据。

  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

12.2.3 消费端的确认与拒绝
确认

为了保证消息队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,

  • 当为 false 时:RabbitMQ 会等待消费者显式回复确认信号后才从内存(或磁盘)中移除消息(先打上删除标记,之后再删除)
  • 当为 true 时:RabbitMQ 会自动把发送出去的消息设置为确认,然后从删除,而不管消费者是否真正消费到了这些消息

当 autoAck 参数设置为 false 时,对于 RabbitMQ 服务器而言,队列中的消息分成了两个部分:

  • 等待投递给消费者的消息
  • 已经投递给消费者,但还没有收到消费者确认信号的消息

如果一直没有收到消费者的确认信号,并且消费者此消息的消费者已经断开连接,则会安排该消息重新进入队列。RabbitMQ 不会为未确认的消息设置过期时间,是否要重新投递给消费者的唯一依据是:消费者未确认并且已经断开链接,这允许消费者消费一条消息的时间可以很久很久。

Spring中提供的确认模式

Spring对RabbitMQ提供了三种确认模式,对这三种模式的测试用例如下,消费者在消费消息时会抛出异常:

None

队列投递给消息者即视为成功,会发送ack给队列。

1. 设置确认模式为none

2. 消息消费异常

3. 但队列认为消费已成功

Auto

Spring 根据执行情况自动判断,无异常执行结束后发送ack,出现amqp重复消费异常拒绝,出现其他异常会重新投递入队,而且会在队列头

1. 设置模式

2. 消息被重复投递给消费者,称为死循环,不停刷异常日志

3. 队列中消息为未确认状态,且会重新入队,不断地被投递给消费者。

4. 添加spring重试配置,设置每条消息重试最多三次,每次间隔3秒。

5. 消息消费三次后提示重试次数已用尽,消息被拒绝

6. 队列中消息已不存在

7. 重新设置重试策略,消费失败后投递到死信交换器中

8. 达到重试次数后投递到了死信队列

Manual

根据执行情况手动确认,执行结束后需要手动ack或者nack。

1. 设置确认状态为手动确认

2. 无论是抛出异常还是正常消费,都需要做手动确认;。

3. 未进行手动确认,RabbitMQ认为消息仍在消费中,如下图有unack状态的一条消息。

4. 未设置消息TTL的情况下,等消费者断开连接后,消息重新入队变成ready状态,有新的消费者连接后投递给它,断开消费者连接后情况入下图:

12.3 消息积压

出现这种情况,先紧急处理现场的情况, 将RabbitMQ服务中堆积的阻塞的消息消费掉,目前能尽快处理的方案可以如下操作:

  • 一般只有比较重要的业务消息才会存储在磁盘中进行持久化(这样做主要是为了提高rabbitMQ的性能,消息全部持久化的话,对RabbitMQ的性能损耗是很高的)
  • 可以通过新增多个消费者来监听业务队列来提高消息的消费速度
  • 或者通过设置并发提高消息的消费速度

消息阻塞常见的有两种情况:

  • 消息生产的速度大于消息消费的速度
  • 架构设计有问题,对于消费失败的消息没有采取措施,导致消费失败的消息不断被RabbitMQ重新发送给消费者,导致消息阻塞

针对上面两种情况,我们分别进行处理

  • 消息生产的速度大于消息消费的速度,我们可以设置channel.basicQos(n)来限制RabbitMQ队列中最大的未确认的消息数量,超过这个数量,RabbitMQ就不在接收生产者生产的消息,这个也是RabbitMQ的优点:削峰填谷
  • 针对第二个,需要修改架构设计,我们一方面可以通过给队列设置死信队列,同时消费端进行业务控制,如果同一个消息被消费端同时消费几次数量以上将该消息手动拒绝,这样消息就会进入死信队列,然后由死信队列进行消息的持久化,或者其他的处理。

12.4 消息幂等

RabbitMQ保证消息的"At Least Once" 或 “At Most Once” , 实际使用中多采用至少一次的模式,可能导致重复消费的问题,这时候就需要保证消息处理的幂等。

一般来说会采取两种方式,涉及到金融对账等场景的使用数据库做强校验,其他场景可以使用Redis做弱检验。

12.4.1 数据库强校验

首先我们需要为消息生成一个全局唯一ID,将ID作为数据库主键,就可以进行去重。即在消费消息前,执行insert操作,如果抛出主键冲突异常就代表已经被消费了,消息可以丢弃。

     EventLog eventLog = new EventLog();
        if(StringUtils.isNotEmpty(message.getMessageProperties().getMessageId())){
            eventLog.setEventId(message.getMessageProperties().getMessageId());
        }
        eventLog.setEventCode(message.getMessageProperties().getReceivedRoutingKey());
        eventLog.setCreateTime(new Date());
        eventLog.setSend(message.getMessageProperties().getAppId());
        eventLog.setCustomNum(1);
        eventLog.setPublictTime(message.getMessageProperties().getTimestamp());
        try {
            eventLog.setMessage(new String(message.getBody(), Charset.defaultCharset().name()));
        } catch (UnsupportedEncodingException ignore) {
        }
        try{
            if(StringUtils.isEmpty(eventLog.getEventId())){
                eventLog.setEventId(SequenceUtil.getId(TableIdCode.ID_EVENT_LOG));
            }
            eventLogMapper.insert(eventLog);
        }
        catch (DuplicateKeyException e){
            eventLog = eventLogInnerService.queryByKey(eventLog.getEventId());
            eventLog.setCustomNum(eventLog.getCustomNum() + 1);
            eventLogMapper.updateByKey(eventLog);
            logger.warn("消息重复消费……………………,{}",e.getMessage());
        }
12.4.2 使用 Redis做弱校验

使用Redis 的原子性的命令,比如 set 命令,在接收到消息后将消息 ID 作为 key 去执行 set 命令,如果执行成功则表示没有执行过这条消息,可以进行消费。

12.5 消息顺序

消息队列可以保证消息将按顺序使用,但是它们不能提供处理顺序上的任何保证。这里的主要区别是消息处理的某些方面在消耗时间无法确定,例如:处理一条消息要比另一条消息花费更长的时间。因此,如果消息 1 的处理时间比另一条消息花费的时间更长,那么消息2和3可能会先处理完成。

如果要确保处理顺序,需要确保队列只有 1 个消费者实例,前一条消息确认后再消费后续的消息。

12.6 提高吞吐量

主要方法有增加消费者、提高 Prefetch count、多线程处理、批量 Ack 等。

12.6.1 增加消费者

多个消费者来消费同一队列,提高消费速度,不过有以下问题:

  • 后端处理能力:比如多个消费者都要操作数据库,那么数据库连接的并发数和读写吞吐量就是后端处理能力,如果达到了数据库的最大处理能力,增加再多的消费者也没有用,甚至会因为数据库拥塞导致整体消费速度的下降。
  • 并发冲突:可以由数据库写入事务来处理并发冲突,或者使用分布式锁,对于具体的某个业务同时只能有一个消费者来处理。
  • 处理顺序:如果消息需要被顺序处理,那么各个消费者之间还需要增加一个同步机制。

可以使用消费者多节点或者单节点中增加消费数。

单节点增加消费数的配置如下:

concurrentConsumers会创建固定的消费者数量 MaxConcurrentConsumers类似线程池,会根据使用情况动态创建消费者。

12.6.2 多线程处理

多线程处理不需要建立多个到 RabbitMQ 的连接,它在收到队列消息后将其放入不同的线程中进行处理,这样进程中就会有多个消息同时处理,增加了消费吞吐量,从而提升了消费速度。

12.6.3 提高 Prefetch count

消息消费速度主要受到发送消息时间、消费者处理时间、消息 Ack 时间这几个时间的影响,如果一个消息走完这个流程再发送另一个的话,效率将会非常低。可以让消息在这几个时间内恰当的分配,让消息总是连续不断的被消费者接收处理,就可以提升消费者的消费速度。

根据如上描述,有些消息可能正在被消费者处理,有些可能在等待消费者处理,有的消息可能还在网络传输中,而如果不限制传输的数量,消费者端可能因处理能力补足会堆积大量的消息,首先内存使用将不可控制,其次此时也无法将这些消息再分配给别的消费者。因此才有了 Prefetch count,用于控制消息发送给消费者的速度;这个方案需要配合 Ack 使用,消费者回复消息 Ack 后,RabbitMQ 才会继续发送同等数量的消息到消费者。

12.6.4 批量 Ack

这种方式有效的原理是:每条消息分别 Ack 的情况下,RabbitMQ 收到一个 Ack 才发送一条消息,这中间就会有很多的时间在等待 Ack 回来,通过批量 Ack 的方式,减少了很多 Ack 传输的时间。注意这里隐含的方式是 RabbitMQ 通过设置的 Prefetch count 连续向消费者发送多条消息,否则这个批量就没意义了。

channel.BasicAck(t.DeliveryTag, true);// true表示批量ack, 凡是tag小于第一个参数的都会ack

12.7 削峰

如果完全不配置QoS,这样Rabbit会尽可能快速地发送队列中的所有消息到客户端。consumer在本地缓存所有的message,从而极有可能导致OOM或者导致服务器内存不足影响其它进程的正常运行。我们需要通过设置Qos的prefetch count来控制consumer的流量。

通过Qos设置消费者每次获取的消息数,spring-amqp 1.0 下默认值是1,spring-amqp 2.0下默认值是250。

  @RabbitHandler
    public void receive2(Message message,  Channel channel)  {
        channel.basicQos(10);
    }

例子如下:

在生产者产生大量消息的情况下,配置消费者Qos限制每次获取到的消息数,平稳消费消息,达到削峰的效果。

13. RabbitMQ 与Spring的集成

13.1 与Springboot的集成

添加amqp的starter

将交换器,队列,绑定关系注册为bean进行初始化

使用rabbitTemplate发送消息

消费者指定队列与消费消息体进行消息,RabbitMQ会将消息投递给消费者