RocketMq学习笔记
本文记录作者基于RocketMq 4.9x版本对RocketMq部分功能特性的学习,并尝试从源码角度分析其实现原理。
相关文章
RocketMq 5.0 proxy的引入:https://juejin.cn/post/7293788137662758946
RocketMq Docker 集群搭建:https://www.cnblogs.com/xiao987334176/p/16771899.html
RocketMq 整体分析:https://www.cnblogs.com/makemylife/p/17459288.html
RocketMq 存储分析:
1、https://developer.aliyun.com/article/974434
2、https://zhuanlan.zhihu.com/p/509234031
RocketMq 重平衡:https://blog.csdn.net/u010785969/article/details/140284607
存储设计
CommitLog
物理存储结构
- 所有消息以 顺序追加(Append-Only) 方式写入 CommitLog 文件,不同 Topic/Queue 的消息混合存储。
- 文件默认大小为 1GB,命名格式为
00000000000000000000
(起始偏移量),由MappedFileQueue
管理文件队列。 - 依赖 内存映射(Mmap) 技术(
MappedFile
类)提升 IO 效率,通过FileChannel.map()
创建映射缓冲区。
消息格式
每条消息的二进制结构包含:┌────────────┬───────────┬───────────┬───────────┬───────────┐ | Total Size | Magic Code | Body CRC | Queue ID | Body | ... └────────────┴───────────┴───────────┴───────────┴───────────┘
- 关键字段:消息长度、魔数、CRC校验、队列ID、消息体等。
消息写入流程
CommitLog#putMessage()
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 1. 序列化消息
byte[] serializedBody = msg.getBody();
final int bodyLength = serializedBody.length;
// 2. 构建消息存储缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(calMsgLength(msg));
this.appendMessageCallback.doAppend(...); // 填充消息头、体等
// 上锁
putMessageLock.lock();
// 3. 追加到 MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
result = mappedFile.appendMessage(byteBuffer.array(), 0, byteBuffer.limit());
// 4. 刷盘处理(同步/异步)
handleDiskFlush(result, putMessageResult, msg);
// 5. 返回写入结果
return putMessageResult;
}
独占锁实现顺序写
如何保证单机存储写 CommitLog 的顺序性,直观的想法就是对写入动作加独占锁保护,即同一时刻只允许一个线程加锁成功,那么该选什么样的锁实现才合适呢?RocketMQ 目前实现了两种方式。1. 基于 AQS 的 ReentrantLock 2. 基于 CAS 的 SpinLock。
那么什么时候选取 spinlock,什么时候选取 reentranlock?回忆下两种锁的实现,对于 ReentrantLock,底层 AQS 抢不到锁的话会休眠,但是 SpinLock 会一直抢锁,造成明显的 CPU 占用。SpinLock 在 trylock 失败时,可以预期持有锁的线程会很快退出临界区,死循环的忙等待很可能要比进程挂起等待更高效。这也是为什么在高并发下为了保持 CPU 平稳占用而采用方式一,单次请求响应时间短的场景下采用方式二能够减少 CPU 开销。
// 默认useReentrantLockWhenPutMessage==false ,使用SpinLock
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
文件追加
MappedFile#appendMessages()
public AppendMessageResult appendMessage(final byte[] data) {
// 获取当前写指针位置
long currentPos = this.wrotePosition.get();
// 检查文件剩余空间,不足则创建新文件
if (currentPos + data.length > this.fileSize) {
createNewMappedFile(); // 创建新文件并切换
}
// 写入内存映射缓冲区
this.mappedByteBuffer.put(data);
this.wrotePosition.addAndGet(data.length);
return new AppendMessageResult(AppendMessageStatus.PUT_OK);
}
刷盘策略
同步刷盘(GroupCommitService)
由GroupCommitService
线程处理,等待数据持久化到磁盘后返回:public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 主循环:持续处理刷盘请求 while (!this.isStopped()) { try { // 1. 等待任务或超时(10ms) this.waitForRunning(10); // 2. 执行刷盘提交 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // 服务关闭前的兜底处理 try { Thread.sleep(10); // 等待可能的最后一批请求 } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } // 3. 交换请求队列,确保未处理请求被消费 synchronized (this) { this.swapRequests(); } // 4. 最后一次刷盘提交 this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); }
**刷盘请求为啥还要分读写两个列表呢?**这是用来做读写分离用的,Producer 发送消息的请求量是非常大的,GroupCommitService 的刷盘操作是同步的,刷盘期间仍然会有大量的刷盘请求被提交进来,拆分成两个读写列表,请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复,两者可以同时进行。
异步刷盘(FlushCommitLogService)
FlushRealTimeService
线程周期性(默认500ms)刷盘,确保数据从 MappedFile 及时 flush 到磁盘。CommitRealTimeService
先将数据从 Transient Store Pool 提交到 MappedFile,再由后续的 flush 操作完成刷盘
ConsumeQueue
核心作用
- 逻辑队列索引:
ConsumeQueue 是 Topic 下每个队列(MessageQueue)的 逻辑索引,记录消息在 CommitLog 中的物理位置。 - 加速消息定位:
消费者通过 ConsumeQueue 快速定位消息在 CommitLog 中的偏移量,避免全量扫描 CommitLog。 - 支持顺序消费:
通过维护队列的消费进度(Consumer Offset),保证消息的顺序性。
Broker 端的后台服务线程会不停地分发请求并从commitLog读取数据异步构建 consumequeue(消费文件)和 indexfile(索引文件)
每个 consumequeue 文件包含 30 万个条目,每个条目大小是 20 个字节,每个文件的大小是 30 万 * 20 = 60万字节,每个文件大小约 5.72M 。
和 commitlog 文件类似,consumequeue 文件的名称也是以偏移量来命名的,可以通过消息的逻辑偏移量定位消息位于哪一个文件里。
消费者从 Broker 获取订阅消息数据时,不用遍历整个 commitlog 文件,只需要根据逻辑偏移量从 consumequeue 文件查询消息偏移量 , 最后通过定位到 commitlog 文件, 获取真正的消息数据。
CommitLog分发
源码路径:
CommitLog.ReputMessageService
public void run() { while (!isStopped()) { long reputOffset = commitLog.getReputFromOffset(); // 获取待处理偏移量 SelectMappedBufferResult buffer = commitLog.getData(reputOffset); // 读取 CommitLog 数据 DispatchRequest dispatchRequest = commitLog.checkMessageAndReturnSize(buffer); // 解析消息 // 分发到 ConsumeQueue 和 IndexFile DefaultMessageStore.this.doDispatch(dispatchRequest); reputOffset += buffer.getSize(); commitLog.setReputFromOffset(reputOffset); // 更新处理指针 } }
Indexfile
核心作用
加速消息检索
基于消息的 Key(如 MessageID 或用户自定义 Key) 或 时间范围 快速定位消息在 CommitLog 中的物理存储位置,避免全量扫描 CommitLog。IndexFile 存储结构
textCopy Code| Header (40B) | | Slot Table (500W * 4B) | // 哈希槽数组 | Index Entries (2000W * 20B) | // 哈希链表结构
消费拉取方式
推送模式
特点
- 自动拉取消息 :在推送模式下,消费者不需要显式地去拉取消息,而是由 RocketMQ 的 Broker 主动将消息推送给消费者。
- 异步消费 :消息的拉取和消费是异步进行的,消费者只需要注册一个监听器(
MessageListener
),当有新消息到达时,RocketMQ 会自动调用监听器来处理消息。 - 高吞吐量 :由于消息的拉取和消费是异步的,推送模式可以实现较高的吞吐量。
- 负载均衡 :推送模式支持集群消费和广播消费,并且会自动进行负载均衡。
使用场景
- 典型场景 :需要高吞吐量的场景,如日志收集、订单处理等。
- 示例 :消费者只需要注册一个监听器,RocketMQ 会自动将消息推送给消费者并触发消费逻辑。
实现原理
- 长轮询机制 :虽然叫“推送模式”,但实际上 RocketMQ 使用的是长轮询机制。消费者会向 Broker 发起拉取请求,如果当前没有消息,Broker 会挂起请求,直到有新消息到达或超时。
- 异步回调 :当消息到达时,Broker 会通知消费者,消费者通过回调函数(
MessageListener
)来处理消息。 - 负载均衡 :推送模式会自动进行负载均衡,确保消息队列均匀分配给消费者组内的各个消费者实例。
DefaultMQPushConsumerImpl
PullMessageService
是 RocketMQ 中用于管理消息拉取的核心组件,主要负责以下任务:
- 维护拉取请求队列 :
PullMessageService
维护了一个拉取请求队列,所有的拉取请求都会被放入这个队列中。 - 执行拉取操作 :
PullMessageService
从队列中取出拉取请求,并调用底层的网络通信模块向服务端发送拉取消息的请求。 - 触发回调 :当消息从服务端返回时,
PullMessageService
会将消息交给消费者注册的监听器进行处理。
PullMessageService
是一个后台线程,在消费者启动时会被创建并启动。它的核心逻辑在 run()
方法中实现。
public void run() {
while (!this.isStopped()) {
try {
// 从拉取请求队列中获取一个拉取请求
PullRequest pullRequest = this.pullRequestQueue.take();
// 执行拉取操作
this.pullMessage(pullRequest);
} catch (InterruptedException e) {
log.error("PullMessageService interrupted", e);
}
}
}
pullRequestQueue
:这是一个阻塞队列,存储了所有的拉取请求。pullMessage(pullRequest)
:这是执行拉取操作的核心方法。
pullMessage(pullRequest)
方法会根据 PullRequest
中的信息向服务端发送拉取消息的请求。
private void pullMessage(final PullRequest pullRequest) {
// 构造拉取消息的请求
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
// 处理拉取到的消息
processPullResult(pullRequest, pullResult);
}
@Override
public void onException(Throwable e) {
// 处理异常
log.error("Pull message exception", e);
}
};
// 调用底层通信模块发送拉取请求
this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback
);
}
当服务端返回消息后,PullMessageService
会调用 processPullResult()
方法处理拉取结果。
private void processPullResult(PullRequest pullRequest, PullResult pullResult) {
switch (pullResult.getPullStatus()) {
case FOUND:
// 消息找到,提交给消费者处理
List<MessageExt> messages = pullResult.getMsgFoundList();
this.consumeMessage(messages);
break;
case NO_NEW_MSG:
// 没有新消息,重新发起拉取请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_NO_MESSAGE);
break;
case NO_MATCHED_MSG:
// 没有匹配的消息,重新发起拉取请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_NO_MATCHED_MESSAGE);
break;
case OFFSET_ILLEGAL:
// 偏移量非法,重新调整偏移量
this.updateOffset(pullRequest);
break;
}
}
PullRequestHoldService
PullRequestHoldService
是一个非常重要的后台服务,主要用于支持 长轮询(Long Polling) 机制。它是 RocketMQ 实现 Push 模式的核心组件之一,通过该服务,消费者可以在没有新消息时挂起请求,直到有新消息到达或超时为止。
核心流程如下:
1、Broker 端接收到消费者的拉取消息请求后,拉取消息处理器开始处理请求,根据拉取请求查询消息存储 ;
2、从消息存储中获取消息数据 ,若存在新消息 ,则将消息数据通过网络返回给消费者。若无新消息,则将拉取请求放入到拉取请求表 pullRequestTable 。
3、长轮询请求管理服务 pullRequestHoldService 每隔 5 秒从拉取请求表中判断拉取消息请求的队列是否有新的消息。
判定标准是:拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了。
若存在新的消息 , 长轮询请求管理服务会触发拉取消息处理器重新处理该拉取消息请求。
4、当 commitlog 中新增了新的消息,消息分发服务会构建消费文件和索引文件,并且会通知长轮询请求管理服务,触发拉取消息处理器重新处理该拉取消息请求。
suspendPullRequest()
方法
public void suspendPullRequest(String topic, int queueId, PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
otifyMessageArriving()
方法
public void notifyMessageArriving(String topic, int queueId, long maxOffset) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
for (PullRequest request : requestList) {
// 消息匹配返回
if (match) {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
}
// 超过最长等待时间返回为空
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
}
}
}
}
手动拉取模式
特点
- 手动拉取消息 :在手动拉取模式下,消费者需要显式地调用
pull()
方法从 Broker 拉取消息。 - 同步消费 :消费者需要主动控制消息的拉取和消费过程,适合对消息消费有精细控制需求的场景。
- 灵活性高 :由于消费者完全控制消息的拉取和消费,因此可以灵活地调整拉取频率、批量大小等参数。
- 无自动负载均衡 :手动拉取模式不会自动进行负载均衡,开发者需要自行管理消息队列的分配。
使用场景
- 典型场景 :需要对消息消费进行精细控制的场景,如批处理任务、数据迁移等。
- 示例 :消费者需要显式地调用
pull()
方法来拉取消息,并处理返回的消息列表。
实现原理
- 手动拉取 :消费者需要显式地调用
pull()
方法从指定的消息队列中拉取消息。 - 无自动负载均衡 :开发者需要自行管理消息队列的分配和偏移量的维护。
- 灵活性 :开发者可以灵活地控制拉取的频率、批量大小等参数,适合对消息消费有精细控制需求的场景。
MQPullConsumerImpl
DefaultMQPullConsumerImpl被废弃了,DefaultLitePullConsumerImpl替代了它,被废弃的主要原因在于其使用复杂、缺乏自动化以及性能瓶颈等问题。
在以下几个方面进行了显著改进:
- 自动管理偏移量 :开发者无需手动管理消费进度,简化了使用。
- 自动负载均衡 :内置负载均衡机制,减少了开发者的负担。
- 简化 API :提供了更简单的 API,降低了使用复杂度。
- 性能优化 :支持批量拉取和异步拉取,提升了系统的吞吐量。
在 RocketMQ 中,poll
消费模式主要由以下核心类实现:
重平衡
RebalanceService
RebalanceService
是重平衡的实现入口
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
触发时机
RebalanceService
是一个后台线程,默认每 20 秒 执行一次重平衡检查。- 当消费者启动时,会立即触发一次重平衡,以完成队列的初始分配。
- 源码入口:
DefaultMQPushConsumerImpl#start
→MQClientInstance#start
→RebalanceService#start
- 源码入口:
- 消费者停止时(
shutdown()
)- Broker 发送请求解除注册
- Broker 处理之后再向每个消费者发送消费者数量变化消息
- 所有消费者收到后唤醒重平衡线程进行重平衡
重平衡策略
- 平均分配策略
实现类: AllocateMessageQueueAveragely
特点:
将 MessageQueue 均分给所有消费者
当队列数无法整除消费者数时,前序消费者多分配一个队列
适用场景: 消费者节点性能均衡的集群环境
示例: 8队列3消费者 → 分配结果为 3,3,2
轮询平均分配
实现类: AllocateMessageQueueAveragelyByCircle
特点:
使用轮询算法分配队列
分配结果与平均分配类似但顺序不同
示例: 8队列3消费者 → 分配结果为 3,3,2机房临近优先分配
实现类: AllocateMachineRoomNearby
特点:
优先分配同机房 Broker 的队列
需配合 MachineRoomResolver 使用
适用场景: 跨机房部署时减少网络延迟一致性哈希分配
实现类: AllocateMessageQueueConsistentHash
特点:
使用虚拟节点哈希环分配队列
消费者变动时最小化队列重新分配
优势: 节点上下线时队列分配波动小- 算法实现
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
}
return results;
}
分配过程说明
配置分配策略
实现类: AllocateMessageQueueByConfig
特点:
通过配置指定消费者固定消费某些队列
需要预先配置 messageQueueList
适用场景: 需要固定消费关系的特殊场景机房分配策略
实现类: AllocateMessageQueueByMachineRoom
特点:
将指定机房 Broker 的队列分配给消费者
需要配置 consumeridcs 指定目标机房
注意: 需配合特定机房部署架构使用
消费方式
并发消费
并发消费的核心是 允许多个线程同时处理消息 ,以提高消息消费的吞吐量。它的主要特点包括:
- 无序性 :并发消费不要求消息按照发送顺序被处理,因此可以充分利用多线程的优势。
- 高吞吐量 :由于多个线程可以并行处理消息,并发消费能够显著提升系统的吞吐量。
- 负载均衡 :RocketMQ 的负载均衡机制会将消息队列均匀分配给消费者组内的各个消费者实例,每个实例内部再通过多线程并行处理消息。
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 消息小于批次数量,直接提交消费
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 从消息中分割出批次大小进行循环提交消费
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
消息根据consumeBatchSize分割需要的大小,直接提交线程池(线程池的大小通过consumeThreadMax
控制)进行消息的处理
处理消费结果
消息被消费后将会返回消费结果,消费成功则更新消费进度,失败则进行消息重试
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 失败则进行消息重试
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
}
// 移除本地消费队列,更新消费进度
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
updateOffset
在广播模式下,每个消费者实例独立消费所有的消息队列,因此偏移量通常存储在本地,调用
updateOffset()
方法时,偏移量会被更新到本地文件中,而不会提交到 Broker。在集群模式下,消费者组内的多个消费者实例共同消费消息队列,因此偏移量需要存储在 Broker 上,以便所有消费者实例共享消费进度。调用
updateOffset()
方法时,偏方法会将偏移量更新到内存中,并定期提交到 Broker上。// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
如果客户端在这个间隔时间内发生了重启,就可能产生消息重复消费的问题,所以业务必须要保证消息消费的幂等性
顺序消费
顺序消费的核心是 保证消息的消费顺序与消息的发送顺序一致 。
分区顺序消费(Partition Orderly)
- 在同一个消息队列(MessageQueue)中,消息按照发送顺序被消费(消息的存储本身就是按顺序存储的,所以保证只有一个线程从队列一个一个消费即可)。
- 不同的消息队列之间没有顺序性要求。
public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } // 消费前需要拿到消费队列的锁 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { ... } }
因为多个消费队列的消费仍然是不保证顺序的,所以如果真的要使用顺序消费,你如果保证业务上的多个同组消费被放到同一个消费队列
具体做法是:你需要为每组消息定义一个唯一的标识(如订单 ID、用户 ID 等),并确保同一组消息使用相同的标识。
即通过
MessageQueueSelector
指定消息发送到哪个队列。例如:// 定义消息分组的 Key String orderId = "order_123"; // 发送消息时指定队列选择器 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 根据业务 Key 计算队列索引 String key = (String) arg; int queueIndex = Math.abs(key.hashCode()) % mqs.size(); return mqs.get(queueIndex); } }, orderId); // 将 orderId 作为参数传递
全局顺序消费(Global Orderly)
- 所有消息都按照发送顺序被消费。
- 全局顺序消费通常通过将所有消息发送到一个单独的消息队列来实现。
实际上RocketMq本身并没有这两种概念的区分,全局顺序消费实际上是单线程生产者发送消息,单消费队列消费消息的分区顺序消费的特例
消息重试
消息重试的实现本身依赖于RocketMq的另一大功能,延迟消息,前面者的基本原理是将消息发会broker并将消息暂存入延时topic(SCHEDULE_TOPIC_XXXX
),后者在延迟指定时间后写入重试 Topic(%RETRY%<ConsumerGroup>
)而后被重新消费。
消费者只需订阅原始 Topic,RocketMQ 会自动处理订阅重试Topic,并在消息处理前将重试topic转换会原始topic
消息发回
重新设置topic为 %RETRY%,并记录原始topic
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
// 重新设置topic为 %RETRY%<ConsumerGroup>
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
// 记录原始topic
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
// 发回
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
转入延时topic
延时有18个等级,而重试最大16次,所以从第三个延时等级开始算作为重试的第一次
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
// 超过最大重试次数,写入死信队列
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
msgExt.setDelayTimeLevel(0);
} else {
//
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
// 写入commitLog
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
隐式订阅
在 DefaultMQPushConsumerImpl
的 copySubscription()
方法中,客户端会为消费者组自动添加重试 Topic 的订阅:
private void copySubscription() {
try {
// 用户显式订阅的原始 Topic(如 "OrderTopic")
for (Map.Entry<String, String> entry : this.subscriptions.entrySet()) {
String topic = entry.getKey();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, entry.getValue());
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
// 自动订阅重试 Topic(%RETRY% + ConsumerGroup)
String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData retrySubscriptionData = FilterAPI.buildSubscriptionData(retryTopic, "*");
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, retrySubscriptionData);
} catch (Exception e) {
throw new RuntimeException("subscription exception", e);
}
}
消息隐式转换
消息在被提交消费处理前回把重试topic的topic还原会原始topic
public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
msg.setTopic(retryTopic);
}
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
}
延时消息
后文再续