一、上下文
《Kafka-Controller选举》博客中分析了Controller是如何选举出来的,且比如会执行onControllerFailover()。接下来让我们看看Controller角色都承担了哪些职责。
二、注册监听器
在从zookeeper读取资源前,注册监听器以获取 broker/topic 的回调。
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
//依次注册这些 Handler
//子节点变化监听
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
//节点变化监听
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
三、初始化ControllerContext
1、获取所有的broker
其实就是获取brokers/ids/ 目录下的id来得到broker列表
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
def getAllBrokerAndEpochsInCluster: Map[Broker, Long] = {
//从 brokers/ids 目录下获取所有 brokerid 且排好序
val brokerIds = getSortedBrokerList
//为每个 brokerid 都封装一个 请求
val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests)
getDataResponses.flatMap { getDataResponse =>
val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
getDataResponse.resultCode match {
case Code.OK =>
// decode 解读 将json 合 brokerid 构建成 BrokerInfo
//{
// "version":5,
// "host":"localhost",
// "port":9092,
// "jmx_port":9999,
// "timestamp":"2233345666",
// "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
// *"rack":"dc1",
// "features": {"feature": {"min_version":1, "first_active_version":2, "max_version":3}}
// }
Some((BrokerIdZNode.decode(brokerId, getDataResponse.data).broker, getDataResponse.stat.getCzxid))
case Code.NONODE => None
case _ => throw getDataResponse.resultException.get
}
}.toMap
}
这一步会得到一个Map[Broker, Long]
Broker中有brokerid,也有这台broker的连接信息、机架信息,此时Controller已经知道自己需要管理的broker有哪些,且可以建立通信
2、判断这些broker是否兼容
val (compatibleBrokerAndEpochs, incompatibleBrokerAndEpochs) = partitionOnFeatureCompatibility(curBrokerAndEpochs)
返回的结果中:
compatibleBrokerAndEpochs 为兼容的borker map
incompatibleBrokerAndEpochs 为不兼容的borker map
那么怎么判断一个broker是否兼容呢?我们看看下面的代码:
private def partitionOnFeatureCompatibility(brokersAndEpochs: Map[Broker, Long]): (Map[Broker, Long], Map[Broker, Long]) = {
//partition 方法:
//一对元素,首先,所有满足谓词p的元素,其次,所有不满足谓词p的元素。
//这两个可迭代集合分别对应filter和filterNot的结果。
//这里提供的默认实现需要遍历该集合两次。严格集合在StrictOptimizedIterableOps中有一个重写版本的分区,只需要一次遍历。
brokersAndEpochs.partition {
case (broker, _) =>
!config.isFeatureVersioningSupported ||
!featureCache.getFeatureOption.exists(
latestFinalizedFeatures =>
BrokerFeatures.hasIncompatibleFeatures(broker.features,
latestFinalizedFeatures.finalizedFeatures().asScala.
map(kv => (kv._1, kv._2.toShort)).toMap))
}
}
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported
public enum MetadataVersion {
IBP_0_8_0(-1, "0.8.0", ""),
//.....
IBP_2_7_IV0(-1, "2.7", "IV0"),
public boolean isFeatureVersioningSupported() {
return this.isAtLeast(IBP_2_7_IV0);
}
}
MetadataVersion包含不同的Kafka版本,其中2.7就表示该Kafka集群可以兼容的最小版本。如果某个broker的代码版本低于这个版本,就会判定为不兼容。
3、将兼容的broker设置成live状态
controllerContext.setLiveBrokers(compatibleBrokerAndEpochs)
class ControllerContext extends ControllerChannelContext {
private val liveBrokers = mutable.Set.empty[Broker]
private val liveBrokerEpochs = mutable.Map.empty[Int, Long]
def setLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {
clearLiveBrokers()
addLiveBrokers(brokerAndEpochs)
}
def addLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {
liveBrokers ++= brokerAndEpochs.keySet
liveBrokerEpochs ++= brokerAndEpochs.map { case (broker, brokerEpoch) => (broker.id, brokerEpoch) }
}
}
其实就是在其内部维护了一个map(liveBrokers ),将存活的broker都放入其中。
4、获取所有的topic
controllerContext.setAllTopics(zkClient.getAllTopicsInCluster(true))
def getAllTopicsInCluster(registerWatch: Boolean = false): Set[String] = {
//查看 brokers/topics 下有哪些信息
val getChildrenResponse = retryRequestUntilConnected(
GetChildrenRequest(TopicsZNode.path, registerWatch))
getChildrenResponse.resultCode match {
case Code.OK => getChildrenResponse.children.toSet
case Code.NONODE => Set.empty
case _ => throw getChildrenResponse.resultException.get
}
}
从zookeeper的brokers/topics目录下获取所有的topic
val allTopics = mutable.Set.empty[String]
def setAllTopics(topics: Set[String]): Unit = {
allTopics.clear()
allTopics ++= topics
}
维护了一个set,将所有的topic都放进去
5、检测每个topic下partition的变化
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
private def registerPartitionModificationsHandlers(topics: Seq[String]): Unit = {
topics.foreach { topic =>
//依次将每个 topic 注册ControllerEventManager中的 队列中 ,监控topic的改变
val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic)
partitionModificationsHandlers.put(topic, partitionModificationsHandler)
}
partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
}
topic下有topic_id、partitions、adding_replicas、removing_replicas等信息,当这些发生改变时,kafka也要对正在使用它们的producer、consumer进行调整,详细可以查看processPartitionModifications()
6、获取TopicPartition副本分配信息
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(controllerContext.allTopics.toSet)
def getReplicaAssignmentAndTopicIdForTopics(topics: Set[String]): Set[TopicIdReplicaAssignment] = {
val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
getDataResponses.map { getDataResponse =>
val topic = getDataResponse.ctx.get.asInstanceOf[String]
getDataResponse.resultCode match {
case Code.OK => TopicZNode.decode(topic, getDataResponse.data)
case Code.NONODE => TopicIdReplicaAssignment(topic, None, Map.empty[TopicPartition, ReplicaAssignment])
case _ => throw getDataResponse.resultException.get
}
}.toSet
}
brokers/topic/partitions下有每个分区对应的副本信息,此时可以到达 TopicPartition -> 副本信息的对应关系,并将最新的对应关系更新到ControllerContext中
replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>
assignments.foreach { case (topicPartition, replicaAssignment) =>
//更新分区中的所有副本分配
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, replicaAssignment)
if (replicaAssignment.isBeingReassigned)
controllerContext.partitionsBeingReassigned.add(topicPartition)
}
}
7、检测broker改变
registerBrokerModificationsHandler(controllerContext.liveOrShuttingDownBrokerIds)
private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
debug(s"Register BrokerModifications handler for $brokerIds")
//循环每个broker,如果broker发生改变,会触发processBrokerModification(brokerId) 进行处理
brokerIds.foreach { brokerId =>
val brokerModificationsHandler = new BrokerModificationsHandler(eventManager, brokerId)
zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerModificationsHandler)
brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)
}
}
《Kafka-Controller选举》博客中我们已经知道它内部有一个队列,并且有一个循环线程,不停的处理队列中的事件,上面分析的broker改变、partition改变、topic改变都会触发事件,而这些事件都会放入这个对立,进行对应的处理
8、更新所有现有分区的leader和isr缓存
updateLeaderAndIsrCache()
private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = {
//TopicPartitionStateZNode.decode json中有 leader 、leader_epoch、isr、leader_recovery_state、controller_epoch
//返回一个 map Map[TopicPartition, LeaderIsrAndControllerEpoch]
// LeaderIsrAndControllerEpoch 中有 leader、ISR、LeaderEpoch 、ControllerEpoch 、ZkVersion、LeaderRecoveryState
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
}
}
class ControllerContext extends ControllerChannelContext {
private val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
def putPartitionLeadershipInfo(partition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
//设置 分区 leader 信息
val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
val replicaAssignment = partitionFullReplicaAssignment(partition)
updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
}
}
在ControllerContext中维护了一个map(TopicPartition -> LeaderIsrAndControllerEpoch)来存放每个分区的leader和isr
9、与每个broker建立通信
controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
class ControllerChannelManager(...){
protected val brokerStateInfo = new mutable.HashMap[Int, ControllerBrokerStateInfo]
def startup(initialBrokers: Set[Broker]):Unit = {
//controller 会与每个broker建立连接 ,这一步只会将 brokerStateInfo 进行填充
initialBrokers.foreach(addNewBroker)
//为每个broker启动一个线程进行连接连接
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
}
}
private def addNewBroker(broker: Broker): Unit = {
//......
val requestThread = new RequestSendThread(config.brokerId, controllerEpoch, messageQueue, networkClient,brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
}
private def startRequestSendThread(brokerId: Int): Unit = {
val requestThread = brokerStateInfo(brokerId).requestSendThread
if (requestThread.getState == Thread.State.NEW)
requestThread.start()
}
}
下面我们看下 RequestSendThread中都做了什么?
RequestSendThread继承了ShutdownableThread,它里面会循环调起doWork();
override def doWork(): Unit = {
def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS)
var clientResponse: ClientResponse = null
try {
var isSendSuccessful = false
while (isRunning && !isSendSuccessful) {
// 如果代理长时间关闭,那么在某个时候,控制器的zookeeper监听器将触发removeBroker,该监听器将在该线程上调用shutdown()。到那时,我们将停止重试。
try {
if (!brokerReady()) {
isSendSuccessful = false
backoff()
}
else {
val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
time.milliseconds(), true)
clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
isSendSuccessful = true
}
} catch {
case e: Throwable => //如果发送不成功,请重新连接到代理并重新发送消息
networkClient.close(brokerNode.idString)
isSendSuccessful = false
backoff()
}
}
if (clientResponse != null) {
val requestHeader = clientResponse.requestHeader
//controller 与broker 之间使用的api有这三个
// ApiKeys.LEADER_AND_ISR 、ApiKeys.LEADER_AND_ISR 、ApiKeys.UPDATE_METADATA)
val api = requestHeader.apiKey
if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.UPDATE_METADATA)
throw new KafkaException(s"Unexpected apiKey received: $apiKey")
val response = clientResponse.responseBody
stateChangeLogger.withControllerEpoch(controllerEpoch()).trace(s"Received response " +
s"$response for request $api with correlation id " +
s"${requestHeader.correlationId} sent to broker $brokerNode")
if (callback != null) {
callback(response)
}
}
} catch {
case e: Throwable =>
//如果出现任何socket错误(例如socket超时),则连接不再可用,需要重新创建。
networkClient.close(brokerNode.idString)
}
}
四、过滤并删除topic
//topicsToBeDeleted : 要删除的topics列表
//topicsIneligibleForDeletion : 不符合删除条件的 topics 列表
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
info("Initializing topic deletion manager")
//正在初始化 topic删除 状态机。
topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
///admin/delete_topics 下 被标记为删除的 topic
// 说明 topic被删除,就是在 zk 的目录下 进行标记
val topicsToBeDeleted = zkClient.getTopicDeletions.toSet
val topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => {
//从controllerContext 获取topic的副本 也就是 TopicPartition
val replicasForTopic = controllerContext.replicasForTopic(topic)
//判断是否存在 broker 在线 && 副本的状态是不在线
replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition))
}}
//topic 重新分配中
val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.map(_.topic)
// 什么叫不符合删除条件?
// 副本的状态是不在线 或者 在重新分配 就将该topic标记为 不符合删除条件的topic
val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
//要删除的topics列表
info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
//不符合删除条件的topics列表
info(s"List of topics ineligible for deletion: ${topicsIneligibleForDeletion.mkString(",")}")
(topicsToBeDeleted, topicsIneligibleForDeletion)
}
五、更新元数据
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = {
try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
//通过api向每个borker发送一下请求
//ApiKeys.LEADER_AND_ISR
//ApiKeys.UPDATE_METADATA
//ApiKeys.STOP_REPLICA
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
case e: IllegalStateException =>
handleIllegalState(e)
}
}
ApiKeys.LEADER_AND_ISR:我们在《Kafka-确定broker中的分区是leader还是follower》中分析过,它会让每个borker启动对所属分区的leader角色或者follower角色,并开始各自角色所负责的任务。
ApiKeys.UPDATE_METADATA:会像每个broker同步最新的元数据
六、启动副本状态机、分区状态机
val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
replicaStateMachine.startup()
partitionStateMachine.startup()
1、副本状态机
其中定义了副本可以处于的状态和转换状态的前置状态。replica可能处于的不同状态如下:
1、NewReplica
controller可以在partition重新分配期间创建新的replicas。在这种状态下,副本只能成为follower状态更改请求。有效的前置状态为NonExistentReplica
2、OnlineReplica
一旦启动了replica并为其partition分配了部分replica,它就处于这种状态。在这种状态下,它可以成为leader或follower状态更改请求。有效的前置状态为NewReplica、OnlineReplica、OfflineReplica和ReplicaDeletionIneligible
3、OfflineReplica
如果replica死亡,它将移动到此状态。当承载replica的broker关闭时,就会发生这种情况。有效的前置状态为NewReplica、OnlineReplica、OfflineReplica和ReplicaDeletionIneligible
4、ReplicaDeletionStarted
如果开始删除replica,它将移动到此状态。有效的前置状态为OfflineReplica
5、ReplicaDeletionSuccessful
如果replica在响应删除replica请求时没有错误代码,则将其移动到此状态。有效的前置状态为ReplicaDelegationStarted
6、ReplicaDeletionSuccessful
如果replica删除失败,则将其移动到此状态。有效的前置状态为ReplicaDelegationStarted和OfflineReplica
7、NonExistentReplica
如果replica被成功删除,它将移动到此状态。有效的前置状态为ReplicaDelegationSuccessful
2、分区状态机
其中定义了分区可以处于的状态和转换状态的前置状态。分区可能处于的不同状态如下:
1、NonExistentPartition
此状态表示分区从未创建或创建后删除。有效的前置状态(如果存在)是OfflinePartition
2、NewPartition
创建后,分区处于NewPartition状态。在这种状态下,分区应该分配了副本,但还没有leader/isr。有效的前置状态为NonExistentPartition
3、OnlinePartition
一旦为分区选出了领导者,它就处于OnlinePartition状态。有效的前置状态为NewPartition/OfflinePartition
4、OfflinePartition
如果在成功选举领导者后,分区的领导者死亡,则分区将移动到OfflinePartition状态。有效的先前状态为NewPartition/OnlinePartition
七、分区重分配
initializePartitionReassignments()
通过检测/admin/revariation_partions的变化来对未来的分区进行重分配
private def initializePartitionReassignments(): Unit = {
//当controller 进行故障转移时,新的重新分配可能已通过Zookeeper提交
val zkPartitionsResumed = processZkPartitionReassignment()
// 我们可能还有一些基于API的重新分配需要重新启动
maybeResumeReassignments { (tp, _) =>
!zkPartitionsResumed.contains(tp)
}
}
八、从副本中选举leader
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
//尝试为每个给定分区选择一个副本作为领导者。
onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)
private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
// 从 zk的 admin/preferred_replica_election 获取首选副本
// PreferredReplicaElectionZNode.decode
// admin/preferred_replica_election/partitions/topic
// admin/preferred_replica_election/partitions/partition
// 封装成TopicPartition(topic, partition)
val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
// 检查是否已完成或主题是否已删除
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
val topicDeleted = replicas.isEmpty
val successful =
if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader == replicas.head else false
successful || topicDeleted
}
val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion
//正在进行首选副本选择的分区
info(s"Partitions undergoing preferred replica election: ${partitionsUndergoingPreferredReplicaElection.mkString(",")}")
//已完成首选副本选择的分区
info(s"Partitions that completed preferred replica election: ${partitionsThatCompletedPreferredReplicaElection.mkString(",")}")
//由于主题删除,跳过分区的首选副本选择
info(s"Skipping preferred replica election for partitions due to topic deletion: ${pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")}")
//恢复分区的首选副本选择
info(s"Resuming preferred replica election for partitions: ${pendingPreferredReplicaElections.mkString(",")}")
pendingPreferredReplicaElections
}
九、开始调度
private[controller] val kafkaScheduler = new KafkaScheduler(1)
kafkaScheduler.startup()
public void startup() {
log.debug("Initializing task scheduler.");
synchronized (this) {
if (isStarted())
throw new IllegalStateException("This scheduler has already been started.");
//初始化线程池
//kafkaController 中的 background.threads 默认 10 既 threads = 10 ,但这里是1
// 用于各种后台处理任务的线程数
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threads);
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRemoveOnCancelPolicy(true);
executor.setThreadFactory(runnable ->
new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon));
this.executor = executor;
}
}
十、启动leader自动再平衡任务
auto.leader.rebalance.enable 默认 true ,可以设置成false将其关闭
启用自动 leader 平衡。后台线程定期检查分区 leader 的分布,可由5s配置。如果领导者不平衡超过5s,则触发领导者重新平衡到分区的首选领导者
if (config.autoLeaderRebalanceEnable) {
//分区lieader 再平衡任务
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
十一、检测Controller变动
《Kafka-Controller选举》中提到,KafkaController启动时注册了两个事件:RegisterBrokerAndReelect 和 Startup
RegisterBrokerAndReelect事件是从众多的broker启动时选举出Controller,而Startup事件是检测zookeeper的controller目录,并再次进行controller的选举。因为Controller是集群的核心,必须在有故障时里面选举出来。
private def processStartup(): Unit = {
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
}