使用Python向RabbitMQ中推送/接收数据——消息队列MQ学习总结

发布于:2024-12-07 ⋅ 阅读:(34) ⋅ 点赞:(0)

什么是消息队列,为什么要使用消息队列

消息队列(Message Queue,简称MQ)是一种在不同应用程序之间的通信的方式。它允许应用程序发送消息到队列中,无需立即处理,而是可以存储起来,直到有其他应用程序准备好去处理它们,这种异步通信的方式可以在比较复杂的业务逻辑中实现各个应用程序之间有条不紊的通信,增加整个系统的稳定性和可维护性。

MQ基本概念

Broker:消息的处理中心,可以理解为一个RabbitMQ服务或是服务集群

Producer:消息生产者,即将消息推送到Broker中的角色

Consumer:消息消费者,即从 Broker中获取消息的角色

CentOS安装RabbitMQ

RabbitMQ依赖于Erlang,执行yum -y install erlang安装并查看自己的Erlang版本

如果yum安装不上的话可以去此地址下载对应版本的rpm包进行安装

rabbitmq/erlang - Packages · packagecloud

 到此地址查看那个RabbitMQ版本的RPM安装包适配26.2x的Erlang

Releases · rabbitmq/rabbitmq-server

发现4.0.3版本是兼容26.2xErlang环境的

下载第一个RPM安装包并上传到服务器

执行命令安装RPM包

rpm -ivh rabbitmq-server-4.0.3-1.el8.noarch.rpm

设置开机自启并启动RabbitMQ服务

systemctl enable rabbitmq-server
systemctl start rabbitmq-server

启用RabbitMQ可视化web管理页面

rabbitmq-plugins enable rabbitmq_management

添加RabbitMQ管理端用户dovir,密码为123

rabbitmqctl add_user dovir 123
rabbitmqctl set_permissions -p / dovir ".*" ".*" ".*"
rabbitmqctl set_user_tags dovir administrator
rabbitmqctl list_users

确保防火墙为关闭状态

输入http://服务器ip地址:15672/访问RabbitMQ可视化web管理页面用自己添加的账号登录

可以看到目前没有任何队列Queue

使用Python向消息队列中推送数据

RabbitMQ有多种工作模式,这里仅示范最简单的模式——Hello world模式,即一个producer发送message,另一个consumer接收message

producer发送message

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host = '192.168.160.132',
        credentials = pika.PlainCredentials('dovir', '123')
    )
)
# 向消息队列推送数据和从中拉取都是通过通道进行的,要先创建通道,用channel去推拉数据
channel = connection.channel()
# 声明(创建)一个名为mq_test的消息队列,已经存在则不会重复创建
channel.queue_declare(queue = 'mq_test')
# 向消息队列推送数据
channel.basic_publish(
    # 指定通过那个交换机去推送,为空则使用默认交换机
    exchange = '',
    # 指定数据推送到mq_test队列
    routing_key = 'mq_test',
    # 消息的内容
    body = 'This is a RabbitMQ test action'
)
# 推送结束后关闭与RabbitMQ服务器的连接,释放资源
connection.close()

执行后发现有了一个名为mq_test的队列 

点击队列,用Get messages查看队列中的内容

使用Python从消息队列中拉取数据

consumer接收message

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host = '192.168.160.132',
        credentials = pika.PlainCredentials('dovir', '123')
    )
)
# 向消息队列推送数据和从中拉取都是通过通道进行的,要先创建通道,用channel去推拉数据
channel = connection.channel()
# 声明(创建)一个名为mq_test的消息队列,已经存在则不会重复创建
channel.queue_declare(queue = 'mq_test')
# 定义成功拉取到消息后的回调函数
def callback(ch, method, properties, body):
    print('成功拉取消息:',body)
# 拉取消息
channel.basic_consume(
    # 拉取mq_test消息队列里面的消息
    queue = 'mq_test',
    # 成功拉取到消息后要执行的回调函数
    on_message_callback = callback,
    # 成功接收到
    auto_ack = True
)
# 开始循环等待,一直处于等待接收消息的状态
print('等待消息入队列中......')
channel.start_consuming()

 运行后成功拉取到消息


网站公告

今日签到

点亮在社区的每一天
去签到