@[TOC]

目录

思考几个问题

  1. 什么是分区状态机?
  2. 创立Topic的时候如何选举Leader?
  3. 分区的所有正本都不在线, 这个时候启动一台之前不在ISR内的正本,它会入选为Leader吗?
  4. 当所有正本都不在线,而后一个一个重启Broker上正本上线,谁会入选为Leader?谁先启动就谁入选吗?
  5. Broker下线了,Leader切换给了其余正本, 当Broker重启的时候,Leader会还给之前的正本吗?
  6. 选举胜利的那一刻, 生产者和生产着都做了哪些事件?
  7. Leader选举期间对分区的影响

<font color=blue>为更好的浏览体验,和及时的勘误</font>
请拜访原文链接:你想晓得的所有对于Kafka Leader选举流程和选举策略都在这(内含12张高清图,倡议珍藏)

分区Leader选举流程剖析

在开始源码剖析之前, 大家先看上面这张图, 好让本人对Leader选举有一个十分清晰的认知,而后再去看前面的源码剖析文章,会更容易了解。

整个流程分为三大块

  1. 触发选举场景 图左
  2. 执行选举流程 图中
  3. Leader选举策略 图右

分区状态机

首先大家得理解两个状态机

1. 分区状态机 管制分区状态流转

2. 正本状态机 管制正本状态流转

这里咱们次要解说分区状态机,这张图示意的是分区状态机

  1. NonExistentPartition :分区在将要被创立之前的初始状态是这个,示意不存在
  2. NewPartition: 示意正在创立新的分区, 是一个中间状态, 这个时候只是在Controller的内存中存了状态信息
  3. OnlinePartition: 在线状态, 失常的分区就应该是这种状态,只有在线的分区才可能提供服务
  4. OfflinePartition: 下线状态, 分区可能因为Broker宕机或者删除Topic等起因流转到这个状态, 下线了就不能提供服务了
  5. NonExistentPartition: 分区不存在的状态, 当Topic删除实现胜利之后, 就会流转到这个状态, 当还处在删除中的时候,还是停留在下线状态。

咱们明天要讲的Leader选举
就是在之前状态=>OnlinePartition状态的时候产生的。

Leader选举流程剖析

源码入口:

PartitionStateMachine#electLeaderForPartitions

 /** * 状态机流转 解决 **/  private def doHandleStateChanges(    partitions: Seq[TopicPartition],    targetState: PartitionState,    partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {    //不相干的代码省略了        targetState match {     // 不相干的代码省略了, 这里状态流程到 OnlinePartition      case OnlinePartition =>        // 分区状态是 OfflinePartition 或者  OnlinePartition 的话 就都须要执行一下选举策略        if (partitionsToElectLeader.nonEmpty) {          // 依据选举策略 进行选举。这里只是找出          val electionResults = electLeaderForPartitions(            partitionsToElectLeader,            partitionLeaderElectionStrategyOpt.getOrElse(              throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")            )          )         }        }

能够看到 咱们最终是调用了doElectLeaderForPartitions 执行分区Leader选举。

PartitionStateMachine#doElectLeaderForPartitions

  // 删除了局部无关代码  private def doElectLeaderForPartitions(    partitions: Seq[TopicPartition],    partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy  ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {     // 去zookeeper节点 /broker/topics/{topic名称}/partitions/{分区号}/state 节点读取根本信息。    val getDataResponses = try {      zkClient.getTopicPartitionStatesRaw(partitions)    }         val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]    val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]      // 遍历从zk中获取的数据返回信息    getDataResponses.foreach { getDataResponse =>      val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]      // 以后分区状态      val currState = partitionState(partition)      if (getDataResponse.resultCode == Code.OK) {        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {          case Some(leaderIsrAndControllerEpoch) =>            if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {              //...            } else {              // 把通过校验的leaderandisr信息 保留到列表              validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr            }          case None =>            //...        }      } else if (getDataResponse.resultCode == Code.NONODE) {       //...      } else {       //...      }    }    // 如果没有 无效的分区,则间接返回    if (validLeaderAndIsrs.isEmpty) {      return (failedElections.toMap, Seq.empty)    }    // 依据入参 传入的 选举策略 来抉择Leader    val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {           // 离线分区 策略 (allowUnclean 示意的是是否容许脏正本参加选举, 如果这里是true,则疏忽topic自身的unclean.leader.election.enable 配置,如果是false,则会思考 unclean.leader.election.enable 的配置。 )      case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>        // 这里就是判断allowUnclean的参数,如果这里是true,则疏忽topic自身的unclean.leader.election.enable 配置,如果是false,则会思考 unclean.leader.election.enable 的配置。因为每个topic的配置可能不一样,所以这里组装每个分区的信息和allowUnclean 返回        val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(          validLeaderAndIsrs,          allowUnclean        )        // 去抉择一个适合的正本 来入选 leader。这里只是计算失去了一个值们还没有真的入选哈        leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)      // 分区正本重调配Leader 选举策略        case ReassignPartitionLeaderElectionStrategy =>       // 去抉择一个适合的正本 来入选 leader。这里只是计算失去了一个值们还没有真的入选哈        leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)            // 优先正本选举策略      case PreferredReplicaPartitionLeaderElectionStrategy =>        // 去抉择一个适合的正本 来入选 leader。这里只是计算失去了一个值们还没有真的入选哈        leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)                // 受控关机策略      case ControlledShutdownPartitionLeaderElectionStrategy =>         // 去抉择一个适合的正本 来入选 leader。这里只是计算失去了一个值们还没有真的入选哈        leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)    }    // 这里是下面策略 没有找到Leader的所有分区,遍历一下,打一个异样日志。    partitionsWithoutLeaders.foreach { electionResult =>      val partition = electionResult.topicPartition      val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"      failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))    }        // 整顿一下下面计算失去哦的后果    val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap    val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap    // 这里去把leader和isr的信息写入到zk中去啦  节点 /broker/topics/{topic名称}/partitions/{分区号}/state     val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(      adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)         // 遍历更新实现的分区, 而后更新Controller外面的分区leader和isr的内存信息 并发送LeaderAndISR申请    finishedUpdates.foreach { case (partition, result) =>      result.right.foreach { leaderAndIsr =>        val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)        val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)        // 更新内存        controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)        // 发送LeaderAndIsr申请      controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,          leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)      }    }    (finishedUpdates ++ failedElections, updatesToRetry)  }

总结一下下面的源码

  1. 去zookeeper节点 /broker/topics/{topic名称}/partitions/{分区号}/state 节点读取根本信息。
  2. 遍历从zk中获取的leaderIsrAndControllerEpoch信息,做一些简略的校验:zk中获取的数据的controllerEpoch必须<=以后的Controller的controller_epoch。最终失去 validLeaderAndIsrs, controller_epoch 就是用来避免脑裂的, 当有两个Controller入选的时候,他们的epoch必定不一样, 那么最新的epoch才是真的Controller
  3. 如果没有获取到无效的validLeaderAndIsrs 信息 则间接返回
  4. 依据入参partitionLeaderElectionStrategy 来匹配不同的Leader选举策略。来选出适合的Leader和ISR信息
  5. 依据下面的选举策略选出的 LeaderAndIsr 信息进行遍历, 将它们一个个写入到zookeeper节点 /broker/topics/{topic名称}/partitions/{分区号}/state 中。 (当然如果下面没有抉择出适合的leader,那么久不会有这个过程了)
  6. 遍历下面写入zk胜利的分区, 而后更新Controller外面的分区leader和isr的内存信息 并发送LeaderAndISR申请,告诉对应的Broker Leader更新了。

看下面的Leader选举策略是不是很简略, 然而两头到底是如何抉择Leader的?
这个是依据传入的策略类型, 来做不同的抉择

那么有哪些策略呢?以及什么时候触发这些选举呢?

分区的几种策略以及对应的触发场景

1. OfflinePartitionLeaderElectionStrategy

遍历分区的AR, 找到第一个满足以下条件的正本:

  1. 正本在线
  2. 在ISR中

如果找不到满足条件的正本,那么再依据 传入的参数allowUnclean判断

  1. allowUnclean=true:AR程序中所有在线正本中的第一个正本。
  2. allowUnclean=false: 须要去查问配置 unclean.leader.election.enable 的值。
    若=true ,则跟下面 1一样 。
    若=false,间接返回None,没有找到适合的Leader。

源码地位:

Election#leaderForOffline

 case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>        // 这里是组装所有分区的信息啊, 返回的对象是 1. 分区 2. leader、isr and controller epoc 3. allow unclean 是否容许脏正本参加竞选        val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(          validLeaderAndIsrs,          allowUnclean        )        // 调用leader选举        leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty) private def leaderForOffline(partition: TopicPartition,                               leaderAndIsrOpt: Option[LeaderAndIsr],                               uncleanLeaderElectionEnabled: Boolean,                               controllerContext: ControllerContext): ElectionResult = {    // 以后分区的AR     val assignment = controllerContext.partitionReplicaAssignment(partition)    // 所有在线的正本    val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))    leaderAndIsrOpt match {      case Some(leaderAndIsr) =>        val isr = leaderAndIsr.isr        // 找到 第一个满足条件:正本在线 && 在 ISR中的正本。 如果没有满足条件的 则判断入参uncleanLeaderElectionEnabled的配置        // 如果是true,则从不在isr中的存活正本中获取正本作为leader        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(          assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)        val newLeaderAndIsrOpt = leaderOpt.map { leader =>          val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))          else List(leader)          leaderAndIsr.newLeaderAndIsr(leader, newIsr)        }        ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas)      case None =>        ElectionResult(partition, None, liveReplicas)    }  }// 找到 第一个满足条件:正本在线 && 在 ISR中的正本。 如果没有满足条件的 则判断入参allowUnclean的配置,如果是true,则从不在isr中的存活正本中获取正本作为leaderobject PartitionLeaderElectionAlgorithms {  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {    assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {      if (uncleanLeaderElectionEnabled) {        val leaderOpt = assignment.find(liveReplicas.contains)        if (leaderOpt.isDefined)          controllerContext.stats.uncleanLeaderElectionRate.mark()        leaderOpt      } else {        None      }    }  }````1.  先组装所有给定的 **validLeaderAndIsrs** 的信息  其实次要还是要去获取每个Topic的对应的`unclean.leader.election.enable` 属性值。   默认状况下,咱们调用到这里的时候 这个入参`allowUnclean=false`.   **如果是false** 那咱们须要去查问一下指定的topic它的属性`unclean.leader.election.enable` 是什么        **如果是true** 则示意间接笼罩了`unclean.leader.election.enable`的配置为true。      ![在这里插入图片形容](/img/bVcXkG8)2. 找到 第一个满足条件:**正本在线** && 在 **ISR中的正本**。 3. 如果没有满足条件的 则判断入**uncleanLeaderElectionEnabled**的配置如果是true,则从不在isr中的存活正本中获取正本作为leader。当然这个**uncleanLeaderElectionEnabled** 参数是上 步骤1中决定的。#### 触发场景:Controller 从新加载> Controller 入选的时候会启动 **分区状态机** `partitionStateMachine`, 启动的时候会从新加载所有分区的状态到内存中, 并触发 对处于 **NewPartition** 或 **OfflinePartition** 状态的所有分区尝试变更为  **OnlinePartition** 状态的状态。把新创建的分区和离线的分区触发一下选举流程啊>  触发源码入口:  **KafkaController#onControllerFailover**   `partitionStateMachine.startup()`

partitionStateMachine.triggerOnlinePartitionStateChange()

<br>加szzdzhp001,支付全副kafka常识图谱#### 触发场景:脚本执行脏选举> 当执行 `kafka-leader-election.sh` 的时候并且 模式抉择的是`UNCLEAN` . 则会触发这个模式。> 这里留神一下,入参`allowUnclean` = (electionTrigger == AdminClientTriggered) > 意思是: 当触发的场景是**AdminClientTriggered**的时候, 则`allowUnclean=true`,示意 不关怀配置参数 `unclean.leader.election.enable` 是什么, 如果没有找到符合条件的Leader, 则就去非ISR 列表找Leader。> 刚好 我能脚本执行的时候 触发器就是 **AdminClientTriggered**> 其余触发器有:> AutoTriggered : 定时主动触发。> ZkTriggered:Controller切换的时候触发的(zk节点/controller 的变更便是Controller角色的切换)> AdminClientTriggered:客户端被动触发。<br>#### 触发场景:Controller 监听到有Broker启动了> 同上。> >  触发源码入口:  **KafkaController#processBrokerChange#onBrokerStartup**

partitionStateMachine.triggerOnlinePartitionStateChange()

#### 触发场景:Controller 监听 LeaderAndIsrResponseReceived申请> 同上。当Controller向对应的Broker发动 **LeaderAndIsrRequest** 申请的时候.   有一个回调函数callback, 这个回调函数会向Controller发动一个事件为  **LeaderAndIsrResponseReceived** 申请。具体源码在:  **ControllerChannelManager#sendLeaderAndIsrRequest**![在这里插入图片形容](/img/bVcXkG9)Controller收到这个事件的申请之后,依据返回的 **leaderAndIsrResponse** 数据   会判断一下有没有**新减少**的离线正本(个别都是因为磁盘拜访有问题)  如果有新的离线正本,则须要将这个离线正本标记为Offline状态源码入口:**KafkaController#onReplicasBecomeOffline**

partitionStateMachine.triggerOnlinePartitionStateChange()

<br>加szzdzhp001,支付全副kafka常识图谱#### 触发场景:Controller 监听 UncleanLeaderElectionEnable申请> 当咱们在批改动静配置的时候, 将动静配置:`unclean.leader.election.enable`设置为 true 的时候    会触发向Controller发动**UncleanLeaderElectionEnable**的申请,这个时候则须要触发一下。触发申请**同上**。 触发源码入口:  **KafkaController#processTopicUncleanLeaderElectionEnable**

partitionStateMachine.triggerOnlinePartitionStateChange(topic)

 下面的触发调用的代码就是上面的接口对处于 **NewPartition** 或 **OfflinePartition** 状态的所有分区尝试变更为      **OnlinePartition** 的状态。 状态的流程触发了Leader选举。   

/**

  • 此 API 对处于 NewPartition 或 OfflinePartition 状态的所有分区尝试变更为
  • OnlinePartition 状态的状态。 这在胜利的控制器选举和代理更改时调用
    */

def triggerOnlinePartitionStateChange(): Unit = {

// 获取所有 OfflinePartition 、NewPartition 的分区状态val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition))triggerOnlineStateChangeForPartitions(partitions)

}

private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Unit = {

// 尝试将 所有 NewPartition or OfflinePartition 状态的分区全副转别成 OnlinePartition状态,//然而除了那个分区所对应的Topic正在被删除的所有分区val partitionsToTrigger = partitions.filter { partition =>  !controllerContext.isTopicQueuedUpForDeletion(partition.topic)}.toSeq// 分区状态机进行状态流转 应用 OfflinePartitionLeaderElectionStrategy 选举策略(allowUnclean =false 不容许 不在isr中的正本参加选举)handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)))

}

1.  获取所有 OfflinePartition 、NewPartition 的分区状态2. 尝试将 所有 NewPartition or OfflinePartition 状态的分区全副转别成 OnlinePartition状态,    然而如果对应的Topic正在删除中,则会被排除掉3. 分区状态机进行状态流转 应用 **OfflinePartitionLeaderElectionStrategy** 选举策略(`allowUnclean=true` 示意如果从isr中没有选出leader,则容许从非isr列表中选举leader ,`allowUnclean=false` 示意如果从isr中没有选出leader, 则须要去读取配置文件的配置 `unclean.leader.election.enable` 来决定是否容许从非ISR列表中选举Leader。 )加szzdzhp001,支付全副kafka常识图谱### 2. ReassignPartitionLeaderElectionStrategy><font color=red><b>分区正本重调配选举策略:</b> </font>>当执行分区正本重调配的时候, 原来的Leader可能有变更, 则须要触发一下 Leader选举。> 1. **只有当之前的Leader正本在通过重调配之后不存在了**。    例如: [2,0] ==> [1,0] 。 原来2是Leader正本,通过重调配之后变成了 [1,0]。2曾经不复存在了,所以须要从新选举Leader。> 2. **当原来的分区Leader正本 因为某些异样,下线了**。须要从新选举Leader![](/img/bVcXkHa)**分区正本重调配产生的Leader选举.**Election#leaderForReassign

private def leaderForReassign(partition: TopicPartition,

                            leaderAndIsr: LeaderAndIsr,                            controllerContext: ControllerContext): ElectionResult = {// 从Controller的内存中获取以后分区的分配情况, 而后跟 removingReplicas(示意以后重调配须要移除掉的正本) 取差集。也就获取当重调配之后剩下的所有正本分配情况了。                           val targetReplicas = controllerContext.partitionFullReplicaAssignment(partition).targetReplicas// 过滤一下不在线的正本。val liveReplicas = targetReplicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))// 这里的isr 是从内部传参进来的, 是去zk节点 /brokers/topics/{topic名称}/partitions/{分区号}/state 中拿取的数据,而不是以后内存中拿到的val isr = leaderAndIsr.isr// 在下面的targetReplicas中找到符合条件的第一个元素:正本必须在线, 正本必须在ISR中。val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(targetReplicas, isr, liveReplicas.toSet)// 结构一下 下面拿到的Leader参数, 组装成一个LeaderAndIsr对象,对象多组装了例如:leaderEpoch+1, zkVersion 等等val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))ElectionResult(partition, newLeaderAndIsrOpt, targetReplicas)

}

// 这个算法就是找到 第一个 符合条件:正本在线,正本在ISR中 的正本。用于遍历的reassignment就是咱们下面的targetReplicas,是从内存中获取的。也就是变更后的正本程序了。那么就是获取了第一个正本啦
def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {

reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))

}

总结:> 从以后的正本调配列表中,获取**正本在线**&&**正本在ISR中**的 第一个正本,遍历的程序是以后正本的调配形式(**AR**),跟ISR的程序没有什么关系。加szzdzhp001,支付全副kafka常识图谱#### 触发场景:分区正本重调配> 并不是每次执行分区正本重调配都会触发这个Leader选举策略, 上面两种状况才会触发> 1. **只有当之前的Leader正本在通过重调配之后不存在了**。例如: [2,0] ==> [1,0] 。 原来2是Leader正本,通过重调配之后变成了 [1,0]。2曾经不复存在了,所以须要从新选举Leader。> 2. **当原来的分区Leader正本 因为某些异样,下线了**。须要从新选举Leader对应的判断条件代码如下:**KafkaController#moveReassignedPartitionLeaderIfRequired**

private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,

                                                  newAssignment: ReplicaAssignment): Unit = {// 重调配之后的所有正本                                                 val reassignedReplicas = newAssignment.replicas//以后的分区Leader是哪个val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader//  如果调配后的正本不蕴含以后Leader正本,则须要从新选举if (!reassignedReplicas.contains(currentLeader)) {  //触发Leader重选举,策略是ReassignPartitionLeaderElectionStrategy  partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))} else if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {  // 下面2种状况都不合乎, 那么就没有必要leader重选举了, 更新一下leaderEpoch就行 了  updateLeaderEpochAndSendRequest(topicPartition, newAssignment)} else {  //触发Leader重选举,策略是ReassignPartitionLeaderElectionStrategy  partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))}

}

![在这里插入图片形容](/img/bVcXkHe)点击查看[分区重调配的源码解析](https://www.szzdzhp.com/kafka/Source_code/source-code-pr.html)### 3. PreferredReplicaPartitionLeaderElectionStrategy> 优先正本选举策略, 必须满足三个条件:  > <font color=red>**是第一个正本&&正本在线&&正本在ISR列表中。** </font>  > 满足下面三个条件才会入选leader,不满足则不会做变更。![优先正本选举 (点击浏览原文看高清大图)](/img/bVcXkHf)

def leaderForPreferredReplica(controllerContext: ControllerContext,

                            leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {leaderAndIsrs.map { case (partition, leaderAndIsr) =>  leaderForPreferredReplica(partition, leaderAndIsr, controllerContext)}

}

private def leaderForPreferredReplica(partition: TopicPartition,

                                    leaderAndIsr: LeaderAndIsr,                                    controllerContext: ControllerContext): ElectionResult = {// AR列表                                    val assignment = controllerContext.partitionReplicaAssignment(partition)// 在线正本val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))val isr = leaderAndIsr.isr// 找出第一个正本 是否在线 并且在ISR中。val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)// 组装leaderandisr返回 ,留神这里是没有批改ISR信息的val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))ElectionResult(partition, newLeaderAndIsrOpt, assignment)

}

def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {

assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))

}

<br>1. 从内存中获取TopicPartition的调配形式2. 过滤不在线的正本3. 找到第一个正本判断一下是否在线&&在ISR列表中。如果满足,则选他为leader,如果不满足,也不会再找其余正本了。4. 返回leaderAndIsr信息, 这里的ISR是没有做批改的。加szzdzhp001,支付全副kafka常识图谱#### 触发场景:主动定时执行优先正本选举工作> Controller 启动的时候,会启动一个定时工作 。每隔一段时间就去执行 **优先正本选举**工作。**与之相干配置:**

如果为true示意会创立定时工作去执行 优先正本选举,为false则不会创立

auto.leader.rebalance.enable=true

每隔多久执行一次 ; 默认300秒;

leader.imbalance.check.interval.seconds partition = 300

标识每个 Broker 失去平衡的比率,如果超过该比率,则执行从新选举 Broker 的 leader;默认比例是10%;

这个比率的算法是 :broker不均衡率=非优先正本的leader个数/总分区数,

如果一个topic有3个分区[0,1,2],并且有3个正本 ,失常状况下,[0,1,2]别离都为一个leader正本; 这个时候 0/3=0%;

leader.imbalance.per.broker.percentage = 10

<br>#### 触发场景: Controller 从新加载的时候> 在这个触发之前还有执行  `partitionStateMachine.startup()`   > 相当于是先把 OfflinePartition、NewPartition状态的分区执行了**OfflinePartitionLeaderElectionStrategy** 策略。       > 而后又执行了    >**PreferredReplicaPartitionLeaderElectionStrategy**策略>这里是从zk节点 `/admin/preferred_replica_election` 读取数据, 来进行判断是否有须要执行Leader选举的分区         > 它是在执行`kafka-preferred-replica-election` 命令的时候会创立这个zk节点           > 然而这个曾经被标记为废除了,并且在3.0的时候间接移除了。源码地位:**KafkaController#onControllerFailover**

// 从zk节点/admin/preferred_replica_election找到哪些符合条件须要执行优先正本选举的分区
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
// 这里的触发类型 是 ZkTriggered
onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)

private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {

// 去zk读取节点  /admin/preferred_replica_electionval partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection// 如果指定分区的 leader 曾经是AR的第一个正本 或者 topic被删除了,则 过滤掉这个分区(没有必要执行leader选举了)val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>  val replicas = controllerContext.partitionReplicaAssignment(partition)  val topicDeleted = replicas.isEmpty  val successful =    if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicas.head else false  successful || topicDeleted}// 将zk获取到的分区数据 - 刚刚须要疏忽的数据 = 还须要执行选举的数据val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection// 找到哪些分区正在删除val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))// 待删除的分区也过滤掉val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion// 返回最终须要执行优先正本选举的数据。pendingPreferredReplicaElections

}

<br>#### 触发场景:执行优先正本选举脚本的时候> 执行脚本` kafka-leader-election.sh` 并且抉择的模式是 `PREFERRED` (优先正本选举)> 则会抉择  **PreferredReplicaPartitionLeaderElectionStrategy** 策略选举<br>### 4. ControlledShutdownPartitionLeaderElectionStrategy> **受控关机选举策略** :  当Broker关机的过程中,会向Controller发动一个申请, 让它从新发动一次选举, 把在所有正在关机(也就是发动申请的那个Broker,或其它同时正在关机的Broker) 的Broker外面的正本给剔除掉。> > ---> > 依据算法算出leader:找到第一个满足条件的正本:  <font color=red>正本在线</font> && <font color=red>正本在ISR中 </font>  && <font color=red>正本所在的Broker不在正在敞开的Broker汇合中</font> 。>> 结构新的ISR列表: 在之前的isr列表中将  <font color=red>正在被敞开的Broker外面的正本</font> 给剔除掉![受控关机Leader选举策略 (点击浏览原文查看高清大图)](/img/bVcXkHg)加szzdzhp001,支付全副kafka常识图谱**Election#leaderForControlledShutdown**

/**
** 为以后领导者正在敞开的分区选举领导者。

  • 参数:
  • controllerContext – 集群以后状态的上下文
  • leaderAndIsrs – 示意须要选举的分区及其各自的领导者/ISR 状态的元组序列
  • 返回:选举后果
    **/
    def leaderForControlledShutdown(controllerContext: ControllerContext,

                            leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {// 以后正在敞开的 BrokerID                              val shuttingDownBrokerIds = controllerContext.shuttingDownBrokerIds.toSet// 依据策略选出leaderleaderAndIsrs.map { case (partition, leaderAndIsr) =>

    leaderForControlledShutdown(partition, leaderAndIsr, shuttingDownBrokerIds, controllerContext)

    }

    }

}

private def leaderForControlledShutdown(partition: TopicPartition,

                                      leaderAndIsr: LeaderAndIsr,                                      shuttingDownBrokerIds: Set[Int],                                      controllerContext: ControllerContext): ElectionResult = {// 以后分区正本分配情况                                  val assignment = controllerContext.partitionReplicaAssignment(partition)// 找到以后分区所有存活的正本(正在敞开中的Broker外面的正本也要算进去)val liveOrShuttingDownReplicas = assignment.filter(replica =>  controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true))val isr = leaderAndIsr.isr// 依据算法算出leader:找到第一个满足条件的正本: 正本在线&& 正本在ISR中 && 正本所在的Broker不在正在敞开的Broker汇合中。val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr,  liveOrShuttingDownReplicas.toSet, shuttingDownBrokerIds)//结构新的ISR列表,在之前的isr列表中将 正在被敞开的Broker 外面的正本给剔除掉val newIsr = isr.filter(replica => !shuttingDownBrokerIds.contains(replica))//结构leaderAndIsr  加上 zkVersion 和 leader_epochval newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeaderAndIsr(leader, newIsr))ElectionResult(partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas)

}

// 依据算法算出leader:找到第一个正本条件的正本: 正本在线&& 正本在ISR中 && 正本所在的Broker不在正在敞开的Broker汇合中。
def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {

assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))

}

<br>#### 触发场景:Broker关机的时候> 当Broker敞开的时候, 会向Controller发一起一个`ControlledShutdownRequest`申请,  Controller收到这个申请会针对性的做一些善后事件。比如说 **执行Leader重选举** 等等之类的。源码地位:**KafkaServer#controlledShutdown**Controller收到申请的源码地位:**KafkaController#doControlledShutdown**与之相干的配置有:

controlled.shutdown.enable : 是否启用受控敞开操作
controlled.shutdown.max.retries 受控关机操作 最大重试的次数
controlled.shutdown.retry.backoff.ms 失败后等等多久再次重试

![在这里插入图片形容](/img/bVcXkHh)## 其余场景###  新创建的Topic Leader选举策略> 创立新的Topic的时候,并没有产生Leader选举的操作, 而是默认从分区对应的所有在线正本中抉择第一个为leader, 而后isr就为 所有在线正本,再组装一下以后的**controller_epoch**信息,写入到zk节点`/brokers/topics/{Topic名称}/partitions/{分区号}/state`中。  > 最初发动  **LeaderAndIsrRequest** 申请,告诉 leader 的变更。具体看看源码:**PartitionStateMachine#doHandleStateChanges**     分区状态从 `NewPartition`流转到`OnlinePartition`

/**

  • 上面省略了局部不重要代码
  • 初始化 leader 和 isr 的值 并写入zk中
  • @param partitions 所有须要初始化的分区
  • @return 返回胜利初始化的分区
    */

private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {

val successfulInitializations = mutable.Buffer.empty[TopicPartition]// 从以后Controller内存中获取所有分区对应的正本状况val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))// 过滤一下 不在线的正本(有可能正本所在的Broker宕机了,或者网络拥挤、或者磁盘脱机等等因素造成正本下线了)val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>    val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))    partition -> liveReplicasForPartition}val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }partitionsWithoutLiveReplicas.foreach { case (partition, replicas) =>  val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " +    s"partition $partition from New to Online, assigned replicas are " +    s"[${replicas.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +    "replica is alive."  logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException(failMsg))}// 拿到所有分区对应的leader 和 isr和 Controller epoch的信息; leader是取所有在线正本的第一个正本val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>  val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)  val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)  partition -> leaderIsrAndControllerEpoch}.toMap// 将下面失去的信息 写入zk的节点中/brokers/topics/{Topic名称}/partitions/{分区号}/stateval createResponses = try {  zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, controllerContext.epochZkVersion)} catch {  case e: ControllerMovedException =>    error("Controller moved to another broker when trying to create the topic partition state znode", e)    throw e  case e: Exception =>    partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }    Seq.empty}createResponses.foreach { createResponse =>  val code = createResponse.resultCode  val partition = createResponse.ctx.get.asInstanceOf[TopicPartition]  val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)  if (code == Code.OK) {    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)    controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr,      partition, leaderIsrAndControllerEpoch, controllerContext.partitionFullReplicaAssignment(partition), isNew = true)    successfulInitializations += partition  } else {    logFailedStateChange(partition, NewPartition, OnlinePartition, code)  }}successfulInitializations

}

<br>1.  从以后的Controller 内存中获取所有入参的分区对应的正本信息2. 过滤那些曾经下线的正本( Broker宕机、网络异样、磁盘脱机、等等都有可能造成正本下线) 。3. 每个分区对应的所有在线正本信息 为 ISR 信息,而后取ISR的第一个正本为leader分区。当然特地留神一下, 这个时候获取的isr信息的程序就是 分区创立时候调配好的AR程序, 获取第一个在线的。(因为在其余状况下 ISR的程序跟AR的程序并不统一)4. 组装 下面的 `isr`、`leader`、`controller_epoch` 等信息 写入到zk节点 <font color=red> /brokers/topics/{Topic名称}/partitions/{分区号}/state</font>例如上面所示
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}```
  1. 而后向其余相干Broker 发动 LeaderAndIsrRequest 申请,告诉他们Leader和Isr信息曾经变更了,去做一下想要的解决。比方去新的leader发动Fetcher申请同步数据。

能够看看之前咱们剖析过的 Topic创立的源码解析 的原理图 如下

重点看:

答复下面的问题

当初,看齐全文之后,我想你应该对上面的问题很分明了吧!

什么是分区状态机

所有的分区状态的流转都是通过分区状态机来进行的, 对立治理! 每个分区状态的流转 都是有严格限度并且固定的,流转到不同状态须要执行的操作不一样, 例如 当分区状态流转到 <font color=blue>OnlinePartition </font> 的时候, 就须要判断是否须要执行 Leader选举 ,

创立Topic的时候如何选举Leader?

创立Topic的时候并没有产生 Leader选举, 而是默认将 在线的第一个正本设置为Leader,所有在线的正本列表 为 ISR 列表。 写入到了zookeeper中。

加szzdzhp001,支付全副kafka常识图谱

分区的所有正本都不在线, 这个时候启动一台之前不在ISR内的正本的Broker,它会入选为Leader吗?

视状况而定。
首先, 启动一台Broker, 会用什么策略选举?
看下面的图,咱们能够晓得是
<font color=blue>OfflinePartitionLeaderElectionStrategy </font>


而后看下这个策略是如何选举的?

那么最终后果就是:
所有正本不在线,那么一个Leader的候选者都入选不了
那么这个时候就会判断 unclean.leader.election.enable 配置是否为true.
如果是true, 则以后在线的正本就是只有本人这个刚启动的在线正本,自然而然就会入选Leader了。
如果是fase, 则没有正本可能以后Leader, 次数处于一个无Leader的状态。


当所有正本都不在线,而后一个一个重启Broker上正本上线,谁会入选为Leader?谁先启动就谁入选吗?

不是, 跟上一个问题同理
依据 unclean.leader.election.enable 配置决定。
如果是true, 则谁先启动,谁就入选(会失落局部数据)
如果是false,则第一个在ISR列表中的正本入选。
顺便再提一句, 尽管在这里可能不是AR中的第一个正本入选Leader。

然而最终还是会主动执行Leader平衡的,主动平衡应用的策略是
<font color=blue> PreferredReplicaPartitionLeaderElectionStrategy </font>
(前提是开启了主动平衡: auto.leader.rebalance.enable=true)


加szzdzhp001,支付全副kafka常识图谱

Broker下线了,Leader切换给了其余正本, 当Broker重启的时候,Leader会还给之前的正本吗?

依据配置 auto.leader.rebalance.enable=true 决定。
true: 会主动执行Leader平衡, 主动平衡策略是<font color=blue> PreferredReplicaPartitionLeaderElectionStrategy </font>策略
false: 不执行主动平衡。 那么久不会还回去。
对于更具体的 Leader平衡机制请看 Leader 平衡机制

加szzdzhp001,支付全副kafka常识图谱

Leader选举期间对分区的影响

Leader的选举基本上不会造成什么影响, Leader的切换十分快, 每个分区不可用的工夫在几毫秒内。


获取全套kafka技术大全请跳转: 全网最全kafka技术大全(蕴含运维与实战) 公布