RabbitMQ 是一个 消息队列中间件(Message Broker),实现了 AMQP 协议,常用于服务之间解耦、异步处理、流量削峰等场景。
我帮你分成两个部分来讲:核心原理 + 常见用法。
🧩 一、核心原理
RabbitMQ 的核心是 生产者(Producer) → 交换机(Exchange) → 队列(Queue) → 消费者(Consumer) 这一条链路。
1. 基础概念
- Producer:消息的发送方。
- Consumer:消息的接收方。
- Queue:存储消息的队列。
- Exchange(交换机):接收生产者的消息,按照规则路由到队列。
- Binding:交换机和队列之间的绑定规则。
- Routing Key:消息的“路由标识”,交换机根据它决定发给哪个队列。
- Broker:RabbitMQ 服务本身。
2. 交换机类型(Exchange Types)
RabbitMQ 支持几种路由模式:
- direct:按 routing key 精确匹配,把消息路由到对应队列。
- fanout:广播模式,消息发送到所有绑定的队列。
- topic:模糊匹配路由(如
order.*
),支持通配符。 - headers:根据消息头部的键值对匹配(较少用)。
3. ACK & 消息可靠性
- ACK:消费者处理完消息后确认(acknowledge)。
- 未 ACK:RabbitMQ 会重新投递消息(保证至少一次投递)。
- 持久化:队列持久化 + 消息持久化,确保 RabbitMQ 重启后消息不丢失。
- 死信队列(DLX):处理无法投递或过期的消息。
4. 工作模式
- 简单队列(Simple Queue):一个生产者 → 一个消费者。
- 工作队列(Work Queue):一个生产者 → 多个消费者(任务分摊)。
- 发布订阅(Pub/Sub):广播消息(fanout exchange)。
- 路由模式(Routing):按 key 精确路由(direct exchange)。
- 主题模式(Topic):模糊匹配路由(topic exchange)。
🛠️ 二、常见用法(Python 示例)
安装依赖:
pip install pika
1. 简单队列
生产者:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消费者:
import pika
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
2. 工作队列(任务分摊)
- 一个生产者发送很多任务。
- 多个消费者并发消费,每个消息只会被一个消费者处理。
- 常用在 后台任务处理。
设置 prefetch_count=1
可以让 RabbitMQ 公平分发任务:
channel.basic_qos(prefetch_count=1)
3. 发布/订阅(fanout)
生产者:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='log message')
消费者:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
这样消费者会自动生成临时队列,接收广播。
4. 路由模式(direct)
生产者:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Error log')
消费者:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
queue_name = channel.queue_declare(queue='', exclusive=True).method.queue
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')
🎯 总结
- RabbitMQ = 生产者 → 交换机 → 队列 → 消费者。
- 交换机类型决定了 消息的路由方式(direct、fanout、topic、headers)。
- 可靠性依赖 ACK + 持久化 + 死信队列。
- 常见场景:异步任务、削峰填谷、解耦微服务、日志广播。
好问题 👍,这几个是 RabbitMQ 保证消息可靠性 的关键机制。我帮你逐个拆开讲:
🟢 1. ACK(消息确认机制)
- 默认行为:消费者从队列里拿到消息后,RabbitMQ 就认为它“已消费”,会立即从队列里删除。
- 风险:如果消费者拿到消息后宕机/异常,消息就丢了。
👉 ACK 就是解决这个问题的机制:
自动 ACK (
auto_ack=True
)- 一旦消费者收到消息,就立刻确认,哪怕还没处理完。
- 风险:消费者挂了,消息丢失。
手动 ACK (
auto_ack=False
)(推荐)- 消费者处理完任务后,再调用
channel.basic_ack()
确认。 - 如果消费者挂了,RabbitMQ 会把消息重新投递给别的消费者。
- 消费者处理完任务后,再调用
例子:
def callback(ch, method, properties, body):
print("处理消息:", body)
# 处理完成后手动确认
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
🔑 作用:确保消息至少被处理一次,不会因为消费者挂掉而丢失。
🟢 2. 持久化(Persistence)
RabbitMQ 的数据默认存在内存里,服务一旦重启,消息就没了。
👉 持久化保证 RabbitMQ 重启后消息不丢。
持久化分三层:
队列持久化(声明时加
durable=True
):channel.queue_declare(queue='task_queue', durable=True)
→ RabbitMQ 重启后,这个队列还在。
消息持久化(生产者发送时设置
delivery_mode=2
):channel.basic_publish( exchange='', routing_key='task_queue', body='Hello', properties=pika.BasicProperties( delivery_mode=2, # 2 表示持久化消息 ))
→ RabbitMQ 重启后,消息仍然在队列里。
交换机持久化(声明时加
durable=True
)。
🔑 作用:保证即使 RabbitMQ 崩溃或重启,消息不会丢失。
🟢 3. 死信队列(Dead Letter Queue, DLQ)
当某些消息 无法被正常消费 时,RabbitMQ 可以把它们转移到另一个队列里(死信队列),避免消息丢失。
死信队列触发的几种情况:
- 消费者 拒绝消息(nack/reject) 且
requeue=False
。 - 消息在队列里 过期(TTL 超时)。
- 队列满了,无法再接收新消息。
👉 配置死信队列的方法:
args = {
'x-dead-letter-exchange': 'dlx_exchange', # 指定死信交换机
'x-dead-letter-routing-key': 'dlx_key' # 指定路由 key
}
channel.queue_declare(queue='task_queue', durable=True, arguments=args)
然后消息会被转发到 死信队列,便于后续人工排查或重试。
🔑 作用:防止消息丢失 & 提供兜底处理机制。
🎯 总结
- ACK:保证消费者挂掉时消息不会丢(至少投递一次)。
- 持久化:保证 RabbitMQ 崩溃/重启时消息不会丢。
- 死信队列:保证异常消息有去处(过期/拒绝/无法投递)。
这三个机制配合起来,RabbitMQ 就能实现 高可靠消息传递。
好问题 👍!RabbitMQ 里的 队列满了(或者说消息堆积过多)是一个常见的情况,处理思路分两类:
🟢 1. 队列为什么会满?
队列本质上是内存+磁盘结构,如果消费者消费不过来,就会导致消息积压。几种常见原因:
- 消费者处理能力不足(速度比不上生产者)。
- 没有限制队列长度,消息无限堆积。
- 消费者挂掉了,没人消费。
- 某些消息过大,占满内存/磁盘。
🟢 2. RabbitMQ 的应对机制
(1) 设置队列最大长度/容量(防止无限堆积)
channel.queue_declare(
queue='task_queue',
durable=True,
arguments={
'x-max-length': 1000, # 最大消息数
'x-max-length-bytes': 10485760 # 最大字节数 (10MB)
}
)
超过限制后,旧消息会被丢弃(FIFO),或者转发到死信队列(推荐)。
(2) 配置死信队列(DLQ)
当队列满了时,新来的消息可以自动进入死信队列:
channel.queue_declare(
queue='task_queue',
durable=True,
arguments={
'x-max-length': 1000,
'x-dead-letter-exchange': 'dlx_exchange',
'x-dead-letter-routing-key': 'dlx_key'
}
)
👉 新消息进不来时,直接进入 DLQ,避免消息丢失。
(3) 限流(QoS)
消费者可以设置一次最多处理多少条消息,避免被“压垮”:
channel.basic_qos(prefetch_count=1) # 一次只取 1 条,处理完再取
这样 RabbitMQ 会 公平调度,不会把大量消息推给一个消费者。
(4) 水平扩展消费者
如果是消费能力不足,最直接的办法就是:多开几个消费者。
RabbitMQ 会按照 Round Robin(轮询) 或 公平分发 把消息分配下去。
(5) 生产端限流 / 拒绝
RabbitMQ 本身不对生产者限流,但你可以在应用层做:
- 使用 发布确认(Publisher Confirms),如果消息积压,可以选择暂停生产。
- 用 消息速率控制(Rate Limit),比如令牌桶算法,减缓生产速度。
🟢 3. 总结
当队列满了,可以这样处理:
- 预防堆积 → 设置
x-max-length
/x-max-length-bytes
。 - 兜底方案 → 配置死信队列,把溢出的消息转移出来。
- 消费优化 →
basic_qos
+ 增加消费者实例。 - 生产端调节 → 启用发布确认,动态调整生产速度。
👉 最佳实践:
- 设置合理的队列长度 + 消息 TTL。
- 配死信队列,确保不会无声丢失。
- 消费端横向扩展,必要时加缓存层(Kafka 更适合高吞吐)