目录
一、mq的作用和使用场景
作用:
RabbitMQ 是一个开源的消息代理和队列服务器,主要用于实现应用程序之间的异步通信和解耦。它的主要作用包括:
应用解耦:将相互依赖的应用系统分离,降低系统间的耦合度
异步通信:实现系统间的非实时、异步消息传递
流量削峰:缓冲突发流量,避免系统被压垮
消息分发:支持多种消息路由模式(点对点、发布/订阅等)
可靠传递:提供消息持久化、确认机制等保证消息可靠传输
使用场景:
使用场景 | 场景示例 | 优势 |
---|---|---|
异步任务处理 | 用户注册后发送欢迎邮件/短信 | 主流程快速响应,耗时操作异步执行 |
订单支付成功后通知物流系统 | ||
图片/视频上传后的处理任务 | ||
应用解耦 | 电商系统中订单系统与库存系统分离 | 系统可独立开发、部署和扩展 |
主业务系统与数据分析系统解耦 | ||
流量削峰 | 秒杀/抢购活动中的请求排队 | 平滑流量波动,保护后端系统 |
突发性大流量日志收集 | ||
分布式系统通信 | 微服务间的事件通知 | 提供标准协议支持多语言客户端(如Java、Python等) |
跨语言系统间的数据交换 | ||
数据同步 | 主数据库变更同步到缓存 | 可靠的消息传递机制 |
多个系统间的数据一致性保证 |
1、抢购活动,削峰填谷,防止系统崩塌。
2、延迟信息处理,比如 10 分钟之后给下单未付款的用户发送邮件提醒。
3、解耦系统,对于新增的功能可以单独写模块扩展,比如用户确认评价之后,新增了给用户返积分的功能,这个时候不用在业务代码里添加新增积分的功能,只需要把新增积分的接口订阅确认评价的消息队列即可,后面再添加任何功能只需要订阅对应的消息队列即可。
二、mq的优点
消息队列(MQ,Message Queue)是分布式系统中重要的中间件,具有很多优点,能够有效提升系统的解耦、可扩展性、可靠性和性能。以下是消息队列的主要优点:
1. 解耦系统模块
生产者与消费者解耦:发送消息的一方(生产者)不需要知道接收消息的一方(消费者)是否存在,也不需要等待其处理完成。
提升系统可维护性:模块之间通过消息通信,降低直接依赖,便于独立开发、测试和维护。
2. 异步处理,提升性能
异步非阻塞通信:将耗时操作通过消息队列异步处理,避免同步调用阻塞主线程,显著提高系统响应速度和吞吐量。
适用于高并发场景:如秒杀、订单处理、日志收集等,可以削峰填谷,缓解系统压力。
3. 流量削峰填谷
在高并发场景下,消息队列可以作为缓冲区,将突发的大量请求暂存起来,消费者按自己的处理能力逐步消费,避免系统被压垮。
4. 保障消息可靠传递
多数消息队列支持消息持久化、确认机制(Ack)、重试机制等,确保消息不丢失、不重复。
支持消息顺序性、事务消息等高级特性,满足不同业务场景需求。
5. 支持广播/发布订阅模式
消息可以被多个消费者消费,适用于通知、广播、事件驱动等场景。
6. 实现最终一致性
在分布式系统中,通过消息队列可以实现跨服务的数据同步,保证多个服务之间的数据最终一致性。
7. 易于扩展
消息队列支持横向扩展,可以通过增加消费者来提升处理能力,适用于大数据量、高并发的系统。
8. 丰富的生态支持
常见的 MQ 中间件如 RabbitMQ、Kafka、RocketMQ、ActiveMQ 等,都有成熟的生态和社区支持,适配各种业务场景。
📌 总结一句话:
消息队列通过解耦、异步、缓存和可靠传递等机制,提升了系统的可扩展性、稳定性和响应能力,是构建高并发、分布式系统的重要组件。
三、mq的缺点
消息队列(MQ)作为分布式系统中实现异步通信、削峰填谷、解耦的重要组件,虽然带来了诸多优势,但也存在一些缺点和挑战。以下是使用 MQ(如 RabbitMQ、Kafka、RocketMQ 等)时可能遇到的主要问题:
1. 系统复杂度增加
引入新的组件:使用 MQ 后,系统架构中新增了一个中间件,需要考虑其部署、配置、监控、容错等问题。
需要处理消息顺序性、幂等性、重复消费等复杂问题,增加了开发和维护成本。
2. 消息丢失风险
如果未正确配置持久化、确认机制等,消息可能在以下环节丢失:
生产端发送失败;
Broker 存储失败;
消费端处理失败且未正确 Ack。
需要额外机制(如 Confirm、Ack、持久化)来保障可靠性。
3. 消息重复消费
网络异常、Ack 超时、消费者宕机等情况可能导致消息被重复投递。
消费者必须实现幂等性处理逻辑,避免重复消费造成数据错误。
4. 消息顺序性难以保证
多队列、多分区、多线程消费时,消息的顺序性难以保证。
对于需要严格顺序的业务(如订单状态变更),需要额外处理逻辑。
5. 系统延迟增加
异步处理虽然提高了吞吐量,但也引入了额外的网络传输和队列排队时间。
对实时性要求高的系统(如金融交易),可能不适合使用 MQ。
6. 运维成本高
需要对 MQ 集群进行监控、扩容、备份、故障转移等操作。
不同 MQ 的运维方式不同,学习成本高。
7. 资源消耗
消息堆积时,会占用大量内存和磁盘资源。
消息压缩、持久化、复制等操作也会带来一定的 CPU 和 I/O 消耗。
8. 消息堆积问题
当消费速度跟不上生产速度时,会导致消息堆积,影响系统性能。
需要合理设置消费者的并发数、限流策略或自动扩容机制。
9. 一致性问题
使用 MQ 后,生产端和消费端的数据一致性不再是本地事务,需要引入事务消息、补偿机制、分布式事务等方案,增加了系统复杂性。
10. 安全性和权限管理
需要对消息队列进行访问控制、权限管理、加密传输等,防止数据泄露或非法访问。
总结:
缺点类型 |
说明 |
---|---|
复杂度增加 |
架构更复杂,需额外维护 MQ 组件 |
消息丢失 |
需配置确认机制、持久化等 |
消息重复 |
消费端需实现幂等 |
顺序性问题 |
多线程/多分区下难保证顺序 |
延迟增加 |
异步带来额外延迟 |
运维成本 |
需监控、扩容、备份等 |
资源消耗 |
内存、磁盘、CPU 占用 |
消息堆积 |
消费能力不足导致堆积 |
数据一致性 |
需要事务或补偿机制 |
安全问题 |
需权限控制、加密等 |
建议:
在使用 MQ 之前,应根据实际业务场景权衡利弊。对于高可用、高吞吐、低实时性要求的系统,MQ 是非常合适的选择;而对于强一致性、低延迟、简单业务的系统,可能不需要引入 MQ。
四、mq相关产品,每种产品的特点
4.1开源产品
1. RabbitMQ
特点 | 说明 |
---|---|
协议支持 | 支持AMQP、MQTT、STOMP等多种协议 |
消息路由 | 提供4种交换机类型,路由策略最灵活 |
可靠性 | 支持持久化、确认机制、事务(性能低) |
管理界面 | 提供功能完善的Web管理控制台 |
适用场景 | 企业级应用、需要复杂路由的中等规模消息处理 |
2. Apache Kafka
特点 | 说明 |
---|---|
高吞吐 | 设计用于日志处理,单机可达百万级TPS |
持久化 | 消息持久化到磁盘,支持长期保存 |
分区顺序 | 保证分区内消息严格有序 |
流处理 | 与Kafka Streams深度集成 |
适用场景 | 日志收集、流数据处理、大数据管道 |
3. Apache RocketMQ
特点 | 说明 |
---|---|
事务消息 | 提供完整的事务消息解决方案 |
延迟消息 | 原生支持18个级别的延迟消息 |
消息轨迹 | 可追踪消息全生命周期 |
双主双从 | 高可用架构设计 |
适用场景 | 金融支付、电商交易等对一致性要求高的场景 |
4. Apache Pulsar
特点 | 说明 |
---|---|
分层架构 | 计算与存储分离,支持无限扩展 |
多租户 | 原生支持多租户隔离 |
消息模型 | 统一队列和流处理模型 |
地理复制 | 内置多机房同步机制 |
适用场景 | 云原生环境、多租户SaaS应用、全球分布式系统 |
5. ActiveMQ
特点 | 说明 |
---|---|
JMS支持 | 完整实现JMS 1.1规范 |
协议支持 | 支持OpenWire、STOMP、AMQP等 |
嵌入式 | 可作为嵌入式消息系统使用 |
适用场景 | 传统Java EE应用、需要JMS规范的项目 |
4.2 商业/云产品
1. AWS SQS/SNS
特点 | 说明 |
---|---|
全托管 | 无需运维基础设施 |
弹性扩展 | 自动扩展处理能力 |
与AWS集成 | 深度集成Lambda、EC2等服务 |
适用场景 | 运行在AWS上的应用,需要轻量级消息服务 |
2. Azure Service Bus
特点 | 说明 |
---|---|
企业级特性 | 支持会话、死信队列、计划消息等 |
高级消息模式 | 提供主题、队列和中继三种模式 |
Geo-DR | 内置异地灾难恢复 |
适用场景 | 企业级应用集成,特别是微软技术栈项目 |
3. Google Cloud Pub/Sub
特点 | 说明 |
---|---|
全球消息传递 | 消息可在全球任何区域生产和消费 |
强一致性 | 保证至少一次投递,支持精确一次处理 |
实时分析集成 | 无缝对接BigQuery等数据分析服务 |
适用场景 | 全球分布式系统、实时分析管道 |
4. Alibaba Cloud MQ
特点 | 说明 |
---|---|
多协议支持 | 兼容RabbitMQ、RocketMQ和Kafka协议 |
金融级可靠 | 提供金融级消息可靠性保证 |
全链路追踪 | 支持消息生产、存储、消费全链路追踪 |
适用场景 | 阿里云上的金融、电商等关键业务系统 |
需要复杂路由 → RabbitMQ
超高吞吐日志 → Kafka
金融交易场景 → RocketMQ
云原生/多租户 → Pulsar
全托管服务 → AWS SQS/Azure Service Bus
传统Java EE → ActiveMQ
五、rabbitmq的搭建过程
docker下安装配置rabbitmq
1、拉取镜像
docker pull rabbitmq:latest
docker pull rabbitmq:3.9.0-management
2、创建并启动容器
使用 Docker 运行一个名为 rabbitmq
的容器,基于 rabbitmq:3.9.0-management
镜像,映射 5672(AMQP协议端口) 和 15672(管理界面端口) 到宿主机,并将容器内的 /etc/rabbitmq
配置目录挂载到宿主机的 /etc/rabbitmq
,以后台模式(-d
)启动,同时分配一个交互式终端(-i
)以保持容器运行。
docker run -id --name rabbitmq -p 5672:5672 -p 15672:15672 -v /etc/rabbimq:/etc/rabbitmq rabbitmq:3.9.0-management
3、查看容器状态
docker ps
如果容器状态显示为Up,并且端口映射正确,那么RabbitMQ服务已经成功启动。
4、查看容器日志
docker logs rabbitmq
访问RabbitMQ管理界面
5、创建用户赋权
创建用户:添加一个名为
admin
、密码为123456
的 RabbitMQ 用户;赋予角色:将
admin
用户标记为administrator
(超级管理员),拥有所有管理权限;设置权限:在默认虚拟主机
/
中,授予admin
用户对所有资源(交换机、队列、绑定)的 配置、写入、读取 全部权限(".*"
通配符表示全部)。
# 进入容器
docker exec -it rabbitmq bash
#启用 RabbitMQ 的管理控制台插件
rabbitmq-plugins enable rabbitmq_management
# 在容器内执行
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
6、使用浏览器打开RabbitMQ管理界面。
默认情况下,管理界面端口为15672。在浏览器地址栏输入以下URL验证rabbitmq搭建成功:
http://<宿主机IP地址>:15672
六、rabbitmq相关角色
RabbitMQ 中重要的角色有:生产者、消费者和代理:
生产者:消息的创建者,负责创建和推送数据到消息服务器;
消费者:消息的接收方,用于处理数据和确认消息;
代理:就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
角色 | 描述 | 关键特点 |
---|---|---|
生产者 (Producer) | 创建并发送消息到 RabbitMQ 服务器 | 不直接发送到队列,而是发送到交换机;可设置消息属性(持久化、优先级等) |
消费者 (Consumer) | 从队列接收并处理消息 | 可订阅多个队列;支持手动确认(ACK)或自动确认 |
队列 (Queue) | 存储消息的缓冲区,是消息的最终目的地 | 多个消费者可竞争消费;支持消息持久化、优先级、TTL(存活时间)等 |
交换机 (Exchange) | 接收生产者消息,并根据规则路由到队列 | 四种类型:direct (直连)、topic (主题)、fanout (扇出)、headers (头匹配) |
绑定 (Binding) | 定义交换机和队列之间的路由规则 | 可基于 routing key (路由键)或 headers 进行匹配 |
虚拟主机 (VHost) | 提供逻辑隔离的环境,类似命名空间 | 包含独立的交换机、队列、绑定和权限;不同 VHost 互不影响 |
管理员 (Admin) | 管理 RabbitMQ 服务器,负责用户、权限、虚拟主机等配置 | 可创建/删除 VHost、用户;监控集群状态 |
七、rabbitmq内部组件
1、ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。
2、Channel(信道):消息推送使用的通道。
3、Exchange(交换器):用于接受、分配消息。
4、Queue(队列):用于存储生产者的消息。
5、RoutingKey(路由键):用于把生成者的数据分配到交换器上。
6、BindingKey(绑定键):用于把交换器的消息绑定到队列上。
组件 | 描述 | 核心作用 |
---|---|---|
Erlang VM | RabbitMQ 基于 Erlang 运行时构建 | 提供高并发、分布式和容错能力(OTP 框架支撑) |
AMQP 0-9-1 协议实现 | 实现 AMQP 标准协议的核心模块 | 定义消息格式、通信规则(如连接、信道、交换机和队列的交互) |
消息存储(Msg Store) | 负责消息的持久化存储(针对持久化消息) | 使用磁盘存储,确保消息不丢失(依赖 message_store 插件) |
队列管理器(Queue Manager) | 管理队列的创建、销毁和消息路由 | 处理消息入队/出队、TTL、死信等逻辑 |
交换机管理器(Exchange Manager) | 管理所有交换机及其绑定关系 | 根据类型(direct/topic/fanout/headers)路由消息到队列 |
连接管理器(Connection Manager) | 维护客户端(生产者/消费者)的 TCP 连接 | 处理连接认证、心跳检测和资源分配 |
信道(Channel) | 在单个连接上复用的轻量级逻辑链路 | 减少 TCP 连接开销,支持多线程操作(每个信道独立工作) |
权限控制器(Access Control) | 基于用户和 VHost 的权限管理 | 控制读写权限(如用户对交换机/队列的操作权限) |
集群协调器(Cluster Coordinator) | 管理 RabbitMQ 节点间的数据同步和故障转移 | 确保镜像队列、元数据的一致性(依赖 Erlang 分布式协议) |
插件系统(Plugin System) | 支持扩展功能(如 MQTT、STOMP、管理界面等) | 通过插件增强协议支持或管理能力(如 rabbitmq-management 提供 Web UI) |
死信交换机(DLX) | 处理无法被消费的消息(Dead Letter Exchange) | 将拒绝/过期的消息路由到指定队列,用于异常处理 |
八、生产者发送消息的过程?
发送消息的核心步骤
建立连接
创建通道
声明交换机/队列(可选)
发布消息
处理确认(可选)
关闭连接
在 RabbitMQ 中,生产者发送消息的过程通常包括以下步骤:
1、首先,生产者与 RabbitMQ Broker 建立一个 TCP 连接(Connection),然后通过该连接创建一个信道(Channel);
2、接着,生产者通过信道将消息发送到指定的交换机(Exchange),并指定路由键(Routing Key);
3、RabbitMQ 接收到消息后,根据交换机的类型和绑定规则,将消息路由到一个或多个对应的队列(Queue)中;
4、如果开启了确认机制(如 publisher confirm),RabbitMQ 还会返回确认信息给生产者,确保消息已成功投递。整个过程通过 AMQP 协议完成,确保了消息的可靠传输。
九、消费者接收消息过程?
建立连接和通道
声明队列(可选)
设置QoS(服务质量控制)
注册消费者
消息处理
发送确认(ACK/NACK)
处理失败消息
在 RabbitMQ 中,消费者接收消息的过程如下:
1、消费者首先通过已建立的 TCP 连接(Connection)创建一个信道(Channel),然后通过该信道订阅(consume)一个或多个队列(Queue);
2、RabbitMQ 会将队列中的消息主动推送给消费者(push 模式),或者消费者主动从队列中拉取消息(pull 模式,如 basic.get);
3、消费者接收到消息后进行业务处理,处理完成后通常会发送一个确认(ack)回执给 RabbitMQ,告知该消息已被正确消费;
4、RabbitMQ 收到确认后,才会从队列中删除这条消息;如果未收到确认或消费者断开连接,RabbitMQ 可以将消息重新入队并转发给其他消费者处理,确保消息不会丢失。
5、整个过程可以通过设置 QoS(服务质量)来控制消费者的预取数量,实现流量控制和负载均衡。
十、springboot项目中如何使用mq?
上图的解说:
在Spring Boot项目中使用消息队列(MQ),如图所示,主要涉及生产者、消息中间件(以RabbitMQ为例)和消费者三个部分。
1. 生产者:在Spring Boot应用中,生产者通过配置与RabbitMQ Broker建立TCP连接(Connection),并通过该连接创建多个信道(Channel)。每个信道可以独立发送消息。生产者将消息发送到指定的交换机(Exchange),并附带路由键(Routing Key),用于指导消息如何被路由到正确的队列(Queue)。
2. 消息中间件(Broker - RabbitMQ Server):作为消息的中介,RabbitMQ接收来自生产者的消息,并根据交换机类型和绑定规则(Binding)将消息路由到一个或多个队列中。虚拟主机(Virtual Host)提供了隔离环境,确保不同应用之间的消息不会相互干扰。队列是存储消息的地方,直到它们被消费者消费。
3. 消费者:同样地,消费者也通过TCP连接与RabbitMQ Broker建立通信,并创建自己的信道来订阅感兴趣的队列。当队列中有新消息时,RabbitMQ会根据消费者的订阅情况,将消息推送给消费者进行处理。消费者处理完消息后,通常需要向RabbitMQ发送确认(ack),表示消息已被成功消费,这样RabbitMQ才会从队列中移除这条消息。
整个过程在Spring Boot中可以通过配置文件和注解简化实现,例如使用`@EnableRabbit`启用RabbitMQ支持,通过`@RabbitListener`监听特定队列的消息等。此外,还可以利用Spring Boot的自动配置功能,简化与RabbitMQ的集成工作,使得开发者能够更专注于业务逻辑的实现。
具体实现细节:(详情参考rabbitmq02)
3.1引入jar包
spring-boot-starter-amqp.jarspring-boot-starter-web.jar
3.2Springboot自动装配原理,进行application.yml配置
连接rabbitmq下相关信息
host
port
username
password
virtualHost
3.3使用rabbitmq模版工具类 rabbitTemplate amqpTemplate
send发送消息方法,需要指定交换机,路由key,交换机的名称
receive接收消息 注解controller控制层类上,添加注解可以实时接收消息@RabbitListener在注解里指定队列的名字实时监听队列信息
3.4细化,创建一个RabbitmqConfig配置类
项目启动时,自动创建交换机、创建队列、指定交换机和队列的绑定
十一、如何保障消息不丢失?
1、发送阶段:发送阶段保障消息到达交换机 事务机制|confirm确认机制
2、存储阶段:持久化机制 交换机持久化、队列的持久化、消息内容的持久化
3、消费阶段:消息的确认机制 自动ack|手动ack
接收方消息确认机制
自动ack|手动ack
spring:
rabbitmq:
host: 主机号
port: 5672
username: admin
password: 123456
virtual-host: /yan3
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
package com.hl.rabbitmq01.web;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
@RestController
@RequestMapping("/c")
public class ConsumerController {
@RabbitListener(queues = {"topicQueue01"})
public void receive(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println(msg);
//业务逻辑 比如传入订单id,根据订单id,减少库存、支付等,
// 如果操作成功,确认消息(从队列移除),如果操作失败,手动拒绝消息
if(msg.length() >= 5){
//确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
//拒绝消息 not ack
// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
消息的持久化机制
交换机的持久化
队列的持久化
消息内容的持久化
package com.hl.rabbitmq01.direct;
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
生产者 javaSE方式简单测试
发布订阅-------direct模型
生产者----消息队列----消费者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接
Connection connection = MQUtil.getConnection();
//2、基于连接,创建信道
Channel channel = connection.createChannel();
//3、基于信道,创建队列
/*
参数:
1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建
2. durable:是否持久化,当mq重启之后,消息还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
4。当Connection关闭时,是否删除队列
autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
channel.queueDeclare("directQueue01", true, false, false, null);
channel.queueDeclare("directQueue02", false, false, false, null);
/*
声明交换机
参数1:交换机名称
参数2:交换机类型
*/
channel.exchangeDeclare("directExchange01", BuiltinExchangeType.DIRECT,true);
/*
绑定交换机和队列
参数1:队列名
参数2:交换机名称
参数3:路由key 广播模型 不支持路由key ""
*/
channel.queueBind("directQueue01","directExchange01","error");
channel.queueBind("directQueue02","directExchange01","error");
channel.queueBind("directQueue02","directExchange01","info");
channel.queueBind("directQueue02","directExchange01","trace");
//发送消息到消息队列
/*
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称,简单模式下路由名称使用消息队列名称
3. props:配置信息
4. body:发送消息数据
*/
channel.basicPublish("directExchange01","user", MessageProperties.PERSISTENT_TEXT_PLAIN,("Hello World ").getBytes());
//4、关闭信道,断开连接
channel.close();
connection.close();
}
}
package com.hl.rabbitmq01.web;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("/p")
public class ProducerController {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public void send(@RequestParam(defaultValue = "user") String key,
@RequestParam(defaultValue = "hello") String msg) throws IOException {
//amqpTemplate.convertAndSend("topicExchange", key, msg);
// rabbitTemplate.convertAndSend("topicExchange",key,msg);
Channel channel = rabbitTemplate
.getConnectionFactory().
createConnection()
.createChannel(false); //false 非事务模式运行 无需手动提交
channel.basicPublish(
"topicExchange", key,
MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBytes());
}
}
/*
创建交换机
*/
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder
.topicExchange("topicExchange")
.durable(true) //是否支持持久化机制
.build();
}
/*
创建队列
*/
@Bean
public Queue queue(){
return QueueBuilder.durable("topicQueue01").build();
}
发送方的消息确认机制
1、事务机制
消耗资源
RabbitMQ中与事务有关的主要有三个方法:
txSelect() 开始事务
txCommit() 提交事务
txRollback() 回滚事务
txSelect主要用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。
当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。
示例
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage(String message){
rabbitTemplate.setChannelTransacted(true); //开启事务操作
rabbitTemplate.execute(channel -> {
try {
channel.txSelect();//开启事务
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
int i = 5/0;
channel.txCommit();//没有问题提交事务
}catch (Exception e){
e.printStackTrace();
channel.txRollback();//有问题回滚事务
}
return null;
});
return "success";
}
}
消费者没有任何变化。
通过测试会发现,发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常,就会捕获异常通过txRollback方法进行回滚事务了,则消息不会发送,消费者就获取不到消息。
2、confirm确认机制
推荐
同步通知
channel.confirmSelect(); //开始confirm操作
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
if (channel.waitForConfirms()){
System.out.println("发送成功");
}else{
//进行消息重发
System.out.println("消息发送失败,进行消息重发");
}
异步通知
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
//消息正确到达broker,就会发送一条ack消息
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("发送消息成功");
}
//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("发送消息失败,重新发送消息");
}
});
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
十二、死信交换机和死信队列
在实际开发项目时,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中进行处理。
死信队列:RabbitMQ中并不是直接声明一个公共的死信队列,然后死信消息就会跑到死信队列中。而是为每个需要使用死信的消息队列配置一个死信交换机,当消息成为死信后,可以被重新发送到死信交换机,然后再发送给使用死信的消息队列。
死信交换机:英文缩写:DLX 。Dead Letter Exchange(死信交换机),死信交换机其实就是普通的交换机,通过给队列设置参数: x-dead-letter-exchange 和x-dead-letter-routing-key,来指向死信交换机
RabbitMQ规定消息符合以下某种情况时,将会成为死信
队列消息长度到达限制(队列消息个数限制);
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
原队列存在消息过期设置,消息到达超时时间未被消费;
死信消息会被RabbitMQ特殊处理,如果配置了死信队列,则消息会被丢到死信队列中,如果没有配置死信队列,则消息会被丢弃。
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","deadExchange");//当前队列和死信交换机绑定
map.put("x-dead-letter-routing-key","user.#");//当前队列和死信交换机绑定的路由规则
// map.put("x-max-length",2);//队列长度
map.put("x-message-ttl",10000);//队列消息过期时间,时间ms
// return QueueBuilder.durable("topicQueue01").build();
return QueueBuilder.durable("topicQueue").withArguments(map).build();
十三、延迟队列简介
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
RabbitMQ中没有延迟队列,但是可以用ttl+死信队列方式和延迟插件两种方式来实现
ttl+死信队列
ttl+死信队列代码在讲死信队列时已经实现,这个不再阐述。
延迟插件
人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
十四、RabbitMQ消息重复消费
RabbitMQ消息重复消费问题_rabbitmq重复消费的问题解决-CSDN博客
业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题。
比如,用户到银行取钱后会收到扣款通知短信,如果用户收到多条扣款信息通知则会有困惑。
解决方法一:send if not exist 首先将 RabbitMQ 的消息自动确认机制改为手动确认,然后每当有一条消息消费成功了,就把该消息的唯一ID记录在Redis 上,然后每次发送消息时,都先去 Redis 上查看是否有该消息的 ID,如果有,表示该消息已经消费过了,不再处理,否则再去处理。
2.1 利用数据库唯一约束实现幂等
解决方法二:insert if not exist 可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据
解决方法三:sql的乐观锁 比如给用户发送短信,变成如果该用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,每修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。
十五、RabbitMQ消息积压
RabbitMq——消息积压分析和解决思路_rabbitmq消息积压-CSDN博客
消息积压产生的原因 正常而言,一般的消息从消息产生到消息消费需要经过以下几种阶段。
以Direct模式为例:
消息由生产者产生,比如新订单的创建等,经过交换机,将消息发送至指定的队列中,然后提供给对应的消费者进行消费。
在这个链路中,存在消息积压的原因大致分为以下几种:
1、消费者宕机,导致消息队列中的消息无法及时被消费,出现积压。 2、消费者没有宕机,但因为本身逻辑处理数据耗时,导致消费者消费能力不足,引起队列消息积压。 3、消息生产方单位时间内产生消息过多,比如“双11大促活动”,导致消费者处理不过来。 消息积压问题解决 针对上面消息积压问题的出现,大致进行了分析,那么根据分析则能制定相关的应对方法。如下所示:
1、大促活动等,导致生产者流量过大,引起积压问题。
提前增加服务器的数量,增加消费者数目,提升消费者针对指定队列消息处理的效率。
2、上线更多的消费者,处理消息队列中的数据。(和1中的大致类似)
3、如果成本有限,则可以专门针对这个队列,编写一个另类的消费者。
当前另类消费者,不进行复杂逻辑处理,只将消息从队列中取出,存放至数据库中,然后basicAck反馈给消息队列。
十六、消息入库(消息补偿)
如果RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,这样也不太好进行处理。所以为了避免RabbitMQ持久化失败而导致数据丢失,我们自己也要做一些消息补偿机制,以应对一些极端情况。
在使用消息队列(Message Queue)时,消息的补偿机制是一种处理消息处理失败或异常情况的方法。当消息消费者无法成功处理消息时,补偿机制允许系统将消息重新发送或执行其他操作,以确保消息的可靠传递和处理。
补偿机制通常涉及以下几个方面:
重试机制:当消息处理失败时,补偿机制会尝试重新发送消息给消费者,以便重新处理。重试间隔和重试次数可以根据具体情况进行配置,以避免重复投递导致的消息处理失败。
延时队列:补偿机制还可以使用延时队列来处理无法立即处理的消息。当某个消息处理失败时,可以将该消息放入到延时队列中,在一定的延时之后再次尝试发送给消费者进行处理。
死信队列:当消息无法被成功处理时,可以将这些无法处理的消息发送到死信队列(Dead Letter Queue)。死信队列通常用于存储无法被消费者处理的消息,以便后续进行排查和处理。
可视化监控和报警:补偿机制还可以包括对消息队列的监控和报警功能,以便及时发现和处理异常情况。通过可视化监控工具可以实时查看消息队列的状态和处理情况,及时发现问题并采取相应的补救措施。
补偿机制的设计和实现密切依赖于具体的消息中间件和使用场景,不同的消息队列系统可能提供不同的补偿机制。因此,在选择和使用消息队列时,需要根据自身的需求和系统特点来选择适合的消息补偿机制。