Kafka-Controller角色需要做什么?

发布于:2024-11-23 ⋅ 阅读:(9) ⋅ 点赞:(0)

一、上下文

Kafka-Controller选举》博客中分析了Controller是如何选举出来的,且比如会执行onControllerFailover()。接下来让我们看看Controller角色都承担了哪些职责。

二、注册监听器

在从zookeeper读取资源前,注册监听器以获取 broker/topic 的回调。

    val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
      isrChangeNotificationHandler)
    //依次注册这些 Handler
    //子节点变化监听
    childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)

    val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
    //节点变化监听
    nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)

三、初始化ControllerContext

1、获取所有的broker

其实就是获取brokers/ids/ 目录下的id来得到broker列表

val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
  def getAllBrokerAndEpochsInCluster: Map[Broker, Long] = {
    //从 brokers/ids 目录下获取所有 brokerid  且排好序
    val brokerIds = getSortedBrokerList
    //为每个 brokerid 都封装一个 请求
    val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
    val getDataResponses = retryRequestsUntilConnected(getDataRequests)
    getDataResponses.flatMap { getDataResponse =>
      val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
      getDataResponse.resultCode match {
        case Code.OK =>
          // decode 解读 将json 合 brokerid 构建成 BrokerInfo
          //{
          //    "version":5,
          //    "host":"localhost",
          //    "port":9092,
          //    "jmx_port":9999,
          //    "timestamp":"2233345666",
          //    "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
          //    *"rack":"dc1",
          //    "features": {"feature": {"min_version":1, "first_active_version":2, "max_version":3}}
          //   }
          Some((BrokerIdZNode.decode(brokerId, getDataResponse.data).broker, getDataResponse.stat.getCzxid))
        case Code.NONODE => None
        case _ => throw getDataResponse.resultException.get
      }
    }.toMap
  }

这一步会得到一个Map[Broker, Long]

Broker中有brokerid,也有这台broker的连接信息、机架信息,此时Controller已经知道自己需要管理的broker有哪些,且可以建立通信

2、判断这些broker是否兼容

val (compatibleBrokerAndEpochs, incompatibleBrokerAndEpochs) = partitionOnFeatureCompatibility(curBrokerAndEpochs)

返回的结果中:

compatibleBrokerAndEpochs 为兼容的borker map

incompatibleBrokerAndEpochs 为不兼容的borker map

那么怎么判断一个broker是否兼容呢?我们看看下面的代码:

  private def partitionOnFeatureCompatibility(brokersAndEpochs: Map[Broker, Long]): (Map[Broker, Long], Map[Broker, Long]) = {
    //partition 方法:
    //一对元素,首先,所有满足谓词p的元素,其次,所有不满足谓词p的元素。
    //这两个可迭代集合分别对应filter和filterNot的结果。
    //这里提供的默认实现需要遍历该集合两次。严格集合在StrictOptimizedIterableOps中有一个重写版本的分区,只需要一次遍历。
    brokersAndEpochs.partition {
      case (broker, _) =>
        !config.isFeatureVersioningSupported ||
        !featureCache.getFeatureOption.exists(
          latestFinalizedFeatures =>
            BrokerFeatures.hasIncompatibleFeatures(broker.features,
              latestFinalizedFeatures.finalizedFeatures().asScala.
                map(kv => (kv._1, kv._2.toShort)).toMap))
    }
  }
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported
public enum MetadataVersion {

    IBP_0_8_0(-1, "0.8.0", ""),
    //.....
    IBP_2_7_IV0(-1, "2.7", "IV0"),

    public boolean isFeatureVersioningSupported() {
        return this.isAtLeast(IBP_2_7_IV0);
    }
}

MetadataVersion包含不同的Kafka版本,其中2.7就表示该Kafka集群可以兼容的最小版本。如果某个broker的代码版本低于这个版本,就会判定为不兼容。

3、将兼容的broker设置成live状态

controllerContext.setLiveBrokers(compatibleBrokerAndEpochs)
class ControllerContext extends ControllerChannelContext {

  private val liveBrokers = mutable.Set.empty[Broker]
  private val liveBrokerEpochs = mutable.Map.empty[Int, Long]

  def setLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {
    clearLiveBrokers()
    addLiveBrokers(brokerAndEpochs)
  }

  def addLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {
    liveBrokers ++= brokerAndEpochs.keySet
    liveBrokerEpochs ++= brokerAndEpochs.map { case (broker, brokerEpoch) => (broker.id, brokerEpoch) }
  }

}

其实就是在其内部维护了一个map(liveBrokers ),将存活的broker都放入其中。

4、获取所有的topic

controllerContext.setAllTopics(zkClient.getAllTopicsInCluster(true))
  def getAllTopicsInCluster(registerWatch: Boolean = false): Set[String] = {
    //查看 brokers/topics 下有哪些信息
    val getChildrenResponse = retryRequestUntilConnected(
      GetChildrenRequest(TopicsZNode.path, registerWatch))
    getChildrenResponse.resultCode match {
      case Code.OK => getChildrenResponse.children.toSet
      case Code.NONODE => Set.empty
      case _ => throw getChildrenResponse.resultException.get
    }
  }

从zookeeper的brokers/topics目录下获取所有的topic

  val allTopics = mutable.Set.empty[String]

  def setAllTopics(topics: Set[String]): Unit = {
    allTopics.clear()
    allTopics ++= topics
  }

维护了一个set,将所有的topic都放进去

5、检测每个topic下partition的变化

  registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)

  private def registerPartitionModificationsHandlers(topics: Seq[String]): Unit = {
    topics.foreach { topic =>
      //依次将每个 topic 注册ControllerEventManager中的 队列中 ,监控topic的改变
      val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic)
      partitionModificationsHandlers.put(topic, partitionModificationsHandler)
    }
    partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
  }

topic下有topic_id、partitions、adding_replicas、removing_replicas等信息,当这些发生改变时,kafka也要对正在使用它们的producer、consumer进行调整,详细可以查看processPartitionModifications()

6、获取TopicPartition副本分配信息

val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(controllerContext.allTopics.toSet)
  def getReplicaAssignmentAndTopicIdForTopics(topics: Set[String]): Set[TopicIdReplicaAssignment] = {
    val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
    val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
    getDataResponses.map { getDataResponse =>
      val topic = getDataResponse.ctx.get.asInstanceOf[String]
      getDataResponse.resultCode match {
        case Code.OK => TopicZNode.decode(topic, getDataResponse.data)
        case Code.NONODE => TopicIdReplicaAssignment(topic, None, Map.empty[TopicPartition, ReplicaAssignment])
        case _ => throw getDataResponse.resultException.get
      }
    }.toSet
  }

brokers/topic/partitions下有每个分区对应的副本信息,此时可以到达 TopicPartition -> 副本信息的对应关系,并将最新的对应关系更新到ControllerContext中

    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>
      assignments.foreach { case (topicPartition, replicaAssignment) =>
        //更新分区中的所有副本分配
        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, replicaAssignment)
        if (replicaAssignment.isBeingReassigned)
          controllerContext.partitionsBeingReassigned.add(topicPartition)
      }
    }

7、检测broker改变

  registerBrokerModificationsHandler(controllerContext.liveOrShuttingDownBrokerIds)

  private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
    debug(s"Register BrokerModifications handler for $brokerIds")
    //循环每个broker,如果broker发生改变,会触发processBrokerModification(brokerId) 进行处理
    brokerIds.foreach { brokerId =>
      val brokerModificationsHandler = new BrokerModificationsHandler(eventManager, brokerId)
      zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerModificationsHandler)
      brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)
    }
  }

Kafka-Controller选举》博客中我们已经知道它内部有一个队列,并且有一个循环线程,不停的处理队列中的事件,上面分析的broker改变、partition改变、topic改变都会触发事件,而这些事件都会放入这个对立,进行对应的处理

8、更新所有现有分区的leader和isr缓存

updateLeaderAndIsrCache()
  private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = {
    //TopicPartitionStateZNode.decode   json中有 leader 、leader_epoch、isr、leader_recovery_state、controller_epoch
    //返回一个 map Map[TopicPartition, LeaderIsrAndControllerEpoch]
    // LeaderIsrAndControllerEpoch 中有 leader、ISR、LeaderEpoch 、ControllerEpoch 、ZkVersion、LeaderRecoveryState
    val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
    leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
      controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
    }
  }
class ControllerContext extends ControllerChannelContext {

  private val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]

  def putPartitionLeadershipInfo(partition: TopicPartition,
                                 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
    //设置 分区 leader 信息
    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
    val replicaAssignment = partitionFullReplicaAssignment(partition)
    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
  }
}

在ControllerContext中维护了一个map(TopicPartition -> LeaderIsrAndControllerEpoch)来存放每个分区的leader和isr

9、与每个broker建立通信

controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
class ControllerChannelManager(...){

  protected val brokerStateInfo = new mutable.HashMap[Int, ControllerBrokerStateInfo]

  def startup(initialBrokers: Set[Broker]):Unit = {
    //controller 会与每个broker建立连接 ,这一步只会将 brokerStateInfo 进行填充
    initialBrokers.foreach(addNewBroker)

    //为每个broker启动一个线程进行连接连接
    brokerLock synchronized {
      brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
    }
  }

  private def addNewBroker(broker: Broker): Unit = {
    //......
    val requestThread = new RequestSendThread(config.brokerId, controllerEpoch, messageQueue, networkClient,brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)
    requestThread.setDaemon(false)


    brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
  }

  private def startRequestSendThread(brokerId: Int): Unit = {
    val requestThread = brokerStateInfo(brokerId).requestSendThread
    if (requestThread.getState == Thread.State.NEW)
      requestThread.start()
  }
}

下面我们看下 RequestSendThread中都做了什么?

RequestSendThread继承了ShutdownableThread,它里面会循环调起doWork();

  override def doWork(): Unit = {

    def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)

    val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
    requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS)

    var clientResponse: ClientResponse = null
    try {
      var isSendSuccessful = false
      while (isRunning && !isSendSuccessful) {
        // 如果代理长时间关闭,那么在某个时候,控制器的zookeeper监听器将触发removeBroker,该监听器将在该线程上调用shutdown()。到那时,我们将停止重试。
        try {
          if (!brokerReady()) {
            isSendSuccessful = false
            backoff()
          }
          else {
            val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
              time.milliseconds(), true)
            clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
            isSendSuccessful = true
          }
        } catch {
          case e: Throwable => //如果发送不成功,请重新连接到代理并重新发送消息
            networkClient.close(brokerNode.idString)
            isSendSuccessful = false
            backoff()
        }
      }
      if (clientResponse != null) {
        val requestHeader = clientResponse.requestHeader
        //controller 与broker 之间使用的api有这三个
        // ApiKeys.LEADER_AND_ISR 、ApiKeys.LEADER_AND_ISR 、ApiKeys.UPDATE_METADATA)
        val api = requestHeader.apiKey
        if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.UPDATE_METADATA)
          throw new KafkaException(s"Unexpected apiKey received: $apiKey")

        val response = clientResponse.responseBody

        stateChangeLogger.withControllerEpoch(controllerEpoch()).trace(s"Received response " +
          s"$response for request $api with correlation id " +
          s"${requestHeader.correlationId} sent to broker $brokerNode")

        if (callback != null) {
          callback(response)
        }
      }
    } catch {
      case e: Throwable =>
        //如果出现任何socket错误(例如socket超时),则连接不再可用,需要重新创建。
        networkClient.close(brokerNode.idString)
    }
  }

四、过滤并删除topic

    //topicsToBeDeleted : 要删除的topics列表
    //topicsIneligibleForDeletion : 不符合删除条件的 topics 列表
    val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
    info("Initializing topic deletion manager")
    //正在初始化 topic删除 状态机。
    topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
  private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
    ///admin/delete_topics  下 被标记为删除的 topic
    // 说明 topic被删除,就是在 zk 的目录下 进行标记
    val topicsToBeDeleted = zkClient.getTopicDeletions.toSet
    val topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => {
      //从controllerContext 获取topic的副本 也就是 TopicPartition
      val replicasForTopic = controllerContext.replicasForTopic(topic)
      //判断是否存在  broker 在线  &&  副本的状态是不在线
      replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition))
    }}
    //topic 重新分配中
    val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.map(_.topic)
    // 什么叫不符合删除条件?
    // 副本的状态是不在线  或者 在重新分配 就将该topic标记为 不符合删除条件的topic
    val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
    //要删除的topics列表
    info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
    //不符合删除条件的topics列表
    info(s"List of topics ineligible for deletion: ${topicsIneligibleForDeletion.mkString(",")}")
    (topicsToBeDeleted, topicsIneligibleForDeletion)
  }

五、更新元数据

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
  private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = {
    try {
      brokerRequestBatch.newBatch()
      brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
      //通过api向每个borker发送一下请求
      //ApiKeys.LEADER_AND_ISR
      //ApiKeys.UPDATE_METADATA
      //ApiKeys.STOP_REPLICA
      brokerRequestBatch.sendRequestsToBrokers(epoch)
    } catch {
      case e: IllegalStateException =>
        handleIllegalState(e)
    }
  }

ApiKeys.LEADER_AND_ISR:我们在《Kafka-确定broker中的分区是leader还是follower》中分析过,它会让每个borker启动对所属分区的leader角色或者follower角色,并开始各自角色所负责的任务。

ApiKeys.UPDATE_METADATA:会像每个broker同步最新的元数据

六、启动副本状态机、分区状态机

  val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
  val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))

 replicaStateMachine.startup()
 partitionStateMachine.startup()

1、副本状态机

其中定义了副本可以处于的状态和转换状态的前置状态。replica可能处于的不同状态如下:

1、NewReplica

        controller可以在partition重新分配期间创建新的replicas。在这种状态下,副本只能成为follower状态更改请求。有效的前置状态为NonExistentReplica

2、OnlineReplica

        一旦启动了replica并为其partition分配了部分replica,它就处于这种状态。在这种状态下,它可以成为leader或follower状态更改请求。有效的前置状态为NewReplica、OnlineReplica、OfflineReplica和ReplicaDeletionIneligible

3、OfflineReplica

        如果replica死亡,它将移动到此状态。当承载replica的broker关闭时,就会发生这种情况。有效的前置状态为NewReplica、OnlineReplica、OfflineReplica和ReplicaDeletionIneligible

4、ReplicaDeletionStarted

        如果开始删除replica,它将移动到此状态。有效的前置状态为OfflineReplica

5、ReplicaDeletionSuccessful

        如果replica在响应删除replica请求时没有错误代码,则将其移动到此状态。有效的前置状态为ReplicaDelegationStarted

6、ReplicaDeletionSuccessful

        如果replica删除失败,则将其移动到此状态。有效的前置状态为ReplicaDelegationStarted和OfflineReplica

7、NonExistentReplica

        如果replica被成功删除,它将移动到此状态。有效的前置状态为ReplicaDelegationSuccessful

2、分区状态机

其中定义了分区可以处于的状态和转换状态的前置状态。分区可能处于的不同状态如下:

1、NonExistentPartition

        此状态表示分区从未创建或创建后删除。有效的前置状态(如果存在)是OfflinePartition

2、NewPartition

        创建后,分区处于NewPartition状态。在这种状态下,分区应该分配了副本,但还没有leader/isr。有效的前置状态为NonExistentPartition

3、OnlinePartition

        一旦为分区选出了领导者,它就处于OnlinePartition状态。有效的前置状态为NewPartition/OfflinePartition

4、OfflinePartition

        如果在成功选举领导者后,分区的领导者死亡,则分区将移动到OfflinePartition状态。有效的先前状态为NewPartition/OnlinePartition

七、分区重分配

initializePartitionReassignments()

通过检测/admin/revariation_partions的变化来对未来的分区进行重分配

  private def initializePartitionReassignments(): Unit = {
    //当controller 进行故障转移时,新的重新分配可能已通过Zookeeper提交
    val zkPartitionsResumed = processZkPartitionReassignment()
    // 我们可能还有一些基于API的重新分配需要重新启动
    maybeResumeReassignments { (tp, _) =>
      !zkPartitionsResumed.contains(tp)
    }
  }

八、从副本中选举leader

val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
//尝试为每个给定分区选择一个副本作为领导者。
onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)

 private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
    // 从 zk的 admin/preferred_replica_election 获取首选副本
    // PreferredReplicaElectionZNode.decode
    //  admin/preferred_replica_election/partitions/topic
    //  admin/preferred_replica_election/partitions/partition
    //  封装成TopicPartition(topic, partition)
    val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
    // 检查是否已完成或主题是否已删除
    val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
      val replicas = controllerContext.partitionReplicaAssignment(partition)
      val topicDeleted = replicas.isEmpty
      val successful =
        if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader == replicas.head else false
      successful || topicDeleted
    }
    val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
    val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
    val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion
    //正在进行首选副本选择的分区
    info(s"Partitions undergoing preferred replica election: ${partitionsUndergoingPreferredReplicaElection.mkString(",")}")
    //已完成首选副本选择的分区
    info(s"Partitions that completed preferred replica election: ${partitionsThatCompletedPreferredReplicaElection.mkString(",")}")
    //由于主题删除,跳过分区的首选副本选择
    info(s"Skipping preferred replica election for partitions due to topic deletion: ${pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")}")
    //恢复分区的首选副本选择
    info(s"Resuming preferred replica election for partitions: ${pendingPreferredReplicaElections.mkString(",")}")
    pendingPreferredReplicaElections
  }

九、开始调度

private[controller] val kafkaScheduler = new KafkaScheduler(1)

kafkaScheduler.startup()

    public void startup() {
        log.debug("Initializing task scheduler.");
        synchronized (this) {
            if (isStarted())
                throw new IllegalStateException("This scheduler has already been started.");
            //初始化线程池
            //kafkaController 中的 background.threads  默认 10  既 threads = 10 ,但这里是1
            // 用于各种后台处理任务的线程数
            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threads);
            executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
            executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
            executor.setRemoveOnCancelPolicy(true);
            executor.setThreadFactory(runnable ->
                new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon));
            this.executor = executor;
        }
    }

 十、启动leader自动再平衡任务

auto.leader.rebalance.enable 默认 true ,可以设置成false将其关闭

启用自动 leader 平衡。后台线程定期检查分区 leader 的分布,可由5s配置。如果领导者不平衡超过5s,则触发领导者重新平衡到分区的首选领导者

    if (config.autoLeaderRebalanceEnable) {
      //分区lieader 再平衡任务
      scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
    }

十一、检测Controller变动

Kafka-Controller选举》中提到,KafkaController启动时注册了两个事件:RegisterBrokerAndReelect 和 Startup

RegisterBrokerAndReelect事件是从众多的broker启动时选举出Controller,而Startup事件是检测zookeeper的controller目录,并再次进行controller的选举。因为Controller是集群的核心,必须在有故障时里面选举出来。

  private def processStartup(): Unit = {
    zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
    elect()
  }