RabbitMQ是一个消息中间件,进行消息的传输和交互,主要用于解决分布式系统中消息传递、应用解耦、流量削峰等问题。
一、典型使用场景
1.流量削峰
主要应用在秒杀活动的场景中,一方面可以控制活动的人数,服务器接收到用户的请求后,首先写入消息队列,如果超过了消息队列的最大值,则直接抛弃用户请求或跳转到错误页面;另一方面可以避免短时间的高流量压垮应用,对于某些服务可以先加入到消息队列,再做后续处理。
2.应用解耦
比如像双十一的一些场景中,用户下单后,订单系统需要去调用库存系统的接口,但是如果此时订单系统出现故障,就会导致下单失败;引入消息队列,用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户下单成功,库存系统从消息队列中订阅下单信息,进行相关库存操作,此时两个系统是解耦的,互不干扰。
3.异步处理
比如用户注册的一些场景,需要发送注册短信和注册邮件,但是这些业务不是必须的。传统的串行方式,将注册信息写入数据库,发送注册邮件,再发送注册短信,全部完成后再返回给客户端;并行方式将注册信息写入数据库,发出注册邮件的同时发送注册短信,全部完成后返回给客户端;引入消息队列后,就可以把发送注册邮件和注册短信这些不必要的业务异步处理,相比较于其他两种方式更能提高响应时间。
二、核心组件
组件 | 作用 |
生产者(Producer) | 发送消息的应用 / 服务 |
消费者(Consumer) | 接收并处理消息的应用 / 服务 |
交换机(Exchange) | 接收生产者发送的消息,根据 “路由规则” 将消息转发到对应的队列 |
队列(Queue) | 存储消息的缓冲区,消费者从队列中获取消息 |
绑定(Binding) | 建立 “交换机” 与 “队列” 之间的关联,并指定 “路由键(Routing Key)” |
路由键 (Routing Key) |
消息的 “地址标识”,交换机通过它匹配绑定规则,决定消息流向 |
虚拟主机(Virtual Host) | 实现 RabbitMQ 的 “多租户隔离”,每个虚拟主机有独立的交换机、队列、权限 |
连接(Connection) | 生产者 / 消费者与 RabbitMQ 服务器之间的 TCP 连接 |
信道(Channel) | 基于 TCP 连接的轻量级通信通道,避免频繁创建 TCP 连接的开销 |
三、交换机
交换机是 RabbitMQ 的核心路由组件,必须通过交换机才能将消息发送到队列(生产者不能直接向队列发消息)。根据路由规则的不同,交换机分为 4 种类型:
1. Direct Exchange(直连交换机)
- 路由规则:消息的
Routing Key
必须与绑定的Routing Key
完全匹配,才会转发到对应队列。 - 适用场景:一对一通信(如任务分配,一个消息只给一个消费者处理)。
- 示例:生产者发送
Routing Key = "order.pay"
的消息,只有绑定了Routing Key = "order.pay"
的队列会收到消息。
2. Topic Exchange(主题交换机)
- 路由规则:支持
Routing Key
的模糊匹配,允许使用通配符:*
:匹配一个单词(如order.*
可匹配order.pay
、order.cancel
,但不匹配order.pay.success
);#
:匹配零个或多个单词(如order.#
可匹配order.pay
、order.pay.success
)。
- 适用场景:一对多通信(如按 “主题” 订阅消息,如 “所有与订单相关的消息”)。
3. Fanout Exchange(扇出交换机)
- 路由规则:忽略 Routing Key,将消息广播到所有与该交换机绑定的队列。
- 适用场景:广播通信(如系统通知、日志收集,所有消费者都需要接收同一份消息)。
4. Headers Exchange(头交换机)
- 路由规则:不依赖
Routing Key
,而是通过消息的头部属性(Headers) 匹配绑定规则(如匹配key=value
对)。 - 适用场景:复杂的多条件路由(较少使用,大多数场景可通过 Topic 交换机替代)。
四、RabbitMQ工作原理
一条消息从生产者发送到消费者,完整流程如下:
- 建立连接:生产者通过 TCP 连接到 RabbitMQ 服务器,创建一个信道(Channel)(因 TCP 连接开销大,信道可复用连接)。
- 声明资源:生产者声明一个交换机(Exchange)和队列(Queue),并通过绑定(Binding)将两者关联,指定路由键(Routing Key)。
- 发送消息:生产者通过信道向交换机发送消息,消息中包含
Routing Key
和实际内容。 - 路由消息:交换机根据自身类型和
Routing Key
,匹配绑定规则,将消息转发到对应的队列。 - 存储消息:队列将消息暂存(若配置持久化,则写入磁盘,避免宕机丢失)。
- 消费消息:消费者通过信道监听队列,当队列中有消息时,获取消息并处理。
- 确认消息:消费者处理完消息后,向 RabbitMQ 发送 “确认信号(Ack)”,RabbitMQ 收到后删除队列中的该消息。
五、RabbitMQ的五种消息模式
1. 简单模式(Simple Mode)
(1)模式说明
- 最简单的一对一通信模式:一个生产者直接向一个队列发送消息,一个消费者从该队列中获取消息。
- 核心特点:不显式声明交换机(默认使用 RabbitMQ 内置的
amq.direct
交换机),消息通过默认路由规则直接投递到指定队列。
(2)工作流程
- 生产者创建连接和信道,指定队列名称并发送消息。
- 消息通过默认交换机(direct 类型)路由到指定队列(队列名称即路由键)。
- 消费者监听该队列,获取并处理消息。
(3)适用场景
简单的点对点通信,如单一任务的触发(例如:发送一条通知消息)。
2.工作队列模式(Work Queue Mode)
(1)模式说明
- 一个生产者向一个队列发送消息,多个消费者共同监听该队列,消息被轮询分配给不同消费者(默认策略),实现任务负载均衡。
- 核心特点:队列中的消息只会被一个消费者处理(避免重复消费),适合耗时任务的并行处理。
(2)工作流程
- 生产者向队列发送多条消息(如多个任务)。
- 多个消费者同时监听该队列,RabbitMQ 按顺序将消息分配给不同消费者(轮询)。
- 消费者处理完消息后发送确认(Ack),队列删除该消息。
(3)适用场景
需要并行处理大量任务的场景,如:发送批量邮件、图片处理、日志解析等。
3. 发布 / 订阅模式(Publish/Subscribe Mode)
(1)模式说明
- 生产者发送消息到 Fanout 交换机,交换机将消息广播到所有绑定的队列,每个队列对应一个消费者,实现 “一条消息被多个消费者接收”。
- 核心特点:忽略路由键(Routing Key),消息无差别分发到所有绑定队列,类似 “广播”。
(2)工作流程
- 生产者声明 Fanout 交换机,发送消息到该交换机(路由键可省略)。
- 多个队列绑定到该交换机(无需指定路由键)。
- 每个队列对应一个消费者,所有消费者都会收到相同的消息。
(3)适用场景
需要 “一对多” 广播的场景,如:系统通知、日志收集(所有日志消费者都需获取完整日志)。
4. 路由模式(Routing Mode)
(1)模式说明
- 生产者发送消息到 Direct 交换机,消息携带特定路由键(Routing Key),交换机仅将消息转发到路由键完全匹配的绑定队列。
- 核心特点:基于精确匹配的路由规则,实现 “消息按关键词定向分发”。
(2)工作流程
- 生产者声明 Direct 交换机,发送消息时指定路由键(如
error
、info
)。 - 多个队列通过不同的路由键绑定到该交换机(如队列 A 绑定
error
,队列 B 绑定info
)。 - 交换机根据消息的路由键,将消息转发到匹配的队列(如路由键为
error
的消息仅发送到队列 A)。
(3)适用场景
需要按 “类型” 筛选消息的场景,如:日志系统中,error
级别的日志需实时告警(发送到告警队列),info
级别的日志仅存储(发送到存储队列)。
5. 主题模式(Topics Mode)
(1)模式说明
- 生产者发送消息到 Topic 交换机,消息携带包含通配符的路由键(如
order.pay
),交换机通过通配符匹配将消息转发到对应的队列。 - 核心特点:支持模糊匹配,比路由模式更灵活,可按 “主题” 分类消息。
(2)通配符规则
*
:匹配一个单词(如order.*
匹配order.pay
、order.cancel
,但不匹配order.pay.success
)。#
:匹配零个或多个单词(如order.#
匹配order.pay
、order.pay.success
)。
(3)工作流程
- 生产者声明 Topic 交换机,发送消息时指定路由键(如
order.pay.success
)。 - 队列通过带通配符的路由键绑定到交换机(如队列 A 绑定
order.pay.*
,队列 B 绑定order.#
)。 - 交换机根据通配符匹配规则,将消息转发到匹配的队列(如
order.pay.success
匹配队列 A 和 B)。
(4)适用场景
需要按 “层级关系” 或 “主题分类” 分发消息的场景,如:电商系统中,订单相关的支付、取消、退款等消息按主题路由到不同处理队列。
六、如何实现消息的可靠传递
RabbitMQ 通过三层保障实现消息可靠传递:
- 交换机持久化:声明交换机时指定
durable=true
,确保 RabbitMQ 重启后交换机不丢失。 - 队列持久化:声明队列时指定
durable=true
,确保队列重启后不丢失。 - 消息持久化:发送消息时指定
delivery_mode=2
,确保消息写入磁盘,避免队列宕机丢失。 - 确认机制:
- 生产者确认(Publisher Confirm):RabbitMQ 收到消息后向生产者返回确认,确保消息已到达交换机。
- 消费者确认(Consumer Ack):消费者处理完消息后发送 Ack,确保消息已被处理。
七、进阶知识
1. 死信队列(Dead-Letter Queue,DLX)
- 定义:当消息满足以下条件时,会被转发到 “死信交换机”,最终存入 “死信队列”:
- 消息被消费者拒绝(
basic_reject
或basic_nack
)且requeue=False
; - 消息过期(设置了 TTL,Time-To-Live);
- 队列达到最大长度,无法接收新消息。
- 消息被消费者拒绝(
- 作用:存放 “处理失败” 或 “过期” 的消息,便于后续排查问题(如重试、分析失败原因)。
2. 延迟队列
- 定义:消息发送后,不会立即被消费者接收,而是等待指定时间后才会被处理。
- 实现方式:利用 “死信队列 + TTL” 实现:
- 声明一个 “延迟队列 A”,设置 TTL(如 5 分钟),并绑定到 “死信交换机 DLX”;
- 生产者将消息发送到 “延迟队列 A”,消息在队列中等待 5 分钟后过期;
- 过期的消息被转发到 “死信交换机 DLX”,再由 DLX 路由到 “实际消费队列 B”;
- 消费者监听 “队列 B”,接收并处理延迟后的消息。
- 适用场景:订单超时未支付自动取消、定时任务(如 30 分钟后发送提醒)。
3. 消息堆积问题
- 原因:消费者处理速度远低于生产者发送速度,导致队列中消息堆积。
- 解决方案:
- 增加消费者实例(水平扩展),提高消费速度;
- 优化消费者处理逻辑(如减少 IO 操作、异步处理);
- 设置队列最大长度,避免消息无限堆积(超出长度的消息可转发到死信队列);
- 采用 “批量消费” 模式(消费者一次获取多条消息处理)。