Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统

发布于:2025-08-09 ⋅ 阅读:(11) ⋅ 点赞:(0)

Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统

一、Spring AMQP 是什么?

Spring AMQP(Application Messaging Protocol)是 Spring 官方提供的对 AMQP 协议的封装,其核心模块有两个:

  • spring-amqp: 提供 AMQP 抽象封装
  • spring-rabbit: RabbitMQ 的具体实现

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

常见的场景包括:

  • 微服务之间的异步通信
  • 秒杀系统削峰
  • 用户注册发送邮件/短信通知
  • 分布式事务的最终一致性方案

二、Spring Boot 集成 RabbitMQ

2.1. 引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

它会自动引入 spring-rabbit 和 spring-amqp 模块。

2.2. 配置 RabbitMQ

spring:
  rabbitmq:
    host: 192.168.184.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

三、快速构建消息系统

  • 一个消息队列
  • 一个消息发送者
  • 一个消息监听者(消费者)

构建示例项目:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者
    在这里插入图片描述
    引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

3.1.消息发送

publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

3.2.消息接收

consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
        // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

四、WorkQueues模型

4.1. 介绍

Work Queues(工作队列)又叫 任务队列(Task Queues),主要用于将一个任务分发给多个消费者(工作线程)处理,每个任务只会被一个消费者处理。

核心思想是:生产者只管发送任务,多个消费者竞争获取任务并处理,达到并发消费、分担压力的目的
在这里插入图片描述

  • Producer(生产者):发送任务消息。
  • Queue(队列):缓存任务。
  • Consumer(消费者):从队列中获取任务并处理。

每个任务只会被一个消费者处理,多个消费者之间互不干扰。

4.2. 消息发送

publisher服务中的SpringAmqpTest类中添加一个测试方法实现循环发送:

/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

4.3. 消息接收

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

多个消费者监听同一个队列,消息将被平均分配(默认轮询方式)。

4.4. 公平分发 vs 轮询分发

🔁 默认行为:轮询分发
RabbitMQ 默认采用 Round-Robin(轮询) 分发方式,消费者不论是否处理完当前消息,下一条消息仍然会发给它。

这可能导致:处理慢的消费者积压任务,处理快的消费者反而闲着

✅ 公平分发(prefetch)
设置每个消费者的最大未确认消息数,让 RabbitMQ 只向空闲的消费者发送消息。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1  # 每个消费者同一时间只能处理1条消息

网站公告

今日签到

点亮在社区的每一天
去签到