一、RocketMQ
RocketMQ 是由阿里巴巴开源的分布式消息中间件,现为 Apache 顶级项目,专为高并发、高可靠场景设计,广泛应用于电商、金融、物流等领域。其核心架构由 NameServer、Broker、Producer 和 Consumer 四大组件构成,支持异步解耦、流量削峰、顺序事务等复杂业务需求。
1.核心组件与架构
NameServer
轻量级元数据管理服务,负责 Broker 的注册与路由信息维护,无状态设计支持横向扩展。多个 NameServer 节点独立运行,避免单点故障。
Broker
消息存储与转发的核心节点,采用主从架构(Master-Slave)实现高可用。数据通过顺序写盘(CommitLog)和索引(ConsumeQueue)结合的方式持久化,支持同步/异步复制,确保消息零丢失。
Producer/Consumer
Producer:支持同步、异步、单向发送模式,可根据哈希或轮询策略将消息分发到不同队列。
Consumer:支持集群消费(负载均衡)和广播消费(全量订阅),消费失败时触发重试机制,最终进入死信队列(DLQ)。
2.消息模型与核心概念
主题(Topic)
消息的逻辑分类容器,例如电商场景中的订单、支付等业务各对应独立主题。每个主题由多个队列(MessageQueue)组成,支持水平扩展。
队列(MessageQueue)
消息存储的最小物理单元,队列内消息按写入顺序存储,天然支持顺序消费。生产者通过轮询或哈希算法将消息分发到不同队列,消费者组内实例并行处理不同队列以提升吞吐量。
标签(Tag)
主题下的次级分类,用于精细化消息过滤。例如订单主题中可通过 TagA 标识支付成功消息,消费者仅订阅特定标签以减少无效数据传输。
3.关键特性
高性能与高可靠:单机吞吐量达十万级,支持万亿级消息堆积且性能不衰减。通过同步刷盘、主从冗余、事务消息等机制保障数据可靠。
丰富的消息类型:支持普通消息、顺序消息(FIFO)、延时消息(精确到秒级)、事务消息(分布式事务一致性)等。
云原生与弹性扩展:腾讯云、火山引擎等平台提供 Serverless 化部署,支持按消息量弹性扩缩容,存储按实际使用计费,降低运维成本。
4.典型应用场景
异步解耦:订单系统与库存、支付系统通过消息队列解耦,提升系统响应速度。
流量削峰:秒杀场景中,请求先写入队列,下游服务按处理能力消费,避免系统过载。
日志收集:实时采集应用日志,推送至大数据平台(如 Flink、ELK)进行分析。
RocketMQ 凭借其金融级稳定性(历经阿里双十一万亿级流量验证),已成为企业构建分布式系统的核心中间件
5.模型图
官网模型图如下:
二、RocketMQ使用
注:(本文基于Windows安装并采用 JDK8 + Spring Boot 2.7.6 + rocketmq-spring-boot-starter:2.3.1 + RocketMQ 5.3.0环境)
1.下载
按需下载自己需要的版本即可。将下载的zip文件进行解压,然后配置环境变量。
2.配置环境变量
右键“此电脑”->“属性”-》“高级系统设置”-》“环境变量”
2.1.在系统变量里新增ROCKETMQ_HOME
ROCKETMQ_HOME
值为解压路径:D:\MY_CODE\RocketMQ\rocketmq-all-5.3.0-bin-release
2.2.在系统变量Path中新增
%ROCKETMQ_HOME%\bin
3.启动
RocketMQ 需要启动两个服务:NameServer 和 Broker。先启动NameServer后启动Broker。打开CMD直接命令启动。
3.1.启动NameServer
start mqnamesrv.cmd
启动成功如下图所示:(boot success即可)
3.2.启动Broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
启动成功如下图所示:(boot success即可)
NameServer和Broker都启动成功代表服务启动成功了,可使用可视化工具RocketMQ-Dashboard进行连接判断服务是否可用。
3.3.脚本启动
每次启动服务都需要到CMD启动NameServer和Broker太麻烦了,整合成一个.bat脚本启动比较方便。
因为NameServer和Broker有先后关系,所以等NameServer启动后延迟500ms再启动Broker。
@echo off
setlocal enabledelayedexpansion
REM 启动 NameServer(后台运行)
echo [INFO] 正在启动 NameServer...
start /B mqnamesrv.cmd
REM 生成并执行 500ms 延迟脚本
echo [INFO] 等待 NameServer 初始化(500ms)...
echo WScript.Sleep 500 > delay.vbs
cscript //nologo delay.vbs
del /f /q delay.vbs >nul 2>&1
REM 启动 Broker(指定 NameServer 地址)
echo [INFO] 正在启动 Broker...
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
exit
三、可视化工具RocketMQ-Dashboard1.0.0使用
下载RocketMQ-Dashboard源码本地运行,也可打包成jar来运行。
1.下载
目前有两个版本,可按需下载。
2.运行
下载源码后解压。使用IDEA打开源码,装载pom文件,修改rocketmq.config.namesrvAddr为你自己服务地址,启动服务,访问http://localhost:8080/(端口可自行修改)即可。
2.1.修改application.properties文件
server.port默认为8080,可修改,防止冲突。
rocketmq.config.namesrvAddr填上自己的服务地址。
2.2.启动
2.3.访问界面
http://localhost:8080/(http://localhost:9091/)
具体使用就不详细讲述了,可自行下载使用。
2.4.打包jar使用
使用IDEA中maven功能进行打包,打包后在target中找到jar包。
将jar复制出来,写.bat脚本运行jar。后续直接启动脚本即可。
.bat脚本内容如下:
@echo off
setlocal enabledelayedexpansion
REM 配置参数(可修改)
set DASHBOARD_PORT=9091
set NAMESRV_ADDR=127.0.0.1:9876
echo 正在启动 RocketMQ Dashboard...
echo 控制台端口: %DASHBOARD_PORT%
echo NameServer地址: %NAMESRV_ADDR%
java -Dserver.port=%DASHBOARD_PORT% ^
-Drocketmq.config.namesrvAddr=%NAMESRV_ADDR% ^
-jar rocketmq-dashboard-1.0.0.jar
if %errorlevel% neq 0 (
echo 启动失败,请检查:
echo 1. 当前目录是否存在 rocketmq-dashboard-1.0.0.jar
echo 2. Java环境变量是否配置正确
)
pause
四、Spring Boot使用RocketMQ
1.pom引入关键jar
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.7.6</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Boot Starter for RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- RocketMQ Client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
<!-- Lombok 简化代码 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- 引入 FastJSON 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
2.yml文件引入配置
# rocketMQ配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: demo-producer-group # 生产者组
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
consumer:
group: demo-consumer-group # 消费者组
max-reconsume-times: 3 # 消费失败最大重试次数
3.消息体创建
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DemoMessage {
private String id;
// 消息内容
private String content;
// 时间戳
private LocalDateTime timestamp;
}
4.Producer实现
举例了普通消息、同步消息、异步消息、事务消息四种类型,还有很多包括延时消息、单向消息等就没一一列举了。
其中普通消息、同步消息、异步消息使用同一个demo-topic,采用Tag来过滤消费。
**特别注意:**在消息头中设置Tag不生效,可采用topic拼接Tag的方式来添加( “topic:TagA”)。
@Slf4j
@Service
public class ProducerService {
// 普通消息主题
private static final String DEMO_TOPIC = "demo-topic";
// 事务消息主题
private static final String TX_DEMO_TOPIC = "tx-demo-topic";
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 普通发送
*/
public void send(DemoMessage demoMessage) {
Message<DemoMessage> msg = MessageBuilder
.withPayload(demoMessage)
.setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
// .setHeader(RocketMQHeaders.TAGS, "TagD") // 设置Tag过滤
.build();
// 消息头设置Tag的方式无效。采用主题连接Tag的方式。
rocketMQTemplate.send(DEMO_TOPIC + ":TagD", msg);
log.info("普通消息发送成功");
}
/**
* 同步发送消息
*
* @param demoMessage
* @return
*/
public String sendSyncMessage(DemoMessage demoMessage) {
// 构造消息,指定Topic和Tag
Message<DemoMessage> msg = MessageBuilder
.withPayload(demoMessage)
.setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
// .setHeader(RocketMQHeaders.TAGS, "TagA") // 设置Tag过滤
.build();
SendResult result = rocketMQTemplate.syncSend(DEMO_TOPIC + ":TagA", msg);
log.info("同步发送成功,消息ID:{}", result.getMsgId());
return result.getMsgId();
}
/**
* 异步发送消息
*
* @param demoMessage
* @return
*/
public void sendAsyncMessage(DemoMessage demoMessage) {
Message<DemoMessage> msg = MessageBuilder
.withPayload(demoMessage)
.setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
// .setHeader(RocketMQHeaders.TAGS, "TagB") // 设置Tag过滤
.build();
// 异步发送并注册回调
rocketMQTemplate.asyncSend(DEMO_TOPIC + ":TagB", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送成功:" + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.info("异步发送失败:" + e.getMessage());
}
});
}
/**
* 发送事务消息
*
* @param demoMessage
* @return
*/
public LocalTransactionState sendTransactionMessage(DemoMessage demoMessage) {
Message<DemoMessage> msg = MessageBuilder
.withPayload(demoMessage)
.setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
// .setHeader(RocketMQHeaders.TAGS, "TagC") // 设置Tag过滤
.build();
// 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TX_DEMO_TOPIC + ":TagC", msg, null);
log.info("事务消息状态:{}", result.getLocalTransactionState());
return result.getLocalTransactionState();
}
}
5.PushConsumer实现
@Slf4j
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer.group}",
topic = "demo-topic",
selectorExpression = "TagA || TagB", // 只消费TagA和TagB
consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费
messageModel = MessageModel.CLUSTERING // 集群模式
)
public class PushConsumer implements RocketMQListener<DemoMessage> {
@Override
public void onMessage(DemoMessage message) {
log.info("PushConsumer收到消息:" + message);
if (message.getContent().contains("error")) {
throw new RuntimeException("模拟消费失败,触发重试");
}
}
}
6.TransactionConsumer实现
@Slf4j
@Component
@RocketMQMessageListener(
consumerGroup = "tx-consumer-group",
topic = "tx-demo-topic",
selectorExpression = "TagC",
consumeMode = ConsumeMode.CONCURRENTLY
)
public class TransactionConsumer implements RocketMQListener<DemoMessage> {
@Override
public void onMessage(DemoMessage message) {
log.info("事务消息消费成功:" + message);
}
}
7.事务消息监听器实现类
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 执行本地事务
* @param msg RocketMQ 事务消息对象(包含业务数据)
* @param arg 发送事务消息时传入的附加参数
* @return RocketMQLocalTransactionState 事务状态(COMMIT/ROLLBACK/UNKNOWN)
* @throws RuntimeException 当本地事务执行失败时抛出异常
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 模拟本地事务(如数据库操作)
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 检查本地事务状态(用于事务回查)
* @param msg RocketMQ 事务消息对象(包含事务ID等信息)
* @return RocketMQLocalTransactionState 事务最终状态
* @throws RuntimeException 当无法确认事务状态时抛出异常
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 事务回查逻辑
return RocketMQLocalTransactionState.COMMIT;
}
}
8.Controller测试发送
@RequestMapping("/rocketmq")
@RestController
public class RocketMQController {
@Autowired
private ProducerService producerService;
/**
* 普通发送消息
*
* @param message
* @return
*/
@GetMapping("/send/common")
public String sendCommonMessage(@RequestParam(value = "message") String message) {
DemoMessage demoMessage = new DemoMessage(
UUID.randomUUID().toString(),
message,
LocalDateTime.now()
);
producerService.send(demoMessage);
return "消息已发送";
}
/**
* 同步发送消息
*
* @param message
* @return
*/
@GetMapping("/send/sync")
public String sendSyncMessage(@RequestParam(value = "message") String message) {
DemoMessage demoMessage = new DemoMessage(
UUID.randomUUID().toString(),
message,
LocalDateTime.now()
);
String id = producerService.sendSyncMessage(demoMessage);
return "同步发送成功,消息ID:" + id;
}
/**
* 异步发送消息
*
* @param message
* @return
*/
@GetMapping("/send/async")
public String sendAsyncMessage(@RequestParam(value = "message") String message) {
DemoMessage demoMessage = new DemoMessage(
UUID.randomUUID().toString(),
message,
LocalDateTime.now()
);
producerService.sendAsyncMessage(demoMessage);
return "异步发送已触发";
}
/**
* 发送事务消息
*
* @param message
* @return
*/
@GetMapping("/send/transaction")
public String sendTransactionMessage(@RequestParam(value = "message") String message) {
DemoMessage demoMessage = new DemoMessage(
UUID.randomUUID().toString(),
message,
LocalDateTime.now()
);
LocalTransactionState status = producerService.sendTransactionMessage(demoMessage);
return "事务消息状态:" + status;
}
}
9.测试验证
简单发送消息测试一下,看看生产者和消费者是否打印日志。到可视化工具查看消息情况。
总结
1、以上就是RocketMQ结合Spring Boot的使用案例,整体还是比较简单,没有举例那种比较复杂的(分布式事务实现等)。
2、SimpleConsumer、PullConsumer没有举例了,大家可自行拓展。
3、顺序消费也没有进行举例,顺序消费让生产者将顺序消息丢到同一个队列里面去消费即可。
4、重复消费,重复消费的话需要保持幂等性,可以借助redis或者mysql来实现。举例mysql可以设置消息key为唯一索引,每次消费消息前将key写入数据库,若已经消费则插入数据库错误,直接返回即可。
5、还有回溯消费、分布式事务等等,大家可拓展学习。