SpringBoot牵手RabbitMQ

发布于:2025-03-14 ⋅ 阅读:(14) ⋅ 点赞:(0)


一、环境搭建

1.1 创建SpringBoot项目

1.1.1 创建一个SpringRabbitMQ项目,打包类型为pom

在这里插入图片描述

1.1.2 添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.syh</groupId>
    <artifactId>SpringRabbitMQ</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <packaging>pom</packaging>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.2.3 创建子模块publisher

消息的生产者

server:
  port: 8081
spring:
  rabbitmq:
    host: 47.120.37.156
    port: 5672
    username: shan
    password: 123456
    virtual-host: syh

1.2.4 创建子模块consumer

消息的消费者

server:
  port: 8082
spring:
  rabbitmq:
    host: 47.120.37.156
    port: 5672
    username: shan
    password: 123456
    virtual-host: syh

1.2.5 最终项目结构

在这里插入图片描述

二、简单(Simple)模式

在这里插入图片描述

2.1 消费者模块声明队列

package com.syh.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author shan
 * @date 2024/5/19 14:30
 */
@Configuration
public class MQConfig {

    @Bean
    public Queue queue() {
        return new Queue("simple.queue");
    }
}

2.2 在生产者模块模拟发送消息

@RestController
@RequiredArgsConstructor
public class PublisherController {

    private final RabbitTemplate rabbitTemplate;

    @GetMapping("/publish/{message}")
    public String publish(@PathVariable("message") String message) {
        System.out.println("publish message: " + message);
        rabbitTemplate.convertAndSend("simple.queue", message);
        return "success";
    }
}

测试:
在这里插入图片描述
查看mq控制台
在这里插入图片描述
发送成功

2.3 在消费者模块模拟接受消息

package com.syh.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author shan
 * @date 2024/5/19 14:58
 */
@Component
public class MQListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

测试:
在这里插入图片描述

三、工作(Work)模式

在这里插入图片描述
需要创建一个队列多个消费者
队列中的消息只能被一个消费者消费,消费完即丢失了。

如果有多个消费者消费同一队列的消息
默认是采取的是公平模式,每人一半消息,不管性能高低,
通过设置prefetch来控制消费者预取的消息数量
,设置每个消费者最多取一条消息,处理完即可拿下一条消息。能者多劳(处理快的处理多)

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

场景:如果订单量庞大,一个消费者花费时间较长,那么可以让多个消费者同时消费这些订单消息

3.1 创建队列

package com.syh.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author shan
 * @date 2024/5/19 14:30
 */
@Configuration
public class MQConfig {

    @Bean
    public Queue simplerQueue() {
        return new Queue("simple.queue");
    }
    @Bean
    public Queue workQueue1() {
        return new Queue("work.queue");
    }
}

3.2 生产者模拟发送多条消息

@GetMapping("/publish2/{message}")
public String publishMessage(@PathVariable("message") String message) {
    System.out.println("publish message: " + message);
    for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend("work.queue", message +"_" + i);
    }
    return "success";
}

测试
在这里插入图片描述

查看
在这里插入图片描述
发送成功

3.2 消费者模块模拟接受消息

需要多个消费者

package com.syh.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author shan
 * @date 2024/5/19 14:58
 */
@Component
public class MQListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueueMessage1(String msg) throws InterruptedException {
        System.out.println("spring 消费者1--接收到消息:【" + msg + "】");
    }
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueueMessage2(String msg) throws InterruptedException {
        System.err.println("spring 消费者2--接收到消息:【" + msg + "】");
    }
}

测试:
在这里插入图片描述
通过效果能看出:
两个消费者各自消费了10个消息

配置消费者最多拿取消息的数量

  • 模拟两个消费者处理消息的时长不同

@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
    System.out.println("spring 消费者1--接收到消息:【" + msg + "】");
    Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
    System.err.println("spring 消费者2--接收到消息:【" + msg + "】");
    Thread.sleep(200);
}

如果不设置 prefatch
在这里插入图片描述
每个消费者各自处理10条消息

  • 设置
server:
  port: 8081
spring:
  rabbitmq:
    host: 47.120.37.156
    port: 5672
    username: shan
    password: 123456
    virtual-host: syh
    listener:
      simple:
        prefetch: 1 #设置每次从RabbitMQ接收的消息数量, 每次只能获取一条消息,处理完成才能再次接收

在这里插入图片描述
消费者1处理快,处理的数据多

四、发布订阅(Publish/Subscribe)模式

● 类型:fanout
● 特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。
在这里插入图片描述
可以看到,在订阅模型中,多了一个x(exchange)交换机角色
场景:用户下单后,可能会调用多个微服务,多个微服务中都需要获取改订单信息进行处理。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失

我们的计划是这样的:
● 创建一个交换机 exchange.fanout,类型是Fanout
● 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout
● 在模拟两个消费者消费消息

4.1 创建一个交换机和两个队列

@Bean
public FanoutExchange fanoutExchange(){
    //        ExchangeBuilder builder = ExchangeBuilder.fanoutExchange("exchange.fanout");
//        return builder.build();
    return new FanoutExchange("exchange.fanout");
}
@Bean
public Queue fanoutQueue1() {
    // Queue queue = QueueBuilder.durable("simpler").build();
    return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
    return new Queue("fanout.queue2");
}

4.2 交换机和队列的绑定

@Bean
public Binding bindingExchangeFanoutQueue1() {
    return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingExchangeFanoutQueue2() {
    return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}

或者

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
))

4.3 生产者模拟发送消息

@GetMapping("/publish3/{message}")
public String publish3(@PathVariable("message") String message) {
    System.out.println("publish message: " + message);
    String exchangeName = "exchange.fanout";
    rabbitTemplate.convertAndSend(exchangeName,"", message);
    return "success";
}

测试发送消息
在这里插入图片描述
查看
在这里插入图片描述
但是此时看不到消息:
因为交换机与队列此时并没有绑定!

4.4 消费者模拟消费消息

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue2"),
        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
}

此时监听的同时 让交换机与队列进行了绑定

测试发送消息
在这里插入图片描述
能够接受并消费

五、路由(Routing)模式

类型:direct
特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。

描述下Direct交换机与Fanout交换机的差异?
● Fanout交换机将消息路由给每一个与之绑定的队列
● Direct交换机根据RoutingKey判断路由给哪个队列
● 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?
● @Queue
● @Exchange

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在这里插入图片描述
在Direct模型下:
● 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
● 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
● Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

5.1 声明队列和交换机

@Bean
public DirectExchange directExchange(){
    return new DirectExchange("exchange.direct");
}
@Bean
public Queue directQueue1() {
    return new Queue("direct.queue1");
}
@Bean
public Queue directQueue2() {
    return new Queue("direct.queue2");
}
@Bean
public Queue directQueue3() {
    return new Queue("direct.queue3");
}

5.2 生产者模拟发送个消息

@GetMapping("/publish4/{message}/{routeKey}")
public String publish4(@PathVariable("message") String message, 
                       @PathVariable("routeKey") String routeKey) {
    System.out.println("publish message: " + message);
    System.out.println("routeKey: " + routeKey);
    String exchangeName = "exchange.direct";
    rabbitTemplate.convertAndSend(exchangeName,routeKey, message);
    return "success";
}

注意:发送消息是带了routingKey

5.3 消费者模拟消费消息

//------------------------------------------
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
        key = {"a","c"}
))
public void listenRabbitListener1(String msg) throws InterruptedException {
    System.out.println("spring 消费者1--接收到消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
        key = {"b","c"}
))
public void listenRabbitListener2(String msg) throws InterruptedException {
    System.out.println("spring 消费者2--接收到消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue3"),
        exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
        key = {"a","b"}
))
public void listenRabbitListener3(String msg) throws InterruptedException {
    System.out.println("spring 消费者3--接收到消息:【" + msg + "】");
}

5.4 测试

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6、主题(Topic)模式

● 类型:topic
● 特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。

描述下Direct交换机与Topic交换机的差异?
● Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
● Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词

举例:
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu

在这里插入图片描述
解释:
● Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather
● Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news

在这里插入图片描述

6.1 声明队列和交换机

@Bean
public TopicExchange topicExchange(){
    return new TopicExchange("exchange.topic");
}
@Bean
public Queue topicQueue1() {
    return new Queue("topic.queue1");
}
@Bean
public Queue topicQueue2() {
    return new Queue("topic.queue2");
}
@Bean
public Queue topicQueue3() {
    return new Queue("topic.queue3");
}   

6.2 生产者发送消息

@GetMapping("/publish5/{message}/{routeKey}")
public String publish5(@PathVariable("message") String message,
                       @PathVariable("routeKey") String routeKey) {
    System.out.println("publish message: " + message);
    System.out.println("routeKey: " + routeKey);
    String exchangeName = "exchange.topic";
    rabbitTemplate.convertAndSend(exchangeName,routeKey, message);
    return "success";
}

注意:交换机名字

6.3 消费者获取消息

@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(name = "topic.queue1"),
                exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
                key = "*.a.*"
        )
)
public void listenTopicQueue1(String msg) throws InterruptedException {
    System.out.println("spring 消费者1--接收到消息:【" + msg + "】");
}
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(name = "topic.queue2"),
                exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
                key = "*.*.b"
        )
)
public void listenTopicQueue2(String msg) throws InterruptedException {
    System.out.println("spring 消费者2--接收到消息:【" + msg + "】");
}

@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(name = "topic.queue3"),
                exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
                key = "c.#"
        )
)
public void listenTopicQueue3(String msg) throws InterruptedException {
    System.out.println("spring 消费者3--接收到消息:【" + msg + "】");
}

6.4 测试

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

7、消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

● 数据体积过大
● 有安全漏洞
● 可读性差

我们来测试一下。

测试默认转换器
发送消息后查看控制台:
在这里插入图片描述

配置JSON转换器【重要】
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

在publisher和consumer两个服务中都引入依赖:

<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-xml</artifactId>
  <version>2.9.10</version>
</dependency>

如果是SpringBoot项目中导入了web启动器就不需要导依赖了

@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}
@GetMapping("/publish6/{message}")
public String publish6(@PathVariable("message") String message) {
    System.out.println("publish message: " + message);
    Map<String, Object> map = new HashMap<>();
    map.put("name", "syh");
    map.put("age", 22);
    map.put("message", message);
    rabbitTemplate.convertAndSend("object.queue", map);
    return "success";
}

在这里插入图片描述