什么是消息队列,为什么要使用消息队列
消息队列(Message Queue,简称MQ)是一种在不同应用程序之间的通信的方式。它允许应用程序发送消息到队列中,无需立即处理,而是可以存储起来,直到有其他应用程序准备好去处理它们,这种异步通信的方式可以在比较复杂的业务逻辑中实现各个应用程序之间有条不紊的通信,增加整个系统的稳定性和可维护性。
MQ基本概念
Broker:消息的处理中心,可以理解为一个RabbitMQ服务或是服务集群
Producer:消息生产者,即将消息推送到Broker中的角色
Consumer:消息消费者,即从 Broker中获取消息的角色
CentOS安装RabbitMQ
RabbitMQ依赖于Erlang,执行yum -y install erlang安装并查看自己的Erlang版本
如果yum安装不上的话可以去此地址下载对应版本的rpm包进行安装
rabbitmq/erlang - Packages · packagecloud
到此地址查看那个RabbitMQ版本的RPM安装包适配26.2x的Erlang
Releases · rabbitmq/rabbitmq-server
发现4.0.3版本是兼容26.2xErlang环境的
下载第一个RPM安装包并上传到服务器
执行命令安装RPM包
rpm -ivh rabbitmq-server-4.0.3-1.el8.noarch.rpm
设置开机自启并启动RabbitMQ服务
systemctl enable rabbitmq-server
systemctl start rabbitmq-server
启用RabbitMQ可视化web管理页面
rabbitmq-plugins enable rabbitmq_management
添加RabbitMQ管理端用户dovir,密码为123
rabbitmqctl add_user dovir 123
rabbitmqctl set_permissions -p / dovir ".*" ".*" ".*"
rabbitmqctl set_user_tags dovir administrator
rabbitmqctl list_users
确保防火墙为关闭状态
输入http://服务器ip地址:15672/访问RabbitMQ可视化web管理页面用自己添加的账号登录
可以看到目前没有任何队列Queue
使用Python向消息队列中推送数据
RabbitMQ有多种工作模式,这里仅示范最简单的模式——Hello world模式,即一个producer发送message,另一个consumer接收message
producer发送message
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host = '192.168.160.132',
credentials = pika.PlainCredentials('dovir', '123')
)
)
# 向消息队列推送数据和从中拉取都是通过通道进行的,要先创建通道,用channel去推拉数据
channel = connection.channel()
# 声明(创建)一个名为mq_test的消息队列,已经存在则不会重复创建
channel.queue_declare(queue = 'mq_test')
# 向消息队列推送数据
channel.basic_publish(
# 指定通过那个交换机去推送,为空则使用默认交换机
exchange = '',
# 指定数据推送到mq_test队列
routing_key = 'mq_test',
# 消息的内容
body = 'This is a RabbitMQ test action'
)
# 推送结束后关闭与RabbitMQ服务器的连接,释放资源
connection.close()
执行后发现有了一个名为mq_test的队列
点击队列,用Get messages查看队列中的内容
使用Python从消息队列中拉取数据
consumer接收message
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host = '192.168.160.132',
credentials = pika.PlainCredentials('dovir', '123')
)
)
# 向消息队列推送数据和从中拉取都是通过通道进行的,要先创建通道,用channel去推拉数据
channel = connection.channel()
# 声明(创建)一个名为mq_test的消息队列,已经存在则不会重复创建
channel.queue_declare(queue = 'mq_test')
# 定义成功拉取到消息后的回调函数
def callback(ch, method, properties, body):
print('成功拉取消息:',body)
# 拉取消息
channel.basic_consume(
# 拉取mq_test消息队列里面的消息
queue = 'mq_test',
# 成功拉取到消息后要执行的回调函数
on_message_callback = callback,
# 成功接收到
auto_ack = True
)
# 开始循环等待,一直处于等待接收消息的状态
print('等待消息入队列中......')
channel.start_consuming()
运行后成功拉取到消息