乐趣区

关于kafka:kafka

kafka:replica 正本同步机制

[](https://www.jianshu.com/u/897…

1 前言

Kafka 的风行归功于它设计和操作简略、存储系统高效、充分利用磁盘程序读写等个性、非常适合在线日志收集等高吞吐场景。

Kafka 个性之一是它的复制协定。复制协定是保障 kafka 高可靠性的要害。对于单个集群中每个 Broker 不同工作负载状况下,如何主动调优 Kafka 正本的工作形式是比拟有挑战的。它的挑战之一是要晓得如何防止 follower 进入和退出同步正本列表(即 ISR)。从用户的角度来看,如果生产者发送一大批海量音讯,可能会引起 Kafka Broker 很多正告。这些警报表明一些 topics 处于“under replicated”状态,这些正本处于同步失败或生效状态,更意味着数据没有被复制到足够数量 Broker 从而减少数据失落的概率。因而 Kafka 集群中处于“under replicated”中 Partition 数要亲密监控。这个正告应该来自于 Broker 生效, 减慢或暂停等状态而不是生产者写不同大小音讯引起的。

2 Kafka 的正本机制

Kafka 中主题的每个 Partition 有一个预写式日志文件,每个 Partition 都由一系列有序的、不可变的音讯组成,这些音讯被间断的追加到 Partition 中,Partition 中的每个音讯都有一个间断的序列号叫做 offset,确定它在分区日志中惟一的地位。

image

Kafka 每个 topic 的 partition 有 N 个正本,其中 N 是 topic 的复制因子。Kafka 通过多正本机制实现故障主动转移,当 Kafka 集群中一个 Broker 生效状况下依然保障服务可用。在 Kafka 中产生复制时确保 partition 的预写式日志有序地写到其余节点上。N 个 replicas 中。其中一个 replica 为 leader,其余都为 follower,leader 解决 partition 的所有读写申请,与此同时,follower 会被动定期地去复制 leader 上的数据。

如下图所示,Kafka 集群中有 4 个 broker, 某 topic 有 3 个 partition, 且复制因子即正本个数也为 3:

Kafka 提供了数据复制算法保障,如果 leader 产生故障或挂掉,一个新 leader 被选举并被承受客户端的音讯胜利写入。Kafka 确保从同步正本列表中选举一个正本为 leader,或者说 follower 追赶 leader 数据。leader 负责保护和跟踪 ISR(In-Sync Replicas 的缩写,示意正本同步队列,具体可参考下节)中所有 follower 滞后的状态。当 producer 发送一条音讯到 broker 后,leader 写入音讯并复制到所有 follower。音讯提交之后才被胜利复制到所有的同步正本。音讯复制提早受最慢的 follower 限度,重要的是疾速检测慢正本,如果 follower“落后”太多或者生效,leader 将会把它从 ISR 中删除。

正本同步队列 (ISR)
所谓同步,必须满足如下两个条件:

  • 正本节点必须能与 zookeeper 放弃会话(心跳机制)
  • 副本能复制 leader 上的所有写操作,并且不能落后太多。(卡住或滞后的正本管制是由 replica.lag.time.max.ms 配置)

默认状况下 Kafka 对应的 topic 的 replica 数量为 1,即每个 partition 都有一个惟一的 leader,为了确保音讯的可靠性,通常利用中将其值 (由 broker 的参数 offsets.topic.replication.factor 指定) 大小设置为大于 1,比方 3。所有的正本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 保护 ISR 列表,follower 从 leader 同步数据有一些提早。任意一个超过阈值都会把 follower 剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新退出的 follower 也会先寄存在 OSR 中。AR=ISR+OSR。

上一节中的 HW 俗称高水位,是 HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能生产到 HW 所在的地位。另外每个 replica 都有 HW,leader 和 follower 各自负责更新本人的 HW 的状态。对于 leader 新写入的音讯,consumer 不能立即生产,leader 会期待该音讯被所有 ISR 中的 replicas 同步后更新 HW,此时音讯能力被 consumer 生产。这样就保障了如果 leader 所在的 broker 生效,该音讯依然能够从新选举的 leader 中获取。对于来自外部 broKer 的读取申请,没有 HW 的限度。
下图具体的阐明了当 producer 生产音讯至 broker 后,ISR 以及 HW 和 LEO 的流转过程:

由此可见,Kafka 的复制机制既不是齐全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条音讯才会被 commit,这种复制形式极大的影响了吞吐率。而异步复制形式下,follower 异步的从 leader 复制数据,数据只有被 leader 写入 log 就被认为曾经 commit,这种状况下如果 follower 都还没有复制完,落后于 leader 时,忽然 leader 宕机,则会失落数据。而 Kafka 的这种应用 ISR 的形式则很好的平衡了确保数据不失落以及吞吐率。

  • Controller 来保护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,次要负责 Partition 治理和正本状态治理,也会执行相似于重调配 partition 之类的治理工作。在合乎某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 Zookeeper 的相干节点中。同时发动 LeaderAndIsrRequest 告诉所有的 replicas。
  • leader 来保护:leader 有独自的线程定期检测 ISR 中 follower 是否脱离 ISR, 如果发现 ISR 变动,则会将新的 ISR 的信息返回到 Zookeeper 的相干节点中。

正本不同步的异常情况

  • 慢正本:在肯定周期时间内 follower 不能追赶上 leader。最常见的起因之一是 I / O 瓶颈导致 follower 追加复制音讯速度慢于从 leader 拉取速度。
  • 卡住正本:在肯定周期时间内 follower 进行从 leader 拉取申请。follower replica 卡住了是因为 GC 暂停或 follower 生效或死亡。
  • 新启动正本:当用户给主题减少正本因子时,新的 follower 不在同步正本列表中,直到他们齐全赶上了 leader 日志。

3 Follower 向 leader 拉取数据的过程

3.1 replica fetcher 线程何时启动

broker 调配的任何一个 partition 都是以 Replica 对象实例的模式存在,而 Replica 在 Kafka 上是有两个角色:leader 和 follower,只有这个 Replica 是 follower,它便会向 leader 进行数据同步。

反映在 ReplicaManager 上就是如果 Broker 的本地正本被选举为 follower,那么它将会启动正本同步线程,其具体实现如下所示:

image

简略来说,makeFollowers() 的处理过程如下:

  1. 先从本地记录 leader partition 的汇合中将这些 partition 移除,因为这些 partition 曾经被选举为了 follower;
  2. 将这些 partition 的本地正本设置为 follower,前面就不会接管对于这个 partition 的 Produce 申请了,如果仍然有 client 在向这台 broker 发送数据,那么它将会返回相应的谬误;
  3. 先进行对于这些 partition 的正本同步线程(如果本地正本之前是 follower 当初还是 follower,先敞开的起因是:这个 partition 的 leader 产生了变动,如果 leader 没有发生变化,那么 makeFollower 办法返回的是 False,这个 Partition 就不会被增加到 partitionsToMakeFollower 汇合中),这样的话能够保障这些 partition 的本地正本将不会再有新的数据追加;
  4. 对这些 partition 本地正本日志文件进行截断操作并进行 checkpoint 操作;
  5. 实现那些提早解决的 Produce 和 Fetch 申请;
  6. 如果本地的 broker 没有掉线,那么向这些 partition 新选举进去的 leader 启动正本同步线程。

对于第 6 步,并不一定会为每一个 partition 都启动一个 fetcher 线程,对于一个目标 broker,只会启动 num.replica.fetchers 个线程,具体这个 topic-partition 会调配到哪个 fetcher 线程上,是依据 topic 名和 partition id 进行计算失去,实现所示:

image

3.2 replica fetcher 线程启动

如上所示,在 ReplicaManager 调用 makeFollowers() 启动 replica fetcher 线程后,它实际上是通过 ReplicaFetcherManager 实例进行相干 topic-partition 同步线程的启动和敞开,其启动过程分为上面两步:

  1. ReplicaFetcherManager 调用 addFetcherForPartitions() 增加对这些 topic-partition 的数据同步流程;
  2. ReplicaFetcherManager 调用 createFetcherThread() 初始化相应的 ReplicaFetcherThread 线程。

addFetcherForPartitions() 的具体实现如下所示:

image

这个办法其实是做了上面这几件事:

  1. 先计算这个 topic-partition 对应的 fetcher id;
  2. 依据 leader 和 fetcher id 获取对应的 replica fetcher 线程,如果没有找到,就调用 createFetcherThread() 创立一个新的 fetcher 线程;
  3. 如果是新启动的 replica fetcher 线程,那么就启动这个线程;
  4. 将 topic-partition 记录到 fetcherThreadMap 中,这个变量记录每个 replica fetcher 线程要同步的 topic-partition 列表。

ReplicaFetcherManager 创立 replica Fetcher 线程的实现如下:

image

3.3 replica fetcher 线程处理过程

replica fetcher 线程在启动之后就开始进行失常数据同步流程了,这个过程都是在 ReplicaFetcherThread 线程中实现的。

ReplicaFetcherThread 的 doWork() 办法是始终在这个线程中的 run() 中调用的,实现办法如下:

image

在 doWork() 办法中次要做了两件事:

  1. 结构相应的 Fetch 申请(buildFetchRequest());
  2. 通过 processFetchRequest() 办法发送 Fetch 申请,并对其后果进行相应的解决。

processFetchRequest() 这个办法的作用是发送 Fetch 申请,并对返回的后果进行解决,最终写入到本地正本的 Log 实例中,其具体实现:

其处理过程简略总结一下:

  1. 通过 fetch() 办法,发送 Fetch 申请,获取相应的 response(如果遇到异样,那么在下次发送 Fetch 申请之前,会 sleep 一段时间再发);
  2. 如果返回的后果 不为空,并且 Fetch 申请的 offset 信息与返回后果的 offset 信息对得上,那么就会调用 processPartitionData() 办法将拉取到的数据追加本地正本的日志文件中,如果返回后果有错误信息,那么就对相应谬误进行相应的解决;
  3. 对在 Fetch 过程中遇到异样或返回谬误的 topic-partition,会进行 delay 操作,下次 Fetch 申请的产生至多要距离 replica.fetch.backoff.ms 工夫。

fetch() 办法作用是发送 Fetch 申请,并返回相应的后果,其具体的实现,如下:

image

processPartitionData

这个办法的作用是,解决 Fetch 申请的具体数据内容,简略来说就是:检查一下数据大小是否超过限度、将数据追加到本地正本的日志文件中、更新本地正本的 hw 值。

image

3.3 正本同步异常情况的解决

在正本同步的过程中,会遇到哪些异常情况呢?

大家肯定会想到对于 offset 的问题,在 Kafka 中,对于 offset 的解决,无论是 producer 端、consumer 端还是其余中央,offset 仿佛都是一个如影随行的问题。在正本同步时,对于 offset,会遇到什么问题呢?上面举两个异样的场景:

  1. 如果以后本地(id:1)的正本当初是 leader,其 LEO 假如为 1000,而另一个在 isr 中的正本(id:2)其 LEO 为 800,此时呈现网络抖动,id 为 1 的机器掉线后又上线了,然而此时正本的 leader 实际上曾经变成了 2,而 2 的 LEO 为 800,这时候 1 启动正本同步线程去 2 上拉取数据,心愿从 offset=1000 的中央开始拉取,然而 2 上最大的 offset 才是 800,这种状况该如何解决呢?
  2. 假如一个 replica(id:1)其 LEO 是 10,它曾经掉线好几天,这个 partition leader 的 offset 范畴是 [100, 800],那么 1 重启启动时,它心愿从 offset=10 的中央开始拉取数据时,这时候产生了 OutOfRange,不过跟下面不同的是这里是小于了 leader offset 的范畴,这种状况又该怎么解决?

以上两种状况都是 offset OutOfRange 的状况,只不过:一是 Fetch Offset 超过了 leader 的 LEO,二是 Fetch Offset 小于 leader 最小的 offset

在介绍 Kafka 解决方案之前,咱们先来本人思考一下这两种状况应该怎么解决?

  1. 如果 fetch offset 超过 leader 的 offset,这时候正本应该是回溯到 leader 的 LEO 地位(超过这个值的数据删除),而后再去进行正本同步,当然这种解决方案其实是无奈保障 leader 与 follower 数据的完全一致,再次发生 leader 切换时,可能会导致数据的可见性不统一,但既然用户容许了脏选举的产生,其实咱们是能够认为用户是能够接管这种状况产生的;
  2. 这种就比拟容易解决,首先清空本地的数据,因为本地的数据都曾经过期了,而后从 leader 的最小 offset 地位开始拉取数据。

下面是咱们比拟容易想出的解决方案,而在 Kafka 中,其解决方案也很相似,不过遇到状况比下面咱们列出的两种状况多了一些简单,其解决方案如下:

image

针对第一种状况,在 Kafka 中,实际上还会产生这样一种状况,1 在收到 OutOfRange 谬误时,这时去 leader 上获取的 LEO 值与最小的 offset 值,这时候却发现 leader 的 LEO 曾经从 800 变成了 1100(这个 topic-partition 的数据量增长得比拟快),再依照下面的解决方案就不太正当,Kafka 这边的解决方案是:遇到这种状况,进行重试就能够了,下次同步时就会失常了,然而仍然会有下面说的那个问题。

3.4 replica fetcher 线程的敞开

replica fetcher 线程敞开的条件,在三种状况下会敞开对这个 topic-partition 的拉取操作:

  1. stopReplica():broker 收到了 controller 发来的 StopReplica 申请,这时会开始敞开对指定 topic-partition 的同步线程;
  2. makeLeaders:这些 partition 的本地正本被选举成了 leader,这时候就会先进行对这些 topic-partition 正本同步线程;
  3. makeFollowers():后面曾经介绍过,这里实际上进行正本同步,而后再开启正本同步线程,因为这些 topic-partition 的 leader 可能产生了切换。

这里间接说线程敞开,其实不是很精确,因为每个 replica fetcher 线程操作的是多个 topic-partition,而在敞开的粒度是 partition 级别,只有这个线程调配的 partition 全副敞开后,这个线程才会真正被敞开。

stopReplica

StopReplica 的申请实际上是 Controller 发送过去的,这个在 controller 局部会讲述,它触发的条件有多种,比方:broker 下线、partition replica 迁徙等等。

makeLeaders

makeLeaders() 办法的调用是在 broker 上这个 partition 的正本被设置为 leader 时触发的,其实现如下:

image

调用 ReplicaFetcherManager 的 removeFetcherForPartitions() 删除对这些 topic-partition 的正本同步设置,这里在实现时,会遍历所有的 replica fetcher 线程,都执行 removePartitions() 办法来移除对应的 topic-partition 汇合。

image

removePartitions

这个办法的作用是:ReplicaFetcherThread 将这些 topic-partition 从本人要拉取的 partition 列表中移除。

image

ReplicaFetcherThread 的敞开

后面介绍那么多,仿佛还是没有真正去敞开,那么 ReplicaFetcherThread 真正敞开是哪里操作的呢?

实际上 ReplicaManager 每次解决完 LeaderAndIsr 申请后,都会调用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads() 办法,如果 fetcher 线程要拉取的 topic-partition 汇合为空,那么就会敞开掉对应的 fetcher 线程。

image


作者:恋情小傻蛋
链接:https://www.jianshu.com/p/f9a…
起源:简书
著作权归作者所有。商业转载请分割作者取得受权,非商业转载请注明出处。

退出移动版