kafka:replica正本同步机制
[](https://www.jianshu.com/u/897...
1 前言
Kafka的风行归功于它设计和操作简略、存储系统高效、充分利用磁盘程序读写等个性、非常适合在线日志收集等高吞吐场景。
Kafka个性之一是它的复制协定。复制协定是保障kafka高可靠性的要害。对于单个集群中每个Broker不同工作负载状况下,如何主动调优Kafka正本的工作形式是比拟有挑战的。它的挑战之一是要晓得如何防止follower进入和退出同步正本列表(即ISR)。从用户的角度来看,如果生产者发送一大批海量音讯,可能会引起Kafka Broker很多正告。这些警报表明一些topics处于“under replicated”状态,这些正本处于同步失败或生效状态,更意味着数据没有被复制到足够数量Broker从而减少数据失落的概率。因而Kafka集群中处于“under replicated”中Partition数要亲密监控。这个正告应该来自于Broker生效,减慢或暂停等状态而不是生产者写不同大小音讯引起的。
2 Kafka的正本机制
Kafka中主题的每个Partition有一个预写式日志文件,每个Partition都由一系列有序的、不可变的音讯组成,这些音讯被间断的追加到Partition中,Partition中的每个音讯都有一个间断的序列号叫做offset, 确定它在分区日志中惟一的地位。
image
Kafka每个topic的partition有N个正本,其中N是topic的复制因子。Kafka通过多正本机制实现故障主动转移,当Kafka集群中一个Broker生效状况下依然保障服务可用。在Kafka中产生复制时确保partition的预写式日志有序地写到其余节点上。N个replicas中。其中一个replica为leader,其余都为follower,leader解决partition的所有读写申请,与此同时,follower会被动定期地去复制leader上的数据。
如下图所示,Kafka集群中有4个broker, 某topic有3个partition,且复制因子即正本个数也为3:
Kafka提供了数据复制算法保障,如果leader产生故障或挂掉,一个新leader被选举并被承受客户端的音讯胜利写入。Kafka确保从同步正本列表中选举一个正本为leader,或者说follower追赶leader数据。leader负责保护和跟踪ISR(In-Sync Replicas的缩写,示意正本同步队列,具体可参考下节)中所有follower滞后的状态。当producer发送一条音讯到broker后,leader写入音讯并复制到所有follower。音讯提交之后才被胜利复制到所有的同步正本。音讯复制提早受最慢的follower限度,重要的是疾速检测慢正本,如果follower“落后”太多或者生效,leader将会把它从ISR中删除。
正本同步队列(ISR)
所谓同步,必须满足如下两个条件:
- 正本节点必须能与zookeeper放弃会话(心跳机制)
- 副本能复制leader上的所有写操作,并且不能落后太多。(卡住或滞后的正本管制是由 replica.lag.time.max.ms 配置)
默认状况下Kafka对应的topic的replica数量为1,即每个partition都有一个惟一的leader,为了确保音讯的可靠性,通常利用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比方3。 所有的正本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader保护ISR列表,follower从leader同步数据有一些提早。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新退出的follower也会先寄存在OSR中。AR=ISR+OSR。
上一节中的HW俗称高水位,是HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能生产到HW所在的地位。另外每个replica都有HW,leader和follower各自负责更新本人的HW的状态。对于leader新写入的音讯,consumer不能立即生产,leader会期待该音讯被所有ISR中的replicas同步后更新HW,此时音讯能力被consumer生产。这样就保障了如果leader所在的broker生效,该音讯依然能够从新选举的leader中获取。对于来自外部broKer的读取申请,没有HW的限度。
下图具体的阐明了当producer生产音讯至broker后,ISR以及HW和LEO的流转过程:
由此可见,Kafka的复制机制既不是齐全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条音讯才会被commit,这种复制形式极大的影响了吞吐率。而异步复制形式下,follower异步的从leader复制数据,数据只有被leader写入log就被认为曾经commit,这种状况下如果follower都还没有复制完,落后于leader时,忽然leader宕机,则会失落数据。而Kafka的这种应用ISR的形式则很好的平衡了确保数据不失落以及吞吐率。
- Controller来保护:Kafka集群中的其中一个Broker会被选举为Controller,次要负责Partition治理和正本状态治理,也会执行相似于重调配partition之类的治理工作。在合乎某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相干节点中。同时发动LeaderAndIsrRequest告诉所有的replicas。
- leader来保护:leader有独自的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变动,则会将新的ISR的信息返回到Zookeeper的相干节点中。
正本不同步的异常情况
- 慢正本:在肯定周期时间内follower不能追赶上leader。最常见的起因之一是I / O瓶颈导致follower追加复制音讯速度慢于从leader拉取速度。
- 卡住正本:在肯定周期时间内follower进行从leader拉取申请。follower replica卡住了是因为GC暂停或follower生效或死亡。
- 新启动正本:当用户给主题减少正本因子时,新的follower不在同步正本列表中,直到他们齐全赶上了leader日志。
3 Follower向leader拉取数据的过程
3.1 replica fetcher 线程何时启动
broker 调配的任何一个 partition 都是以 Replica 对象实例的模式存在,而 Replica 在 Kafka 上是有两个角色: leader 和 follower,只有这个 Replica 是 follower,它便会向 leader 进行数据同步。
反映在 ReplicaManager 上就是如果 Broker 的本地正本被选举为 follower,那么它将会启动正本同步线程,其具体实现如下所示:
image
简略来说,makeFollowers() 的处理过程如下:
- 先从本地记录 leader partition 的汇合中将这些 partition 移除,因为这些 partition 曾经被选举为了 follower;
- 将这些 partition 的本地正本设置为 follower,前面就不会接管对于这个 partition 的 Produce 申请了,如果仍然有 client 在向这台 broker 发送数据,那么它将会返回相应的谬误;
- 先进行对于这些 partition 的正本同步线程(如果本地正本之前是 follower 当初还是 follower,先敞开的起因是:这个 partition 的 leader 产生了变动,如果 leader 没有发生变化,那么 makeFollower办法返回的是 False,这个 Partition 就不会被增加到 partitionsToMakeFollower 汇合中),这样的话能够保障这些 partition 的本地正本将不会再有新的数据追加;
- 对这些 partition 本地正本日志文件进行截断操作并进行 checkpoint 操作;
- 实现那些提早解决的 Produce 和 Fetch 申请;
- 如果本地的 broker 没有掉线,那么向这些 partition 新选举进去的 leader 启动正本同步线程。
对于第6步,并不一定会为每一个 partition 都启动一个 fetcher 线程,对于一个目标 broker,只会启动 num.replica.fetchers
个线程,具体这个 topic-partition 会调配到哪个 fetcher 线程上,是依据 topic 名和 partition id 进行计算失去,实现所示:
image
3.2 replica fetcher 线程启动
如上所示,在 ReplicaManager 调用 makeFollowers() 启动 replica fetcher 线程后,它实际上是通过 ReplicaFetcherManager 实例进行相干 topic-partition 同步线程的启动和敞开,其启动过程分为上面两步:
- ReplicaFetcherManager 调用 addFetcherForPartitions() 增加对这些 topic-partition 的数据同步流程;
- ReplicaFetcherManager 调用 createFetcherThread() 初始化相应的 ReplicaFetcherThread 线程。
addFetcherForPartitions()
的具体实现如下所示:
image
这个办法其实是做了上面这几件事:
- 先计算这个 topic-partition 对应的 fetcher id;
- 依据 leader 和 fetcher id 获取对应的 replica fetcher 线程,如果没有找到,就调用 createFetcherThread() 创立一个新的 fetcher 线程;
- 如果是新启动的 replica fetcher 线程,那么就启动这个线程;
- 将 topic-partition 记录到 fetcherThreadMap 中,这个变量记录每个 replica fetcher 线程要同步的 topic-partition 列表。
ReplicaFetcherManager 创立 replica Fetcher 线程的实现如下:
image
3.3 replica fetcher 线程处理过程
replica fetcher 线程在启动之后就开始进行失常数据同步流程了,这个过程都是在 ReplicaFetcherThread 线程中实现的。
ReplicaFetcherThread 的 doWork()
办法是始终在这个线程中的 run()
中调用的,实现办法如下:
image
在 doWork() 办法中次要做了两件事:
- 结构相应的 Fetch 申请(buildFetchRequest());
- 通过 processFetchRequest() 办法发送 Fetch 申请,并对其后果进行相应的解决。
processFetchRequest()
这个办法的作用是发送 Fetch 申请,并对返回的后果进行解决,最终写入到本地正本的 Log 实例中,其具体实现:
其处理过程简略总结一下:
- 通过 fetch() 办法,发送 Fetch 申请,获取相应的 response(如果遇到异样,那么在下次发送 Fetch 申请之前,会 sleep 一段时间再发);
- 如果返回的后果 不为空,并且 Fetch 申请的 offset 信息与返回后果的 offset 信息对得上,那么就会调用 processPartitionData() 办法将拉取到的数据追加本地正本的日志文件中,如果返回后果有错误信息,那么就对相应谬误进行相应的解决;
- 对在 Fetch 过程中遇到异样或返回谬误的 topic-partition,会进行 delay 操作,下次 Fetch 申请的产生至多要距离 replica.fetch.backoff.ms 工夫。
fetch()
办法作用是发送 Fetch 申请,并返回相应的后果,其具体的实现,如下:
image
processPartitionData
这个办法的作用是,解决 Fetch 申请的具体数据内容,简略来说就是:检查一下数据大小是否超过限度、将数据追加到本地正本的日志文件中、更新本地正本的 hw 值。
image
3.3 正本同步异常情况的解决
在正本同步的过程中,会遇到哪些异常情况呢?
大家肯定会想到对于 offset 的问题,在 Kafka 中,对于 offset 的解决,无论是 producer 端、consumer 端还是其余中央,offset 仿佛都是一个如影随行的问题。在正本同步时,对于 offset,会遇到什么问题呢?上面举两个异样的场景:
- 如果以后本地(id:1)的正本当初是 leader,其 LEO 假如为1000,而另一个在 isr 中的正本(id:2)其 LEO 为800,此时呈现网络抖动,id 为1 的机器掉线后又上线了,然而此时正本的 leader 实际上曾经变成了 2,而2的 LEO 为800,这时候1启动正本同步线程去2上拉取数据,心愿从 offset=1000 的中央开始拉取,然而2上最大的 offset 才是800,这种状况该如何解决呢?
- 假如一个 replica (id:1)其 LEO 是10,它曾经掉线好几天,这个 partition leader 的 offset 范畴是 [100, 800],那么 1 重启启动时,它心愿从 offset=10 的中央开始拉取数据时,这时候产生了 OutOfRange,不过跟下面不同的是这里是小于了 leader offset 的范畴,这种状况又该怎么解决?
以上两种状况都是 offset OutOfRange 的状况,只不过:一是 Fetch Offset 超过了 leader 的 LEO,二是 Fetch Offset 小于 leader 最小的 offset
在介绍 Kafka 解决方案之前,咱们先来本人思考一下这两种状况应该怎么解决?
- 如果 fetch offset 超过 leader 的 offset,这时候正本应该是回溯到 leader 的 LEO 地位(超过这个值的数据删除),而后再去进行正本同步,当然这种解决方案其实是无奈保障 leader 与 follower 数据的完全一致,再次发生 leader 切换时,可能会导致数据的可见性不统一,但既然用户容许了脏选举的产生,其实咱们是能够认为用户是能够接管这种状况产生的;
- 这种就比拟容易解决,首先清空本地的数据,因为本地的数据都曾经过期了,而后从 leader 的最小 offset 地位开始拉取数据。
下面是咱们比拟容易想出的解决方案,而在 Kafka 中,其解决方案也很相似,不过遇到状况比下面咱们列出的两种状况多了一些简单,其解决方案如下:
image
针对第一种状况,在 Kafka 中,实际上还会产生这样一种状况,1 在收到 OutOfRange 谬误时,这时去 leader 上获取的 LEO 值与最小的 offset 值,这时候却发现 leader 的 LEO 曾经从 800 变成了 1100(这个 topic-partition 的数据量增长得比拟快),再依照下面的解决方案就不太正当,Kafka 这边的解决方案是:遇到这种状况,进行重试就能够了,下次同步时就会失常了,然而仍然会有下面说的那个问题。
3.4 replica fetcher 线程的敞开
replica fetcher 线程敞开的条件,在三种状况下会敞开对这个 topic-partition 的拉取操作:
- stopReplica():broker 收到了 controller 发来的 StopReplica 申请,这时会开始敞开对指定 topic-partition 的同步线程;
- makeLeaders:这些 partition 的本地正本被选举成了 leader,这时候就会先进行对这些 topic-partition 正本同步线程;
- makeFollowers():后面曾经介绍过,这里实际上进行正本同步,而后再开启正本同步线程,因为这些 topic-partition 的 leader 可能产生了切换。
这里间接说线程敞开,其实不是很精确,因为每个 replica fetcher 线程操作的是多个 topic-partition,而在敞开的粒度是 partition 级别,只有这个线程调配的 partition 全副敞开后,这个线程才会真正被敞开。
stopReplica
StopReplica 的申请实际上是 Controller 发送过去的,这个在 controller 局部会讲述,它触发的条件有多种,比方:broker 下线、partition replica 迁徙等等。
makeLeaders
makeLeaders()
办法的调用是在 broker 上这个 partition 的正本被设置为 leader 时触发的,其实现如下:
image
调用 ReplicaFetcherManager 的 removeFetcherForPartitions()
删除对这些 topic-partition 的正本同步设置,这里在实现时,会遍历所有的 replica fetcher 线程,都执行 removePartitions()
办法来移除对应的 topic-partition 汇合。
image
removePartitions
这个办法的作用是:ReplicaFetcherThread 将这些 topic-partition 从本人要拉取的 partition 列表中移除。
image
ReplicaFetcherThread的敞开
后面介绍那么多,仿佛还是没有真正去敞开,那么 ReplicaFetcherThread 真正敞开是哪里操作的呢?
实际上 ReplicaManager 每次解决完 LeaderAndIsr 申请后,都会调用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads()
办法,如果 fetcher 线程要拉取的 topic-partition 汇合为空,那么就会敞开掉对应的 fetcher 线程。
image
作者:恋情小傻蛋
链接:https://www.jianshu.com/p/f9a...
起源:简书
著作权归作者所有。商业转载请分割作者取得受权,非商业转载请注明出处。