RabbitMQ练习(Hello World)

发布于:2024-08-22 ⋅ 阅读:(43) ⋅ 点赞:(0)

1、RabbitMQ教程

《RabbitMQ Tutorials》icon-default.png?t=N7T8https://www.rabbitmq.com/tutorials

RabbitMQ是一个消息代理,它接受并转发消息。你可以将其想象成一个邮局:当你将需要邮寄的信件放入邮筒时,你可以确信邮递员最终会将邮件投递给你的收件人。在这个类比中,RabbitMQ既是邮筒,也是邮局,还是邮递员。

RabbitMQ与邮局的主要区别在于它不处理纸张,而是接受、存储和转发数据的二进制块(binary blobs of data)——即消息

RabbitMQ以及消息传递通常使用一些专业术语。以下是一些常见的术语和它们的含义:

  1. 消息(Message):消息是发送者发送和接收者接收的数据单元。

  2. 生产者(Producer):生产者是发送消息的一方。

  3. 消费者(Consumer):消费者是接收和处理消息的一方。

  4. 队列(Queue):队列是消息的缓冲区,用于存储等待被消费者处理的消息。

  5. 交换机(Exchange):交换机接收来自生产者的消息,并将它们路由到一个或多个队列。

  6. 绑定(Binding)绑定是将队列和交换机连接在一起的规则,定义了交换机应将哪些消息发送到哪个队列

  7. 路由键(Routing Key):路由键是消息的一部分,用于决定消息应该发送到哪个队列。

  8. 虚拟主机(Virtual Host)虚拟主机是RabbitMQ中的一个命名空间,它拥有自己的队列、交换机和绑定

  9. 持久性(Durability):持久性是指消息或队列在RabbitMQ重启后仍然保持存在的特性。

  10. 确认(Acknowledgment):确认是消费者向RabbitMQ发送的一个信号,表明消息已经被成功接收和处理。

  11. 死信队列(Dead Letter Exchange):死信队列是用于存储无法被正常处理的消息的队列。

  12. 消息模式(Message Pattern):消息模式是消息传递中使用的设计模式,如发布/订阅、请求/响应等。

2、环境准备

2.1 准备虚机

可参考《VMware Workstation安装Ubuntu 22.04笔记》

2.2 安装Docker

可参考《Ubuntu 22.04 Docker安装笔记》

root@k0test1:~# ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host 
       valid_lft forever preferred_lft forever
2: ens32: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
    link/ether 00:50:56:39:7e:a9 brd ff:ff:ff:ff:ff:ff
    altname enp2s0
    inet 10.0.20.70/24 brd 10.0.20.255 scope global ens32
       valid_lft forever preferred_lft forever
    inet6 fe80::250:56ff:fe39:7ea9/64 scope link 
       valid_lft forever preferred_lft forever
3: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN group default 
    link/ether 02:42:40:29:d1:63 brd ff:ff:ff:ff:ff:ff
    inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0
       valid_lft forever preferred_lft forever
root@k0test1:~# docker --version
Docker version 27.1.2, build d01f264
root@k0test1:~# 

2.3 使用Dockerfile创建Docker images

可参考《Dockerfile创建Docker image练习》,创建Docker images ubtest:22.04

root@k0test1:~# pwd
/root
root@k0test1:~# cat Dockerfile 
FROM ubuntu:22.04
RUN apt-get -qq update \
        && apt-get -qq install vim -y \
        && apt-get -qq install iproute2 -y \
        && apt-get -qq install iputils-ping -y  \
        && apt-get -qq install openssh-server -y \
        && rm -rf /var/lib/apt/lists/*
RUN mkdir /var/run/sshd
RUN echo "root:openstack" | chpasswd
RUN echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config
CMD ["/usr/sbin/sshd", "-D"]
root@k0test1:~# docker images
REPOSITORY    TAG               IMAGE ID       CREATED         SIZE
ubtest        22.04             4a9576459c4c   9 hours ago     234MB
ubuntu        22.04             8a3cdc4d1ad3   7 weeks ago     77.9MB
hello-world   latest            d2c94e258dcb   15 months ago   13.3kB
root@k0test1:~#

2.4 Docker安装RabbitMQ Server

尝试使用 RabbitMQ,可以使用社区提供的 Docker 镜像来快速部署:

root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
  • --rm: 这个选项意味着当容器退出时,自动清理容器文件系统。

  • --name rabbitmq: 这为容器指定了一个名称 rabbitmq,方便后续的管理和引用。

  • -p 5672:5672: 这将容器内部的 5672 端口映射到宿主机的 5672 端口,5672 端口是 RabbitMQ 服务默认的 AMQP 协议端口。

  • -p 15672:15672: 这将容器内部的 15672 端口映射到宿主机的 15672 端口,15672 端口是 RabbitMQ 管理界面的 HTTP 端口。

  • rabbitmq:3.13-management: 这是指定要运行的 Docker 镜像的名称和标签。3.13-management 标签表明这是一个 RabbitMQ 3.13 版本的镜像,且包含了管理插件。

安装过程中的部分信息:

部分安装信息:
...
2024-08-17 22:25:33.847883+00:00 [info] <0.742.0> Management plugin: HTTP (non-TLS) listener started on port 15672
2024-08-17 22:25:33.848083+00:00 [info] <0.772.0> Statistics database started.
2024-08-17 22:25:33.848604+00:00 [info] <0.771.0> Starting worker pool 'management_worker_pool' with 3 processes in it
2024-08-17 22:25:33.859781+00:00 [info] <0.790.0> Prometheus metrics: HTTP (non-TLS) listener started on port 15692
2024-08-17 22:25:33.860550+00:00 [info] <0.676.0> Ready to start client connection listeners
2024-08-17 22:25:33.862729+00:00 [info] <0.834.0> started TCP listener on [::]:5672
 completed with 5 plugins.
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0> Server startup complete; 5 plugins started.
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0>  * rabbitmq_prometheus
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0>  * rabbitmq_federation
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0>  * rabbitmq_management
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0>  * rabbitmq_management_agent
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0>  * rabbitmq_web_dispatch
2024-08-17 22:25:34.093106+00:00 [info] <0.9.0> Time to start RabbitMQ: 7726 ms
  1. Management Plugin HTTP Listener: 管理插件的 HTTP (非TLS) 监听器已经在端口 15672 上启动,可以通过访问这个端口来使用 RabbitMQ 的 Web 管理界面。

  2. Prometheus Metrics: 服务器已成功启动 Prometheus 指标的 HTTP (非TLS) 监听器,监听端口为 15692。这意味着 RabbitMQ 可以为 Prometheus 提供监控数据,便于系统监控和性能分析。

  3. Client Connection Listeners: 服务器已准备好启动客户端连接监听器。

  4. TCP Listener: 服务器已在端口 5672 上启动了 TCP 监听器,这是 AMQP 客户端连接到 RabbitMQ 服务器的标准端口

  5. Plugins Started: 服务器启动了 5 个插件,包括:

    • rabbitmq_prometheus: 用于 Prometheus 监控的插件。
    • rabbitmq_federation: 用于跨服务器消息传递的插件。
    • rabbitmq_management: 提供管理界面和HTTP API的插件。
    • rabbitmq_management_agent: 管理代理插件,支持管理界面。
    • rabbitmq_web_dispatch: 处理HTTP请求的插件。
  6. Server Startup Complete: RabbitMQ 服务器启动完成,总共耗时 7726 毫秒。

这些日志条目表明 RabbitMQ 服务器已成功启动,并且所有必要的插件都已加载。如果需要访问或管理 RabbitMQ 服务器,可以通过管理界面(默认端口为 15672)进行操作。

Web登录RabbitMQ(缺省username/password: guest/guest):

 2.5 使用Docker容器准备两台测试机器

使用Docker容器准备两台测试机器,名称分别为:senderreceiver

root@k0test1:~# docker run --name sender --hostname sender -d ubtest:22.04          
4a86598c28928cffebbe2e61a86305b54176319006e26fc8bf69201cda6b8748
root@k0test1:~# docker run --name receiver --hostname receiver -d ubtest:22.04
17d06c4aca4a7c4133fd786a0f849354b3c48bc824443c7bde9a979a05dac37f
root@k0test1:~# 

root@k0test1:~# docker ps 
CONTAINER ID   IMAGE                      COMMAND                  CREATED         STATUS         PORTS                                                                                                                                                 NAMES
17d06c4aca4a   ubtest:22.04               "/usr/sbin/sshd -D"      2 minutes ago   Up 2 minutes                                                                                                                                                         receiver
4a86598c2892   ubtest:22.04               "/usr/sbin/sshd -D"      2 minutes ago   Up 2 minutes                                                                                                                                                         sender
cb4bd3e88529   rabbitmq:3.13-management   "docker-entrypoint.s…"   2 hours ago     Up 2 hours     4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
root@k0test1:~# 

2.6 在测试机器安装RabbitMQ客户端软件

RabbitMQ 支持多种消息协议,其中包括 AMQP 0-9-1,这是一个开放的、通用的消息传递协议。对于不同编程语言的开发者来说,RabbitMQ 提供了多种客户端库,以便能够与 RabbitMQ 服务器进行交互。

在 Python 中,pika 是一个广泛使用的 RabbitMQ 客户端库,它允许 Python 应用程序作为消息的发送者(生产者)和接收者(消费者)

在两台测试机器分别安装python3、pip3和pika软件:

sender:

root@k0test1:~# docker exec -it sender /bin/bash
root@sender:/# apt update
root@sender:/# apt install python3
root@sender:/# apt install python3-pip

root@sender:/# python3 -m pip install pika --upgrade
Collecting pika
  Downloading pika-1.3.2-py3-none-any.whl (155 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 155.4/155.4 KB 1.2 MB/s eta 0:00:00
Installing collected packages: pika
Successfully installed pika-1.3.2
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
root@sender:/# 

receiver:

root@k0test1:~# docker exec -it receiver /bin/bash
root@receiver:/# apt update
root@receiver:/# apt install python3
root@receiver:/# apt install python3-pip
root@receiver:/# python3 -m pip install pika --upgrade
Collecting pika
  Downloading pika-1.3.2-py3-none-any.whl (155 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 155.4/155.4 KB 824.8 kB/s eta 0:00:00
Installing collected packages: pika
Successfully installed pika-1.3.2
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
root@receiver:/#

2.7 网络拓扑

3、Hello world练习

3.1 概述

一个典型的消息传递系统中的组件和它们之间的关系如下图所示,P是生产者(Producer),C是消费者(Consumer),中间是一个队列(Queue)——一个消息缓冲区(原文:In the diagram below, "P" is our producer and "C" is our consumer. The box in the middle is a queue - a message buffer that RabbitMQ keeps on behalf of the consumer.):

  1. 生产者(Producer):这里的"P"代表生产者,也就是发送消息的一方。生产者生成数据或信息,然后将其发送到消息系统。

  2. 消费者(Consumer):"C"代表消费者,即接收和处理消息的一方。消费者从消息系统中获取数据,并对其进行处理。

  3. 队列(Queue):中间的"盒子"指的是队列,它是RabbitMQ用来存储消息的缓冲区。队列在消息传递中扮演着至关重要的角色,它确保即使在消费者暂时无法接收消息的情况下,消息也不会丢失。

  4. 消息缓冲(Message Buffer)队列作为消息缓冲区,可以存储生产者发送的消息,直到消费者准备好接收它们。

  5. RabbitMQ:RabbitMQ是一个消息代理,它负责维护队列和消息的传递。它接受生产者发送的消息,并将其存储在队列中,然后根据消费者的请求将消息传递给消费者。

  6. 代表消费者(on behalf of the consumer):RabbitMQ维护队列是为了消费者的利益。这意味着RabbitMQ确保消息在消费者准备接收之前被安全地存储,并且在消费者请求时能够可靠地传递给消费者。

总结来说,一个消息传递的基本流程:生产者发送消息到RabbitMQ,RabbitMQ将消息存储在队列中,消费者从队列中接收消息并进行处理。队列作为消息的临时存储,确保了消息传递的可靠性和灵活性。

在本练习中,将用Python编写两个小程序:一个生产者(发送者)发送一条消息,和一个消费者(接收者)接收消息并打印出来。

在本网络环境中,生产者位于Sender容器,消费者位于Receiver容器

3.2 Sending

进入sender容器,vi编写send.py:

root@k0test1:~# docker exec -it sender /bin/bash
root@sender:/# pwd 
/
root@sender:/# vi send.py
root@sender:/# cat send.py
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
root@sender:/# 

这段Python代码是一个生产者脚本,用于向RabbitMQ发送消息。下面是代码的具体解释:

  1. import pika:导入pika库,这是一个用于与RabbitMQ交互的Python客户端库。

  2. 创建到RabbitMQ服务器的连接:

    • connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2')):这行代码创建了一个连接,连接到IP地址为172.17.0.2的RabbitMQ服务器。这里的host参数应该设置为RabbitMQ服务运行的主机地址。
  3. 创建一个通道:

    • channel = connection.channel():在连接上创建一个新的通道,用于消息的发送和接收。
  4. 声明一个队列:

    • channel.queue_declare(queue='hello'):声明一个名为hello的队列。如果该队列不存在,RabbitMQ将自动创建它。在发送消息之前确保接收队列存在是非常重要的,因为如果消息发送到不存在的位置,RabbitMQ将会丢弃该消息。
  5. 发布消息:

    • channel.basic_publish(exchange='', routing_key='hello', body='Hello World!'):通过通道发布一条消息。消息内容为'Hello World!'。这里没有指定交换机(exchange),所以消息将直接发送到指定的队列(routing_key也设置为hello,与队列名相同)。
    • 在RabbitMQ中,消息不能直接发送到队列,而是必须通过一个交换机(Exchange)。这里使用了默认交换机,这个交换机可以通过一个空字符串来标识。默认交换机是RabbitMQ中一个特殊的实体,它允许我们直接指定消息应该发送到哪个队列。这是通过在发送消息时设置routing_key参数来实现的,routing_key应该与目标队列的名称相匹配。

  6. 打印消息确认:

    • print(" [x] Sent 'Hello World!'"):在控制台打印一条消息,表示消息已经成功发送。
  7. 关闭连接:

    • connection.close():关闭与RabbitMQ的连接,释放资源。

这个脚本是一个简单的生产者示例,用于演示如何使用pika库向RabbitMQ发送消息。在实际应用中,你可能需要处理连接异常、消息确认等更复杂的逻辑。

3.3 Receiving

进入receiver容器,vi编写receive.py:

root@k0test1:~# docker exec -it receiver /bin/bash
root@receiver:/# pwd
/
root@receiver:/# vi receive.py
root@receiver:/# cat receive.py
import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(f" [x] Received {body}")

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)
root@receiver:/# 

这段Python代码是一个使用pika库与RabbitMQ交互的消费者脚本。下面是代码的逐行解释:

  1. import pika, sys, os:导入所需的模块,pika用于RabbitMQ通信,sys用于访问与Python解释器相关的变量和函数,os用于操作系统功能。

  2. 定义main()函数:这是脚本的主要入口点。

  3. 创建到RabbitMQ服务器的连接:

    • connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2')):创建一个连接到IP地址为172.17.0.2的RabbitMQ服务器。
  4. 创建一个通道:

    • channel = connection.channel():在连接上创建一个新的通道。
  5. 声明一个队列:

    • channel.queue_declare(queue='hello'):声明一个名为hello的队列,如果该队列不存在,RabbitMQ将自动创建它。
    • 在send.py代码中已经声明了队列,但在某些情况下,或者为了代码的健壮性和可维护性,重复声明队列是一种好的实践。
  6. 定义回调函数callback

    • def callback(ch, method, properties, body):定义一个函数,当消息到达时会被调用。参数包括通道(ch)、方法(method)、属性(properties)和消息体(body)。
    • print(f" [x] Received {body}"):在控制台打印接收到的消息内容。
  7. 设置消息消费:

    • channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True):设置消费者开始监听名为hello的队列,当有新消息到达时调用callback函数。auto_ack=True表示自动发送确认给RabbitMQ,表明消息已被接收。
  8. 打印等待消息的提示:

    • print(' [*] Waiting for messages. To exit press CTRL+C'):通知用户脚本正在等待消息,并告知如何退出。
  9. 开始接收消息:

    • channel.start_consuming():启动消息接收循环。
  10. 检查是否作为主程序运行:

    • if __name__ == '__main__'::如果这个脚本是作为主程序运行的,而不是被导入到其他脚本中。
  11. 捕获KeyboardInterrupt异常:

    • try...except KeyboardInterrupt::捕获CTRL+C中断信号,允许用户通过键盘中断退出脚本。
  12. 退出程序:

    • sys.exit(0):尝试正常退出程序。
    • except SystemExit: os._exit(0):如果sys.exit()抛出SystemExit异常,使用os._exit(0)强制退出程序。

这个脚本作为一个RabbitMQ消费者,监听名为hello的队列,并在控制台打印出接收到的消息。使用CTRL+C可以中断消息接收循环并退出程序。

 3.4 开始测试

1、在receive容器,运行消费者程序receive.py

消费者程序将开始运行并等待消息。消费者程序将持续运行并监听指定的队列。只要它在运行,它就会接收并处理发送到队列的消息。

root@receiver:/# python3 receive.py
 [*] Waiting for messages. To exit press CTRL+C

2、此时,在rabbitmq容器,查看队列:

root@0840a29d9e2f:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
hello   0
root@0840a29d9e2f:/# 

这里是命令输出的解释:

  • root@0840a29d9e2f:/#:这是你的命令行提示符,显示你当前以root用户登录到一个容器或者某个系统的shell中。

  • rabbitmqctl list_queues:这是你执行的命令,用来列出RabbitMQ中所有的队列。

  • Timeout: 60.0 seconds ...:这个信息表明rabbitmqctl命令设置了60秒的超时时间。

  • Listing queues for vhost / ...:这表明命令正在列出RabbitMQ的默认虚拟主机(vhost)下的队列。

  • name messages:这是列标题,表示输出将显示队列的名称和消息数量。

  • hello 0:这是列出的一个队列,名为hello,当前有0条消息。

根据这个输出,可以得出以下结论:

  • RabbitMQ服务器上至少有一个名为hello的队列。
  • 这个队列目前没有任何消息。

 3、在sender容器,启动生产者程序send.py发送消息

root@sender:/# python3 send.py
 [x] Sent 'Hello World!'
root@sender:/# 

4、观察消费者响应

一旦生产者发送了消息,消费者应该会接收到消息,并在控制台中打印出消息内容。

root@receiver:/# python3 receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World!'   <-----消费者收到的生产者发生的消息

从这个输出可以得出以下结论:

  1. 消费者程序正在运行,并且已经成功连接到RabbitMQ服务器。
  2. 消费者程序监听的队列接收到了消息。
  3. 接收到的消息内容是 'Hello World!'

 5、重复第3步,生产者多次发送消息, 消费者多次接收消息:

root@sender:/# python3 send.py
 [x] Sent 'Hello World!'
root@sender:/# python3 send.py
 [x] Sent 'Hello World!'
root@sender:/# python3 send.py
 [x] Sent 'Hello World!'
root@sender:/# python3 send.py
 [x] Sent 'Hello World!'
root@sender:/# python3 send.py
 [x] Sent 'Hello World!'
root@sender:/# 

root@receiver:/# python3 receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World!'
 [x] Received b'Hello World!'
 [x] Received b'Hello World!'
 [x] Received b'Hello World!'
 [x] Received b'Hello World!'

6、停止消费者程序

要停止消费者程序,你可以在运行它的终端中按下CTRL+C。程序应该会捕捉到这个中断信号,并优雅地关闭通道和连接,然后退出。

4、Wireshark抓包

4.1 抓包方式

在虚机上运行wireshark,捕获docker0端口流量:

root@k0test1:~# wireshark &
[1] 2562

 4.2 抓包信息

4.3 典型数据包

1、生产者向RabbitMQ Server发送消息

Frame 42: 88 bytes on wire (704 bits), 88 bytes captured (704 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 48864, Dst Port: 5672, Seq: 382, Ack: 597, Len: 22
Advanced Message Queuing Protocol
    Type: Method (1)
    Channel: 1
    Length: 14
    Class: Basic (60)
    Method: Publish (40)
    Arguments
        Ticket: 0
        Exchange: 
        Routing-Key: hello
        .... ...0 = Mandatory: False
        .... ..0. = Immediate: False

 这段 Wireshark 捕获的信息描述了一个使用 AMQP 协议的 "Publish" 方法调用,它用于在 RabbitMQ 等消息代理中发送消息到一个交换机。以下是对这些信息的详细解释:

  1. Advanced Message Queuing Protocol: 表示这是 AMQP 协议的数据包。

  2. Type: Method (1): 表示这是一个方法类型的消息,用于执行操作。

  3. Channel: 1: 指定了 AMQP 通道号,通道是 AMQP 连接中的一个虚拟通道,用于隔离消息。

  4. Length: 14: 表示方法帧有效载荷的长度是 14 字节。

  5. Class: Basic (60): 表示使用的是 AMQP 协议的基础类(Basic Class),这个类定义了基本操作,如消息的发布和接收。

  6. Method: Publish (40): 表示执行的操作是发布消息(Publish)。这是客户端用来将消息发送到交换机的方法。

  7. Arguments:

    • 包含了执行 Publish 方法所需的参数。
  8. Ticket: 0: 指定了用于访问限制的票据ID。

  9. Exchange:: 指定了消息要发送到的交换机名称。在这个例子中,交换机名称没有给出,可能使用默认交换机。

  10. Routing-Key: hello: 指定了路由键,用于决定消息如何从交换机路由到队列。在这个例子中,路由键是 "hello",这意味着消息将被发送到名为 "hello" 的队列(如果交换机配置正确)。

  11. Mandatory: False: 表示如果消息无法被路由到任何队列,不强制客户端接收到一个返回(否定确认)。

  12. Immediate: False: 表示消息应该在所有绑定的队列为空时被路由,而不是直接返回给发送者。

这个数据帧表示客户端通过 AMQP 协议的 Publish 方法向交换机发送了一个消息,消息将根据指定的路由键 "hello" 被路由到相应的队列。如果使用的是默认交换机,并且存在一个名为 "hello" 的队列,消息将被投递到那个队列中。

 2、RabbitMQ 服务器向消费者发送消息

Frame 48: 175 bytes on wire (1400 bits), 175 bytes captured (1400 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:02 (02:42:ac:11:00:02), Dst: 02:42:ac:11:00:04 (02:42:ac:11:00:04)
Internet Protocol Version 4, Src: 172.17.0.2 (172.17.0.2), Dst: 172.17.0.4 (172.17.0.4)
Transmission Control Protocol, Src Port: 5672, Dst Port: 47890, Seq: 648, Ack: 446, Len: 109
Advanced Message Queuing Protocol
    Type: Method (1)
    Channel: 1
    Length: 59
    Class: Basic (60)
    Method: Deliver (60)
    Arguments
        Consumer-Tag: ctag1.a0f530198eb74e1a85390418796baf1a
        Delivery-Tag: 1
        .... ...0 = Redelivered: False
        Exchange: 
        Routing-Key: hello
Advanced Message Queuing Protocol
    Type: Content header (2)
    Channel: 1
    Length: 14
    Class ID: Basic (60)
    Weight: 0
    Body size: 12
    Property flags: 0x0000
    Properties
Advanced Message Queuing Protocol
    Type: Content body (3)
    Channel: 1
    Length: 12
    Payload: 48656c6c6f20576f726c6421

这段 Wireshark 捕获的信息描述了一个使用 AMQP 协议的 "Deliver" 方法调用,这通常表示 RabbitMQ 服务器正在向消费者发送消息。以下是对这些信息的详细解释:

  1. Advanced Message Queuing Protocol: 表示这是 AMQP 协议的数据包。

  2. Type: Method (1): 表示这是一个方法类型的消息,用于执行操作。

  3. Channel: 1: 指定了 AMQP 通道号,通道是 AMQP 连接中的一个虚拟通道,用于隔离消息。

  4. Length: 59: 表示方法帧有效载荷的长度是 59 字节。

  5. Class: Basic (60): 表示使用的是 AMQP 协议的基础类(Basic Class),这个类定义了基本操作,如消息的发布和接收。

  6. Method: Deliver (60): 表示执行的操作是消息分派(Deliver)。这是服务器用来向消费者发送消息的方法。

  7. Arguments:

    • 包含了执行 Deliver 方法所需的参数。
  8. Consumer-Tag: ctag1.a0f530198eb74e1a85390418796baf1a: 指定了消费者的标签,用于区分不同的消费者。每个消费者在订阅队列时会被分配一个唯一的标签。

  9. Delivery-Tag: 1: 指定了分派给消费者的消息的序列号。这是消费者用来确认消息的标识符。

  10. Redelivered: False: 表示这条消息不是重新分派的。如果消息由于某些原因未能被消费者处理,它可能会被重新分派,此时该标志会设置为 True

  11. Exchange: 指定了消息最初发送到的交换机名称。在这个例子中,交换机名称没有给出,可能使用默认交换机。

  12. Routing-Key: hello: 指定了消息的路由键,用于决定消息如何从交换机路由到队列。在这个例子中,路由键是 "hello"。

这个数据帧表示 RabbitMQ 服务器正在向一个消费者分派一条消息,这条消息最初是发送到默认交换机,并且使用路由键 "hello" 被路由到一个队列。消费者通过其唯一的消费者标签 ctag1.a0f530198eb74e1a85390418796baf1a 来接收这条消息。如果消费者成功接收并准备好处理这条消息,它将发送一个针对这个 Delivery-Tag 的确认回执给服务器。如果消费者设置为手动确认模式,那么它必须显式地发送确认,否则消息可能会被重新分派或在消费者断开连接时返回给队列。

 4.4 流量图

5、小结

通过本次练习了解了如何在 RabbitMQ 中使用命名队列发送和接收消息。这里是这个过程的简要概述:

  1. 发送消息到命名队列

    • 使用AMQP协议和RabbitMQ客户端库(如pika for Python)连接到RabbitMQ服务器。
    • 声明一个命名队列(例如hello),如果它尚不存在,RabbitMQ将自动创建它。
    • 通过交换机将消息发送到队列,当不使用特定交换机时,可以使用默认交换机,并将队列名作为路由键。
  2. 接收来自命名队列的消息

    • 同样,首先建立与RabbitMQ服务器的连接,并创建通道。
    • 声明相同的命名队列,确保队列存在。
    • 使用basic_consume方法订阅队列,并提供一个回调函数来处理接收到的消息。
    • 调用start_consuming方法开始接收消息,并等待回调函数被触发。
  3. 消息确认

    • 在手动确认模式下,消费者在处理完每条消息后需要发送一个确认回执给RabbitMQ,告知消息已被成功处理。
    • 在自动确认模式下,RabbitMQ会在消息被接收后自动标记消息为已确认。