RabbitMQ 是一款基于 AMQP 协议的高性能消息中间件,广泛应用于分布式系统中,用于实现服务之间的异步通信、解耦和负载均衡。RabbitMQ 提供了五种常见的消息模型,每种模型都有其独特的特点和适用场景。本文将详细介绍这五种消息模型,帮助读者更好地理解和使用 RabbitMQ。
一、简单模式(Simple Queue)
1.1 模型介绍
简单模式是最基础的消息传递模型,包含一个生产者、一个队列和一个消费者。生产者将消息发送到队列,消费者从队列中接收消息。这种模式适用于一对一的通信场景。
1.2 工作流程
生产者将消息发送到指定的队列。
消费者监听队列,获取并处理消息。
消息被消费后从队列中删除。
1.3 应用场景
适用于简单的任务分配场景,例如日志记录、邮件通知等。
创建一个工程demo,两个子模块consumer和publisher
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
配置:我是把mq安装到的windows
spring:
rabbitmq:
host: localhost # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /yyf # 虚拟主机
username: yyf # 用户名
password: 123456 # 密码
根基mq配置
生产消息
package com.itfly;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* Unit test for simple App.
*/
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "test.queue";
// 消息
String message = "hello, spring amqp! yyf";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
接收消息:在消费者模块
package com.itfly.controller;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "test.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
启动消费者服务,再启动测试类
二、工作模式(Work Queue)
2.1 模型介绍
工作模式用于在多个消费者之间分配任务。一个生产者将消息发送到队列,多个消费者可以并发地从队列中获取任务并处理。这种模式可以实现任务的负载均衡。
一条消息只能被一个消费者处理
2.2 工作流程
生产者将消息发送到队列。
多个消费者监听同一个队列,竞争获取消息。
消费者处理完消息后,消息从队列中删除。
2.3 应用场景
适用于需要并发处理的任务分配场景,例如批量处理订单、视频转码等。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
三、发布/订阅模式(Publish/Subscribe)
发布/订阅模式是一种一对多的消息传递模型,生产者将消息发送到交换机(Exchange),交换机根据绑定规则将消息分发到多个队列,从而实现消息的广播。
交换机的作用是什么?
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
不能缓存消息,路由失败,消息丢失
FanoutExchange的会将消息路由到每个绑定的队列
3.1 广播模式(Fanout Exchange)
Fanout 交换机将消息广播到所有绑定的队列,不关心消息的路由键。
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "test.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);//三个参数
}
3.2 路由模式(Direct Exchange)
Direct 交换机根据路由键将消息发送到指定的队列。生产者在发送消息时指定路由键,队列在绑定交换机时也指定路由键,只有匹配的队列才能接收消息。
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key)消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息@Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "test.direct"; // 消息 String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
消息只会被路由到
Routingkey为red的队列
3.3 主题模式(Topic Exchange)
Topic 交换机允许使用通配符匹配路由键,从而实现更灵活的消息订阅。例如,路由键可以使用 *
和 #
,其中 *
表示一个单词,#
表示零个或多个单词。
3.4 应用场景
适用于需要一对多消息传递的场景,例如日志收集、事件通知等。
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以.
分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
四、RPC 模式(Remote Procedure Call)
RPC 模式用于实现远程过程调用。客户端发送请求消息到队列,服务端处理请求后将响应消息发送到另一个队列,客户端从该队列中获取响应。
4.1 工作流程
客户端发送请求消息到请求队列。
服务端处理请求,将响应消息发送到响应队列。
客户端从响应队列中获取响应消息。
4.2 应用场景
适用于需要同步调用的场景,例如远程接口调用。
五、延迟队列模式(Delayed Queue)
延迟队列用于实现消息的延迟处理。生产者发送消息时可以指定延迟时间,消息在达到延迟时间后才会被消费者消费。
5.1 工作流程
生产者发送消息到队列,并指定延迟时间。
消息在队列中等待指定的延迟时间。
延迟时间到达后,消费者从队列中获取消息并处理。
5.2 应用场景
适用于需要延迟处理的场景,例如定时任务、订单超时处理等。