共计 14287 个字符,预计需要花费 36 分钟才能阅读完成。
原文链接:https://fxbing.github.io/2021…
本文源码基于 kafka 0.10.2 版本
每当 controller 产生状态变更时,都会通过调用 sendRequestsToBrokers
办法发送 leaderAndIsrRequest
申请,本文次要介绍 kafka 服务端解决该申请的逻辑和过程。
LEADER_AND_ISR
整体逻辑流程
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
在 server 端收到 LEADER_AND_ISR 申请后,会调用 handleLeaderAndIsrRequest
办法进行解决,该办法的解决流程如图所示:
源码
handleLeaderAndIsrRequest
handleLeaderAndIsrRequest
函数的逻辑后果次要分为以下几个局部:
- 结构 callback 函数
onLeadershipChange
, 用来回调 coordinator 解决新增的 leader 或者 follower 节点 - 校验申请权限,如果校验胜利调用
replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
进行后续解决【此处该函数的主流程】,否则,间接返回错误码Errors.CLUSTER_AUTHORIZATION_FAILED.code
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val correlationId = request.header.correlationId
val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
try {def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
// for each new leader or follower, call coordinator to handle consumer group migration.
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
val leaderAndIsrResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
} else {val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
}
requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
} catch {
case e: KafkaStorageException =>
fatal("Disk error during leadership change.", e)
Runtime.getRuntime.halt(1)
}
}
becomeLeaderOrFollower
ReplicaManager
的次要工作有以下几个局部,具体代码地位见中文正文:
- 校验 controller epoch 是否合规,只解决比本人 epoch 大且本地有正本的 tp 的申请
- 调用
makeLeaders
和makeFollowers
办法结构新增的 leader partition 和 follower partition【此处为次要逻辑,前面小结具体介绍】 - 如果是第一次收到申请,启动定时更新 hw 的线程
- 停掉空的 Fetcher 线程
- 调用回调函数,coordinator 解决新增的 leader partition 和 follower partition
def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
metadataCache: MetadataCache,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, stateInfo, correlationId,
leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
}
// 次要代码,结构返回后果
replicaStateChangeLock synchronized {val responseMap = new mutable.HashMap[TopicPartition, Short]
// 如果 controller epoch 不正确,间接返回 Errors.STALE_CONTROLLER_EPOCH.code 错误码
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since" +
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val controllerId = leaderAndISRRequest.controllerId
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
// 校验所有的 partition 信息,分为以下 3 种状况://1. 本地不蕴含该 partition,返回 Errors.UNKNOWN_TOPIC_OR_PARTITION.code
//2. 本地蕴含该 partition,controller epoch 比本地 epoch 大,信息正确
//3. controller epoch 比本地 epoch 小,返回 Errors.STALE_CONTROLLER_EPOCH.code
val partitionState = new mutable.HashMap[Partition, PartitionState]()
leaderAndISRRequest.partitionStates.asScala.foreach {case (topicPartition, stateInfo) =>
val partition = getOrCreatePartition(topicPartition)
val partitionLeaderEpoch = partition.getLeaderEpoch
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
if (partitionLeaderEpoch < stateInfo.leaderEpoch) {if(stateInfo.replicas.contains(localBrokerId))
partitionState.put(partition, stateInfo)
else {
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d" +
"epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
} else {
// Otherwise record the error code in response
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d" +
"epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
}
}
// 解决 leader&follower 正本,结构 partitionsBecomeLeader 和 partitionsBecomeFollower 供 callback 解决(coordinator 解决)val partitionsTobeLeader = partitionState.filter {case (_, stateInfo) =>
stateInfo.leader == localBrokerId
}
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
// 次要调用
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
// 次要调用
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
else
Set.empty[Partition]
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
// 在第一次收到收到申请后,就会启动 Scheduler,定时更新 hw checkpoint
if (!hwThreadInitialized) {startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
// 因为下面更新了元信息,此处查看停掉不必要的 Fetcher 线程
replicaFetcherManager.shutdownIdleFetcherThreads()
// 回调
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
}
}
}
makeLeaders
解决新增的 leader partition
- 进行这些 partition 的 follower 线程
- 更新这些 partition 的 metadata cache
- 结构新增 leader 汇合
private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
// 结构 becomeLeaderOrFollower 须要的返回后果
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE.code)
val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
try {
// First stop fetchers for all the partitions
// 进行 Fetcher 线程
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
// Update the partition information to be the leader
// 结构新增 leader partition 汇合
partitionState.foreach{case (partition, partitionStateInfo) =>
if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
partitionsToMakeLeaders += partition
else
stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from" +
"controller %d epoch %d for partition %s since it is already the leader for the partition.")
.format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
}
}
} catch {
case e: Throwable =>
partitionState.keys.foreach { partition =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
"epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)
stateChangeLogger.error(errorMsg, e)
}
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionsToMakeLeaders
}
partition.makeLeader(controllerId, partitionStateInfo, correlationId)
会进行元信息的解决,并更新 hw,此办法会调用 maybeIncrementLeaderHW
函数,该函数会尝试追赶 hw:如果其余正本落后 leader 不太远,并且比之前的 hw 大,会延缓 hw 增长速度,尽可能让其余正本进队。
def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.controllerEpoch
// add replicas that are new
// 结构新 ISR
allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
// remove assigned replicas that have been removed by the controller
// 移除所有不在新 ISR 中的正本
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas
leaderEpoch = partitionStateInfo.leaderEpoch
zkVersion = partitionStateInfo.zkVersion
// 是否第一次成为该 partition 的 leader
val isNewLeader =
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {false} else {leaderReplicaIdOpt = Some(localBrokerId)
true
}
val leaderReplica = getReplica().get
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curTimeMs = time.milliseconds
// initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
// 新 leader 初始化
(assignedReplicas - leaderReplica).foreach { replica =>
val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
}
// we may need to increment high watermark since ISR could be down to 1
if (isNewLeader) {
// construct the high watermark metadata for the new leader replica
leaderReplica.convertHWToLocalOffsetMetadata()
// reset log end offset for remote replicas
assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
}
// 尝试追赶 hw, 如果其余正本落后 leader 不太远,并且比之前的 hw 大,会延缓 hw 增长速度,尽可能让其余正本进队
(maybeIncrementLeaderHW(leaderReplica), isNewLeader)
}
// some delayed operations may be unblocked after HW changed
// hw 更新后会解决一些 request
if (leaderHWIncremented)
tryCompleteDelayedRequests()
isNewLeader
}
makeFollowers
解决新增的 follower partition
- 从 leaderpartition 汇合中移除这些 partition
- 标记为 follower,阻止 producer 申请
- 移除 Fetcher 线程
- 依据 hw truncate 这些 partition 的本地日志
- 清理 producer 和 fetch 申请
- 如果没有宕机,从新的 leader fetch 数据
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short],
metadataCache: MetadataCache) : Set[Partition] = {
partitionState.keys.foreach { partition =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d" +
"starting the become-follower transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
}
// 结构 becomeLeaderOrFollower 须要的返回后果
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE.code)
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
// TODO: Delete leaders from LeaderAndIsrRequest
partitionState.foreach{case (partition, partitionStateInfo) =>
val newLeaderBrokerId = partitionStateInfo.leader
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(_) =>
// 结构返回后果
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition
else
stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from" +
"controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
.format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
partition.topicPartition, newLeaderBrokerId))
case None =>
// The leader broker should always be present in the metadata cache.
// If not, we should record the error message and abort the transition process for this partition
stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
"%d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
.format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
partition.topicPartition, newLeaderBrokerId))
// Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647)
partition.getOrCreateReplica()}
}
// 移除 Fetcher 线程
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
// 依据新 hw 进行 truncate
logManager.truncateTo(partitionsToMakeFollower.map { partition =>
(partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
}.toMap)
//hw 更新,尝试解决申请
partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
tryCompleteDelayedProduce(topicPartitionOperationKey)
tryCompleteDelayedFetch(topicPartitionOperationKey)
}
if (isShuttingDown.get()) {
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from" +
"controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
controllerId, epoch, partition.topicPartition))
}
}
else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
// 重置 fetch 地位,退出 Fetcher
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
partition.topicPartition -> BrokerAndInitialOffset(metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
partition.getReplica().get.logEndOffset.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
}
} catch {
case e: Throwable =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d" +
"epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
stateChangeLogger.error(errorMsg, e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionsToMakeFollower
}
正文完