1、为什么会用到kafka(音讯队列的作用)

缓冲和削峰:上游数据时有突发流量,上游可能扛不住,或者上游没有足够多的机器来保障冗余,kafka在两头能够起到一个缓冲的作用,把音讯暂存在kafka中,上游服务就能够依照本人的节奏进行缓缓解决。

解耦和扩展性:我的项目开始的时候,并不能确定具体需要。音讯队列能够作为一个接口层,解耦重要的业务流程。只须要恪守约定,针对数据编程即可获取扩大能力。

冗余:能够采纳一对多的形式,一个生产者公布音讯,能够被多个订阅topic的服务生产到,供多个毫无关联的业务应用。

健壮性:音讯队列能够沉积申请,所以生产端业务即便短时间死掉,也不会影响次要业务的失常进行。

异步通信:很多时候,用户不想也不须要立刻解决音讯。音讯队列提供了异步解决机制,容许用户把一个音讯放入队列,但并不立刻解决它。想向队列中放入多少音讯就放多少,而后在须要的时候再去解决它们。

2、Kafka为什么这么快

利用 Partition 实现并行处理

不同 Partition 可位于不同机器,因而能够充分利用集群劣势,实现机器间的并行处理。另一方面,因为 Partition 在物理上对应一个文件夹,即便多个 Partition 位于同一个节点,也可通过配置让同一节点上的不同 Partition 置于不同的磁盘上,从而实现磁盘间的并行处理,充分发挥多磁盘的劣势。

利用了古代操作系统分页存储 Page Cache 来利用内存进步 I/O 效率

程序写

  • kafka的音讯是一直追加到文件中的,这个个性使kafka能够充分利用磁盘的程序读写性能
    因为古代的操作系统提供了预读和写技术,磁盘的程序写大多数状况下比随机写内存还要快。
    程序读写不须要硬盘磁头的寻道工夫,只需很少的扇区旋转工夫,所以速度远快于随机读写
  • Zero-copy 零拷技术缩小拷贝次数
  • 数据批量解决。合并小的申请,而后以流的形式进行交互,直顶网络下限。
    在很多状况下,零碎的瓶颈不是 CPU 或磁盘,而是网络IO。
    因而,除了操作系统提供的低级批处理之外,Kafka 的客户端和 broker 还会在通过网络发送数据之前,大数据培训在一个批处理中累积多条记录 (包含读和写)。记录的批处理摊派了网络往返的开销,应用了更大的数据包从而进步了带宽利用率。
  • Pull 拉模式 应用拉模式进行音讯的获取生产,与生产端解决能力相符。
  • 数据压缩
    Kafka还反对对音讯汇合进行压缩,Producer能够通过GZIP、Snappy、LZ4格局对音讯汇合进行压缩,数据压缩个别都是和批处理配套应用来作为优化伎俩的。
    压缩的益处就是缩小传输的数据量,加重对网络传输的压力
    Producer压缩之后,在Consumer需进行解压,尽管减少了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个老本很值得

3、Kafka名词解释以及工作形式

  • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker能够包容多个topic。
  • Producer:音讯生产者,向kafka broker发送音讯的客户端。
  • Consumer:音讯消费者,向kafka broker取音讯的客户端。
  • Topic:队列,生产者和消费者通过此进行对接。
  • Consumer Group (CG):若干个Consumer组成的汇合。这是kafka用来实现一个topic音讯的播送(发给所有的consumer)和单播(发给任意一个consumer)的伎俩。一个topic能够有多个CG。topic的音讯会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把音讯发给该CG中的一个consumer。如果须要实现播送,只有每个consumer有一个独立的CG就能够了。要实现单播只有所有的consumer在同一个CG。用CG还能够将consumer进行自在的分组而不须要屡次发送音讯到不同的topic。
  • Partition:分区,为了实现扩展性,一个topic能够散布在多个broker上,一个topic能够分为多个partition,每个partition都是一个有序的队列。partition中的每条音讯都会被调配一个有序的id(offset)。kafka只保障同一个partition中的音讯程序,不保障一个topic的整体(多个partition之间)的程序。生产者和消费者应用时能够指定topic中的具体partition。
  • 正本:在kafka中,每个主题能够有多个分区,每个分区又能够有多个正本。这多个正本中,只有一个是leader,而其余的都是follower正本。仅有leader正本能够对外提供服务。多个follower正本通常寄存在和leader正本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其余follower正本也能迅速”转正“,开始对外提供服务。
  • offset:生产偏移量,topic中的每个分区都是有序且程序不可变的记录集,并且一直地追加到结构化的log文件。分区中的每一个记录都会调配一个id号来示意程序,咱们称之为offset,offset用来惟一的标识分区中每一条记录。能够设置为“主动提交”与“手动提交”。

4、Kafka中的AR、ISR、OSR代表什么?HW、LEO、LSO等别离代表什么?

  • AR:Assigned Replicas 指以后分区中的所有正本。
  • ISR:In-Sync Replicas 正本同步队列。ISR中包含Leader和Foller。如果Leader过程挂掉,会在ISR队列中抉择一个服务作为新的Leader。有replica.lag.max.message(提早条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务器是否能够退出ISR正本队列,在0.10版本之后移除了replica.lag.max.message(提早条数)参数,防治服务频繁的进出队列。任意一个维度超过阈值都会把Follower踢出ISR,存入OSR(Outof-Sync Replicas)列表,新退出的Follower也会先寄存在OSR中。
  • OSR:(Out-of-Sync Replicas)非同步正本队列。与leader正本同步滞后过多的正本(不包含leader正本)组成OSR。如果OSR汇合中有follower正本“追上”了leader正本,那么leader正本会把它从OSR汇合转移至ISR汇合。默认状况下,当leader正本产生故障时,只有在ISR汇合中的正本才有资格被选举为新的leader,而在OSR汇合中的正本则没有任何机会(不过这个准则也能够通过批改unclean.leader.election.enable参数配置来扭转)。

unclean.leader.election.enable 为true的话,意味着非ISR汇合的broker 也能够参加选举,这样就有可能产生数据失落和数据不统一的状况,Kafka的可靠性就会升高;而如果unclean.leader.election.enable参数设置为false,Kafka的可用性就会升高。

  • ISR的伸缩:
    1)Leader跟踪保护ISR中follower滞后状态,落后太多或生效时,leade把他们从ISR剔除。
    2)OSR中follower“追上”Leader,在ISR中才有资格选举leader。

LEO (Log End Offset),标识以后日志文件中下一条待写入的音讯的offset。上图中offset为9的地位即为以后日志文件的 LEO,LEO 的大小相当于以后日志分区中最初一条音讯的offset值加1.分区 ISR 汇合中的每个正本都会保护本身的 LEO ,而 ISR 汇合中最小的 LEO 即为分区的 HW,对消费者而言只能生产 HW 之前的音讯。

HW:replica高水印值,正本中最新一条已提交音讯的位移。leader 的HW值也就是理论已提交音讯的范畴,每个replica都有HW值,但仅仅leader中的HW能力作为标示信息。什么意思呢,就是说当依照参数规范胜利实现音讯备份(胜利同步给follower replica后)才会更新HW的值,代表音讯实践上曾经不会失落,能够认为“已提交”。

5、ISR膨胀性:

启动 Kafka时候主动开启的两个定时工作,“isr-expiration"和”isr-change-propagation"。

isr-expiration:isr-expiration工作会周期性的检测每个分区是否须要缩减其ISR汇合,相当于一个纪检委员,巡逻尖子班时候发现有学生睡觉打牌看小说,就把它的座位移除尖子班,缩减ISR,宁缺毋滥。同样情理,如果follower数据同步赶上leader,那么该follower就能进入ISR尖子班,裁减。

下面对于ISR尖子班人员的所见,都会记录到isrChangeSet中,设想成是一个名单列表,谁能进,谁要出,都记录在案。

isr-change-propagation:作用就是查看isrChangeSet,依照名单上的信息移除和迁入,个别是2500ms查看一次,然而为了避免频繁膨胀裁减影响性能,不是每次都能做变动,必须满足:

1、上一次ISR汇合发生变化间隔当初曾经超过5秒,

2、上一次写入zookeeper的时候间隔当初曾经超过60秒。这两个条件都满足,那么就开始换座位!这两个条件能够由咱们来配置。

Kafka应用这种ISR膨胀的形式无效的衡量了数据可靠性与性能之间的关系。

6、kafka follower如何与leader同步数据

Kafka的复制机制既不是齐全的同步复制,也不是单纯的异步复制。齐全同步复制要求All Alive Follower都复制完,这条音讯才会被认为commit,这种复制形式极大的影响了吞吐率。而异步复制形式下,Follower异步的从Leader复制数据,数据只有被Leader写入log就被认为曾经commit,这种状况下,如果leader挂掉,会失落数据,kafka应用ISR的形式很好的平衡了确保数据不失落以及吞吐率。Follower能够批量的从Leader复制数据,而且Leader充分利用磁盘程序读以及send file(zero copy)机制,这样极大的进步复制性能,外部批量写磁盘,大幅缩小了Follower与Leader的音讯量差。

7、Zookeeper 在 Kafka 中的作用(晚期)

zookeeper 是一个分布式的协调组件,晚期版本的kafka用zk做meta信息存储,consumer的生产状态,group的治理以及 offset的值。思考到zk自身的一些因素以及整个架构较大概率存在单点问题,新版本中逐步弱化了zookeeper的作用。新的consumer应用了kafka外部的group coordination协定,也缩小了对zookeeper的依赖,

然而broker仍然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。

1、Broker注册

Broker是分布式部署并且相互独立,此时须要有一个注册零碎可能将整个集群中的Broker治理起来,此时就用到的Zookeeper。

在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokes/ids

2、Topic注册

在kafka中,同一个Topic的音讯会被分成多个分区并将其散布在多个Broker上,这些分区信息以及与Broker的对应关系也都是邮件Zookeeper保护,由专门的节点记录:/brokers/topics

3、消费者注册

消费者服务器在初始化启动时退出消费者分组的步骤如下:
注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创立一个属于本人的消费者节点,例如/consumer/[group_id]/ids/[consumer_id],实现节点创立后,消费者就会将本人订阅的Topic信息写入该长期节点。

对消费者分组中的消费者的变动注册监听:每个消费者都须要关注所属消费者分组中的其余消费者服务器的变动状况,即对/consumer/[group_id]/ids节点注册子节点变动的Watcher监听,一旦发现消费者新增或缩小,就触发消费者的负载平衡。

对Broker服务器变动注册监听:消费者须要对/broker/ids[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就依据具体情况来决定是否须要进行消费者负载平衡。

进行消费者负载平衡:为了让同一个Topic下不同分区的音讯尽量平衡地被多个消费者生产而进行消费者与音讯分区调配的过程,通常对于一个消费者分组,如果组内的消费者服务器产生变更或Broker服务器产生变更,会进行消费者负载平衡。

Offset记录

在消费者对指定音讯分区进行生产的过程中,须要定时地将分区音讯的生产进度Offset记录到Zookeeper上,以便对该消费者进行重启或者其余消费者从新接管该音讯分区的音讯生产后,可能从之前的进度持续进行音讯生产。Offset在Zookeeper中由一个专门节点进行记录,其节点门路为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。

4.生产者负载平衡

因为同一个Topic音讯会被分区并将其散布在多个Broker上,因而生产者须要将音讯正当地发送到这些分布式的Broker上,那么如何实现生产者的负载平衡,Kafka反对传统的四层负载平衡,也反对Zookeeper形式实现负载平衡。

四层负载平衡:依据生产者的IP地址和端口来为其圈定一个相关联的Broker。通常,一个生产者只会对应单个Broker,而后该生产者产生的音讯都发送到该Broker。这种形式逻辑简略,每个生产者不须要同其余零碎建设额定的TCP链接,只须要和Broker保护单个TCP连贯即可。然而无奈做到真正的负载平衡,因为理论零碎中的每个生产者产生的音讯量及每个Broker的音讯存储量都是不一样的,如果有些生产者产生的音讯远多于其余生产者的话,那么会导致不同的Broker接管到的音讯总数差别微小,同时,生产者也无奈实时感知到Broker的新增和删除。

应用Zookeeper进行负载平衡,因为每个Broker启动时,都会实现Broker注册过程,生产者会通过该节点的变动来动静地感知到Broker服务器列表的变更,这样就能够实现动静的负载平衡机制。

5.消费者负载平衡

与生产者类似,Kafka中的消费者同样须要进行负载平衡来实现多个消费者正当地从对应的Broker服务器上接管音讯,每个消费者分组蕴含若干消费者,每条音讯都只会发送给分组中的一个消费者,不同的消费者分组生产本人特定的Topic上面的音讯,互不烦扰。

6.分区与消费者的关系

生产组consumer group下有多个Consumer(消费者)。

对于每个消费者组(consumer group),Kafka都会为其调配一个全局惟一的Group ID,Group外部的所有消费者共享该ID。订阅的topic下的每个分区只能调配给某个group下的一个consumer(当然该分区还能够被调配给其余group)
同时,kafka为每个消费者调配一个Consumer ID,通常采纳“Hostname:UUID”模式示意。
在kafka中,规定了每个音讯分区只能被同组的一个消费者进行生产,因而,须要在zookeeper上记录音讯分区与Consumer之间的关系,每个消费者一旦确定了对一个生产分区的生产权力,须要将其Consumer ID写入到平Zookeeper对应音讯分区的长期节点上,例如:/consumers/[group_id]/owners/topic/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个音讯分区的示意,节点内容就是该音讯分区上消费者的Consumer ID。

7.补充

晚期版本的 kafka 用 zk 做 meta 信息存储,consumer 的生产状态,group 的治理以及 offse t的值。思考到zk自身的一些因素以及整个架构较大概率存在单点问题,新版本中的确逐步弱化了zookeeper的作用。新的consumer应用了kafka外部的group coordination协定,也缩小了对zookeeper的依赖

8、Kafka如何保证数据不失落

Kafka存在丢音讯的问题,次要产生在Broker、Producer、Consumer三种。

  1. Broker
    broker写数据时首先写到PageCache中,pageCache的数据通过linux的flusher程序异步批量存储至磁盘中,此过程称为刷盘。而pageCache位于内存。这部分数据会在断电后失落。
    刷盘触发条件有三:

被动调用sync或fsync函数
可用内存低于阀值
dirty data工夫达到阀值。dirty是pagecache的一个标识位,当有数据写入到pageCache时,pagecache被标注为dirty,数据刷盘当前,dirty标记革除。

kafka没有提供同步刷盘的形式,也就是说实践上要齐全让kafka保障单个broker不失落音讯是做不到的,只能通过调整刷盘机制的参数缓解该状况,比方:

缩小刷盘距离log.flush.interval.ms(在刷新到磁盘之前,任何topic中的音讯保留在内存中的最长工夫)
缩小刷盘数据量大小log.flush.interval.messages(在将音讯刷新到磁盘之前,在日志分区上累积的音讯数量)。

工夫越短,数据量越小,性能越差,然而失落的数据会变少,可靠性越好。这是一个选择题。
同时,Kafka通过producer和broker协同解决音讯失落的状况,一旦producer发现broker音讯失落,即可主动进行retry。retry次数可依据参数retries进行配置,超过指定次数会,此条音讯才会被判断失落。
producer和broker之间,通过ack机制来判断音讯是否失落。

acks=0,producer不期待broker的响应,效率最高,然而音讯很可能会丢。
acks=1,leader broker收到音讯后,不期待其余follower的响应,即返回ack。也能够了解为ack数为1。此时,如果follower还没有收到leader同步的音讯leader就挂了,那么音讯会失落。依照上图中的例子,如果leader收到音讯,胜利写入PageCache后,会返回ack,此时producer认为音讯发送胜利。但此时,依照上图,数据还没有被同步到follower。如果此时leader断电,数据会失落。
acks=-1,leader broker收到音讯后,挂起,期待所有ISR列表中的follower返回后果后,再返回ack。-1等效与all。这种配置下,只有leader写入数据到pagecache是不会返回ack的,还须要所有的ISR返回“胜利”才会触发ack。如果此时断电,producer能够晓得音讯没有被发送胜利,将会从新发送。如果在follower收到数据当前,胜利返回ack,leader断电,数据将存在于原来的follower中。
在从新选举当前,新的leader会持有该局部数据。数据从leader同步到follower,须要2步:
数据从pageCache被刷盘到disk。因为只有disk中的数据能力被同步到replica。
数据同步到replica,并且replica胜利将数据写入PageCache。在producer失去ack后,哪怕是所有机器都停电,数据也至多会存在于leader的磁盘内。
下面第三点提到了ISR的列表的follower,须要配合另一个参数能力更好的保障ack的有效性。ISR是Broker保护的一个“牢靠的follower列表”,in-sync Replica列表,broker的配置蕴含一个参数:min.insync.replicas。
该参数示意ISR中起码的正本数。如果不设置该值,ISR中的follower列表可能为空。此时相当于acks=1。

  1. Producer
    producer在发送数据时能够将多个申请进行合并后异步发送,合并后的申请首先缓存在本地buffer中,失常状况下,producer客户端的异步调用能够通过callback回调函数来解决音讯发送失败或者超时的状况,然而当呈现以下状况,将会呈现数据失落

producer异常中断,buffer中的数据将失落。
producer客户端内存不足,如果采取的策略是抛弃音讯(另一种策略是block阻塞),音讯也会失落。
音讯产生(异步)过快,导致挂起线程过多,内存不足,导致程序解体,音讯失落。

针对以上状况,能够有以下解决思路。

producer采纳同步形式发送音讯,或者生产数据时采纳阻塞的线程池,并且线程数不宜过多。整体思路就是管制音讯产生速度。
扩充buffer的容量配置,配置项为:buffer.memory。这种办法能够缓解数据失落的状况,但不能杜绝。

3.Consumer

Consumer生产音讯有以下几个步骤:

接管音讯
解决音讯
反馈处理结果

生产形式次要分为两种

主动提交offset,Automatic Offset Committing (enable.auto.commit=true)
手动提交offset,Manual Offset Control(enable.auto.commit=false)

Consumer主动提交机制是依据肯定的工夫距离,将收到的音讯进行commit,具体配置为:auto.commit.interval.ms。commit和生产的过程是异步的,也就是说可能存在生产过程未胜利,commit音讯就曾经提交,此时就会呈现音讯失落。
咱们可将提交类型改为手动提交,在生产实现后再进行提交,这样能够保障音讯“至多被生产一次”(at least once),但如果生产实现后在提交过程中呈现故障,则会呈现反复生产的状况,本章不探讨,下章解说。

9、简述Kafka的Rebalance机制

什么是 Rebalance

Rebalance 实质上是一种协定,规定了一个 Consumer Group 下的所有 consumer 如何达成统一,来调配订阅
Topic 的每个分区。
例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具备 100 个 partition 的 Topic。失常状况下,kafka 会为每个 Consumer 均匀的调配 5 个分区。这个调配的过程就是 Rebalance。

触发 Rebalance 的机会

Rebalance 的触发条件有3个。

组成员个数发生变化。例如有新的 consumer 实例退出该生产组或者来到组。
订阅的 Topic 个数发生变化。
订阅 Topic 的分区数发生变化。

Rebalance 产生时,Group 下所有 consumer 实例都会协调在一起独特参加,kafka 可能保障尽量达到最偏心的调配。然而
Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会进行工作,期待 Rebalance 过程实现。

Rebalance 过程

Rebalance 过程分为两步:JoinGroup 申请和 SyncGroup 申请。
JoinGroup :JoinGroup 申请的次要作用是将组成员订阅信息发送给领导者消费者,待领导者制订好调配计划后,重均衡流程进入到 SyncGroup 申请阶段。
SyncGroup:SyncGroup 申请的次要目标,就是让协调者把领导者制订的调配计划下发给各个组内成员。当所有成员都胜利接管到调配计划后,消费者组进入到 Stable 状态,即开始失常的生产工作。

10、如何保障音讯不被反复生产(保障音讯的幂等性)

幂等性:就是用户对于同一操作发动的一次申请或者屡次申请的后果是统一的,不会因为屡次点击而产生了副作用。

呈现起因:

起因1:Consumer在生产过程中,被强行kill掉消费者线程或异常中断(生产零碎宕机、重启等),导致理论生产后的数据,offset没有提交。
起因2:设置offset为主动提交,敞开kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能局部offset没提交,下次重启会反复生产。
起因3:生产超时导致消费者与集群断开连接,offset尚未提交,导致重均衡后反复生产。个别生产超时(session.time.out)有以下起因:并发过大,消费者忽然宕机,解决超时等。

解决思路:

进步生产能力,进步单条音讯的处理速度,例如对音讯解决中比 较耗时的步骤可通过异步的形式进行解决、利用多线程解决等。在缩短单条音讯生产时常的同时,依据理论场景可将session.time.out(Consumer心跳超时工夫)和max.poll.interval.ms(consumer两次poll的最大工夫距离)值设置大一点,防止不必要的rebalance,此外可适当减小max.poll.records的值( 示意每次生产的时候,获取多少条音讯),默认值是500,可依据理论音讯速率适当调小。这种思路可解决因生产工夫过长导致的反复生产问题, 对代码改变较小,但无奈相对防止反复生产问题。
依据业务状况制订:引入独自去重机制,例如生成音讯时,在音讯中退出惟一标识符如主键id。写入时依据逐步主键判断update还是insert。如果写redis,则每次依据主键id进行set即可,人造幂等性。或者应用redis作为缓冲,将id首先写入redis进行反复判断,而后在进行后续操作。

11、为什么Kafka不反对读写拆散?

在 Kafka 中,生产者写入音讯、消费者读取音讯的操作都是与 leader 正本进行交互的,从 而实现的是一种主写主读的生产生产模型。
Kafka 并不反对主写从读,因为主写从读有 2 个很明 显的毛病:

数据一致性问题。数据从主节点转到从节点必然会有一个延时的工夫窗口,这个工夫 窗口会导致主从节点之间的数据不统一。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值批改为 Y,那么在这个变更告诉到从节点之前,利用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不统一的问题。
延时问题。相似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程须要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会消耗肯定的工夫。而在 Kafka 中,主从同步会比 Redis 更加耗时,它须要经验网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的利用而言,主写从读的性能并不太实用。

12、Kafka选举机制

Kafka选举次要分为以下三种:

控制器(Broker)选举机制

分区正本选举机制

生产组选举机制

控制器选举

控制器是Kafka的外围组件,它的次要作用是在Zookeeper的帮忙下治理和协调整个Kafka集群包含所有分区与正本的状态。集群中任意一个Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器。

集群中第一个启动的Broker会通过在Zookeeper中创立长期节点/controller来让本人成为控制器,其余Broker启动时也会在zookeeper中创立长期节点,然而发现节点曾经存在,所以它们会收到一个异样,意识到控制器曾经存在,那么就会在Zookeeper中创立watch对象,便于它们收到控制器变更的告诉。

如果控制器与Zookeeper断开连接或异样退出,其余broker通过watch收到控制器变更的告诉,就会尝试创立长期节点/controller,如果有一个Broker创立胜利,那么其余broker就会收到创立异样告诉,代表控制器曾经选举胜利,其余Broker只需创立watch对象即可。
控制器作用

主题治理:
创立、删除Topic,以及减少Topic分区等操作都是由控制器执行。

分区重调配:

执行Kafka的reassign脚本对Topic分区重调配的操作,也是由控制器实现。

如果集群中有一个Broker异样退出,控制器会查看这个broker是否有分区的正本leader,如果有那么这个分区就须要一个新的leader,此时控制器就会去遍历其余正本,决定哪一个成为新的leader,同时更新分区的ISR汇合。

如果有一个Broker退出集群中,那么控制器就会通过Broker ID去判断新退出的Broker中是否含有现有分区的正本,如果有,就会从分区正本中去同步数据。

Preferred leader选举:

因为在Kafka集群长时间运行中,broker的宕机或解体是不可避免的,leader就会产生转移,即便broker从新回来,也不会是leader了。在泛滥leader的转移过程中,就会产生leader不平衡景象,可能一小部分broker上有大量的leader,影响了整个集群的性能,所以就须要把leader调整回最后的broker上,这就须要Preferred leader选举。

集群成员治理:

控制器可能监控新broker的减少,broker的被动敞开与被动宕机,进而做其余工作。这也是利用Zookeeper的ZNode模型和Watcher机制,控制器会监听Zookeeper中/brokers/ids下长期节点的变动。同时对broker中的leader节点进行调整。

比方,控制器组件会利用 Watch 机制查看 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创立专属的 znode 节点。一旦创立结束,ZooKeeper 会通过 Watch 机制将音讯告诉推送给控制器,这样,控制器就能主动地感知到这个变动,进而开启后续的新增 Broker 作业。

侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:长期节点。每个 Broker 启动后,会在 /brokers/ids 下创立一个长期 znode。当 Broker 宕机或被动敞开后,该 Broker 与 ZooKeeper 的会话完结,这个 znode 会被主动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能晓得有 Broker 敞开或宕机了,从而进行“善后”。

元数据服务:

控制器上保留了最全的集群元数据信息,其余所有broker会定期接管控制器发来的元数据更新申请,从而更新其内存中的缓存数据。

分区正本选举机制
产生正本选举的状况:

创立主题
减少分区
分区下线(分区中原先的leader正本下线,此时分区须要选举一个新的leader上线来对外提供服务)
分区重调配

分区leader正本的选举由Kafka控制器负责具体实施。次要过程如下:

从Zookeeper中读取以后分区的所有ISR(in-sync replicas)汇合。
调用配置的分区抉择算法抉择分区的leader。

分区正本分为ISR(同步正本)和OSR(非同步正本),当leader产生故障时,只有“同步正本”才能够被选举为leader。选举时依照汇合中正本的程序查找第一个存活的正本,并且这个正本在ISR汇合中。

同时kafka反对OSR(非同步正本)也加入选举,Kafka broker端提供了一个参数unclean.leader.election.enable,用于管制是否容许非同步正本参加leader选举;如果开启,则当 ISR为空时就会从这些正本中选举新的leader,这个过程称为 Unclean leader选举。
能够依据理论的业务场景抉择是否开启Unclean leader选举。开启 Unclean 领导者选举可能会造成数据失落,但益处是,它使得分区 Leader 正本始终存在,不至于进行对外提供服务,因而晋升了高可用性。个别倡议是敞开Unclean leader选举,因为通常数据的一致性要比可用性重要。

生产组(Consumer Group)选主

在Kafka的生产端,会有一个消费者协调器以及生产组,组协调器(Group Coordinator)须要为生产组内的消费者选举出一个生产组的leader。

如果生产组内还没有leader,那么第一个退出生产组的消费者即为生产组的leader,如果某一个时刻leader消费者因为某些起因退出了生产组,那么就会从新选举leader,选举源码如下:

private val members = new mutable.HashMap[String, MemberMetadata]leaderId = members.keys.headOption

在组协调器中消费者的信息是以HashMap的模式存储的,其中key为消费者的member_id,而value是消费者相干的元数据信息。而leader的取值为HashMap中的第一个键值对的key(这种选举形式等同于随机)。
生产组的Leader和Coordinator没有关联。生产组的leader负责Rebalance过程中生产调配计划的制订。

13、脑裂问题

controller挂掉后,Kafka集群会从新选举一个新的controller。这外面存在一个问题,很难确定之前的controller节点是挂掉还是只是短暂性的故障。如果之前挂掉的controller又失常了,他并不知道本人曾经被取代了,那么此时集群中会呈现两台controller。

其实这种状况是很容易产生。比方,某个controller因为GC而被认为曾经挂掉,并抉择了一个新的controller。在GC的状况下,在最后的controller眼中,并没有扭转任何货色,该Broker甚至不晓得它曾经暂停了。因而,它将持续充当以后controller,这是分布式系统中的常见状况,称为脑裂。

如果,处于沉闷状态的controller进入了长时间的GC暂停。它的ZooKeeper会话过期了,之前注册的/controller节点被删除。集群中其余Broker会收到zookeeper的这一告诉。

因为集群中必须存在一个controller Broker,所以当初每个Broker都试图尝试成为新的controller。假如Broker 2速度比拟快,成为了最新的controller Broker。此时,每个Broker会收到Broker2成为新的controller的告诉,因为Broker3正在进行"stop the world"的GC,可能不会收到Broker2成为最新的controller的告诉。

等到Broker3的GC实现之后,仍会认为本人是集群的controller,在Broker3的眼中如同什么都没有产生一样。

当初,集群中呈现了两个controller,它们可能一起收回具备抵触的命令,就会呈现脑裂的景象。如果对这种状况不加以解决,可能会导致重大的不统一。所以须要一种办法来辨别谁是集群以后最新的Controller。

Kafka是通过应用epoch number(纪元编号,也称为隔离令牌)来实现的。epoch number只是枯燥递增的数字,第一次选出Controller时,epoch number值为

1,如果再次选出新的Controller,则epoch number将为

2,顺次枯燥递增。

每个新选出的controller通过Zookeeper 的条件递增操作取得一个全新的、数值更大的epoch number 。其余Broker 在晓得以后epoch number 后,如果收到由controller收回的蕴含较旧(较小)epoch number的音讯,就会疏忽它们,即Broker依据最大的epoch number来辨别以后最新的controller。

上图,Broker3向Broker1收回命令:让Broker1上的某个分区正本成为leader,该音讯的epoch number值为1。于此同时,Broker2也向Broker1发送了雷同的命令,不同的是,该音讯的epoch number值为2,此时Broker1只服从Broker2的命令(因为其epoch number较大),会疏忽Broker3的命令,从而防止脑裂的产生。

14、如何为Kafka集群抉择适合的Topics/Partitions数量

在kafka中,单个patition是kafka并行操作的最小单元。在producer和broker端,向每一个分区写入数据是能够齐全并行化的,此时,能够通过加大硬件资源的利用率来晋升零碎的吞吐量,例如对数据进行压缩。在consumer段,kafka只容许单个partition的数据被一个consumer线程生产。因而,在consumer端,每一个Consumer Group外部的consumer并行度齐全依赖于被生产的分区数量。综上所述,通常状况下,在一个Kafka集群中,partition的数量越多,意味着能够达到的吞吐量越大。

咱们能够粗略地通过吞吐量来计算kafka集群的分区数量。假如对于单个partition,producer端的可达吞吐量为p,Consumer端的可达吞吐量为c,冀望的指标吞吐量为t,那么集群所须要的partition数量至多为max(t/p,t/c)。在producer端,单个分区的吞吐量大小会受到批量大小、数据压缩办法、 确认类型(同步/异步)、复制因子等配置参数的影响。通过测试,在producer端,单个partition的吞吐量通常是在10MB/s左右。在consumer端,单个partition的吞吐量依赖于consumer端每个音讯的应用逻辑处理速度。因而,咱们须要对consumer端的吞吐量进行测量。

15、Kafka 分区数能够减少或缩小吗?为什么?

kafka反对分区数减少

例如咱们能够应用 bin/kafka-topics.sh -alter --topic --topic topic-name --partitions 3 命令将本来分区数为1得topic-name设置为3。

当主题中的音讯蕴含有key时(即key不为null),依据key来计算分区的行为就会有所影响。当topic-config的分区数为1时,不论音讯的key为何值,音讯都会发往这一个分区中;当分区数减少到3时,那么就会依据音讯的key来计算分区号,本来发往分区0的音讯当初有可能会发往分区1或者分区2中。如此还会影响既定音讯的程序,所以在减少分区数时肯定要三思而后行。对于基于key计算的主题而言,倡议在一开始就设置好分区数量,防止当前对其进行调整。

Kafka 不反对缩小分区数。

依照Kafka现有的代码逻辑而言,此性能齐全能够实现,不过也会使得代码的复杂度急剧增大。
实现此性能须要思考的因素很多,比方删除掉的分区中的音讯该作何解决?如果随着分区一起隐没则音讯的可靠性得不到保障;

如果须要保留则又须要思考如何保留。间接存储到现有分区的尾部,音讯的工夫戳就不会递增,如此对于Spark、Flink这类须要音讯工夫戳(事件工夫)的组件将会受到影响;

如果扩散插入到现有的分区中,那么在音讯量很大的时候,外部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何失去保障?

与此同时,程序性问题、事务性问题、以及分区和正本的状态机切换问题都是不得不面对的。反观这个性能的收益点却是很低,如果真的须要实现此类的性能,齐全能够从新创立一个分区数较小的主题,而后将现有主题中的音讯依照既定的逻辑复制过来即可。

17、谈谈你对 Kafka 幂等的理解?

Kafka幂等性次要针对生产者而言。防止生产者数据反复提交至Kafka broker中并落盘。

在失常状况下,Producer向Broker发送音讯,Broker将音讯追加写到对应的流(即某一Topic的某一Partition)中并落盘,并向Producer返回ACK信号,示意确认收到。

然而Producer和Broker之间的通信总有可能出现异常,如果音讯曾经写入,但ACK在半途失落了,Producer就会进行retry操作再次发送该音讯,造成反复写入。

为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。

PID。每个新的Producer在初始化的时候会被调配一个惟一的PID,这个PID对用户是不可见的。

Sequence Numbler。对于每个PID,该Producer发送数据的每个都对应一个从0开始枯燥递增的Sequence Number

Broker端在缓存中保留了这seq number,对于接管的每条音讯,如果其序号比Broker缓存中序号大于1则承受它,否则将其抛弃,这样就能够实现了音讯反复提交了.然而只能保障单个Producer对于同一个的Exactly Once语义


Producer应用幂等性的示例非常简单,与失常状况下Producer应用相比变动不大,只须要
把Producer的配置enable.idempotence设置为true即可,如下所示:

Properties props = new Properties();props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");//当enable.idempotence为true时acks默认为 all// props.put("acks", "all"); props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer(props);producer.send(new ProducerRecord(topic, "test");

Prodcuer 幂等性对外保留的接口非常简单,其底层的实现对下层利用做了很好的封装,应用层并不需要去关怀具体的实现细节,对用户十分敌对

Kafka的幂等性实现了对于单个Producer会话、单个TopicPartition级别的不重不漏,也就是最细粒度的保障。深圳大数据培训如果Producer重启(PID发生变化),或者写入是跨Topic、跨Partition的,单纯的幂等性就会生效,须要更高级别的事务性来解决了。当然事务性的原理更加简单

18、谈谈你对 Kafka 事务的理解?
幂等性能够保障单个Producer会话、单个TopicPartition、单个会话session的不重不漏,如果Producer重启,或者是写入跨Topic、跨Partition的音讯,幂等性无奈保障。此时须要用到Kafka事务。
Kafka 的事务处理,次要是容许利用能够把生产和生产的 batch 解决(波及多个 Partition)在一个原子单元内实现,操作要么全副实现、要么全副失败。为了实现这种机制,咱们须要利用能提供一个惟一 id,即便故障复原后也不会扭转,这个 id 就是 TransactionnalId(也叫 txn.id),txn.id 能够跟外部的 PID 1:1 调配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 外部主动生成的(并且故障复原后这个 PID 会变动),有了 txn.id 这个机制,就能够实现多 partition、跨会话的 EOS 语义。
当用户应用 Kafka 的事务性时,Kafka 能够做到的保障:

跨会话的幂等性写入:即便两头故障,复原后仍然能够放弃幂等性;
跨会话的事务复原:如果一个利用实例挂了,启动的下一个实例仍然能够保障上一个事务实现(commit 或者 abort);
跨多个 Topic-Partition 的幂等性写入,Kafka 能够保障跨多个 Topic-Partition 的数据要么全副写入胜利,要么全副失败,不会呈现中间状态。

事务性示例
Kafka 事务性的应用办法也非常简单,用户只须要在 Producer 的配置中配置 transactional.id,通过 initTransactions() 初始化事务状态信息,再通过 beginTransaction() 标识一个事务的开始,而后通过 commitTransaction() 或 abortTransaction() 对事务进行 commit 或 abort,示例如下所示:
生产者:

Properties props = new Properties();props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("client.id", "ProducerTranscationnalExample");props.put("bootstrap.servers", "localhost:9092");props.put("transactional.id", "test-transactional");props.put("acks", "all");KafkaProducer producer = new KafkaProducer(props);producer.initTransactions();try {    String msg = "matt test";    producer.beginTransaction();    producer.send(new ProducerRecord(topic, "0", msg.toString()));    producer.send(new ProducerRecord(topic, "1", msg.toString()));    producer.send(new ProducerRecord(topic, "2", msg.toString()));    producer.commitTransaction();} catch (ProducerFencedException e1) {    e1.printStackTrace();    producer.close();} catch (KafkaException e2) {    e2.printStackTrace();    producer.abortTransaction();}producer.close();

消费者:
消费者应该设置提交事务的隔离级别

properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");

Kafka中只有两种事务隔离级别:read_committed、read_uncommitted
设置为read_committed时候是生产者事务已提交的数据能力读取到。在执行 commitTransaction() 或 abortTransaction() 办法前,设置为“read_committed”的生产端利用是生产不到这些音讯的,不过在 KafkaConsumer 外部会缓存这些音讯,直到生产者执行 commitTransaction() 办法之后它能力将这些音讯推送给生产端利用。同时KafkaConsumer会依据分区对数据进行整合,推送时依照分区程序进行推送。而不是依照数据发送程序。
反之,如果生产者执行了 abortTransaction() 办法,那么 KafkaConsumer 会将这些缓存的音讯抛弃而不推送给生产端利用。
设置为read_uncommitted时候能够读取到未提交的数据(报错终止前的数据)