RabbitMQ五种消息模型

发布于:2025-03-14 ⋅ 阅读:(21) ⋅ 点赞:(0)

RabbitMQ 是一款基于 AMQP 协议的高性能消息中间件,广泛应用于分布式系统中,用于实现服务之间的异步通信、解耦和负载均衡。RabbitMQ 提供了五种常见的消息模型,每种模型都有其独特的特点和适用场景。本文将详细介绍这五种消息模型,帮助读者更好地理解和使用 RabbitMQ。

一、简单模式(Simple Queue)

1.1 模型介绍

简单模式是最基础的消息传递模型,包含一个生产者、一个队列和一个消费者。生产者将消息发送到队列,消费者从队列中接收消息。这种模式适用于一对一的通信场景。

1.2 工作流程

  1. 生产者将消息发送到指定的队列。

  2. 消费者监听队列,获取并处理消息。

  3. 消息被消费后从队列中删除。

1.3 应用场景

适用于简单的任务分配场景,例如日志记录、邮件通知等。

创建一个工程demo,两个子模块consumer和publisher

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

配置:我是把mq安装到的windows

spring:
  rabbitmq:
    host: localhost # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /yyf # 虚拟主机
    username: yyf # 用户名
    password: 123456 # 密码

 根基mq配置

生产消息

package com.itfly;


import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * Unit test for simple App.
 */
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "test.queue";
        // 消息
        String message = "hello, spring amqp! yyf";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

 接收消息:在消费者模块

package com.itfly.controller;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
        // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "test.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

启动消费者服务,再启动测试类

二、工作模式(Work Queue)

2.1 模型介绍

工作模式用于在多个消费者之间分配任务。一个生产者将消息发送到队列,多个消费者可以并发地从队列中获取任务并处理。这种模式可以实现任务的负载均衡。

一条消息只能被一个消费者处理

2.2 工作流程

  1. 生产者将消息发送到队列。

  2. 多个消费者监听同一个队列,竞争获取消息。

  3. 消费者处理完消息后,消息从队列中删除。

2.3 应用场景

适用于需要并发处理的任务分配场景,例如批量处理订单、视频转码等。

 也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

 

三、发布/订阅模式(Publish/Subscribe)

发布/订阅模式是一种一对多的消息传递模型,生产者将消息发送到交换机(Exchange),交换机根据绑定规则将消息分发到多个队列,从而实现消息的广播。

交换机的作用是什么?

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列

3.1 广播模式(Fanout Exchange)

Fanout 交换机将消息广播到所有绑定的队列,不关心消息的路由键。

 

@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "test.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);//三个参数
}

3.2 路由模式(Direct Exchange)

Direct 交换机根据路由键将消息发送到指定的队列。生产者在发送消息时指定路由键,队列在绑定交换机时也指定路由键,只有匹配的队列才能接收消息。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

  • @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "test.direct";
        // 消息
        String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }

    消息只会被路由到Routingkey为red的队列

3.3 主题模式(Topic Exchange)

Topic 交换机允许使用通配符匹配路由键,从而实现更灵活的消息订阅。例如,路由键可以使用 *#,其中 * 表示一个单词,# 表示零个或多个单词。

3.4 应用场景

适用于需要一对多消息传递的场景,例如日志收集、事件通知等。

 

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

四、RPC 模式(Remote Procedure Call)

RPC 模式用于实现远程过程调用。客户端发送请求消息到队列,服务端处理请求后将响应消息发送到另一个队列,客户端从该队列中获取响应。

4.1 工作流程

  1. 客户端发送请求消息到请求队列。

  2. 服务端处理请求,将响应消息发送到响应队列。

  3. 客户端从响应队列中获取响应消息。

4.2 应用场景

适用于需要同步调用的场景,例如远程接口调用。

五、延迟队列模式(Delayed Queue)

延迟队列用于实现消息的延迟处理。生产者发送消息时可以指定延迟时间,消息在达到延迟时间后才会被消费者消费。

5.1 工作流程

  1. 生产者发送消息到队列,并指定延迟时间。

  2. 消息在队列中等待指定的延迟时间。

  3. 延迟时间到达后,消费者从队列中获取消息并处理。

5.2 应用场景

适用于需要延迟处理的场景,例如定时任务、订单超时处理等。