RabbitMQ简述
RabbitMQ 是一个开源的 消息代理(Message Broker) 软件,实现了 高级消息队列协议(AMQP),用于在分布式系统中存储、转发消息,支持异步通信、解耦服务、负载均衡和消息缓冲。
核心概念
Producer(生产者):发送消息的应用。
Consumer(消费者):接收消息的应用。
Queue(队列):存储消息的缓冲区,遵循 FIFO(先进先出)。
Exchange(交换机):接收生产者消息并路由到队列(根据规则)。
Binding(绑定):定义交换机和队列之间的关联规则。
Message(消息):包含有效载荷(数据)和元数据(如路由键、头信息)。
交换机类型(Routing Strategies)
直连交换机(Direct Exchange)
Direct:精确匹配路由键(如点对点通信)。
- 根据消息的routing key精确匹配队列
- 常用于单播(unicast)消息路由
- 典型应用场景:订单处理(不同订单类型路由到不同队列)
扇形交换机(Fanout Exchange)
Fanout:广播到所有绑定的队列(发布/订阅模式)。
- 将消息广播到所有绑定的队列
- 忽略routing key
- 典型应用场景:广播通知、事件发布
主题交换机(Topic Exchange)
Topic:基于通配符匹配路由键(灵活的路由)。
- 根据通配符匹配routing key
- 支持*(匹配一个单词)和#(匹配零个或多个单词)
- 典型应用场景:基于多维度路由(如日志级别.应用名称)
头交换机(Headers Exchange)
Headers:通过消息头属性路由(而非路由键)。
- 根据消息头(header)属性匹配
- 忽略routing key
- 支持x-match参数(all需全部匹配,any只需匹配一个)
交换机属性
创建交换机时可设置以下主要属性:
Name:交换机名称
Type:交换机类型(direct, fanout, topic, headers)
Durability:是否持久化(重启后是否保留)
Auto-delete:当所有队列都解除绑定后是否自动删除
Arguments:额外参数(如alternate-exchange等)
模式
模式 | 交换机类型 | 核心机制 | 典型应用场景 |
---|---|---|---|
简单模式 | 默认交换机 | 直接队列绑定 | 单任务异步处理 |
工作队列 | 默认交换机 | 多消费者竞争 | 并行任务处理 |
发布/订阅 | Fanout | 广播到所有队列 | 事件通知 |
路由模式 | Direct | 精确匹配路由键 | 选择性日志分发 |
主题模式 | Topic | 通配符匹配路由键 | 多维度消息分类 |
RPC | 默认交换机 | 回调队列+关联ID | 同步远程调用 |
头部交换机 | Headers | 键值对匹配 | 复杂条件路由 |
死信队列 | 任意类型(DLX) | TTL/拒绝触发 | 异常消息处理 |
延迟队列 | Delayed Message插件 | 延迟投递 | 定时任务/超时控制 |
简单模式
简单队列不介绍,直接看工作队列
工作队列
创建队列
创建一个名为xiri.queue的队列
消费者代码
模拟2个消费者互相抢消息
@Component
public class SpringRabbitListener
{
@RabbitListener(queues = {"xiri.queue"})
public void listener1(String mes)
{
System.out.println("消费者1接受消息:"+mes);
}
@RabbitListener(queues = {"xiri.queue"})
public void listener2(String mes)
{
System.out.println("消费者2接受消息:"+mes);
}
}
生产者代码
模拟50条消息
@SpringBootTest
public class ProducerTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void WorkQueueSent()
{
//队列名称
String queueName = "xiri.queue";
for (int i = 1; i <= 50; i++) {
//发送消息
rabbitTemplate.convertAndSend(queueName,"消息-"+i);
}
}
}
运行结果
由此发现默认情况下,是轮询投递消息,并没有考虑到消费者已经处理完了消息,造成消息堆积
消息堆积处理方案(能者多劳)
设置每次只能给消费者投递1次消息,处理完成后才能获取下一个消息
- 修改yml配置文件
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
virtual-host: /xiri #虚拟主机
username: xiri #账号
password: 123 #密码
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
- 修改消费者代码
给代码加上等待时间进行模拟测试
@Component
public class SpringRabbitListener
{
@RabbitListener(queues = {"xiri.queue"})
public void listener1(String mes) throws InterruptedException {
System.out.println("消费者1接受消息:"+mes);
Thread.sleep(20);
}
@RabbitListener(queues = {"xiri.queue"})
public void listener2(String mes) throws InterruptedException
{
System.out.println("消费者2接受消息:"+mes);
Thread.sleep(100);
}
}
- 测试结果
消费者1处理消息快,处理消息多,实现能者多劳
发布/订阅
控制台设置
设置一个fanout交换机
设置两个队列
绑定2个队列
消费者
在消费者服务写两个消费者方法模拟,分别监听队列1和队列2
@Component
public class SpringRabbitListener
{
@RabbitListener(queues = {"xiri.queue1"})
public void listener1(String mes) throws InterruptedException {
System.out.println("消费者1接受消息:"+mes);
}
@RabbitListener(queues = {"xiri.queue2"})
public void listener2(String mes) throws InterruptedException
{
System.out.println("消费者2接受消息:"+mes);
}
}
生产者
生产者向交换机发送消息
@SpringBootTest
public class ProducerTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void sent()
{
//队列名称
String exchange = "xiri.fanout";
//发送消息
rabbitTemplate.convertAndSend(exchange,null,"消息");//routingKey没有设置,可以为空
}
}
运行结果
路由模式
直连交换机(Direct Exchange)会根据规则路由到指定的队列
控制台设置
创建类型为direct,名称为xiri.direct交换机
创建2个队列,名字分别为direct.queue1、direct.queue2
进行绑定
direct.queue1绑定key1
direct.queue2绑定key2
消费者
@Component
public class SpringRabbitListener
{
@RabbitListener(queues = {"direct.queue1"})
public void listener1(String mes) throws InterruptedException {
System.out.println("消费者1接受消息:"+mes);
}
@RabbitListener(queues = {"direct.queue2"})
public void listener2(String mes) throws InterruptedException
{
System.out.println("消费者2接受消息:"+mes);
}
}
生产者
@SpringBootTest
public class ProducerTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void sent()
{
//队列名称
String exchange = "xiri.direct";
//发送消息
rabbitTemplate.convertAndSend(exchange,"key1","消息1");
rabbitTemplate.convertAndSend(exchange,"key1","消息2");
rabbitTemplate.convertAndSend(exchange,"key2","消息3");
}
}
运行结果
主题模式
直连交换机(Direct Exchange) 和 主题交换机(Topic Exchange)类似,区别在于Routing key可以是多个单词列表以 .(点) 分割
控制台设置
创建类型为topic,名为xiri.topic的交换机
创建队列
交换机绑定队列
消费者
@Component
public class SpringRabbitListener
{
@RabbitListener(queues = {"topic.queue1"})
public void listener1(String mes) throws InterruptedException {
System.out.println("消费者1接受消息:"+mes);
}
@RabbitListener(queues = {"topic.queue2"})
public void listener2(String mes) throws InterruptedException
{
System.out.println("消费者2接受消息:"+mes);
}
}
生产者
@SpringBootTest
public class ProducerTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void sent()
{
//队列名称
String exchange = "xiri.topic";
//发送消息
rabbitTemplate.convertAndSend(exchange,"topic.key1","消息1");
rabbitTemplate.convertAndSend(exchange,"topic.key2","消息2");
rabbitTemplate.convertAndSend(exchange,"topic.key.node1","消息3");
}
}
运行结果
根据通配符发到消费者
注解声明队列和交换机
在消费者端,通过 @RabbitListener 注解自动声明队列并绑定到交换机
@Component
public class SpringRabbitListener
{
//基于注解来声明队列和交换机,并且绑定
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue1", durable = "true"), //设置队列,并且持久化
exchange = @Exchange(value = "xiri.direct",type = ExchangeTypes.DIRECT), //设置交换机和类型
key = {"key1"} //设置路由
)
})
public void listener1(String mes) throws InterruptedException {
System.out.println("消费者1接受消息:"+mes);
}
}
消息转换器
转换成json格式传输
消费者和生产者都需要创建bean
创建Bean
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConverter
{
@Bean
public MessageConverter messageConverter()
{
return new Jackson2JsonMessageConverter();
}
}
这样数据就是以JSON格式传输的
获取消息
消息丢失问题
生产者发送消息丢失
生产者发送消息到 RabbitMQ 服务器时,由于网络问题或 RabbitMQ 服务崩溃,消息未到达交换机
解决方案
1. 生产者重试机制
通过配置重试机制,但是SpringAMQP是阻塞的,如果对性能有要求不能使用,这个只是对连接进行的重试,而不是消息失败的重试
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
virtual-host: /xiri #虚拟主机
username: xiri #账号
password: 123 #密码
connection-timeout: 1s #超时时间
template:
retry:
enabled: true #开启超时重试机制
initial-interval: 1000ms #失败后初始等待时间
multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier
max-attempts: 3 #最大重试次数
测试效果,故意将网络故障,造成超时重试3次
2. 生产者确认
RabbitMQ 提供 ConfirmCallback 机制,确认消息是否成功到达交换机。
如果对消息可靠性要求不高,不需要开启确认机制,因为会影响性能
生产者yml文件配置
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
virtual-host: /xiri #虚拟主机
username: xiri #账号
password: 123 #密码
connection-timeout: 1s #超时时间
template:
retry:
enabled: true #开启超时重试机制
initial-interval: 1000ms #失败后初始等待时间
multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier
max-attempts: 3 #最大重试次数
publisher-confirm-type: correlated # 开启异步确认
publisher-returns: true # 开启路由失败回调
以下为以上内容中关键的配置信息
publisher-confirm-type: correlated # 开启异步确认
publisher-returns: true # 开启路由失败回调
生产者代码配置
// Spring AMQP 配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true); // 开启强制回调
// 设置 ConfirmCallback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息到达交换机,ID: " + correlationData.getId());
} else {
System.err.println("消息未到达交换机,原因: " + cause);
}
});
// 设置 ReturnsCallback
rabbitTemplate.setReturnsCallback(returned -> {
System.err.println("消息未路由到队列: " + returned.getMessage());
});
return rabbitTemplate;
}
生产者测试
@SpringBootTest
public class ProducerTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void sent()
{
//队列名称
String exchange = "xiri.direct";
//设置消息唯一编号
CorrelationData id = new CorrelationData(UUID.randomUUID().toString());
//发送消息
rabbitTemplate.convertAndSend(exchange,"key","消息",id);
}
}
消息结果
3. 数据持久化
RabbitMQ默认将数据保存在内存当中,如果宕机了,消息就会丢失,还会造成内存积压,引发阻塞问题
实现数据持久化三个方面:交换机持久化、队列持久化、消息持久化
spring发送消息默认就是持久的
设置非持久化
@SpringBootTest
public class ProducerTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void sent()
{
//队列名称
String exchange = "xiri.direct";
//设置消息唯一编号
CorrelationData id = new CorrelationData(UUID.randomUUID().toString());
//发送消息
rabbitTemplate.convertAndSend(exchange, "key", "消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);//发送持久化消息
return message;
}
});
}
}
以下已经设置为1,表示非持久化模式
4. lazy queue
Lazy Queue 是 RabbitMQ 的一种特殊队列模式,它会尽可能将消息存储在磁盘,而不是内存中,从而减少内存使用,适合处理大量消息且消费较慢的场景
3.6.0(初始引入)
首次支持 Lazy Queue,允许消息直接存储到磁盘,减少内存占用。
3.12.0(默认模式)
从该版本开始,Lazy Queue 成为所有队列的默认模式,官方推荐升级到该版本或手动启用 Lazy 模式1。
消费者消息丢失问题
解决方案
1.确认机制
SpringAMQP消息确认机制有三种处理方式:
- none 不处理
- manual 手动处理,需要在业务代码中调用api
- auto 自动处理,利用aop处理,对代码没有破坏性
当业务出现异常时,会自动返回nack
如果是消息处理或校验异常,自动返回reject
开启消费者确认机制为auto,有spring确认消息处理完成后返回ack,异常返回nack
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto #none:关闭ack,manual:手动ack,auto:自动ack
2.重试机制
在 Spring AMQP 的 RabbitMQ 配置中,stateless 是消费者重试机制(retry)的一个参数,用于控制重试时的状态管理方式
stateless=true(默认)
- 每次重试都是无状态的,即不保留前一次尝试的上下文(如数据库事务、Spring Session 等)。
- 适用场景:普通消息处理,无需依赖前一次重试的状态。
- 性能更好:因为不需要维护状态。
stateless=false
- 重试时会保留状态(如事务、Session 等),确保多次重试在同一个上下文中执行。
- 适用场景:需要事务一致性的操作(如支付处理)。
- 性能较低:因为需要维护状态。
开启重试机制
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
retry:
enabled: true #开启消费者重试机制
initial-interval: 1000ms #失败后初始等待时间
multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier
max-attempts: 3 #最大重试次数
stateless: true #true为无状态,false为有状态。决定重试时是否保持消费者状态(如事务、Session等)
重试多次依然失败处理策略
在开启重试模式后,重试次数耗尽依然失败,则需要有MessageRecoverer接口来处理,它有三种实现:
实现类 | 行为 | 适用场景 |
---|---|---|
RejectAndDontRequeueRecoverer(默认) | 直接拒绝消息(reject),且不重新入队,消息可能丢失或进入死信队列(若配置)13 | 非关键消息,允许丢弃 |
ImmediateRequeueMessageRecoverer | 立即将消息重新放回队列(nack + requeue=true),可能导致无限循环 | 临时性错误(如网络抖动) |
RepublishMessageRecoverer(推荐) | 将消息重新发布到指定的异常交换机和队列,供人工或后续处理 | 关键业务,需 |
使用第三种方式演示
开启消费者失败重试机制,并设置MessageRecoverer,多次重试无效后将消息投递到异常交换机,交由人工处理问题
消费者ymy配置
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true #开启消费者重试机制
消费者配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
//这个配置需要开启重试机制才会开启
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name="enabled",havingValue = "true")
public class ErrorConfig
{
@Bean
public DirectExchange errorExchange()
{
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue()
{
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(DirectExchange errorExchange,Queue errorQueue)
{
//队列绑定交换机
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate)
{
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}
}
效果
将异常信息和消息全部转到了error.queue
业务层幂等设计
- 数据库唯一约束
- 例如订单表对 order_id 设置唯一索引,重复插入会报错。
Redis 原子操作 - 用 SETNX 或分布式锁标记已处理的消息。
- 消息去重
- 生产者生成唯一 ID,发送消息时携带 correlationId,消费者记录已处理的 ID。
- 消费者记录消息 ID,用 Redis 或数据库存储已处理的消息 ID。
延迟消息
1.死信交换机
利用死信队列(DLX)+ TTL 实现延迟消息
死信队列(DLX):死信会被路由到指定的死信交换机(DLX),再进入死信队列,由消费者处理
消息设置 TTL(Time To Live):消息或队列可以设置过期时间(TTL),到期后消息会变成“死信”
消费者
声明队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DlxConfig
{
@Bean
public DirectExchange xiriExchange()
{
return new DirectExchange("xiri.direct",true,false);
}
@Bean
public Queue xiriQueue()
{
return QueueBuilder.durable("xiri.queue")
.withArgument("x-dead-letter-exchange", "dlx.direct")
.withArgument("x-dead-letter-routing-key","dlx.key")
.build();
}
@Bean
public Binding xiriBinding()
{
return BindingBuilder.bind(xiriQueue())
.to(xiriExchange())
.with("xiri.key");
}
}
消费
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class SpringRabbitListener
{
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "dlx.queue", durable = "true"),
exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),
key = "dlx.key" // 死信路由键
)
)
public void listener(String mes) throws InterruptedException
{
System.out.println(LocalDateTime.now() +" 死信接受消息:"+mes);
}
}
生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.LocalDateTime;
@SpringBootTest
public class ProducerTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void sent()
{
//队列名称
String exchange = "xiri.direct";
//发送消息
rabbitTemplate.convertAndSend(exchange, "xiri.key", "消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置过期时间(5秒)
message.getMessageProperties().setExpiration("5000");
return message;
}
});
System.out.println(LocalDateTime.now() +" 发送消息");
}
}
结果
5秒后收到消息
缺点: 消息排序问题:如果队列中有不同 TTL 的消息,RabbitMQ 只会检查队头消息的 TTL,可能导致后进队的消息先过期
2.RabbitMQ延迟插件
使用 rabbitmq-delayed-message-exchange 插件
RabbitMQ 官方提供的插件,通过 自定义交换机类型(x-delayed-message) 实现真正的延迟投递,消息按延迟时间排序,到期后才会被路由到目标队列
- 下载插件(需匹配 RabbitMQ 版本):
插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases - 将下载的文件放到RabbitMQ的plugins目录里面
- 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 重启RabbitMQ
关闭
rabbitmq-service.bat stop
启动
rabbitmq-server start
消费者
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class SpringRabbitListener
{
//延迟队列,关键点在交换机设置delayed属性为true
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "xiri.queue", durable = "true"),
exchange = @Exchange(name = "xiri.direct", type = ExchangeTypes.DIRECT,delayed = "true"),
key = "xiri.key"
)
)
public void listener1(String mes) throws InterruptedException {
System.out.println(LocalDateTime.now()+" 消费者接受消息:"+mes);
}
}
生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.LocalDateTime;
@SpringBootTest
public class ProducerTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void sent()
{
//队列名称
String exchange = "xiri.direct";
//发送消息
rabbitTemplate.convertAndSend(exchange,"xiri.key","消息",message -> {
message.getMessageProperties().setDelayLong(5000L);//设置5秒过去
return message;
});
System.out.println(LocalDateTime.now()+" 发送消息");
}
}
结果