文章目录
一、环境搭建
1.1 创建SpringBoot项目
1.1.1 创建一个SpringRabbitMQ项目,打包类型为pom
1.1.2 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.syh</groupId>
<artifactId>SpringRabbitMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<packaging>pom</packaging>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.2.3 创建子模块publisher
消息的生产者
server:
port: 8081
spring:
rabbitmq:
host: 47.120.37.156
port: 5672
username: shan
password: 123456
virtual-host: syh
1.2.4 创建子模块consumer
消息的消费者
server:
port: 8082
spring:
rabbitmq:
host: 47.120.37.156
port: 5672
username: shan
password: 123456
virtual-host: syh
1.2.5 最终项目结构
二、简单(Simple)模式
2.1 消费者模块声明队列
package com.syh.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author shan
* @date 2024/5/19 14:30
*/
@Configuration
public class MQConfig {
@Bean
public Queue queue() {
return new Queue("simple.queue");
}
}
2.2 在生产者模块模拟发送消息
@RestController
@RequiredArgsConstructor
public class PublisherController {
private final RabbitTemplate rabbitTemplate;
@GetMapping("/publish/{message}")
public String publish(@PathVariable("message") String message) {
System.out.println("publish message: " + message);
rabbitTemplate.convertAndSend("simple.queue", message);
return "success";
}
}
测试:
查看mq控制台
发送成功
2.3 在消费者模块模拟接受消息
package com.syh.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author shan
* @date 2024/5/19 14:58
*/
@Component
public class MQListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
测试:
三、工作(Work)模式
需要创建一个队列多个消费者
队列中的消息只能被一个消费者消费,消费完即丢失了。
如果有多个消费者消费同一队列的消息
默认是采取的是公平模式,每人一半消息,不管性能高低,
通过设置prefetch来控制消费者预取的消息数量
,设置每个消费者最多取一条消息,处理完即可拿下一条消息。能者多劳(处理快的处理多)
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
场景:如果订单量庞大,一个消费者花费时间较长,那么可以让多个消费者同时消费这些订单消息
3.1 创建队列
package com.syh.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author shan
* @date 2024/5/19 14:30
*/
@Configuration
public class MQConfig {
@Bean
public Queue simplerQueue() {
return new Queue("simple.queue");
}
@Bean
public Queue workQueue1() {
return new Queue("work.queue");
}
}
3.2 生产者模拟发送多条消息
@GetMapping("/publish2/{message}")
public String publishMessage(@PathVariable("message") String message) {
System.out.println("publish message: " + message);
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("work.queue", message +"_" + i);
}
return "success";
}
测试
查看
发送成功
3.2 消费者模块模拟接受消息
需要多个消费者
package com.syh.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author shan
* @date 2024/5/19 14:58
*/
@Component
public class MQListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
System.out.println("spring 消费者1--接收到消息:【" + msg + "】");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消费者2--接收到消息:【" + msg + "】");
}
}
测试:
通过效果能看出:
两个消费者各自消费了10个消息
配置消费者最多拿取消息的数量
- 模拟两个消费者处理消息的时长不同
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
System.out.println("spring 消费者1--接收到消息:【" + msg + "】");
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消费者2--接收到消息:【" + msg + "】");
Thread.sleep(200);
}
如果不设置 prefatch
每个消费者各自处理10条消息
- 设置
server:
port: 8081
spring:
rabbitmq:
host: 47.120.37.156
port: 5672
username: shan
password: 123456
virtual-host: syh
listener:
simple:
prefetch: 1 #设置每次从RabbitMQ接收的消息数量, 每次只能获取一条消息,处理完成才能再次接收
消费者1处理快,处理的数据多
四、发布订阅(Publish/Subscribe)模式
● 类型:fanout
● 特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。
可以看到,在订阅模型中,多了一个x(exchange)交换机角色
场景:用户下单后,可能会调用多个微服务,多个微服务中都需要获取改订单信息进行处理。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失
我们的计划是这样的:
● 创建一个交换机 exchange.fanout,类型是Fanout
● 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout
● 在模拟两个消费者消费消息
4.1 创建一个交换机和两个队列
@Bean
public FanoutExchange fanoutExchange(){
// ExchangeBuilder builder = ExchangeBuilder.fanoutExchange("exchange.fanout");
// return builder.build();
return new FanoutExchange("exchange.fanout");
}
@Bean
public Queue fanoutQueue1() {
// Queue queue = QueueBuilder.durable("simpler").build();
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
4.2 交换机和队列的绑定
@Bean
public Binding bindingExchangeFanoutQueue1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingExchangeFanoutQueue2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
或者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue1"),
exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
))
4.3 生产者模拟发送消息
@GetMapping("/publish3/{message}")
public String publish3(@PathVariable("message") String message) {
System.out.println("publish message: " + message);
String exchangeName = "exchange.fanout";
rabbitTemplate.convertAndSend(exchangeName,"", message);
return "success";
}
测试发送消息
查看
但是此时看不到消息:
因为交换机与队列此时并没有绑定!
4.4 消费者模拟消费消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue1"),
exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue2"),
exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
}
此时监听的同时 让交换机与队列进行了绑定
测试发送消息
能够接受并消费
五、路由(Routing)模式
● 类型:direct
● 特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
描述下Direct交换机与Fanout交换机的差异?
● Fanout交换机将消息路由给每一个与之绑定的队列
● Direct交换机根据RoutingKey判断路由给哪个队列
● 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
● @Queue
● @Exchange
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
● 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
● 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
● Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
5.1 声明队列和交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("exchange.direct");
}
@Bean
public Queue directQueue1() {
return new Queue("direct.queue1");
}
@Bean
public Queue directQueue2() {
return new Queue("direct.queue2");
}
@Bean
public Queue directQueue3() {
return new Queue("direct.queue3");
}
5.2 生产者模拟发送个消息
@GetMapping("/publish4/{message}/{routeKey}")
public String publish4(@PathVariable("message") String message,
@PathVariable("routeKey") String routeKey) {
System.out.println("publish message: " + message);
System.out.println("routeKey: " + routeKey);
String exchangeName = "exchange.direct";
rabbitTemplate.convertAndSend(exchangeName,routeKey, message);
return "success";
}
注意:发送消息是带了routingKey
5.3 消费者模拟消费消息
//------------------------------------------
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
key = {"a","c"}
))
public void listenRabbitListener1(String msg) throws InterruptedException {
System.out.println("spring 消费者1--接收到消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
key = {"b","c"}
))
public void listenRabbitListener2(String msg) throws InterruptedException {
System.out.println("spring 消费者2--接收到消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3"),
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
key = {"a","b"}
))
public void listenRabbitListener3(String msg) throws InterruptedException {
System.out.println("spring 消费者3--接收到消息:【" + msg + "】");
}
5.4 测试
6、主题(Topic)模式
● 类型:topic
● 特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。
描述下Direct交换机与Topic交换机的差异?
● Topic交换机接收的消息RoutingKey必须是多个单词,以 **.**
分割
● Topic交换机与队列绑定时的bindingKey可以指定通配符
● #
:代表0个或多个词
● *
:代表1个词
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
解释:
● Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather
● Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news
6.1 声明队列和交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("exchange.topic");
}
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.queue2");
}
@Bean
public Queue topicQueue3() {
return new Queue("topic.queue3");
}
6.2 生产者发送消息
@GetMapping("/publish5/{message}/{routeKey}")
public String publish5(@PathVariable("message") String message,
@PathVariable("routeKey") String routeKey) {
System.out.println("publish message: " + message);
System.out.println("routeKey: " + routeKey);
String exchangeName = "exchange.topic";
rabbitTemplate.convertAndSend(exchangeName,routeKey, message);
return "success";
}
注意:交换机名字
6.3 消费者获取消息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
key = "*.a.*"
)
)
public void listenTopicQueue1(String msg) throws InterruptedException {
System.out.println("spring 消费者1--接收到消息:【" + msg + "】");
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
key = "*.*.b"
)
)
public void listenTopicQueue2(String msg) throws InterruptedException {
System.out.println("spring 消费者2--接收到消息:【" + msg + "】");
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "topic.queue3"),
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
key = "c.#"
)
)
public void listenTopicQueue3(String msg) throws InterruptedException {
System.out.println("spring 消费者3--接收到消息:【" + msg + "】");
}
6.4 测试
7、消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
● 数据体积过大
● 有安全漏洞
● 可读性差
我们来测试一下。
测试默认转换器
发送消息后查看控制台:
配置JSON转换器【重要】
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
如果是SpringBoot项目中导入了web启动器就不需要导依赖了
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@GetMapping("/publish6/{message}")
public String publish6(@PathVariable("message") String message) {
System.out.println("publish message: " + message);
Map<String, Object> map = new HashMap<>();
map.put("name", "syh");
map.put("age", 22);
map.put("message", message);
rabbitTemplate.convertAndSend("object.queue", map);
return "success";
}