文章目录
引言
RabbitMQ 是一个开源的 消息代理(Message Broker) 软件,实现了 高级消息队列协议(AMQP),用于在分布式系统中存储、转发消息,支持多种消息传递模式。它通过解耦生产者和消费者、异步处理、流量削峰等机制,提升系统的可扩展性、可靠性和灵活性。
本文主要介绍在Linux/MacOS/Windows下的Rabbit的安装,以及以python为示例的具体代码编写。
核心概念
- Producer(生产者):发送消息的程序。
- Consumer(消费者):接收消息的程序。
- Queue(队列):存储消息的缓冲区,消息会一直存在队列中直到被消费。
- Exchange(交换机):接收生产者发送的消息,并根据规则(路由键、绑定等)将消息路由到队列。
- Binding(绑定):定义 Exchange 和 Queue 之间的关系。
- Message(消息):传递的数据,包含有效负载(payload)和元数据(如路由键)。
RabbitMQ 的常见应用场景
异步处理
- 示例:用户注册后发送邮件/短信通知。
- 优势:主流程快速响应,耗时操作异步交给消费者处理。
应用解耦
- 示例:订单系统和库存系统通过消息队列通信,避免直接接口调用。
- 优势:某一系统宕机不影响其他系统,消息可暂存后处理。
流量削峰
- 示例:秒杀活动的高并发请求先写入队列,系统按处理能力逐步消费。
- 优势:避免服务器瞬时过载。
日志收集
- 示例:多台服务器将日志发送到队列,由统一服务消费存储。
- 优势:集中处理,避免日志丢失。
1. 安装 RabbitMQ
在本地安装 RabbitMQ
Linux (Ubuntu)
# 安装 Erlang(RabbitMQ 依赖)
sudo apt-get install -y erlang
# 安装 RabbitMQ
sudo apt-get install -y rabbitmq-server
# 启动服务
sudo systemctl start rabbitmq-server
# 启用管理插件(可选)
sudo rabbitmq-plugins enable rabbitmq_management
MacOS
# 通过 Homebrew 安装
brew install rabbitmq
# 启动服务
brew services start rabbitmq
# 启用管理插件
rabbitmq-plugins enable rabbitmq_management
Windows
下载并安装 Erlang。
下载 RabbitMQ 的 Windows 安装包。
- 安装完成后,勾选“Start RabbitMQ Service”,或之后手动启动rabbtmq服务
- 安装完成后,勾选“Start RabbitMQ Service”,或之后手动启动rabbtmq服务
通过命令行启用管理插件:
rabbitmq-plugins enable rabbitmq_management
配置环境变量
安装RabbitMQ完成后需要配置到系统环境变量,否则直接执行 rabbitmq-plugins enable rabbitmq_management
命令会找不到。
- 右键点击“此电脑” → “属性” → “高级系统设置” → “环境变量”。
- 在“系统变量”中找到
Path
,点击“编辑”。 - 添加 RabbitMQ 的
sbin
目录路径(例如):C:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.0\sbin
- 保存后重新打开命令行,即可直接运行
rabbitmq-plugins
。
如果不配置系统环境变量,也可直接在 rabbitmq-plugins.bat
所在的路径(即sbin)下执行命令。
2. Python 代码示例
安装 Python 客户端库
pip install pika
pika 是 用于与
RabbitMQ
进行通信的Python 客户端库,构建高效、可靠的消息队列系统。
示例 1:基本队列通信
生产者(producer.py)
import pika
# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个队列(如果不存在则创建)
channel.queue_declare(queue='hello')
# 发送消息
channel.basic_publish(
exchange='', # 使用默认交换机
routing_key='hello', # 指定队列名称
body='Hello World!' # 消息内容
)
print(" [x] Sent 'Hello World!'")
# 关闭连接
connection.close()
消费者(consumer.py)
import pika
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
# 连接到 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列(确保存在)
channel.queue_declare(queue='hello')
# 消费消息
channel.basic_consume(
queue='hello', # 监听的队列
on_message_callback=callback, # 消息处理函数
auto_ack=True # 自动确认消息
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 开始阻塞监听
运行测试
- 启动消费者:
python consumer.py
- 启动生产者:
python producer.py
- 消费者会输出:
[x] Received b'Hello World!'
示例 2:使用交换机(Topic 模式)
生产者(topic_producer.py)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个 Topic 类型的交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 发送消息到特定路由键
routing_key = 'user.notification.email'
message = 'Email sent to user!'
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
消费者(topic_consumer.py)
import pika
import sys
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 创建临时队列(关闭连接后自动删除)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 绑定队列到交换机,监听特定路由键
routing_key = 'user.notification.*' # 通配符匹配
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=routing_key
)
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
运行测试
- 启动消费者(监听
user.notification.*
):python topic_consumer.py
- 启动生产者(发送
user.notification.email
):python topic_producer.py
- 消费者会收到匹配路由键的消息。
示例 3:消息持久化与手动确认
生产者(持久化消息)
channel.queue_declare(queue='task_queue', durable=True) # 队列持久化
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='This is a persistent task',
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)
消费者(手动 ACK)
def callback(ch, method, properties, body):
print(f" [x] Processing {body}")
# 模拟耗时任务
import time
time.sleep(5)
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=False # 关闭自动确认
)
代码结合:
可用把上面的通信代码写成一个项目,把RabbitMQ的部分封装成一个连接类:
https://gitee.com/aiyimu/python/tree/master/RabbitMQ_Practice/RabbitMQ_Practice
3. 关键注意事项
- 连接管理:
- 生产环境中使用连接池(如
pika.BlockingConnection
的复用)。
- 生产环境中使用连接池(如
- 错误处理:
- 捕获
pika.exceptions.AMQPConnectionError
并实现重连逻辑。
- 捕获
- 性能优化:
- 通过
channel.basic_qos(prefetch_count=1)
限制消费者每次只处理一条消息。
- 通过
- 监控:
- 访问
http://localhost:15672
(默认账号guest/guest
)查看队列状态。
- 访问