RocketMQ从部署到spring boot使用完整教程

发布于:2025-04-04 ⋅ 阅读:(36) ⋅ 点赞:(0)

一、RocketMQ

RocketMQ官网文档

RocketMQ 是由阿里巴巴开源的分布式消息中间件,现为 Apache 顶级项目,专为高并发、高可靠场景设计,广泛应用于电商、金融、物流等领域。其核心架构由 ​NameServer、Broker、Producer 和 ​Consumer 四大组件构成,支持异步解耦、流量削峰、顺序事务等复杂业务需求。

1.核心组件与架构

​NameServer
轻量级元数据管理服务,负责 Broker 的注册与路由信息维护,无状态设计支持横向扩展。多个 NameServer 节点独立运行,避免单点故障。

​Broker
消息存储与转发的核心节点,采用主从架构(Master-Slave)实现高可用。数据通过顺序写盘(CommitLog)和索引(ConsumeQueue)结合的方式持久化,支持同步/异步复制,确保消息零丢失。

​Producer/Consumer
​Producer:支持同步、异步、单向发送模式,可根据哈希或轮询策略将消息分发到不同队列。
​Consumer:支持集群消费(负载均衡)和广播消费(全量订阅),消费失败时触发重试机制,最终进入死信队列(DLQ)。

2.消息模型与核心概念

​主题(Topic)​
消息的逻辑分类容器,例如电商场景中的订单、支付等业务各对应独立主题。每个主题由多个队列(MessageQueue)组成,支持水平扩展。

​队列(MessageQueue)​
消息存储的最小物理单元,队列内消息按写入顺序存储,天然支持顺序消费。生产者通过轮询或哈希算法将消息分发到不同队列,消费者组内实例并行处理不同队列以提升吞吐量。

​标签(Tag)​
主题下的次级分类,用于精细化消息过滤。例如订单主题中可通过 TagA 标识支付成功消息,消费者仅订阅特定标签以减少无效数据传输。

3.关键特性

​高性能与高可靠:单机吞吐量达十万级,支持万亿级消息堆积且性能不衰减。通过同步刷盘、主从冗余、事务消息等机制保障数据可靠。

​丰富的消息类型:支持普通消息、顺序消息(FIFO)、延时消息(精确到秒级)、事务消息(分布式事务一致性)等。

​云原生与弹性扩展:腾讯云、火山引擎等平台提供 Serverless 化部署,支持按消息量弹性扩缩容,存储按实际使用计费,降低运维成本。

4.典型应用场景

​异步解耦:订单系统与库存、支付系统通过消息队列解耦,提升系统响应速度。

​流量削峰:秒杀场景中,请求先写入队列,下游服务按处理能力消费,避免系统过载。

​日志收集:实时采集应用日志,推送至大数据平台(如 Flink、ELK)进行分析。

RocketMQ 凭借其金融级稳定性(历经阿里双十一万亿级流量验证),已成为企业构建分布式系统的核心中间件

5.模型图

官网模型图如下:
在这里插入图片描述


二、RocketMQ使用

注:(本文基于Windows安装并采用 JDK8 + Spring Boot 2.7.6 + ​rocketmq-spring-boot-starter:2.3.1 + ​RocketMQ 5.3.0环境)

1.下载

官网下载

按需下载自己需要的版本即可。将下载的zip文件进行解压,然后配置环境变量。
在这里插入图片描述
在这里插入图片描述

2.配置环境变量

右键“此电脑”->“属性”-》“高级系统设置”-》“环境变量”

2.1.在系统变量里新增ROCKETMQ_HOME

ROCKETMQ_HOME
值为解压路径:D:\MY_CODE\RocketMQ\rocketmq-all-5.3.0-bin-release

在这里插入图片描述

2.2.在系统变量Path中新增

%ROCKETMQ_HOME%\bin

在这里插入图片描述

3.启动

RocketMQ 需要启动两个服务:NameServer 和 Broker。先启动NameServer后启动Broker。打开CMD直接命令启动。

3.1.启动NameServer

start mqnamesrv.cmd

启动成功如下图所示:(boot success即可)

在这里插入图片描述

3.2.启动Broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

启动成功如下图所示:(boot success即可)
在这里插入图片描述

NameServer和Broker都启动成功代表服务启动成功了,可使用可视化工具RocketMQ-Dashboard进行连接判断服务是否可用。

3.3.脚本启动

每次启动服务都需要到CMD启动NameServer和Broker太麻烦了,整合成一个.bat脚本启动比较方便。

因为NameServer和Broker有先后关系,所以等NameServer启动后延迟500ms再启动Broker。

@echo off
setlocal enabledelayedexpansion

REM 启动 NameServer(后台运行)
echo [INFO] 正在启动 NameServer...
start /B mqnamesrv.cmd

REM 生成并执行 500ms 延迟脚本
echo [INFO] 等待 NameServer 初始化(500ms)...
echo WScript.Sleep 500 > delay.vbs
cscript //nologo delay.vbs
del /f /q delay.vbs >nul 2>&1

REM 启动 Broker(指定 NameServer 地址)
echo [INFO] 正在启动 Broker...
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

exit

在这里插入图片描述

在这里插入图片描述


三、可视化工具RocketMQ-Dashboard1.0.0使用

下载RocketMQ-Dashboard源码本地运行,也可打包成jar来运行。

1.下载

官网下载

目前有两个版本,可按需下载。
在这里插入图片描述

2.运行

下载源码后解压。使用IDEA打开源码,装载pom文件,修改rocketmq.config.namesrvAddr为你自己服务地址,启动服务,访问http://localhost:8080/(端口可自行修改)即可。

2.1.修改application.properties文件

server.port默认为8080,可修改,防止冲突。

rocketmq.config.namesrvAddr填上自己的服务地址。

在这里插入图片描述

2.2.启动

在这里插入图片描述

2.3.访问界面

http://localhost:8080/(http://localhost:9091/)

具体使用就不详细讲述了,可自行下载使用。
在这里插入图片描述

2.4.打包jar使用

使用IDEA中maven功能进行打包,打包后在target中找到jar包。
在这里插入图片描述
在这里插入图片描述

将jar复制出来,写.bat脚本运行jar。后续直接启动脚本即可。
在这里插入图片描述

.bat脚本内容如下:

@echo off
setlocal enabledelayedexpansion

REM 配置参数(可修改)
set DASHBOARD_PORT=9091
set NAMESRV_ADDR=127.0.0.1:9876

echo 正在启动 RocketMQ Dashboard...
echo 控制台端口: %DASHBOARD_PORT%
echo NameServer地址: %NAMESRV_ADDR%

java -Dserver.port=%DASHBOARD_PORT% ^
     -Drocketmq.config.namesrvAddr=%NAMESRV_ADDR% ^
     -jar rocketmq-dashboard-1.0.0.jar

if %errorlevel% neq 0 (
    echo 启动失败,请检查:
    echo 1. 当前目录是否存在 rocketmq-dashboard-1.0.0.jar
    echo 2. Java环境变量是否配置正确
)
pause

四、Spring Boot使用RocketMQ

1.pom引入关键jar

<properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.7.6</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <!-- Spring Boot Starter for RocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.3.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- RocketMQ Client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>5.3.0</version>
        </dependency>

        <!-- Lombok 简化代码 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 引入 FastJSON 依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

    </dependencies>

2.yml文件引入配置

# rocketMQ配置
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: demo-producer-group  # 生产者组
    send-message-timeout: 3000  # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

  consumer:
    group: demo-consumer-group  # 消费者组
    max-reconsume-times: 3       # 消费失败最大重试次数

3.消息体创建

@Data
@AllArgsConstructor
@NoArgsConstructor
public class DemoMessage {
    private String id;
    // 消息内容
    private String content;
    // 时间戳
    private LocalDateTime timestamp;
}

4.Producer实现

举例了普通消息、同步消息、异步消息、事务消息四种类型,还有很多包括延时消息、单向消息等就没一一列举了。

其中普通消息、同步消息、异步消息使用同一个demo-topic,采用Tag来过滤消费。

**特别注意:**在消息头中设置Tag不生效,可采用topic拼接Tag的方式来添加( “topic:TagA”)。

@Slf4j
@Service
public class ProducerService {
    // 普通消息主题
    private static final String DEMO_TOPIC = "demo-topic";
    // 事务消息主题
    private static final String TX_DEMO_TOPIC = "tx-demo-topic";


    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    /**
     * 普通发送
     */
    public void send(DemoMessage demoMessage) {
        Message<DemoMessage> msg = MessageBuilder
                .withPayload(demoMessage)
                .setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
//                .setHeader(RocketMQHeaders.TAGS, "TagD")  // 设置Tag过滤
                .build();
        // 消息头设置Tag的方式无效。采用主题连接Tag的方式。
        rocketMQTemplate.send(DEMO_TOPIC + ":TagD", msg);
        log.info("普通消息发送成功");
    }

    /**
     * 同步发送消息
     *
     * @param demoMessage
     * @return
     */
    public String sendSyncMessage(DemoMessage demoMessage) {
        // 构造消息,指定Topic和Tag
        Message<DemoMessage> msg = MessageBuilder
                .withPayload(demoMessage)
                .setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
//                .setHeader(RocketMQHeaders.TAGS, "TagA")  // 设置Tag过滤
                .build();
        SendResult result = rocketMQTemplate.syncSend(DEMO_TOPIC + ":TagA", msg);
        log.info("同步发送成功,消息ID:{}", result.getMsgId());
        return result.getMsgId();
    }

    /**
     * 异步发送消息
     *
     * @param demoMessage
     * @return
     */
    public void sendAsyncMessage(DemoMessage demoMessage) {
        Message<DemoMessage> msg = MessageBuilder
                .withPayload(demoMessage)
                .setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
//                .setHeader(RocketMQHeaders.TAGS, "TagB")  // 设置Tag过滤
                .build();
        // 异步发送并注册回调
        rocketMQTemplate.asyncSend(DEMO_TOPIC + ":TagB", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步发送成功:" + sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                log.info("异步发送失败:" + e.getMessage());
            }
        });
    }

    /**
     * 发送事务消息
     *
     * @param demoMessage
     * @return
     */
    public LocalTransactionState sendTransactionMessage(DemoMessage demoMessage) {
        Message<DemoMessage> msg = MessageBuilder
                .withPayload(demoMessage)
                .setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
//                .setHeader(RocketMQHeaders.TAGS, "TagC")  // 设置Tag过滤
                .build();
        // 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TX_DEMO_TOPIC + ":TagC", msg, null);
        log.info("事务消息状态:{}", result.getLocalTransactionState());
        return result.getLocalTransactionState();
    }
}

5.PushConsumer实现

@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer.group}",
        topic = "demo-topic",
        selectorExpression = "TagA || TagB",  // 只消费TagA和TagB
        consumeMode = ConsumeMode.CONCURRENTLY,     // 并发消费
        messageModel = MessageModel.CLUSTERING      // 集群模式
)
public class PushConsumer implements RocketMQListener<DemoMessage> {
    @Override
    public void onMessage(DemoMessage message) {
        log.info("PushConsumer收到消息:" + message);
        if (message.getContent().contains("error")) {
            throw new RuntimeException("模拟消费失败,触发重试");
        }
    }
}

6.TransactionConsumer实现

@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "tx-consumer-group",
        topic = "tx-demo-topic",
        selectorExpression = "TagC",
        consumeMode = ConsumeMode.CONCURRENTLY
)
public class TransactionConsumer implements RocketMQListener<DemoMessage> {
    @Override
    public void onMessage(DemoMessage message) {
        log.info("事务消息消费成功:" + message);
    }
}

7.事务消息监听器实现类

@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    /**
     * 执行本地事务
     * @param msg RocketMQ 事务消息对象(包含业务数据)
     * @param arg 发送事务消息时传入的附加参数
     * @return RocketMQLocalTransactionState 事务状态(COMMIT/ROLLBACK/UNKNOWN)
     * @throws RuntimeException 当本地事务执行失败时抛出异常
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 模拟本地事务(如数据库操作)
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 检查本地事务状态(用于事务回查)
     * @param msg RocketMQ 事务消息对象(包含事务ID等信息)
     * @return RocketMQLocalTransactionState 事务最终状态
     * @throws RuntimeException 当无法确认事务状态时抛出异常
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 事务回查逻辑
        return RocketMQLocalTransactionState.COMMIT;
    }
}

8.Controller测试发送

@RequestMapping("/rocketmq")
@RestController
public class RocketMQController {

    @Autowired
    private ProducerService producerService;

    /**
     * 普通发送消息
     *
     * @param message
     * @return
     */
    @GetMapping("/send/common")
    public String sendCommonMessage(@RequestParam(value = "message") String message) {
        DemoMessage demoMessage = new DemoMessage(
                UUID.randomUUID().toString(),
                message,
                LocalDateTime.now()
        );
        producerService.send(demoMessage);
        return "消息已发送";
    }


    /**
     * 同步发送消息
     *
     * @param message
     * @return
     */
    @GetMapping("/send/sync")
    public String sendSyncMessage(@RequestParam(value = "message") String message) {
        DemoMessage demoMessage = new DemoMessage(
                UUID.randomUUID().toString(),
                message,
                LocalDateTime.now()
        );
        String id = producerService.sendSyncMessage(demoMessage);
        return "同步发送成功,消息ID:" + id;
    }

    /**
     * 异步发送消息
     *
     * @param message
     * @return
     */
    @GetMapping("/send/async")
    public String sendAsyncMessage(@RequestParam(value = "message") String message) {
        DemoMessage demoMessage = new DemoMessage(
                UUID.randomUUID().toString(),
                message,
                LocalDateTime.now()
        );
        producerService.sendAsyncMessage(demoMessage);
        return "异步发送已触发";
    }

    /**
     * 发送事务消息
     *
     * @param message
     * @return
     */
    @GetMapping("/send/transaction")
    public String sendTransactionMessage(@RequestParam(value = "message") String message) {
        DemoMessage demoMessage = new DemoMessage(
                UUID.randomUUID().toString(),
                message,
                LocalDateTime.now()
        );
        LocalTransactionState status = producerService.sendTransactionMessage(demoMessage);
        return "事务消息状态:" + status;
    }

}

9.测试验证

简单发送消息测试一下,看看生产者和消费者是否打印日志。到可视化工具查看消息情况。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


总结

1、以上就是RocketMQ结合Spring Boot的使用案例,整体还是比较简单,没有举例那种比较复杂的(分布式事务实现等)。
2、SimpleConsumer、PullConsumer没有举例了,大家可自行拓展。
3、顺序消费也没有进行举例,顺序消费让生产者将顺序消息丢到同一个队列里面去消费即可。
4、重复消费,重复消费的话需要保持幂等性,可以借助redis或者mysql来实现。举例mysql可以设置消息key为唯一索引,每次消费消息前将key写入数据库,若已经消费则插入数据库错误,直接返回即可。
5、还有回溯消费、分布式事务等等,大家可拓展学习。