关于kafka:kafka源码学习KafkaApisLEADERANDISR

14次阅读

共计 14287 个字符,预计需要花费 36 分钟才能阅读完成。

原文链接:https://fxbing.github.io/2021…
本文源码基于 kafka 0.10.2 版本

​ 每当 controller 产生状态变更时,都会通过调用 sendRequestsToBrokers 办法发送 leaderAndIsrRequest 申请,本文次要介绍 kafka 服务端解决该申请的逻辑和过程。

LEADER_AND_ISR

整体逻辑流程

case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)

在 server 端收到 LEADER_AND_ISR 申请后,会调用 handleLeaderAndIsrRequest 办法进行解决,该办法的解决流程如图所示:

源码

handleLeaderAndIsrRequest

handleLeaderAndIsrRequest函数的逻辑后果次要分为以下几个局部:

  1. 结构 callback 函数onLeadershipChange, 用来回调 coordinator 解决新增的 leader 或者 follower 节点
  2. 校验申请权限,如果校验胜利调用 replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange) 进行后续解决【此处该函数的主流程】,否则,间接返回错误码Errors.CLUSTER_AUTHORIZATION_FAILED.code
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
    // ensureTopicExists is only for client facing requests
    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
    // stop serving data to clients for the topic being deleted
    val correlationId = request.header.correlationId
    val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]

    try {def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
        // for each new leader or follower, call coordinator to handle consumer group migration.
        // this callback is invoked under the replica state change lock to ensure proper order of
        // leadership changes
        updatedLeaders.foreach { partition =>
          if (partition.topic == Topic.GroupMetadataTopicName)
            coordinator.handleGroupImmigration(partition.partitionId)
        }
        updatedFollowers.foreach { partition =>
          if (partition.topic == Topic.GroupMetadataTopicName)
            coordinator.handleGroupEmigration(partition.partitionId)
        }
      }

      val leaderAndIsrResponse =
        if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
          new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
        } else {val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
        }

      requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
    } catch {
      case e: KafkaStorageException =>
        fatal("Disk error during leadership change.", e)
        Runtime.getRuntime.halt(1)
    }
  }

becomeLeaderOrFollower

ReplicaManager的次要工作有以下几个局部,具体代码地位见中文正文:

  1. 校验 controller epoch 是否合规,只解决比本人 epoch 大且本地有正本的 tp 的申请
  2. 调用 makeLeadersmakeFollowers办法结构新增的 leader partition 和 follower partition【此处为次要逻辑,前面小结具体介绍】
  3. 如果是第一次收到申请,启动定时更新 hw 的线程
  4. 停掉空的 Fetcher 线程
  5. 调用回调函数,coordinator 解决新增的 leader partition 和 follower partition
def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
                           metadataCache: MetadataCache,
                           onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
        stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
                                .format(localBrokerId, stateInfo, correlationId,
                                        leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
    }
    // 次要代码,结构返回后果
    replicaStateChangeLock synchronized {val responseMap = new mutable.HashMap[TopicPartition, Short]
        // 如果 controller epoch 不正确,间接返回 Errors.STALE_CONTROLLER_EPOCH.code 错误码
        if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since" +
                                    "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
                                                                                                                  correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
            BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
        } else {
            val controllerId = leaderAndISRRequest.controllerId
            controllerEpoch = leaderAndISRRequest.controllerEpoch

            // First check partition's leader epoch
            // 校验所有的 partition 信息,分为以下 3 种状况://1. 本地不蕴含该 partition,返回 Errors.UNKNOWN_TOPIC_OR_PARTITION.code
            //2. 本地蕴含该 partition,controller epoch 比本地 epoch 大,信息正确
            //3. controller epoch 比本地 epoch 小,返回 Errors.STALE_CONTROLLER_EPOCH.code
            val partitionState = new mutable.HashMap[Partition, PartitionState]()
            leaderAndISRRequest.partitionStates.asScala.foreach {case (topicPartition, stateInfo) =>
                val partition = getOrCreatePartition(topicPartition)
                val partitionLeaderEpoch = partition.getLeaderEpoch
                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
                if (partitionLeaderEpoch < stateInfo.leaderEpoch) {if(stateInfo.replicas.contains(localBrokerId))
                    partitionState.put(partition, stateInfo)
                    else {
                        stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d" +
                                                "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
                                               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                                                       topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
                        responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
                    }
                } else {
                    // Otherwise record the error code in response
                    stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d" +
                                            "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")
                                           .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                                                   topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
                    responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
                }
            }
            // 解决 leader&follower 正本,结构 partitionsBecomeLeader 和 partitionsBecomeFollower 供 callback 解决(coordinator 解决)val partitionsTobeLeader = partitionState.filter {case (_, stateInfo) =>
                stateInfo.leader == localBrokerId
            }
            val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys

            val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
            // 次要调用
            makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
            else
            Set.empty[Partition]
            val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
            // 次要调用
            makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
            else
            Set.empty[Partition]

            // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
            // have been completely populated before starting the checkpointing there by avoiding weird race conditions
            // 在第一次收到收到申请后,就会启动 Scheduler,定时更新 hw checkpoint
            if (!hwThreadInitialized) {startHighWaterMarksCheckPointThread()
                hwThreadInitialized = true
            }
            // 因为下面更新了元信息,此处查看停掉不必要的 Fetcher 线程
            replicaFetcherManager.shutdownIdleFetcherThreads()
            // 回调
            onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
            BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
        }
    }
}

makeLeaders

解决新增的 leader partition

  1. 进行这些 partition 的 follower 线程
  2. 更新这些 partition 的 metadata cache
  3. 结构新增 leader 汇合
private def makeLeaders(controllerId: Int,
                          epoch: Int,
                          partitionState: Map[Partition, PartitionState],
                          correlationId: Int,
                          responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
    // 结构 becomeLeaderOrFollower 须要的返回后果
    for (partition <- partitionState.keys)
      responseMap.put(partition.topicPartition, Errors.NONE.code)

    val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()

    try {
      // First stop fetchers for all the partitions
      // 进行 Fetcher 线程 
        replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
      // Update the partition information to be the leader
      // 结构新增 leader partition 汇合
      partitionState.foreach{case (partition, partitionStateInfo) =>
        if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
          partitionsToMakeLeaders += partition
        else
          stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from" +
            "controller %d epoch %d for partition %s since it is already the leader for the partition.")
            .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
      }
      }
    } catch {
      case e: Throwable =>
        partitionState.keys.foreach { partition =>
          val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
            "epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)
          stateChangeLogger.error(errorMsg, e)
        }
        // Re-throw the exception for it to be caught in KafkaApis
        throw e
    }

    partitionsToMakeLeaders
  }

partition.makeLeader(controllerId, partitionStateInfo, correlationId)会进行元信息的解决,并更新 hw,此办法会调用 maybeIncrementLeaderHW 函数,该函数会尝试追赶 hw:如果其余正本落后 leader 不太远,并且比之前的 hw 大,会延缓 hw 增长速度,尽可能让其余正本进队。

def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
      // to maintain the decision maker controller's epoch in the zookeeper path
      controllerEpoch = partitionStateInfo.controllerEpoch
      // add replicas that are new
      // 结构新 ISR
      allReplicas.foreach(replica => getOrCreateReplica(replica))
      val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
      // remove assigned replicas that have been removed by the controller
      // 移除所有不在新 ISR 中的正本
      (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
      inSyncReplicas = newInSyncReplicas
      leaderEpoch = partitionStateInfo.leaderEpoch
      zkVersion = partitionStateInfo.zkVersion
      // 是否第一次成为该 partition 的 leader
      val isNewLeader =
        if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {false} else {leaderReplicaIdOpt = Some(localBrokerId)
          true
        }
      val leaderReplica = getReplica().get
      val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
      val curTimeMs = time.milliseconds
      // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
      // 新 leader 初始化
      (assignedReplicas - leaderReplica).foreach { replica =>
        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
      }
      // we may need to increment high watermark since ISR could be down to 1
      if (isNewLeader) {
        // construct the high watermark metadata for the new leader replica
        leaderReplica.convertHWToLocalOffsetMetadata()
        // reset log end offset for remote replicas
        assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
      }
      //  尝试追赶 hw, 如果其余正本落后 leader 不太远,并且比之前的 hw 大,会延缓 hw 增长速度,尽可能让其余正本进队
      (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
    }
    // some delayed operations may be unblocked after HW changed
    //  hw 更新后会解决一些 request
    if (leaderHWIncremented)
      tryCompleteDelayedRequests()
    isNewLeader
  }

makeFollowers

解决新增的 follower partition

  1. 从 leaderpartition 汇合中移除这些 partition
  2. 标记为 follower,阻止 producer 申请
  3. 移除 Fetcher 线程
  4. 依据 hw truncate 这些 partition 的本地日志
  5. 清理 producer 和 fetch 申请
  6. 如果没有宕机,从新的 leader fetch 数据
private def makeFollowers(controllerId: Int,
                          epoch: Int,
                          partitionState: Map[Partition, PartitionState],
                          correlationId: Int,
                          responseMap: mutable.Map[TopicPartition, Short],
                          metadataCache: MetadataCache) : Set[Partition] = {
    partitionState.keys.foreach { partition =>
        stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d" +
                                 "starting the become-follower transition for partition %s")
                                .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
    }

    // 结构 becomeLeaderOrFollower 须要的返回后果
    for (partition <- partitionState.keys)
    responseMap.put(partition.topicPartition, Errors.NONE.code)

    val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()

    try {

        // TODO: Delete leaders from LeaderAndIsrRequest
        partitionState.foreach{case (partition, partitionStateInfo) =>
            val newLeaderBrokerId = partitionStateInfo.leader
            metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
                // Only change partition state when the leader is available
                case Some(_) =>
                // 结构返回后果
                if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
                partitionsToMakeFollower += partition
                else
                stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from" +
                                        "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
                                       .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
                                               partition.topicPartition, newLeaderBrokerId))
                case None =>
                // The leader broker should always be present in the metadata cache.
                // If not, we should record the error message and abort the transition process for this partition
                stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
                                         "%d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
                                        .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
                                                partition.topicPartition, newLeaderBrokerId))
                // Create the local replica even if the leader is unavailable. This is required to ensure that we include
                // the partition's high watermark in the checkpoint file (see KAFKA-1647)
                partition.getOrCreateReplica()}
        }
// 移除 Fetcher 线程
        replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
        // 依据新 hw 进行 truncate
        logManager.truncateTo(partitionsToMakeFollower.map { partition =>
            (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
        }.toMap)
        //hw 更新,尝试解决申请
        partitionsToMakeFollower.foreach { partition =>
            val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
            tryCompleteDelayedProduce(topicPartitionOperationKey)
            tryCompleteDelayedFetch(topicPartitionOperationKey)
        }

        if (isShuttingDown.get()) {
            partitionsToMakeFollower.foreach { partition =>
                stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from" +
                                         "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
                                                                                                                     controllerId, epoch, partition.topicPartition))
            }
        }
        else {
            // we do not need to check if the leader exists again since this has been done at the beginning of this process
            // 重置 fetch 地位,退出 Fetcher
            val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
                                                                                           partition.topicPartition -> BrokerAndInitialOffset(metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
                                                                                               partition.getReplica().get.logEndOffset.messageOffset)).toMap
            replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
        }
    } catch {
        case e: Throwable =>
        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d" +
                        "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
        stateChangeLogger.error(errorMsg, e)
        // Re-throw the exception for it to be caught in KafkaApis
        throw e
    }

    partitionsToMakeFollower
}
正文完
 0