RabbitMQ简述

发布于:2025-07-26 ⋅ 阅读:(16) ⋅ 点赞:(0)

RabbitMQ简述

RabbitMQ 是一个开源的 消息代理(Message Broker) 软件,实现了 高级消息队列协议(AMQP),用于在分布式系统中存储、转发消息,支持异步通信、解耦服务、负载均衡和消息缓冲。

核心概念

Producer(生产者):发送消息的应用。
Consumer(消费者):接收消息的应用。
Queue(队列):存储消息的缓冲区,遵循 FIFO(先进先出)。
Exchange(交换机):接收生产者消息并路由到队列(根据规则)。
Binding(绑定):定义交换机和队列之间的关联规则。
Message(消息):包含有效载荷(数据)和元数据(如路由键、头信息)。

交换机类型(Routing Strategies)

直连交换机(Direct Exchange)

Direct:精确匹配路由键(如点对点通信)。

  • 根据消息的routing key精确匹配队列
  • 常用于单播(unicast)消息路由
  • 典型应用场景:订单处理(不同订单类型路由到不同队列)

扇形交换机(Fanout Exchange)

Fanout:广播到所有绑定的队列(发布/订阅模式)。

  • 将消息广播到所有绑定的队列
  • 忽略routing key
  • 典型应用场景:广播通知、事件发布

主题交换机(Topic Exchange)

Topic:基于通配符匹配路由键(灵活的路由)。

  • 根据通配符匹配routing key
  • 支持*(匹配一个单词)和#(匹配零个或多个单词)
  • 典型应用场景:基于多维度路由(如日志级别.应用名称)

头交换机(Headers Exchange)

Headers:通过消息头属性路由(而非路由键)。

  • 根据消息头(header)属性匹配
  • 忽略routing key
  • 支持x-match参数(all需全部匹配,any只需匹配一个)

交换机属性

创建交换机时可设置以下主要属性:
Name:交换机名称
Type:交换机类型(direct, fanout, topic, headers)
Durability:是否持久化(重启后是否保留)
Auto-delete:当所有队列都解除绑定后是否自动删除
Arguments:额外参数(如alternate-exchange等)

模式

模式 交换机类型 核心机制 典型应用场景
简单模式 默认交换机 直接队列绑定 单任务异步处理
工作队列 默认交换机 多消费者竞争 并行任务处理
发布/订阅 Fanout 广播到所有队列 事件通知
路由模式 Direct 精确匹配路由键 选择性日志分发
主题模式 Topic 通配符匹配路由键 多维度消息分类
RPC 默认交换机 回调队列+关联ID 同步远程调用
头部交换机 Headers 键值对匹配 复杂条件路由
死信队列 任意类型(DLX) TTL/拒绝触发 异常消息处理
延迟队列 Delayed Message插件 延迟投递 定时任务/超时控制

简单模式

在这里插入图片描述
简单队列不介绍,直接看工作队列

工作队列

在这里插入图片描述

创建队列

创建一个名为xiri.queue的队列

在这里插入图片描述

消费者代码

模拟2个消费者互相抢消息

@Component
public class SpringRabbitListener 
{
	@RabbitListener(queues = {"xiri.queue"})
	public void listener1(String mes)
	{
		System.out.println("消费者1接受消息:"+mes);
	}
	
	@RabbitListener(queues = {"xiri.queue"})
	public void listener2(String mes)
	{
		System.out.println("消费者2接受消息:"+mes);
	}
}

生产者代码

模拟50条消息

@SpringBootTest
public class ProducerTest 
{
	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void WorkQueueSent()
	{
		//队列名称
		String queueName = "xiri.queue";
		for (int i = 1; i <= 50; i++) {
			//发送消息
			rabbitTemplate.convertAndSend(queueName,"消息-"+i);	
		}
	}
}

运行结果

由此发现默认情况下,是轮询投递消息,并没有考虑到消费者已经处理完了消息,造成消息堆积
在这里插入图片描述

消息堆积处理方案(能者多劳)

设置每次只能给消费者投递1次消息,处理完成后才能获取下一个消息

  1. 修改yml配置文件
spring:
  rabbitmq:
    host: 127.0.0.1   #ip
    port: 5672  #端口
    virtual-host: /xiri   #虚拟主机
    username: xiri  #账号
    password: 123   #密码
    listener:
      simple:
        prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
  1. 修改消费者代码
    给代码加上等待时间进行模拟测试
@Component
public class SpringRabbitListener 
{
	@RabbitListener(queues = {"xiri.queue"})
	public void listener1(String mes) throws InterruptedException {
		System.out.println("消费者1接受消息:"+mes);
		Thread.sleep(20);
	}
	
	@RabbitListener(queues = {"xiri.queue"})
	public void listener2(String mes) throws InterruptedException 
	{
		System.out.println("消费者2接受消息:"+mes);
		Thread.sleep(100);
	}
}
  1. 测试结果
    消费者1处理消息快,处理消息多,实现能者多劳
    在这里插入图片描述

发布/订阅

在这里插入图片描述

控制台设置

设置一个fanout交换机
在这里插入图片描述
设置两个队列
在这里插入图片描述
绑定2个队列
在这里插入图片描述

消费者

在消费者服务写两个消费者方法模拟,分别监听队列1和队列2

@Component
public class SpringRabbitListener 
{
	@RabbitListener(queues = {"xiri.queue1"})
	public void listener1(String mes) throws InterruptedException {
		System.out.println("消费者1接受消息:"+mes);
	}
	
	@RabbitListener(queues = {"xiri.queue2"})
	public void listener2(String mes) throws InterruptedException 
	{
		System.out.println("消费者2接受消息:"+mes);
	}
}

生产者

生产者向交换机发送消息

@SpringBootTest
public class ProducerTest 
{
	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void sent()
	{
		//队列名称
		String exchange = "xiri.fanout";
		//发送消息
		rabbitTemplate.convertAndSend(exchange,null,"消息");//routingKey没有设置,可以为空
	}
}

运行结果

在这里插入图片描述

路由模式

直连交换机(Direct Exchange)会根据规则路由到指定的队列

控制台设置

创建类型为direct,名称为xiri.direct交换机
在这里插入图片描述
创建2个队列,名字分别为direct.queue1、direct.queue2
在这里插入图片描述
进行绑定
direct.queue1绑定key1
direct.queue2绑定key2
在这里插入图片描述

消费者

@Component
public class SpringRabbitListener 
{
	@RabbitListener(queues = {"direct.queue1"})
	public void listener1(String mes) throws InterruptedException {
		System.out.println("消费者1接受消息:"+mes);
	}
	
	@RabbitListener(queues = {"direct.queue2"})
	public void listener2(String mes) throws InterruptedException 
	{
		System.out.println("消费者2接受消息:"+mes);
	}
}

生产者

@SpringBootTest
public class ProducerTest 
{
	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void sent()
	{
		//队列名称
		String exchange = "xiri.direct";
		//发送消息
		rabbitTemplate.convertAndSend(exchange,"key1","消息1");
		rabbitTemplate.convertAndSend(exchange,"key1","消息2");
		rabbitTemplate.convertAndSend(exchange,"key2","消息3");
	}
}

运行结果

在这里插入图片描述

主题模式

直连交换机(Direct Exchange) 和 主题交换机(Topic Exchange)类似,区别在于Routing key可以是多个单词列表以 .(点) 分割

控制台设置

创建类型为topic,名为xiri.topic的交换机
在这里插入图片描述
创建队列
在这里插入图片描述
交换机绑定队列在这里插入图片描述

消费者

@Component
public class SpringRabbitListener 
{
	@RabbitListener(queues = {"topic.queue1"})
	public void listener1(String mes) throws InterruptedException {
		System.out.println("消费者1接受消息:"+mes);
	}
	
	@RabbitListener(queues = {"topic.queue2"})
	public void listener2(String mes) throws InterruptedException 
	{
		System.out.println("消费者2接受消息:"+mes);
	}
}

生产者

@SpringBootTest
public class ProducerTest 
{
	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void sent()
	{
		//队列名称
		String exchange = "xiri.topic";
		//发送消息
		rabbitTemplate.convertAndSend(exchange,"topic.key1","消息1");
		rabbitTemplate.convertAndSend(exchange,"topic.key2","消息2");
		rabbitTemplate.convertAndSend(exchange,"topic.key.node1","消息3");
	}
}

运行结果

根据通配符发到消费者
在这里插入图片描述

注解声明队列和交换机

在消费者端,通过 @RabbitListener 注解自动声明队列并绑定到交换机

@Component
public class SpringRabbitListener 
{
	//基于注解来声明队列和交换机,并且绑定
	@RabbitListener(bindings = {
			@QueueBinding(
					value = @Queue(value = "direct.queue1", durable = "true"),	//设置队列,并且持久化
					exchange = @Exchange(value = "xiri.direct",type = ExchangeTypes.DIRECT),	//设置交换机和类型
					key = {"key1"}	//设置路由
			)
	})
	public void listener1(String mes) throws InterruptedException {
		System.out.println("消费者1接受消息:"+mes);
	}
}

消息转换器

转换成json格式传输
消费者和生产者都需要创建bean

创建Bean

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConverter 
{
	@Bean
	public MessageConverter messageConverter()
	{
		return new Jackson2JsonMessageConverter();
	}
}

这样数据就是以JSON格式传输的
获取消息
在这里插入图片描述

消息丢失问题

生产者发送消息丢失

生产者发送消息到 RabbitMQ 服务器时,由于网络问题或 RabbitMQ 服务崩溃,消息未到达交换机

解决方案

1. 生产者重试机制

通过配置重试机制,但是SpringAMQP是阻塞的,如果对性能有要求不能使用,这个只是对连接进行的重试,而不是消息失败的重试

spring:
  rabbitmq:
    host: 127.0.0.1   #ip
    port: 5672  #端口
    virtual-host: /xiri   #虚拟主机
    username: xiri  #账号
    password: 123   #密码
    connection-timeout: 1s #超时时间
    template:
      retry:
        enabled: true #开启超时重试机制
        initial-interval: 1000ms #失败后初始等待时间
        multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier
        max-attempts: 3 #最大重试次数

测试效果,故意将网络故障,造成超时重试3次
在这里插入图片描述

2. 生产者确认

RabbitMQ 提供 ConfirmCallback 机制,确认消息是否成功到达交换机。
如果对消息可靠性要求不高,不需要开启确认机制,因为会影响性能
生产者yml文件配置

spring:
  rabbitmq:
    host: 127.0.0.1   #ip
    port: 5672  #端口
    virtual-host: /xiri   #虚拟主机
    username: xiri  #账号
    password: 123   #密码
    connection-timeout: 1s #超时时间
    template:
      retry:
        enabled: true #开启超时重试机制
        initial-interval: 1000ms #失败后初始等待时间
        multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier
        max-attempts: 3 #最大重试次数
    publisher-confirm-type: correlated  # 开启异步确认
    publisher-returns: true # 开启路由失败回调

以下为以上内容中关键的配置信息

publisher-confirm-type: correlated  # 开启异步确认
publisher-returns: true # 开启路由失败回调

生产者代码配置

// Spring AMQP 配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
{
	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
	rabbitTemplate.setMandatory(true); // 开启强制回调
	
	// 设置 ConfirmCallback
	rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
		if (ack) {
			System.out.println("消息到达交换机,ID: " + correlationData.getId());
		} else {
			System.err.println("消息未到达交换机,原因: " + cause);
		}
	});

	// 设置 ReturnsCallback
	rabbitTemplate.setReturnsCallback(returned -> {
		System.err.println("消息未路由到队列: " + returned.getMessage());
	});

	return rabbitTemplate;
}

生产者测试

@SpringBootTest
public class ProducerTest 
{
	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void sent()
	{
		//队列名称
		String exchange = "xiri.direct";
		//设置消息唯一编号
		CorrelationData id = new CorrelationData(UUID.randomUUID().toString());
		//发送消息
		rabbitTemplate.convertAndSend(exchange,"key","消息",id);
	}
}

消息结果
在这里插入图片描述

3. 数据持久化

RabbitMQ默认将数据保存在内存当中,如果宕机了,消息就会丢失,还会造成内存积压,引发阻塞问题
实现数据持久化三个方面:交换机持久化、队列持久化、消息持久化
spring发送消息默认就是持久的
设置非持久化

@SpringBootTest
public class ProducerTest 
{
	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void sent() 
	{
		//队列名称
		String exchange = "xiri.direct";
		//设置消息唯一编号
		CorrelationData id = new CorrelationData(UUID.randomUUID().toString());
		//发送消息
		rabbitTemplate.convertAndSend(exchange, "key", "消息", new MessagePostProcessor() {
			@Override
			public Message postProcessMessage(Message message) {
				message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);//发送持久化消息
				return message;
			}
		});
	}
}

以下已经设置为1,表示非持久化模式
在这里插入图片描述

4. lazy queue

Lazy Queue 是 RabbitMQ 的一种特殊队列模式,它会尽可能将消息存储在磁盘,而不是内存中,从而减少内存使用,适合处理大量消息且消费较慢的场景
3.6.0(初始引入)
首次支持 Lazy Queue,允许消息直接存储到磁盘,减少内存占用。
3.12.0(默认模式)
从该版本开始,Lazy Queue 成为所有队列的默认模式,官方推荐升级到该版本或手动启用 Lazy 模式1。

消费者消息丢失问题

解决方案

1.确认机制

SpringAMQP消息确认机制有三种处理方式:

  1. none 不处理
  2. manual 手动处理,需要在业务代码中调用api
  3. auto 自动处理,利用aop处理,对代码没有破坏性
    当业务出现异常时,会自动返回nack
    如果是消息处理或校验异常,自动返回reject

开启消费者确认机制为auto,有spring确认消息处理完成后返回ack,异常返回nack

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  #none:关闭ack,manual:手动ack,auto:自动ack
2.重试机制

在 Spring AMQP 的 RabbitMQ 配置中,stateless 是消费者重试机制(retry)的一个参数,用于控制重试时的状态管理方式
stateless=true(默认)

  1. 每次重试都是无状态的,即不保留前一次尝试的上下文(如数据库事务、Spring Session 等)。
  2. 适用场景:普通消息处理,无需依赖前一次重试的状态。
  3. 性能更好:因为不需要维护状态。

stateless=false

  1. 重试时会保留状态(如事务、Session 等),确保多次重试在同一个上下文中执行。
  2. 适用场景:需要事务一致性的操作(如支付处理)。
  3. 性能较低:因为需要维护状态。

开启重试机制

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
        retry:
          enabled: true #开启消费者重试机制
          initial-interval: 1000ms #失败后初始等待时间
          multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier
          max-attempts: 3 #最大重试次数
          stateless: true #true为无状态,false为有状态。决定重试时是否保持消费者状态(如事务、Session等)

重试多次依然失败处理策略
在开启重试模式后,重试次数耗尽依然失败,则需要有MessageRecoverer接口来处理,它有三种实现:

实现类 行为 适用场景
RejectAndDontRequeueRecoverer(默认) 直接拒绝消息(reject),且不重新入队,消息可能丢失或进入死信队列(若配置)13 非关键消息,允许丢弃
ImmediateRequeueMessageRecoverer 立即将消息重新放回队列(nack + requeue=true),可能导致无限循环 临时性错误(如网络抖动)
RepublishMessageRecoverer(推荐) 将消息重新发布到指定的异常交换机和队列,供人工或后续处理 关键业务,需

使用第三种方式演示
开启消费者失败重试机制,并设置MessageRecoverer,多次重试无效后将消息投递到异常交换机,交由人工处理问题
消费者ymy配置

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true #开启消费者重试机制

消费者配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
//这个配置需要开启重试机制才会开启
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name="enabled",havingValue = "true")
public class ErrorConfig
{
	@Bean
	public DirectExchange errorExchange()
	{
		return new DirectExchange("error.direct");
	}
	
	@Bean
	public Queue errorQueue()
	{
		return new Queue("error.queue");
	}
	
	@Bean
	public Binding errorBinding(DirectExchange errorExchange,Queue errorQueue)
	{
		//队列绑定交换机
		return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
	}
	
	@Bean
	public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate)
	{
		return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
	}
}

效果
将异常信息和消息全部转到了error.queue
在这里插入图片描述

业务层幂等设计

  1. 数据库唯一约束
  • 例如订单表对 order_id 设置唯一索引,重复插入会报错。
    Redis 原子操作
  • 用 SETNX 或分布式锁标记已处理的消息。
  1. 消息去重
  • 生产者生成唯一 ID,发送消息时携带 correlationId,消费者记录已处理的 ID。
  • 消费者记录消息 ID,用 Redis 或数据库存储已处理的消息 ID。

延迟消息

1.死信交换机

利用死信队列(DLX)+ TTL 实现延迟消息
死信队列(DLX):死信会被路由到指定的死信交换机(DLX),再进入死信队列,由消费者处理
消息设置 TTL(Time To Live):消息或队列可以设置过期时间(TTL),到期后消息会变成“死信”

在这里插入图片描述

消费者

声明队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DlxConfig 
{
	@Bean
	public DirectExchange xiriExchange()
	{
		return new DirectExchange("xiri.direct",true,false);
	}

	@Bean
	public Queue xiriQueue()
	{
		return QueueBuilder.durable("xiri.queue")
				.withArgument("x-dead-letter-exchange", "dlx.direct")
				.withArgument("x-dead-letter-routing-key","dlx.key")
				.build();
	}

	@Bean
	public Binding xiriBinding() 
	{
		return BindingBuilder.bind(xiriQueue())
				.to(xiriExchange())
				.with("xiri.key");
	}
}

消费

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
public class SpringRabbitListener 
{
	@RabbitListener(
			bindings = @QueueBinding(
					value = @Queue(name = "dlx.queue", durable = "true"),
					exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),
					key = "dlx.key" // 死信路由键
			)
	)
	public void listener(String mes) throws InterruptedException 
	{
		System.out.println(LocalDateTime.now() +" 死信接受消息:"+mes);
	}
}
生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;

@SpringBootTest
public class ProducerTest 
{
	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void sent()
	{
		//队列名称
		String exchange = "xiri.direct";
		//发送消息
		rabbitTemplate.convertAndSend(exchange, "xiri.key", "消息", new MessagePostProcessor() {
			@Override
			public Message postProcessMessage(Message message) throws AmqpException {
				//设置过期时间(5秒)
				message.getMessageProperties().setExpiration("5000");
				return message;
			}
		});
		System.out.println(LocalDateTime.now() +" 发送消息");
	}
}
结果

在这里插入图片描述
5秒后收到消息
在这里插入图片描述

缺点: 消息排序问题:如果队列中有不同 TTL 的消息,RabbitMQ 只会检查队头消息的 TTL,可能导致后进队的消息先过期

2.RabbitMQ延迟插件

使用 rabbitmq-delayed-message-exchange 插件
RabbitMQ 官方提供的插件,通过 自定义交换机类型(x-delayed-message) 实现真正的延迟投递,消息按延迟时间排序,到期后才会被路由到目标队列

  1. 下载插件(需匹配 RabbitMQ 版本):
    插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
  2. 将下载的文件放到RabbitMQ的plugins目录里面
  3. 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 重启RabbitMQ

关闭

rabbitmq-service.bat stop

启动

rabbitmq-server start

消费者

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
public class SpringRabbitListener 
{
	//延迟队列,关键点在交换机设置delayed属性为true
	@RabbitListener(
			bindings = @QueueBinding(
					value = @Queue(name = "xiri.queue", durable = "true"),
					exchange = @Exchange(name = "xiri.direct", type = ExchangeTypes.DIRECT,delayed = "true"),
					key = "xiri.key"
			)
	)
	public void listener1(String mes) throws InterruptedException {
		System.out.println(LocalDateTime.now()+" 消费者接受消息:"+mes);
	}
}

生产者

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;

@SpringBootTest
public class ProducerTest 
{
	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void sent()
	{
		//队列名称
		String exchange = "xiri.direct";
		//发送消息
		rabbitTemplate.convertAndSend(exchange,"xiri.key","消息",message -> {
			message.getMessageProperties().setDelayLong(5000L);//设置5秒过去
			return message;
		});
		System.out.println(LocalDateTime.now()+" 发送消息");
	}
}

结果

在这里插入图片描述
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到