【RabbitMQ】多系统下的安装配置与编码使用(python)

发布于:2025-06-27 ⋅ 阅读:(11) ⋅ 点赞:(0)

引言

RabbitMQ 是一个开源的 消息代理(Message Broker) 软件,实现了 高级消息队列协议(AMQP),用于在分布式系统中存储、转发消息,支持多种消息传递模式。它通过解耦生产者和消费者、异步处理、流量削峰等机制,提升系统的可扩展性、可靠性和灵活性。

本文主要介绍在Linux/MacOS/Windows下的Rabbit的安装,以及以python为示例的具体代码编写。

核心概念

  1. Producer(生产者):发送消息的程序。
  2. Consumer(消费者):接收消息的程序。
  3. Queue(队列):存储消息的缓冲区,消息会一直存在队列中直到被消费。
  4. Exchange(交换机):接收生产者发送的消息,并根据规则(路由键、绑定等)将消息路由到队列。
  5. Binding(绑定):定义 Exchange 和 Queue 之间的关系。
  6. Message(消息):传递的数据,包含有效负载(payload)和元数据(如路由键)。

RabbitMQ 的常见应用场景

  1. 异步处理

    • 示例:用户注册后发送邮件/短信通知。
    • 优势:主流程快速响应,耗时操作异步交给消费者处理。
  2. 应用解耦

    • 示例:订单系统和库存系统通过消息队列通信,避免直接接口调用。
    • 优势:某一系统宕机不影响其他系统,消息可暂存后处理。
  3. 流量削峰

    • 示例:秒杀活动的高并发请求先写入队列,系统按处理能力逐步消费。
    • 优势:避免服务器瞬时过载。
  4. 日志收集

    • 示例:多台服务器将日志发送到队列,由统一服务消费存储。
    • 优势:集中处理,避免日志丢失。

1. 安装 RabbitMQ

在本地安装 RabbitMQ

Linux (Ubuntu)

# 安装 Erlang(RabbitMQ 依赖)
sudo apt-get install -y erlang

# 安装 RabbitMQ
sudo apt-get install -y rabbitmq-server

# 启动服务
sudo systemctl start rabbitmq-server

# 启用管理插件(可选)
sudo rabbitmq-plugins enable rabbitmq_management

MacOS

# 通过 Homebrew 安装
brew install rabbitmq

# 启动服务
brew services start rabbitmq

# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

Windows

  1. 下载并安装 Erlang
    在这里插入图片描述

  2. 下载 RabbitMQ 的 Windows 安装包。

    • 安装完成后,勾选“Start RabbitMQ Service”,或之后手动启动rabbtmq服务
      在这里插入图片描述
      在这里插入图片描述
  3. 通过命令行启用管理插件:

    rabbitmq-plugins enable rabbitmq_management
    
配置环境变量

安装RabbitMQ完成后需要配置到系统环境变量,否则直接执行 rabbitmq-plugins enable rabbitmq_management 命令会找不到。

  1. 右键点击“此电脑” → “属性” → “高级系统设置” → “环境变量”。
  2. 在“系统变量”中找到 Path,点击“编辑”。
  3. 添加 RabbitMQ 的 sbin 目录路径(例如):
    C:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.0\sbin
    
  4. 保存后重新打开命令行,即可直接运行 rabbitmq-plugins

在这里插入图片描述

在这里插入图片描述

如果不配置系统环境变量,也可直接在 rabbitmq-plugins.bat 所在的路径(即sbin)下执行命令。

在这里插入图片描述


2. Python 代码示例

安装 Python 客户端库

pip install pika

pika 是 用于与 RabbitMQ 进行通信的Python 客户端库,构建高效、可靠的消息队列系统。


示例 1:基本队列通信

生产者(producer.py)

import pika

# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列(如果不存在则创建)
channel.queue_declare(queue='hello')

# 发送消息
channel.basic_publish(
    exchange='',          # 使用默认交换机
    routing_key='hello',  # 指定队列名称
    body='Hello World!'   # 消息内容
)
print(" [x] Sent 'Hello World!'")

# 关闭连接
connection.close()

消费者(consumer.py)

import pika

def callback(ch, method, properties, body):
    print(f" [x] Received {body}")

# 连接到 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列(确保存在)
channel.queue_declare(queue='hello')

# 消费消息
channel.basic_consume(
    queue='hello',         # 监听的队列
    on_message_callback=callback,  # 消息处理函数
    auto_ack=True         # 自动确认消息
)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 开始阻塞监听

运行测试

  1. 启动消费者:
    python consumer.py
    
  2. 启动生产者:
    python producer.py
    
  3. 消费者会输出:
    [x] Received b'Hello World!'

示例 2:使用交换机(Topic 模式)

生产者(topic_producer.py)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个 Topic 类型的交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 发送消息到特定路由键
routing_key = 'user.notification.email'
message = 'Email sent to user!'
channel.basic_publish(
    exchange='topic_logs',
    routing_key=routing_key,
    body=message
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

消费者(topic_consumer.py)

import pika
import sys

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 创建临时队列(关闭连接后自动删除)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 绑定队列到交换机,监听特定路由键
routing_key = 'user.notification.*'  # 通配符匹配
channel.queue_bind(
    exchange='topic_logs',
    queue=queue_name,
    routing_key=routing_key
)

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

运行测试

  1. 启动消费者(监听 user.notification.*):
    python topic_consumer.py
    
  2. 启动生产者(发送 user.notification.email):
    python topic_producer.py
    
  3. 消费者会收到匹配路由键的消息。

示例 3:消息持久化与手动确认

生产者(持久化消息)

channel.queue_declare(queue='task_queue', durable=True)  # 队列持久化
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='This is a persistent task',
    properties=pika.BasicProperties(
        delivery_mode=2,  # 消息持久化
    )
)

消费者(手动 ACK)

def callback(ch, method, properties, body):
    print(f" [x] Processing {body}")
    # 模拟耗时任务
    import time
    time.sleep(5)
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认

channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback,
    auto_ack=False  # 关闭自动确认
)

代码结合:

可用把上面的通信代码写成一个项目,把RabbitMQ的部分封装成一个连接类:

https://gitee.com/aiyimu/python/tree/master/RabbitMQ_Practice/RabbitMQ_Practice

3. 关键注意事项

  1. 连接管理
    • 生产环境中使用连接池(如 pika.BlockingConnection 的复用)。
  2. 错误处理
    • 捕获 pika.exceptions.AMQPConnectionError 并实现重连逻辑。
  3. 性能优化
    • 通过 channel.basic_qos(prefetch_count=1) 限制消费者每次只处理一条消息。
  4. 监控
    • 访问 http://localhost:15672(默认账号 guest/guest)查看队列状态。


网站公告

今日签到

点亮在社区的每一天
去签到