RabbitMQ章节介绍
一、RabbitMQ概述
- RabbitMQ学习内容: 本章节将学习RabbitMQ的概念、安装启动、管理后台、代码实操、交换机工作模式以及Spring Boot整合RabbitMQ。
- 消息队列定义: 消息队列是一种用于在分布式系统中传递消息的机制。
- 消息队列特性: 消息队列具有异步、解耦、削峰填谷等特性。
- 消息队列好处: 使用消息队列可以提高系统的可用性和伸缩性,降低系统间的耦合度。
- 核心组件和概念: RabbitMQ的核心组件和概念是学习的重点内容,后续会通过图示方法详细讲解。
二、RabbitMQ安装与启动
- 安装环境: RabbitMQ可以在Linux、macOS和Windows下安装,但最推荐在Linux环境下安装。
- 启动管理后台: 安装完成后,需要启动RabbitMQ的管理后台,以便进行后续的操作和管理。
- 管理后台功能: 管理后台可以查看消息流量、交换机、队列等,还可以对用户进行添加、删除、配置权限等操作。
三、RabbitMQ代码实操
- 实操内容: 将进行案例编写,难度逐渐增加,从简单的发消息到多个消费者的场景变化。
- 实操目的: 通过代码实操,加深对RabbitMQ的理解和掌握其使用方法。
四、交换机工作模式
- 交换机特色: RabbitMQ的一个特色是支持多种交换机,不同的交换机有不同的工作模式。
- 主流交换机: 将介绍主流的交换机,并通过代码演示其工作模式。
五、Spring Boot整合RabbitMQ
- 整合步骤: 在Spring Boot中引入RabbitMQ相关依赖,进行配置,编写生产者和消费者,实现消息发送和接收。
- 整合目的: 通过Spring Boot整合RabbitMQ,实现更加便捷和高效的消息队列应用。
知识小结
知识点 | 核心内容 | 考试重点/易混淆点 | 难度系数 |
---|---|---|---|
Rabbit MQ概念 | 讲解什么是消息队列,消息队列的特性及好处 | 消息队列的基本概念、特性 | 🌟 |
核心组件和概念 | 核心组件和概念的识别与理解 | 核心组件和概念的识别与理解 | 🌟🌟 |
Rabbit MQ安装与启动 | Linux、macOS、Windows下的安装过程 | Linux环境下的安装步骤 | 🌟🌟 |
管理后台的启动与操作 | 管理后台的功能与使用 | 管理后台的操作方法 | 🌟🌟🌟 |
Rabbit MQ管理后台 | 查看消息流量、交换机、队列 | 管理后台的操作方法 | 🌟🌟 |
用户管理 | 添加、删除、配置权限 | 用户管理的操作步骤与权限配置 | 🌟🌟🌟 |
Rabbit MQ代码实操 | 案例编写:简单发消息、多消费者变化 | 代码实操的流程与逻辑 | 🌟🌟🌟🌟 |
交换机工作模式 | 主流交换机的介绍 | 不同交换机的特性与适用场景 | 🌟🌟🌟🌟 |
交换机工作模式的代码演示 | 代码演示的理解与实践 | 代码演示的理解与实践 | 🌟🌟🌟🌟🌟 |
Spring Boot整合Rabbit MQ | 引入依赖、配置、编写生产者和消费者 | Spring Boot与Rabbit MQ的整合步骤 | 🌟🌟🌟🌟 |
实现通信 | 通信机制的理解与实现 | 通信机制的理解与实现 | 🌟🌟🌟🌟🌟 |
初识RabbitMQ
一、RabbitMQ官方描述
- 描述: RabbitMQ是部署最广泛的开源消息代理,拥有数万用户,是全球范围内从小型初创公司到大型企业广泛使用的消息中间件。
- 特点: RabbitMQ轻量级,易于在本地和云端部署,支持多种消息协议,可以分布式和联合配置以满足高扩展性、高可用性需求,运行于多种操作系统和云环境,并为多数流行语言提供广泛的开发工具。
二、RabbitMQ特性
- 异步消息: 支持异步消息传递,提升系统响应速度和吞吐量。
- 分布式部署: 可以跨多个可用区和区域联合部署,增强系统的可扩展性和容错性。
- 多语言支持: 为Java、.NET、PHP、Python、JavaScript、Ruby、Go等流行语言提供开发工具,方便多语言环境下的集成。
- 企业和云就绪: 提供可插拔的认证、授权,支持TLS和LDAP,以及持续集成、运营指标等工具,方便与企业系统集成和云环境部署。
- 灵活插件方法: 支持通过插件扩展RabbitMQ功能,满足定制化需求。
三、下载和安装RabbitMQ
- 安装方式: RabbitMQ支持多种安装方式,包括Windows、Debian和Ubuntu、RPM-based Linux、Homebrew、EC2等。
- Docker镜像: 提供社区Docker镜像,方便快速试验和部署。
- 依赖: RabbitMQ依赖于Erlang语言,安装前需确保Erlang环境已配置。
- 课程安装: 课程中会演示一种相对简单清晰的安装方式。
四、RabbitMQ教程
- 教程内容: 教程覆盖使用RabbitMQ创建消息应用程序的基础,包括“Hello World!”、工作队列、发布/订阅等模式。
- 语言支持: 教程提供多种语言的示例代码,包括Python、Java、Ruby等。
- 课程参考: 课程中的消息发送及交换机模式部分参考了教程中的思路和代码。
- 主要关注: 我们主要关注的是JAVA语言的相关内容,后续课程会详细展开介绍。
消息队列的概念
一、消息队列
- 消息队列: 接收并转发消息,类似于“快递公司”。
- 生产者(Producer): 生产消息并发送给消息队列。
- 消费者(Consumer): 从消息队列中获取并消费消息。
- 队列特性: 先进先出的数据结构,一个队列可以对应多个消费者。
二、为什么要用消息队列
- 代码解耦: 消息队列使得生产者和消费者之间不需要直接依赖,提高了系统的灵活性。
- 应对流量高峰: 消息队列可以缓冲请求,降低流量冲击。
- 异步执行: 提高系统响应速度。
三、消息队列的特性
- 高性能: 消息队列的处理能力通常远大于数据库或业务系统的处理能力,适用于高并发场景。
- 基础组件: 消息队列是通用的系统组件,不受业务限制,广泛应用于各种系统中。
- 消息确认: 支持消息确认机制,确保消息在未被处理前不会丢失,适用于重要数据的传输。
知识小结
知识点 | 核心内容 | 考试重点/易混淆点 | 难度系数 |
---|---|---|---|
RabbitMQ概念 | RabbitMQ是部署最广泛的开源消息代理,支持多语言,提供广泛开发工具 | RabbitMQ的基本概念和应用场景 | 🌟 |
RabbitMQ官网内容 | 官网介绍了RabbitMQ的广泛应用、更新情况、多语言支持、特性、下载和安装教程等 | 官网的结构和内容布局,重点关注下载和安装教程、教程中的示例代码 | 🌟🌟 |
消息队列的基本概念 | 消息队列的核心理解是接收并转发消息,涉及生产者、消费者和队列 | 生产者、消费者和队列的定义及关系 | 🌟 |
消息队列的好处 | 1. 代码解偶,提高系统稳定性 2. 应对流量高峰,降低流量冲击 3. 实现异步执行,提高系统响应速度 |
消息队列在解偶、流量控制和异步执行方面的应用 | 🌟🌟🌟 |
消息队列的特性 | 1. 性能优越,处理能力强大 2. 作为基础组件,通用性强 3. 支持消息确认,保证数据一致性 |
消息队列的性能、通用性和数据一致性保障 | 🌟🌟 |
RabbitMQ的特点
一、RabbitMQ的特点
- 路由能力灵活强大
- 路由能力: RabbitMQ的路由能力指的是消息从生产者发送到消费者的路径配置。可以灵活配置消息发送给一个或多个消费者。
- 开源免费
- 开源免费: RabbitMQ是开源的,可以免费使用。
- 支持多种编程语言
- 支持语言: RabbitMQ支持的编程语言包括JAVA、ruby、PHP、c#、JavaScript、go、objective c、swift等主流语言。
- 应用广泛,社区活跃
- 应用广泛: RabbitMQ部署广泛,是开源MQ中应用最广泛的之一。
- 社区活跃: 拥有一个活跃的社区,遇到问题时可以寻求社区帮助。
- 有监控和管理后台
- 监控和管理后台: RabbitMQ提供了开箱即用的监控和管理后台,方便对性能和消息进行全盘掌握。
RabbitMQ核心概念
一、消息的生产者、消息、交换机
- 消息生产者(Producer): 生产消息的一方,可以是一个或多个,不限制数量。
- 消息(Message): 由生产者产生的,需要被传递和处理的数据。
- 交换机(Exchange): 接收生产者发送的消息,并根据路由键(Routing Key)将消息路由到一个或多个队列。
- 路由键(Routing Key): 用于决定交换机如何将消息路由到队列的规则或标识。
- 绑定(Binding): 交换机和队列之间的连接关系,通过路由键进行绑定。
二、服务端、虚拟主机
- 服务端(Broker): RabbitMQ的服务实例,相当于Redis中的Redis Server,负责消息的中转和处理。
- 虚拟主机(Virtual Host): 用于隔离不同服务的消息,类似于域名,不同虚拟主机之间的消息不会相互干扰。
三、生产者、队列、交换机
- 队列(Queue): 消息的中转或存储地点,消息从交换机发送到队列,再由消费者从队列中消费。
- 消息流程:
- 生产者生产消息并发送到交换机。
- 交换机根据路由键将消息路由到队列。
- 消费者从队列中消费消息。
- 连接(Connection)与信道(Channel):
- 连接是客户端和服务端之间的通信链路。
- 信道是在连接内部用于读写数据的通道。
- 消息从生产者经过交换机,通过绑定关系路由到队列,最终由消费者消费。
知识小结
知识点 | 核心内容 | 考试重点/易混淆点 | 难度系数 |
---|---|---|---|
RabbitMQ特性 | 路由能力灵活强大;开源免费;支持多编程语言;应用广泛,社区活跃;有监控和管理后台 | 路由能力与其他消息队列的区别;开源免费的优势;多语言支持的范围 | 🌟🌟🌟 |
核心概念/组件 | 生产者、消息、交换机、队列、绑定、路由键、连接、信道、消费者、Broker、Virtual Host | 交换机与队列的绑定关系;Broker与Virtual Host的作用 | 🌟🌟🌟 |
消息流程 | 生产者生产消息 → 交换机 → 队列 → 消费者消费消息 | 在交换机与队列之间的流转过程;消费者如何消费消息 | 🌟🌟 |
架构理解 | 整体架构图的理解,包括生产者、交换机、队列、消费者等的位置和关系 | 架构图中各组件的连接方式和作用;Virtual Host的隔离作用 | 🌟🌟🌟 |
实操演示 | 监控和管理后台的使用 | 如何在后台进行性能监控和消息管理 | 🌟 |
# RabbitMQ安装与配置课程笔记
## 一、RabbitMQ安装
### 1. Linux下安装RabbitMQ
#### 1.1 环境配置
- **环境配置命令**:
```bash
echo "export LC_ALL=en_US.UTF-8" >> /etc/profile
source /etc/profile
这一步是为了将编码设置为UTF-8,以便更好地显示中文。
1.2 安装方法一
- 配置RabbitMQ源:
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
- 配置Erlang源:
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
- 安装RabbitMQ:
安装过程中会下载相关依赖包,如erlang和socat。sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
1.3 启动RabbitMQ
- 启动命令:
systemctl start rabbitmq-server
- 检查状态:
rabbitmqctl status
1.4 常用命令
- 启动:
systemctl start rabbitmq-server
- 停止:
rabbitmqctl stop
- 设置开机启动:
systemctl enable rabbitmq-server
- 查看状态:
rabbitmqctl status
- 添加用户:
rabbitmqctl add_user <username> <password>
- 设置用户标签:
rabbitmqctl set_user_tags <username> <tag>
(如administrator) - 启用管理插件:
rabbitmq-plugins enable rabbitmq_management
1.5 安装后的检查与配置
- 检查服务状态:
systemctl status rabbitmq-server
- 配置阿里云安全组: 打开15672端口以访问RabbitMQ的管理界面。
- 访问管理界面: 在浏览器中访问
http://ip:15672
,使用添加的管理员账号(如admin)和密码登录。
二、知识小结
知识点 | 核心内容 | 考试重点/易混淆点 | 难度系数 |
---|---|---|---|
RabbitMQ安装准备 | 介绍如何安装RabbitMQ,提供教辅资料下载 | 识别操作系统版本(CentOS 7.6) 确保不使用额外配置(如YUM换源) |
🌟 |
环境配置 | 设置编码为UTF-8,使用source命令生效 | 编码设置对中文显示的影响 source命令的作用 |
🌟🌟 |
安装方法选择 | 主推方法一,方法二作为备选 | 方法一与方法二的区别 何时选择方法二 |
🌟 |
方法一安装步骤 | 配置RabbitMQ源、配置Erlang源、安装RabbitMQ及依赖 | 每一步的具体作用 安装过程中的提示信息 |
🌟🌟🌟 |
安装完成后的操作 | 查看安装状态,启动RabbitMQ服务 | 如何确认安装成功 启动和查看状态的命令 |
🌟🌟 |
启动与检查 | 使用命令启动RabbitMQ,检查服务器状态 | 启动命令 状态检查命令及返回信息解读 |
🌟🌟🌟 |
后续操作指引 | 查看常用命令,包括开启界面、停止、开机启动等 | 常用命令的查找与使用 不同操作系统下的安装差异 |
🌟 |
三、RabbitMQ的管理后台
3.1 添加管理插件
- 命令:
rabbitmq-plugins enable rabbitmq_management
- 结果: 成功启用后,会显示
started 3 plugins
。
3.2 添加用户
- 命令:
rabbitmqctl add_user admin password
- 设置管理员:
rabbitmqctl set_user_tags admin administrator
3.3 概览界面
- 访问地址:
http://服务器IP:15672
- 登录信息: 用户名为admin,密码为设置时的password。
- 概览内容: 显示主要信息,如消息积压情况、消息频次、交换机和队列数量等。
3.4 交换机相关情况
- 默认交换机: RabbitMQ默认创建7个交换机。
- 交换机类型: direct、fanout、headers、topic等。
- 消息发送: 在交换机详情页面进行消息的发送,用于调试。
3.5 用户相关情况
- 用户列表: 显示所有用户,包括系统默认的guest和自己创建的admin。
- 添加用户: 通过管理后台的Add User功能添加新用户,并设置其密码和权限标签。
- 权限标签: 如admin代表超级管理员,monitoring代表可以登录管理后台查看相关信息。
3.6 虚拟主机相关情况
- 虚拟主机用途: 用于区分不同的服务。
- 默认虚拟主机: 通常使用/作为默认虚拟主机。
- 权限配置: 新添加的用户admin需要配置对虚拟主机的访问权限。
3.7 总结
- 主要步骤: 启用管理后台插件、访问管理后台、查看概览信息、配置用户权限。
四、知识小结
知识点 | 核心内容 | 考试重点/易混淆点 | 难度系数 |
---|---|---|---|
管理后台介绍 | RabbitMQ的管理后台是其亮点和特性之一 | 概览界面显示主要信息,如消息积压、频次等 | 🌟 |
添加用户 | 默认用户只有guest,需添加admin用户 | admin用户与guest用户的区别及权限 | 🌟🌟 |
创建虚拟主机 | 虚拟主机用于区分不同服务 | 虚拟主机的配置和权限设置 | 🌟🌟 |
管理后台配置 | 通过命令行启动管理后台并添加admin用户 | 命令行的使用和管理后台的访问地址 | 🌟🌟 |
启动管理后台 | 使用 rabbitmq-plugins enable rabbitmq_management 命令 |
命令的准确性和执行效果 | 🌟 |
添加admin用户 | 使用 rabbitmqctl add_user admin password 命令 |
用户添加后的权限设置 | 🌟 |
设置管理员权限 | 使用 rabbitmqctl set_user_tags admin administrator 命令 |
管理员权限的具体内容 | 🌟 |
访问管理后台 | 通过浏览器访问,默认端口15672 | 访问地址、端口号及安全组设置 | 🌟 |
管理后台功能 | 查看交换机、队列、用户等信息 | 各功能模块的用途和操作 | 🌟 |
五、实战案例:Hello World
5.1 新建项目
- 项目新建: 使用IDEA新建Maven项目,命名为ruby-mq。
- 添加依赖: 引入amqp-client和slf4j-nop依赖。
- 新建包: 在src/main/java下新建包helloworld。
- 新建发送类: 编写Send类,用于发送消息到RabbitMQ。
5.2 发送类编写
- 建立工厂: 使用ConnectionFactory类创建连接工厂实例。
- 设置RabbitMQ地址: 通过setHost方法设置服务器IP地址,设置用户名和密码。
- 建立连接: 使用newConnection方法建立连接。
- 获得信道: 通过connection的createChannel方法获得信道。
- 声明队列: 使用channel的queueDeclare方法声明队列。
- 发布消息: 使用channel的basicPublish方法发布消息。
- 关闭链接: 先关闭信道,再关闭连接。
5.3 接收类编写
- 类名: Recv
- 接收消息: 使用basicConsume方法接收消息。
- 处理消息: 重写handleDelivery方法处理接收到的消息。
- 打印消息: 将接收到的byte数组转换为字符串并打印。
5.4 运行与测试
- 运行Send类: 检查是否能成功打印出"发送了消息:Hello World!"。
- 运行Recv类: 检查是否能成功接收并打印消息。
六、知识小结
知识点 | 核心内容 | 考试重点/易混淆点 | 难度系数 |
---|---|---|---|
RabbitMQ基础 | RabbitMQ的特点和API丰富性 | RabbitMQ支持多语言,API丰富 | 🌟 |
项目创建 | 使用IDEA新建Maven项目,命名为ruby-mq | 项目命名、Maven依赖引入 | 🌟🌟 |
依赖导入 | 引入amqp-client和slf4j-nop依赖 | 依赖版本选择 | 🌟🌟 |
发送类编写 | 创建Send类,用于发送消息到RabbitMQ | 连接工厂创建、设置RabbitMQ地址、用户认证 | 🌟🌟🌟 |
队列声明 | 在Send类中声明队列,指定队列名字为"hello" | 队列名字、持久化设置、独占设置、自动删除设置 | 🌟🌟 |
消息发送 | 使用basicPublish方法发送消息,内容为"hello world" | 交换机概念、routing key设置、消息体编码 | 🌟🌟🌟 |
连接与信道管理 | 建立连接、获取信道,并在使用完毕后关闭 | 异常处理、资源释放 | 🌟🌟 |
RabbitMQ服务端配置 | 阿里云实例IP配置、端口开放(5672)、用户权限设置 | IP地址修改、端口开放步骤、用户权限配置 | 🌟🌟🌟 |
前置条件 | RabbitMQ服务端启动、端口开放、用户添加及权限设置 | 服务端状态检查、端口配置、用户及权限验证 | 🌟🌟🌟 |
运行与测试 | 运行Send类,检查消息是否成功发送 | 连接失败处理、消息发送验证 | 🌟🌟 |
七、工作队列与多个消费者
7.1 工作队列
- 任务类: NewTask用于发送耗时任务到RabbitMQ队列。
- 工人类: Worker用于从RabbitMQ队列接收并处理耗时任务。
7.2 多个消费者
- 允许并行: 在IDEA中配置允许并行运行。
- 添加消费者: 启动多个worker实例,提高处理效率。
- 不公平派遣: 默认按顺序分配消息。
- 公平派遣: 根据消费者的压力分配消息。
- 设置prefetchCount:
channel.basicQos(1)
- 手动确认消息:
channel.basicAck
- 设置prefetchCount:
7.3 效果展示
- 公平派遣效果: 根据消费者的繁忙程度分配任务,实现工作量上的平衡。
八、知识小结
知识点 | 核心内容 | 考试重点/易混淆点 | 难度系数 |
---|---|---|---|
多个消费者处理 | 使用多个worker提高消息处理速度 | 如何配置允许并行运行 | 🌟 |
JAVA包和类 | 在JAVA中新建work cues包和new task类 | 包和类的命名、作用及关系 | 🌟 |
任务耗时和多个任务 | 任务会有所耗时,并且会有多个任务 | 任务耗时和多个任务的概念 | 🌟 |
连接RabbitMQ | 连接到RabbitMQ,包括localhost和guest用户登录 | 连接RabbitMQ的步骤和注意事项 | 🌟 |
声明队列 | 声明一个名为task_queue的队列,并设置其属性 | 队列的命名、属性和声明方法 | 🌟 |
发送消息 | 通过for循环发送10个消息到队列中 | 消息的构建、发送和打印 | 🌟 |
关闭连接 | 发送完消息后,关闭连接和信道 | 连接和信道的关闭方法 | 🌟 |
接收消息 | 在worker类中接收和处理消息 | 消息的接收、处理和打印 | 🌟 |
消息处理逻辑 | 根据消息内容是否包含点来决定处理时间 | 消息处理的方法、逻辑和延迟处理 | 🌟 |
运行程序 | 预判程序运行结果,并实际运行验证 | 程序运行的预期结果和实际验证 | 🌟 |
公平派遣 | 实现公平派遣,避免消息堆积 | basicQos与basicAck的使用 | 🌟 |
手动确认重要性 | 避免消息积累,释放内存 | 忘记手动确认的严重后果 | 🌟 |
RabbitMQ交换机工作模式课程笔记
一、交换机工作模式概述
1.1 四种工作模式
- Fan Out: 广播模式,所有绑定的队列都会收到相同的消息。
- Direct: 直接模式,根据路由键选择性地转发消息。
- Topic: 主题模式,支持模糊匹配路由键。
- Headers: 根据消息头属性匹配,较少使用。
二、Fan Out模式
2.1 模式特点
- 广播: 所有绑定的队列都会收到相同的消息。
- 无需路由键: 不需要设置路由键,只需将队列绑定到交换机。
2.2 应用场景
- 日志系统: 多个消费者需要接收所有日志消息。
2.3 代码示例
发送者类:
EmitLog
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "Info: Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Sent '" + message + "'"); } } }
接收者类:
ReceiveLogs
public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } } }
2.4 运行效果
- 多个消费者: 每个消费者都会收到相同的消息。
- 临时队列: 队列在消费者断开连接时自动删除。
三、Direct模式
3.1 模式特点
- 路由键匹配: 根据路由键选择性地转发消息。
- 灵活控制: 可以将相同路由键绑定到多个队列。
3.2 应用场景
- 日志系统升级: 只接收特定类型的消息(如错误日志)。
3.3 代码示例
发送者类:
EmitLogDirect
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(args); String message = severity + ": Hello World!"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Sent '" + message + "'"); } } private static String getSeverity(String[] args) { if (args.length < 1) return "info"; return args[0]; } }
接收者类:
ReceiveLogsDirect
public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (args.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info|warning|error]"); System.exit(1); } for (String severity : args) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println("Waiting for messages"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } } }
3.4 运行效果
- 选择性接收: 消费者只接收其绑定的路由键的消息。
- 多个绑定: 同一路由键可以绑定到多个队列。
四、Topic模式
4.1 模式特点
- 模糊匹配: 支持使用
*
(单个单词)和#
(零个或多个单词)进行路由键匹配。 - 多条件匹配: 可以根据多个条件进行消息路由。
4.2 应用场景
- 复杂日志系统: 根据多个条件(如模块和日志级别)进行消息路由。
4.3 代码示例
发送者类:
EmitLogTopic
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRoutingKey(args); String message = "Animal World"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Sent routingKey='" + routingKey + "': " + message); } } private static String getRoutingKey(String[] args) { if (args.length < 1) return "anonymous.info"; return args[0]; } }
接收者类:
ReceiveLogsTopic
public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (args.length < 1) { System.err.println("Usage: ReceiveLogsTopic '<routingKey>'"); System.exit(1); } for (String bindingKey : args) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println("Waiting for messages"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received '" + delivery.getEnvelope().getRoutingKey() + "': " + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } } }
4.4 运行效果
- 模糊匹配: 消费者可以根据模式匹配接收消息。
- 多绑定: 一个消费者可以绑定多个路由键模式。
五、Headers模式
5.1 模式特点
- 消息头匹配: 根据消息头属性进行匹配。
- 较少使用: 生产环境中较少使用。
5.2 应用场景
- 特定属性匹配: 当需要根据消息头中的特定属性进行路由时。
六、知识小结
模式 | 特点 | 应用场景 | 难度系数 |
---|---|---|---|
Fan Out | 广播,所有绑定队列收到相同消息,无需路由键 | 日志系统 | 🌟 |
Direct | 根据路由键选择性转发消息 | 升级的日志系统 | 🌟🌟 |
Topic | 支持模糊匹配路由键,多条件匹配 | 复杂日志系统 | 🌟🌟🌟 |
Headers | 根据消息头属性匹配 | 特定属性匹配 | 🌟 |
Spring Boot与RabbitMQ整合课程笔记
一、课程目标
- 整合Spring Boot与RabbitMQ: 学习如何在Spring Boot项目中使用RabbitMQ进行消息的发送和接收。
二、项目创建
2.1 创建Spring Boot项目
- 新建项目: 使用Spring Initializr创建两个项目,一个作为生产者(producer),一个作为消费者(consumer)。
- 项目配置:
- Group:
com.imook
- Artifact:
- 生产者:
spring-boot-rabbitmq-producer
- 消费者:
spring-boot-rabbitmq-consumer
- 生产者:
- Group:
- 依赖引入: 在
pom.xml
中引入RabbitMQ相关依赖。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.2 配置Spring Boot项目
- 版本设置: 将Spring Boot版本设置为2.1.x。
- 自动导入依赖: 启用自动导入依赖功能,确保所有依赖正确加载。
三、配置RabbitMQ
3.1 生产者配置
application.properties
:server.port=8080 spring.application.name=producer spring.rabbitmq.addresses=127.0.0.1:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=5000
3.2 消费者配置
application.properties
:server.port=8081 spring.application.name=consumer spring.rabbitmq.addresses=127.0.0.1:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=5000
四、代码编写
4.1 生产者代码
4.1.1 配置类
TopicRabbitConfig
: 配置交换机和队列。@Configuration public class TopicRabbitConfig { @Bean public Queue queueOne() { return new Queue("queue_one"); } @Bean public Queue queueTwo() { return new Queue("queue_two"); } @Bean public TopicExchange exchange() { return new TopicExchange("boot_exchange"); } @Bean public Binding bindingQueueOne(Queue queueOne, TopicExchange exchange) { return BindingBuilder.bind(queueOne).to(exchange).with("dog.red"); } @Bean public Binding bindingQueueTwo(Queue queueTwo, TopicExchange exchange) { return BindingBuilder.bind(queueTwo).to(exchange).with("dog.*"); } }
4.1.2 消息发送类
MessageSender
: 发送消息到RabbitMQ。@Component public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendOne() { String message = "This is message one, routing key: dog.red"; System.out.println("Sending message: " + message); rabbitTemplate.convertAndSend("boot_exchange", "dog.red", message); } public void sendTwo() { String message = "This is message two, routing key: dog.black"; System.out.println("Sending message: " + message); rabbitTemplate.convertAndSend("boot_exchange", "dog.black", message); } }
4.1.3 测试类
MessageSenderTest
: 测试消息发送。@SpringBootTest public class MessageSenderTest { @Autowired private MessageSender messageSender; @Test public void testSendOne() { messageSender.sendOne(); } @Test public void testSendTwo() { messageSender.sendTwo(); } }
4.2 消费者代码
4.2.1 消息接收类
ReceiverOne
: 接收消息。@Component public class ReceiverOne { @RabbitListener(queues = "queue_one") public void process(String message) { System.out.println("ReceiverOne received: " + message); } }
ReceiverTwo
: 接收消息。@Component public class ReceiverTwo { @RabbitListener(queues = "queue_two") public void process(String message) { System.out.println("ReceiverTwo received: " + message); } }
五、运行与测试
5.1 启动消费者
- 启动消费者项目: 确保消费者项目先于生产者项目启动,以便能够接收消息。
5.2 发送消息
- 运行测试类: 执行
MessageSenderTest
中的测试方法,发送消息。 - 查看结果: 消费者项目控制台将打印接收到的消息。
六、总结
- 整合步骤: 创建Spring Boot项目,配置RabbitMQ,编写生产者和消费者代码。
- 发送消息: 使用
RabbitTemplate
的convertAndSend
方法发送消息。 - 接收消息: 使用
@RabbitListener
注解监听队列,并处理接收到的消息。 - Topic模式: 通过不同的路由键实现灵活的消息分发。
以上内容详细介绍了如何在Spring Boot中整合RabbitMQ,并通过代码示例展示了消息的发送和接收过程。
RabbitMQ总结
一、RabbitMQ概述
- 消息队列定义: 一种在分布式系统中传递消息的机制。
- RabbitMQ特点: 支持多语言、高性能、灵活的交换机模式、易用的管理后台。
- 架构图: 核心组件包括生产者、交换机、队列和消费者。
二、核心概念
- 生产者 (Producer): 发送消息到RabbitMQ。
- 消息 (Message): 要传递的数据内容。
- 交换机 (Exchange): 负责将消息路由到一个或多个队列。
- 队列 (Queue): 消息的存储和转发中心。
- 消费者 (Consumer): 从队列中接收并处理消息。
三、安装与启动
- 支持操作系统: Linux、macOS、Windows。
- 安装步骤:
- 配置RabbitMQ源。
- 安装Erlang运行环境。
- 安装RabbitMQ服务。
- 启动命令:
systemctl start rabbitmq-server
- 管理后台: 提供Web界面进行监控和管理,默认端口为15672。
四、实战案例演示
- 发送消息:
- 使用
RabbitTemplate
发送消息到指定交换机。 - 示例代码:
rabbitTemplate.convertAndSend("exchangeName", "routingKey", "message");
- 使用
- 多消费者分配:
- 使用工作队列模式实现消息的负载均衡。
- 公平派遣通过
basicQos
和手动确认消息实现。
- 交换机模式:
- Fan Out: 广播模式,所有绑定队列接收相同消息。
- Direct: 根据路由键选择性转发。
- Topic: 支持模糊匹配路由键。
- Headers: 根据消息头属性匹配。
五、官网资源
- 文档板块: 提供详细的API文档和使用指南。
- 博客板块: 社区贡献的使用经验和最佳实践。
六、Spring Boot整合
- 依赖引入: 在
pom.xml
中添加spring-boot-starter-amqp
。 - 配置文件: 设置RabbitMQ连接信息。
- 代码实现:
- 发送消息: 使用
RabbitTemplate
。 - 接收消息: 使用
@RabbitListener
注解。
- 发送消息: 使用
七、总结
- 基础学习: 掌握RabbitMQ的核心概念和基本操作。
- 实战应用: 通过案例理解消息队列在分布式系统中的作用。
- 进阶准备: 为后续在Spring Cloud中的应用打下基础。
以上内容总结了RabbitMQ的核心知识点及其在Spring Boot中的应用,为后续的进阶学习提供了坚实的基础。