原文链接:https://fxbing.github.io/2021...
本文源码基于kafka 0.10.2版本

每当controller产生状态变更时,都会通过调用sendRequestsToBrokers办法发送leaderAndIsrRequest申请,本文次要介绍kafka服务端解决该申请的逻辑和过程。

LEADER_AND_ISR

整体逻辑流程

case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)

在server端收到LEADER_AND_ISR申请后,会调用handleLeaderAndIsrRequest办法进行解决,该办法的解决流程如图所示:

源码

handleLeaderAndIsrRequest

handleLeaderAndIsrRequest函数的逻辑后果次要分为以下几个局部:

  1. 结构callback函数onLeadershipChange,用来回调coordinator解决新增的leader或者follower节点
  2. 校验申请权限,如果校验胜利调用replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)进行后续解决【此处该函数的主流程】,否则,间接返回错误码Errors.CLUSTER_AUTHORIZATION_FAILED.code
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {    // ensureTopicExists is only for client facing requests    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they    // stop serving data to clients for the topic being deleted    val correlationId = request.header.correlationId    val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]    try {      def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {        // for each new leader or follower, call coordinator to handle consumer group migration.        // this callback is invoked under the replica state change lock to ensure proper order of        // leadership changes        updatedLeaders.foreach { partition =>          if (partition.topic == Topic.GroupMetadataTopicName)            coordinator.handleGroupImmigration(partition.partitionId)        }        updatedFollowers.foreach { partition =>          if (partition.topic == Topic.GroupMetadataTopicName)            coordinator.handleGroupEmigration(partition.partitionId)        }      }      val leaderAndIsrResponse =        if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)          new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)        } else {          val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)        }      requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))    } catch {      case e: KafkaStorageException =>        fatal("Disk error during leadership change.", e)        Runtime.getRuntime.halt(1)    }  }

becomeLeaderOrFollower

ReplicaManager的次要工作有以下几个局部,具体代码地位见中文正文:

  1. 校验controller epoch是否合规,只解决比本人epoch大且本地有正本的tp的申请
  2. 调用makeLeadersmakeFollowers办法结构新增的leader partition和follower partition【此处为次要逻辑,前面小结具体介绍】
  3. 如果是第一次收到申请,启动定时更新hw的线程
  4. 停掉空的Fetcher线程
  5. 调用回调函数,coordinator解决新增的leader partition和follower partition
def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,                           metadataCache: MetadataCache,                           onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {    leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>        stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"                                .format(localBrokerId, stateInfo, correlationId,                                        leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))    }    //次要代码,结构返回后果    replicaStateChangeLock synchronized {        val responseMap = new mutable.HashMap[TopicPartition, Short]        //如果controller epoch不正确,间接返回Errors.STALE_CONTROLLER_EPOCH.code错误码        if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +                                    "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,                                                                                                                  correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))            BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)        } else {            val controllerId = leaderAndISRRequest.controllerId            controllerEpoch = leaderAndISRRequest.controllerEpoch            // First check partition's leader epoch            //校验所有的partition信息,分为以下3种状况:            //1. 本地不蕴含该partition,返回Errors.UNKNOWN_TOPIC_OR_PARTITION.code            //2. 本地蕴含该partition,controller epoch比本地epoch大,信息正确            //3. controller epoch比本地epoch小,返回Errors.STALE_CONTROLLER_EPOCH.code            val partitionState = new mutable.HashMap[Partition, PartitionState]()            leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>                val partition = getOrCreatePartition(topicPartition)                val partitionLeaderEpoch = partition.getLeaderEpoch                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path                if (partitionLeaderEpoch < stateInfo.leaderEpoch) {                    if(stateInfo.replicas.contains(localBrokerId))                    partitionState.put(partition, stateInfo)                    else {                        stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +                                                "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")                                               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,                                                       topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))                        responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)                    }                } else {                    // Otherwise record the error code in response                    stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +                                            "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")                                           .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,                                                   topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))                    responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)                }            }            //解决leader&follower正本,结构partitionsBecomeLeader和partitionsBecomeFollower供callback解决(coordinator解决)            val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>                stateInfo.leader == localBrokerId            }            val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys            val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)            // 次要调用            makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)            else            Set.empty[Partition]            val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)            // 次要调用            makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)            else            Set.empty[Partition]            // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions            // have been completely populated before starting the checkpointing there by avoiding weird race conditions            // 在第一次收到收到申请后,就会启动Scheduler,定时更新hw checkpoint            if (!hwThreadInitialized) {                startHighWaterMarksCheckPointThread()                hwThreadInitialized = true            }            // 因为下面更新了元信息,此处查看停掉不必要的Fetcher线程            replicaFetcherManager.shutdownIdleFetcherThreads()            // 回调            onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)            BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)        }    }}

makeLeaders

解决新增的leader partition

  1. 进行这些partition的follower线程
  2. 更新这些partition的metadata cache
  3. 结构新增leader汇合
private def makeLeaders(controllerId: Int,                          epoch: Int,                          partitionState: Map[Partition, PartitionState],                          correlationId: Int,                          responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {    // 结构becomeLeaderOrFollower须要的返回后果    for (partition <- partitionState.keys)      responseMap.put(partition.topicPartition, Errors.NONE.code)    val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()    try {      // First stop fetchers for all the partitions      // 进行Fetcher线程         replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))      // Update the partition information to be the leader      // 结构新增leader partition汇合      partitionState.foreach{ case (partition, partitionStateInfo) =>        if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))          partitionsToMakeLeaders += partition        else          stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +            "controller %d epoch %d for partition %s since it is already the leader for the partition.")            .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))      }      }    } catch {      case e: Throwable =>        partitionState.keys.foreach { partition =>          val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +            " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)          stateChangeLogger.error(errorMsg, e)        }        // Re-throw the exception for it to be caught in KafkaApis        throw e    }    partitionsToMakeLeaders  }

partition.makeLeader(controllerId, partitionStateInfo, correlationId)会进行元信息的解决,并更新hw,此办法会调用maybeIncrementLeaderHW函数,该函数会尝试追赶hw:如果其余正本落后leader不太远,并且比之前的hw大,会延缓hw增长速度,尽可能让其余正本进队。

def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {    val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {      val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr      // to maintain the decision maker controller's epoch in the zookeeper path      controllerEpoch = partitionStateInfo.controllerEpoch      // add replicas that are new      // 结构新ISR      allReplicas.foreach(replica => getOrCreateReplica(replica))      val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet      // remove assigned replicas that have been removed by the controller      // 移除所有不在新ISR中的正本      (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)      inSyncReplicas = newInSyncReplicas      leaderEpoch = partitionStateInfo.leaderEpoch      zkVersion = partitionStateInfo.zkVersion      //是否第一次成为该partition的leader      val isNewLeader =        if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {          false        } else {          leaderReplicaIdOpt = Some(localBrokerId)          true        }      val leaderReplica = getReplica().get      val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset      val curTimeMs = time.milliseconds      // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.      //新leader初始化      (assignedReplicas - leaderReplica).foreach { replica =>        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)      }      // we may need to increment high watermark since ISR could be down to 1      if (isNewLeader) {        // construct the high watermark metadata for the new leader replica        leaderReplica.convertHWToLocalOffsetMetadata()        // reset log end offset for remote replicas        assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))      }      //  尝试追赶hw,如果其余正本落后leader不太远,并且比之前的hw大,会延缓hw增长速度,尽可能让其余正本进队      (maybeIncrementLeaderHW(leaderReplica), isNewLeader)    }    // some delayed operations may be unblocked after HW changed    //  hw更新后会解决一些request    if (leaderHWIncremented)      tryCompleteDelayedRequests()    isNewLeader  }

makeFollowers

解决新增的follower partition

  1. 从leaderpartition汇合中移除这些partition
  2. 标记为follower,阻止producer申请
  3. 移除Fetcher线程
  4. 依据hw truncate这些partition的本地日志
  5. 清理producer和fetch申请
  6. 如果没有宕机,从新的leader fetch数据
private def makeFollowers(controllerId: Int,                          epoch: Int,                          partitionState: Map[Partition, PartitionState],                          correlationId: Int,                          responseMap: mutable.Map[TopicPartition, Short],                          metadataCache: MetadataCache) : Set[Partition] = {    partitionState.keys.foreach { partition =>        stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +                                 "starting the become-follower transition for partition %s")                                .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))    }    // 结构becomeLeaderOrFollower须要的返回后果    for (partition <- partitionState.keys)    responseMap.put(partition.topicPartition, Errors.NONE.code)    val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()    try {        // TODO: Delete leaders from LeaderAndIsrRequest        partitionState.foreach{ case (partition, partitionStateInfo) =>            val newLeaderBrokerId = partitionStateInfo.leader            metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {                // Only change partition state when the leader is available                case Some(_) =>                // 结构返回后果                if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))                partitionsToMakeFollower += partition                else                stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +                                        "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")                                       .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,                                               partition.topicPartition, newLeaderBrokerId))                case None =>                // The leader broker should always be present in the metadata cache.                // If not, we should record the error message and abort the transition process for this partition                stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +                                         " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")                                        .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,                                                partition.topicPartition, newLeaderBrokerId))                // Create the local replica even if the leader is unavailable. This is required to ensure that we include                // the partition's high watermark in the checkpoint file (see KAFKA-1647)                partition.getOrCreateReplica()            }        }//移除Fetcher线程        replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))        //依据新hw进行truncate        logManager.truncateTo(partitionsToMakeFollower.map { partition =>            (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)        }.toMap)        //hw更新,尝试解决申请        partitionsToMakeFollower.foreach { partition =>            val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)            tryCompleteDelayedProduce(topicPartitionOperationKey)            tryCompleteDelayedFetch(topicPartitionOperationKey)        }        if (isShuttingDown.get()) {            partitionsToMakeFollower.foreach { partition =>                stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +                                         "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,                                                                                                                     controllerId, epoch, partition.topicPartition))            }        }        else {            // we do not need to check if the leader exists again since this has been done at the beginning of this process            // 重置fetch地位,退出Fetcher            val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>                                                                                           partition.topicPartition -> BrokerAndInitialOffset(                                                                                               metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),                                                                                               partition.getReplica().get.logEndOffset.messageOffset)).toMap            replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)        }    } catch {        case e: Throwable =>        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +                        "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)        stateChangeLogger.error(errorMsg, e)        // Re-throw the exception for it to be caught in KafkaApis        throw e    }    partitionsToMakeFollower}