RocketMq详解:六、RocketMq的负载均衡机制

发布于:2024-12-07 ⋅ 阅读:(31) ⋅ 点赞:(0)

上一章:《SpringBoot+Aop实现RocketMq的幂等》


1.背景

在RocketMQ中,它的负载均衡主要可以分为

  • Consumer订阅消息的负载均衡
  • Broker端的消息分发策略
  • Producer端的发送消息的负载均衡

其中在消费者端还有一个重量级的组件:Rebalance负载均衡组件,他负责相对均匀的给消费者分配需要拉取的队列信息。

在了解负载均衡组件之前,我们先看看什么是负载均衡以及为什么要使用负载均衡

1.1 什么是负载均衡

负载均衡在分布式服务里会频繁的出现,其主要是用来在多个资源 (一般是服务器)中分配负载,达到最优化资源使用,避免单台服务器过载

RocketMQ中的负载均衡,指的是如何将消息队列(Message Queue)均匀地分配给消费者组中的各个消费者实例

通过负载均衡机制,可以避免某些消费者实例处理过多消息队列而过载,或者某些实例没有消息可处理,从而提高系统的整体处理能力和资源利用率

1.2 负载均衡的意义

在这里插入图片描述

上图是 RocketMQ 的消息储存模型:消息是按照队列的方式分区有序储存的

RocketMQ 的队列模型使得生产者、消费者和读写队列都是多对多的映射关系,彼此之间都可以无限水平扩展

对比传统的消息队列如 RabbitMQ 是很大的优势。尤其是在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好
在这里插入图片描述

消费者消费某个 topic 的消息等同于消费这个 topic 上所有队列的消息(上图中 Consumer A1 消费队列 1,Consumer A2 消费队列 2、3)。

所以,要保证每个消费者的负载尽量均衡,也就是要给这些消费者分配相同数量的队列,并保证在异常情况下(如客户端宕机)队列可以在不同消费者之间迁移

在具体了解RocketMQ的负载均衡策略之前,我们先了解一些RocketMq的整个消费逻辑,以便我们后期可以更好的理解

2.RocketMQ消息消费

2.1 消息的流转过程

RocketMQ 支持两种消费模式:集群消费( Clustering )和广播消费( Broadcasting )。

集群消费:同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。
在这里插入图片描述

广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次
在这里插入图片描述

上面提到了两个名词:消费者组、消息队列
这两个名词在:《RocketMQ 介绍及基本概念》这篇文章中已进行说明,下面在简单概述一下

  • 消费者组

消费者组是 RocketMQ 中负载均衡的基本单位

一个主题(Topic)的消息队列可以被多个消费者组订阅,每个消费者组中的消费者实例会共享消息队列。

如果一个主题有 8 个队列,而一个消费者组有 4 个消费者实例,那么负载均衡机制会将这 8 个队列均匀分配给这 4 个消费者实例。

  • 消息队列

消息队列是 RocketMQ 中消息存储和消费的基本单位

一个主题可以有多个消息队列,这些队列中的消息会被消费者组中的消费者实例消费。通过将消息队列均匀地分配给各个消费者实例,RocketMQ 实现了负载均衡。

2.2 Consumer消费消息的流程

consumer消息消费过程:
在这里插入图片描述

因为广播模式所有的Consumer都会收到全量消息,所以RocketMQ的负载均衡只针对于Consumer集群消费的模式

3.RocketMq的负载均衡策略

3.1 Broker负载均衡

Broker是以group(消费者组)为单位提供服务。

一个group里面分master和slave,master和slave存储的数据一样,slave从master同步数据(同步双写或异步复制看配置)。

通过nameserver暴露给客户端后,只是客户端关心(注册或发送)一个个的topic路由信息。

路由信息中会细化为message queue的路由信息。而message queue会分布在不同的broker group。所以对于客户端来说,分布在不同broker group的message queue为成为一个服务集群,但客户端会把请求分摊到不同的queue。

而由于压力分摊到了不同的queue,不同的queue实际上分布在不同的Broker group,也就是说压力会分摊到不同的broker进程,这样消息的存储和转发均起到了负载均衡的作用。

Broker一旦需要横向扩展,只需要增加broker group,然后把对应的topic建上,客户端的message queue集合即会变大,这样对于broker的负载则由更多的broker group来进行分担。

并且由于每个group下面的topic的配置都是独立的,也就说可以让group1下面的那个topic的queue数量是4,其他group下的topic queue数量是2,这样group1则得到更大的负载。

  • commit log

虽然每个topic下面有很多message queue,但是message queue本身并不存储消息。真正的消息存储会写在SetCommitLog的文件ter,message queue只是存储CommitLog中对应的位置信息,方便通过message queue找到对应存储在CommitLog的消息。

不同的topic,message queue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写

具体如下图:
在这里插入图片描述

3.2 Producer发送消息负载均衡

Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
在这里插入图片描述

注: 另外多个队列可以部署在一台机器上,也可以分别部署在多台不同的机器上

上图所示的若干队列可以部署在一台机器上,也可以分别部署在不同的机器上,发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。另外也可以自定义方式选择发往哪个队列。

RocketMQ的顺序消息发送的时候,就要求我们自己实现队列选择器,根据消息唯一标识选择对应的队列进行发送。

3.3 消费端的负载均衡

3.3.1 Rebalance组件

消费端的负载均衡主要依赖于Rebalance组件,将 Broker 端中多个队列按照某种算法分配给同一个消费组中的不同消费者

Rebalance即再均衡,指的是将一个Topic下的多个Queue在同一个Consumer Group中的多个 Consumer间进行重新分配的过程,它能够提升消息的并行消费能力

RocketMQ 5.0以前是按照队列粒度进行负载均衡的,5.0以后提供了按消息粒度进行负载均衡。

对于4.x/3.x的版本,包括DefaultPushConsumer、DefaultPullConsumer、LitePullConsumer等,默认且仅能使用队列粒度负载均衡策略

  • 队列粒度负载均衡策略

队列粒度负载均衡策略中,同一消费者组内的多个消费者将按照队列粒度消费消息,每个队列只能被其中一个消费者消费

队列粒度负载均衡是在每个消费者端进行的,并不是由某个节点统一进行负载均衡之后将分配结果通知到每个消费者。

费者增加或者减少会影响消息队列的分配消,所以Broker需要感知消费者的上下线情况

消费者在启动时会向所有的Broker发送心跳包进行注册,通知Broker消费者上线,下线的时候也会向Broker发送取消注册的请求

Broker会维护消费者信息的注册信息,在消费者发生变更时会通知消费者进行负载均衡

由于负载均衡是每个客户端独立进行计算,那么何时触发呢?

3.3.2 Rebalance触发时机

  • 消费端启动时,立即进行负载均衡

消费者在启动时会进行一次负载均衡,为自己分配消息队列。

  • 消费端定时任务每隔 20 秒触发负载均衡

消费者本身也会定时执行负载均衡,默认是20s执行一次。
在这里插入图片描述

  • 消费者上下线,Broker 端通知消费者触发负载均衡

如果有消费者向Broker发送UNREGISTER_CLIENT取消注册请求,并且开启了允许通知变更,会触发变更事件。

变更事件同上,Broker会通知该消费者组下的所有消费者进行一次负载均衡。

比如我们动态添加了Consumer进行消费,那么此时肯定是要重新分配一下,也就是触发Rebalance再均衡。

例如,一个Topic下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。如果此时其中一个消费者分配2个队列,给另一个分配3个队列,从而提升消息我们增加一个消费者,那么就可以给的并行消费能力。 如下图:
在这里插入图片描述

  • 消费者所订阅Topic的队列数量发生变化

比如我们动态调整了Topic对应的队列数量,那么此时肯定是要重新分配一下,也就是触发Rebalance再均衡。

例如一个Topic下5个队列,有2个消费者的情况下,那么就可以给其中一个消费者分配2个队列,给另一个分配3个队列,假设我们调整到Topic下有7个队列,还是2个消费者的情况下,那么就可以给其中一消费者分配4个队列,给另一个分配3个队列;从而提升消息的并行消费能力。如下图:
在这里插入图片描述
像Consumer Group扩容或缩容、Consumer与NameServer间发生网络异常、Consumer发生宕机等都会导致消费者组中消费者的数量发生变化。

需要注意的是,由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列,等于是多余的消费者什么都不做,白白浪费

3.3.3 负载均衡流程

1、发送心跳
消费者启动后,"它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包消息消费分组名称、订阅关系集合、消息通信模式和客户端实例编号等信息

Broker 端在收到消费者的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量 consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量 channelinfoTable 中,为之后做消费端的负载均衡提供可以依据的元数据信息。

2、启动负载均衡服务
负载均衡核心代码:
负载均衡服务执行逻辑在doRebalance函数,里面会对每个消费者组执行负载均衡操作。

/* group */
private ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
 
public void doRebalance() {
    //每个消费者组都有负载均衡
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}

由于每个消费者组可能会消费很多topic,每个topic都有自己的不同队列,最终是按topic的维度进行负载均衡。

 public boolean doRebalance(boolean isOrder) {
        boolean balanced = true;
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            Iterator var4 = subTable.entrySet().iterator();

            while(var4.hasNext()) {
                Map.Entry<String, SubscriptionData> entry = (Map.Entry)var4.next();
                String topic = (String)entry.getKey();

                try {
                    if (!this.clientRebalance(topic) && this.tryQueryAssignment(topic)) {
                        balanced = this.getRebalanceResultFromBroker(topic, isOrder);
                    } else {
                        balanced = this.rebalanceByTopic(topic, isOrder);
                    }
                } catch (Throwable var8) {
                    Throwable e = var8;
                    if (!topic.startsWith("%RETRY%")) {
                        log.warn("rebalance Exception", e);
                        balanced = false;
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
        return balanced;
    }

里面最核心的便是:
在这里插入图片描述

这段代码的逻辑如下:
1.clientRebalance(topic):首先调用clientRebalance方法,尝试对指定主题(topic)进行客户端再平衡。如果这个方法返回false,意味着客户端再平衡没有成功或不需要进行。

2.tryQueryAssignment(topic):接着调用tryQueryAssignment方法,尝试从代理服务器(broker)查询最新的订阅分配信息。这个方法通常会在客户端再平衡失败或者不需要进行时被调用,目的是检查是否可以从代理服务器获取新的分配信息。

3.如果上述两个条件都满足(即clientRebalance返回false且tryQueryAssignment成功),则会调用getRebalanceResultFromBroker(topic, isOrder)方法,从代理服务器获取再平衡的结果,并将结果赋值给balanced变量。这通常意味着需要根据来自代理服务器的信息来更新本地的消费队列分配。

4.否则,如果上述两个条件不同时满足,则调用rebalanceByTopic(topic, isOrder)方法,通过其他方式(可能是基于当前已有的分配信息)来进行再平衡,并将结果赋值给balanced变量。

总结来说,这段代码是在判断是否应该从代理服务器获取最新的分配信息来完成再平衡,还是基于现有的分配信息自行处理再平衡。

选择哪条路径取决于clientRebalancetryQueryAssignment方法的执行结果。这种设计允许RocketMQ灵活地应对不同的网络状况和系统状态,以确保消息能够高效、公平地分发给各个消费者。

最终最终负载均衡逻辑处理的实现在:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic。

    private boolean rebalanceByTopic(String topic, boolean isOrder) {
        boolean balanced = true;
        Set mqSet;
        switch (this.messageModel) {
            //广播模式
            case BROADCASTING:
                mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});
                    }

                    balanced = mqSet.equals(this.getWorkingMessageQueue(topic));
                } else {
                    this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
                }
                break;
            //集群模式
            case CLUSTERING:
                mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);
                if (null == mqSet && !topic.startsWith("%RETRY%")) {
                    this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList();
                    mqAll.addAll(mqSet);
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;

                    try {
                        allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                    } catch (Throwable var11) {
                        Throwable e = var11;
                        log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
                        return false;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info("client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }

                    balanced = allocateResultSet.equals(this.getWorkingMessageQueue(topic));
                }
        }

        return balanced;
    }

我们一起来看一下这段代码:
负载均衡服务会根据消费模式为”广播模式”还是“集群模式”做不同的逻辑处理,这里主要来看下集群模式下的主要处理流程:
在这里插入图片描述

(1) 获取该主题下的消息消费队列集合;

(2) 查询 Broker 端获取该消费组下消费者 Id 列表;

(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;
在这里插入图片描述

这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range ,最后遍历整个 range 而计算出当前消费端应该分配到的记录。

(4) 分配到的消息队列集合与 processQueueTable 做一个过滤比对操作
在这里插入图片描述

消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的消费快照。

标红的部分表示与分配到的消息队列集合互不包含,则需要将这些红色队列 Dropped 属性为 true , 然后从 processQueueTable 对象中移除。

绿色的部分表示与分配到的消息队列集合的交集,processQueueTable 对象中已经存在该队列。

黄色的部分表示这些队列需要添加到 processQueueTable 对象中,创建这些队列的消费快照。最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进入拉取消息环节。

3.3.4 Queue分配算法

一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费,而一个Consumer可以同时消费多个Queue中的消息。

那么Queue与Consumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费,也是有算法策略的。

负载均衡策略顶层接口:

/**
 * Strategy Algorithm for message allocating between consumers
 */
public interface AllocateMessageQueueStrategy {
 
    /**
     * Allocating by consumer id
     * 给消费者id分配消费队列
     */
    List<MessageQueue> allocate(
        final String consumerGroup, //消费者组
        final String currentCID, //当前消费者id
        final List<MessageQueue> mqAll, //所有的队列
        final List<String> cidAll //所有的消费者
    );
 
}

他默认共有7种负载均衡策略实现:
在这里插入图片描述

常见的有四种策略,分别是:平均分配策略环形平均策略一致性hash策略同机房策略

这些策略是通过在创建Consumer时的构造器传进去的。

(1)、平均分配策略 (默认)
该算法是根据
$[avg = QueueCount/ ConsumerCount] $的计算结果进行分配的,如果能够整除,则按顺序将avg个Queue逐个分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配
在这里插入图片描述

(2)、环形分配策略
环形平均算法是指,根据消费者的顺序,依次由Queue队列组成的环形图逐个分配,该方法不需要提前计算,如下图:
在这里插入图片描述

(3)、一致性哈希分配策略

该算法会将consumer的hash值作为Node节点存放到hash环上,然后将queue的hash值也放到hash环 上,通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer。
在这里插入图片描述

一致性哈希算法可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance,所以它适合用在Consume数量变化较频繁的场景,如下图:
在这里插入图片描述

但是一致性哈希算法也存在不足,就是分配效率较低,容易导致分配不均的情况。即每个消费者消费的队列数,有可能相差很大,这样就会造成个别消费者压力过大。

我们可以引入虚拟桶,让queue在hash环中尽可能分配均匀

在负载均衡的分配策略中,一致性哈希算法数见不鲜,感兴趣的同学可以移步至:《负载均衡的常见几种算法》
(4)、机房分配策略

该算法会根据queue的部署机房位置consumer的位置,过滤出当前consumer相同机房的queue。

然后按照平均分配策略或环形平均策略对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配。如下图:
在这里插入图片描述

上面我们讲了那么多好处,但是没有什么事情都是能够十分完美的兼容所有,下面我们来辩证的讨论一下负载均衡对消费的影响

3.3.5 负载均衡对消费的影响

Rebalance的在提升消费能力的同时,也带来一些问题

a、消费暂停: 在只有一个Consumer时,其负责消费所有队列;在新增了一个Consumer后会触发 Rebalance的发生。此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。

b、消费重复: Consumer在消费新分配给自己的队列时,必须接着之前Consumer 提交的消费进度的offset 继续消费。然而默认情况下,offset是异步提交的,这个异步性导致提交到Broker的offset与Consumer实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息。

C、消费突刺: 由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息
在这里插入图片描述

上图是真实的一个线上case,这两个时间点在进行应用发布,根据我们上文的分析某个消费者下线后同组的其他消费者感知这一变化需要一定时间,导致有秒级的消费延迟产生。在发布结束后消费者快速处理堆积的消息,可以发现消费速度有一个明显的上涨。

这个例子展示了下线时由于负载均衡带来了短暂的消息处理延迟,新的消费者会从服务端获取消费位点继续之前的消费进度。如果消费者异常宕机或者没有调用 shutdown 优雅下线,没有上传自己的最新消费位点,会使得新分配的消费者重复消费。

当某个客户端触发负载均衡时,就会出现:

  • 对于新分配的队列可能会重复消费,这也是官方要求消费要做好幂等的原因
  • 对于不再负责的队列会短时间消费停止,如果原本的消费 TPS 很高或者正好出现生产高峰就会造成消费毛刺。

为了避免这些影响,则需要我们在使用时注意:

  • 1.避免频繁上下线,为了避免负载均衡的影响应该尽量减少客户端的上下线,同时做好消费幂等;
  • 2.同时在有应用重启或下线前要调用 shutdown 方法,这样服务端在收到客户端的下线请求后会通知客户端及时触发负载均衡,减少消费延迟;
  • 3.选择合适的负载均衡策略;
    • 需要根据业务需要灵活选择负载均衡策略:
    • 需要保证客户端的负载尽可能的均衡:选择默认的平均分配策略;
    • 需要降低应用重启带来的消费延迟:选择一致性哈希的分配策略。
  • 4.RocketMQ 的负载均衡是每个客户端独立进行计算,所以务必要保证每个客户端的负载均衡算法和订阅语句一致:
    • 负载均衡策略不一致会导致多个客户端分配到相同队列或有客户端分不到队列;
    • 订阅语句不一致会导致有消息未能消费。

3.3.6 RocketMQ 5.0 消息级别负载均衡

为了彻底解决客户端负载均衡导致的重复消费和消费延迟问题,RocketMQ 5.0 提出了消息级别的负载均衡机制

同一个队列的消息可以由多个消费者消费,服务端会确保消息不重不漏的被客户端消费到:

消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的

消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时

因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费

在 4.x 的客户端中,顺序消费的实现强依赖于队列的分配

RocketMQ 5.0 在消息维度的负载均衡的基础上也实现了顺序消费的语意:不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费
在这里插入图片描述

如上图所述,队列 Queue1 中有 4 条顺序消息,这 4 条消息属于同一消息组 G1,存储顺序由 M1 到 M4。

在消费过程中,前面的消息 M1、M2 被 消费者Consumer A1 处理时,只要消费状态没有提交,消费者 A2 是无法并行消费后续的 M3、M4 消息的,必须等前面的消息提交消费状态后才能消费后面的消息。

4.RocketMQ指定机器消费设计思路

日常测试环境当中会存在多台consumer进行消费,但实际开发当中某台consumer新上了功能后希望消息只由该机器进行消费进行逻辑覆盖,这个时候consumerGroup的集群模式就会给我们造成困扰,因为消费负载均衡的原因不确定消息具体由哪台consumer进行消费。当然我们可以通过介入consumer的负载均衡机制来实现指定机器消费。

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
 
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        
 
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费
        if (!cidAll.contains(currentCID)) {
            return result;
        }
 
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到