Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统
一、Spring AMQP 是什么?
Spring AMQP(Application Messaging Protocol)是 Spring
官方提供的对 AMQP
协议的封装,其核心模块有两个:
spring-amqp
: 提供AMQP
抽象封装spring-rabbit
:RabbitMQ
的具体实现
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了
RabbitTemplate
工具,用于发送消息
常见的场景包括:
- 微服务之间的异步通信
- 秒杀系统削峰
- 用户注册发送邮件/短信通知
- 分布式事务的最终一致性方案
二、Spring Boot 集成 RabbitMQ
2.1. 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
它会自动引入 spring-rabbit 和 spring-amqp 模块。
2.2. 配置 RabbitMQ
spring:
rabbitmq:
host: 192.168.184.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
三、快速构建消息系统
- 一个消息队列
- 一个消息发送者
- 一个消息监听者(消费者)
构建示例项目:
mq-demo
:父工程,管理项目依赖publisher
:消息的发送者consumer
:消息的消费者
引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
3.1.消息发送
在publisher
服务中编写测试类SpringAmqpTest
,并利用RabbitTemplate
实现消息发送:
package com.itheima.publisher.amqp;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
3.2.消息接收
在consumer
服务的com.itheima.consumer.listener
包中新建一个类SpringRabbitListener
,代码如下:
package com.itheima.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
四、WorkQueues模型
4.1. 介绍
Work Queues
(工作队列)又叫 任务队列(Task Queues),主要用于将一个任务分发给多个消费者(工作线程)处理,每个任务只会被一个消费者处理。
核心思想是:生产者只管发送任务,多个消费者竞争获取任务并处理,达到并发消费、分担压力的目的。
Producer
(生产者):发送任务消息。Queue
(队列):缓存任务。Consumer
(消费者):从队列中获取任务并处理。
每个任务只会被一个消费者处理,多个消费者之间互不干扰。
4.2. 消息发送
在publisher
服务中的SpringAmqpTest
类中添加一个测试方法实现循环发送:
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
4.3. 消息接收
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
多个消费者监听同一个队列,消息将被平均分配(默认轮询方式)。
4.4. 公平分发 vs 轮询分发
🔁 默认行为:轮询分发
RabbitMQ 默认采用 Round-Robin(轮询) 分发方式,消费者不论是否处理完当前消息,下一条消息仍然会发给它。
这可能导致:处理慢的消费者积压任务,处理快的消费者反而闲着。
✅ 公平分发(prefetch)
设置每个消费者的最大未确认消息数,让 RabbitMQ 只向空闲的消费者发送消息。
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每个消费者同一时间只能处理1条消息