关于kafka:深入解析Kafka的offset管理

49次阅读

共计 19698 个字符,预计需要花费 50 分钟才能阅读完成。

1、为什么会用到 kafka(音讯队列的作用)

缓冲和削峰:上游数据时有突发流量,上游可能扛不住,或者上游没有足够多的机器来保障冗余,kafka 在两头能够起到一个缓冲的作用,把音讯暂存在 kafka 中,上游服务就能够依照本人的节奏进行缓缓解决。

解耦和扩展性:我的项目开始的时候,并不能确定具体需要。音讯队列能够作为一个接口层,解耦重要的业务流程。只须要恪守约定,针对数据编程即可获取扩大能力。

冗余:能够采纳一对多的形式,一个生产者公布音讯,能够被多个订阅 topic 的服务生产到,供多个毫无关联的业务应用。

健壮性:音讯队列能够沉积申请,所以生产端业务即便短时间死掉,也不会影响次要业务的失常进行。

异步通信:很多时候,用户不想也不须要立刻解决音讯。音讯队列提供了异步解决机制,容许用户把一个音讯放入队列,但并不立刻解决它。想向队列中放入多少音讯就放多少,而后在须要的时候再去解决它们。

2、Kafka 为什么这么快

利用 Partition 实现并行处理

不同 Partition 可位于不同机器,因而能够充分利用集群劣势,实现机器间的并行处理。另一方面,因为 Partition 在物理上对应一个文件夹,即便多个 Partition 位于同一个节点,也可通过配置让同一节点上的不同 Partition 置于不同的磁盘上,从而实现磁盘间的并行处理,充分发挥多磁盘的劣势。

利用了古代操作系统分页存储 Page Cache 来利用内存进步 I/O 效率

程序写

  • kafka 的音讯是一直追加到文件中的,这个个性使 kafka 能够充分利用磁盘的程序读写性能
    因为古代的操作系统提供了预读和写技术,磁盘的程序写大多数状况下比随机写内存还要快。
    程序读写不须要硬盘磁头的寻道工夫,只需很少的扇区旋转工夫,所以速度远快于随机读写
  • Zero-copy 零拷技术缩小拷贝次数
  • 数据批量解决。合并小的申请,而后以流的形式进行交互,直顶网络下限。
    在很多状况下,零碎的瓶颈不是 CPU 或磁盘,而是网络 IO。
    因而,除了操作系统提供的低级批处理之外,Kafka 的客户端和 broker 还会在通过网络发送数据之前,大数据培训在一个批处理中累积多条记录 (包含读和写)。记录的批处理摊派了网络往返的开销,应用了更大的数据包从而进步了带宽利用率。
  • Pull 拉模式 应用拉模式进行音讯的获取生产,与生产端解决能力相符。
  • 数据压缩
    Kafka 还反对对音讯汇合进行压缩,Producer 能够通过 GZIP、Snappy、LZ4 格局对音讯汇合进行压缩,数据压缩个别都是和批处理配套应用来作为优化伎俩的。
    压缩的益处就是缩小传输的数据量,加重对网络传输的压力
    Producer 压缩之后,在 Consumer 需进行解压,尽管减少了 CPU 的工作,但在对大数据处理上,瓶颈在网络上而不是 CPU,所以这个老本很值得

3、Kafka 名词解释以及工作形式

  • Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 能够包容多个 topic。
  • Producer:音讯生产者,向 kafka broker 发送音讯的客户端。
  • Consumer:音讯消费者,向 kafka broker 取音讯的客户端。
  • Topic:队列,生产者和消费者通过此进行对接。
  • Consumer Group(CG):若干个 Consumer 组成的汇合。这是 kafka 用来实现一个 topic 音讯的播送(发给所有的 consumer)和单播(发给任意一个 consumer)的伎俩。一个 topic 能够有多个 CG。topic 的音讯会复制(不是真的复制,是概念上的)到所有的 CG,但每个 CG 只会把音讯发给该 CG 中的一个 consumer。如果须要实现播送,只有每个 consumer 有一个独立的 CG 就能够了。要实现单播只有所有的 consumer 在同一个 CG。用 CG 还能够将 consumer 进行自在的分组而不须要屡次发送音讯到不同的 topic。
  • Partition:分区,为了实现扩展性,一个 topic 能够散布在多个 broker 上,一个 topic 能够分为多个 partition,每个 partition 都是一个有序的队列。partition 中的每条音讯都会被调配一个有序的 id(offset)。kafka 只保障同一个 partition 中的音讯程序,不保障一个 topic 的整体(多个 partition 之间)的程序。生产者和消费者应用时能够指定 topic 中的具体 partition。
  • 正本:在 kafka 中,每个主题能够有多个分区,每个分区又能够有多个正本。这多个正本中,只有一个是 leader,而其余的都是 follower 正本。仅有 leader 正本能够对外提供服务。多个 follower 正本通常寄存在和 leader 正本不同的 broker 中。通过这样的机制实现了高可用,当某台机器挂掉后,其余 follower 正本也能迅速”转正“,开始对外提供服务。
  • offset:生产偏移量,topic 中的每个分区都是有序且程序不可变的记录集,并且一直地追加到结构化的 log 文件。分区中的每一个记录都会调配一个 id 号来示意程序,咱们称之为 offset,offset 用来惟一的标识分区中每一条记录。能够设置为“主动提交”与“手动提交”。

4、Kafka 中的 AR、ISR、OSR 代表什么?HW、LEO、LSO 等别离代表什么?

  • AR:Assigned Replicas 指以后分区中的所有正本。
  • ISR:In-Sync Replicas 正本同步队列。ISR 中包含 Leader 和 Foller。如果 Leader 过程挂掉,会在 ISR 队列中抉择一个服务作为新的 Leader。有 replica.lag.max.message(提早条数)和 replica.lag.time.max.ms(延迟时间)两个参数决定一台服务器是否能够退出 ISR 正本队列,在 0.10 版本之后移除了 replica.lag.max.message(提早条数)参数,防治服务频繁的进出队列。任意一个维度超过阈值都会把 Follower 踢出 ISR,存入 OSR(Outof-Sync Replicas)列表,新退出的 Follower 也会先寄存在 OSR 中。
  • OSR:(Out-of-Sync Replicas)非同步正本队列。与 leader 正本同步滞后过多的正本(不包含 leader 正本)组成 OSR。如果 OSR 汇合中有 follower 正本“追上”了 leader 正本,那么 leader 正本会把它从 OSR 汇合转移至 ISR 汇合。默认状况下,当 leader 正本产生故障时,只有在 ISR 汇合中的正本才有资格被选举为新的 leader,而在 OSR 汇合中的正本则没有任何机会(不过这个准则也能够通过批改 unclean.leader.election.enable 参数配置来扭转)。

unclean.leader.election.enable 为 true 的话,意味着非 ISR 汇合的 broker 也能够参加选举,这样就有可能产生数据失落和数据不统一的状况,Kafka 的可靠性就会升高;而如果 unclean.leader.election.enable 参数设置为 false,Kafka 的可用性就会升高。

  • ISR 的伸缩:
    1)Leader 跟踪保护 ISR 中 follower 滞后状态,落后太多或生效时,leade 把他们从 ISR 剔除。
    2)OSR 中 follower“追上”Leader,在 ISR 中才有资格选举 leader。

LEO(Log End Offset),标识以后日志文件中下一条待写入的音讯的 offset。上图中 offset 为 9 的地位即为以后日志文件的 LEO,LEO 的大小相当于以后日志分区中最初一条音讯的 offset 值加 1. 分区 ISR 汇合中的每个正本都会保护本身的 LEO,而 ISR 汇合中最小的 LEO 即为分区的 HW,对消费者而言只能生产 HW 之前的音讯。

HW:replica 高水印值,正本中最新一条已提交音讯的位移。leader 的 HW 值也就是理论已提交音讯的范畴,每个 replica 都有 HW 值,但仅仅 leader 中的 HW 能力作为标示信息。什么意思呢,就是说当依照参数规范胜利实现音讯备份(胜利同步给 follower replica 后)才会更新 HW 的值,代表音讯实践上曾经不会失落,能够认为“已提交”。

5、ISR 膨胀性:

启动 Kafka 时候主动开启的两个定时工作,“isr-expiration” 和”isr-change-propagation”。

isr-expiration:isr-expiration 工作会周期性的检测每个分区是否须要缩减其 ISR 汇合,相当于一个纪检委员,巡逻尖子班时候发现有学生睡觉打牌看小说,就把它的座位移除尖子班,缩减 ISR,宁缺毋滥。同样情理,如果 follower 数据同步赶上 leader,那么该 follower 就能进入 ISR 尖子班,裁减。

下面对于 ISR 尖子班人员的所见,都会记录到 isrChangeSet 中,设想成是一个名单列表,谁能进,谁要出,都记录在案。

isr-change-propagation:作用就是查看 isrChangeSet,依照名单上的信息移除和迁入,个别是 2500ms 查看一次,然而为了避免频繁膨胀裁减影响性能,不是每次都能做变动,必须满足:

1、上一次 ISR 汇合发生变化间隔当初曾经超过 5 秒,

2、上一次写入 zookeeper 的时候间隔当初曾经超过 60 秒。这两个条件都满足,那么就开始换座位!这两个条件能够由咱们来配置。

Kafka 应用这种 ISR 膨胀的形式无效的衡量了数据可靠性与性能之间的关系。

6、kafka follower 如何与 leader 同步数据

Kafka 的复制机制既不是齐全的同步复制,也不是单纯的异步复制。齐全同步复制要求 All Alive Follower 都复制完,这条音讯才会被认为 commit,这种复制形式极大的影响了吞吐率。而异步复制形式下,Follower 异步的从 Leader 复制数据,数据只有被 Leader 写入 log 就被认为曾经 commit,这种状况下,如果 leader 挂掉,会失落数据,kafka 应用 ISR 的形式很好的平衡了确保数据不失落以及吞吐率。Follower 能够批量的从 Leader 复制数据,而且 Leader 充分利用磁盘程序读以及 send file(zero copy)机制,这样极大的进步复制性能,外部批量写磁盘,大幅缩小了 Follower 与 Leader 的音讯量差。

7、Zookeeper 在 Kafka 中的作用(晚期)

zookeeper 是一个分布式的协调组件,晚期版本的 kafka 用 zk 做 meta 信息存储,consumer 的生产状态,group 的治理以及 offset 的值。思考到 zk 自身的一些因素以及整个架构较大概率存在单点问题,新版本中逐步弱化了 zookeeper 的作用。新的 consumer 应用了 kafka 外部的 group coordination 协定,也缩小了对 zookeeper 的依赖,

然而 broker 仍然依赖于 ZK,zookeeper 在 kafka 中还用来选举 controller 和 检测 broker 是否存活等等。

1、Broker 注册

Broker 是分布式部署并且相互独立,此时须要有一个注册零碎可能将整个集群中的 Broker 治理起来,此时就用到的 Zookeeper。

在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点:/brokes/ids

2、Topic 注册

在 kafka 中,同一个 Topic 的音讯会被分成多个分区并将其散布在多个 Broker 上,这些分区信息以及与 Broker 的对应关系也都是邮件 Zookeeper 保护,由专门的节点记录:/brokers/topics

3、消费者注册

消费者服务器在初始化启动时退出消费者分组的步骤如下:
注册到消费者分组。每个消费者服务器启动时,都会到 Zookeeper 的指定节点下创立一个属于本人的消费者节点,例如 /consumer/[group_id]/ids/[consumer_id],实现节点创立后,消费者就会将本人订阅的 Topic 信息写入该长期节点。

对消费者分组中的消费者的变动注册监听:每个消费者都须要关注所属消费者分组中的其余消费者服务器的变动状况,即对 /consumer/[group_id]/ids 节点注册子节点变动的 Watcher 监听,一旦发现消费者新增或缩小,就触发消费者的负载平衡。

对 Broker 服务器变动注册监听:消费者须要对 /broker/ids[0-N]中的节点进行监听,如果发现 Broker 服务器列表发生变化,那么就依据具体情况来决定是否须要进行消费者负载平衡。

进行消费者负载平衡:为了让同一个 Topic 下不同分区的音讯尽量平衡地被多个消费者生产而进行消费者与音讯分区调配的过程,通常对于一个消费者分组,如果组内的消费者服务器产生变更或 Broker 服务器产生变更,会进行消费者负载平衡。

Offset 记录

在消费者对指定音讯分区进行生产的过程中,须要定时地将分区音讯的生产进度 Offset 记录到 Zookeeper 上,以便对该消费者进行重启或者其余消费者从新接管该音讯分区的音讯生产后,可能从之前的进度持续进行音讯生产。Offset 在 Zookeeper 中由一个专门节点进行记录,其节点门路为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是 Offset 的值。

4. 生产者负载平衡

因为同一个 Topic 音讯会被分区并将其散布在多个 Broker 上,因而生产者须要将音讯正当地发送到这些分布式的 Broker 上,那么如何实现生产者的负载平衡,Kafka 反对传统的四层负载平衡,也反对 Zookeeper 形式实现负载平衡。

四层负载平衡:依据生产者的 IP 地址和端口来为其圈定一个相关联的 Broker。通常,一个生产者只会对应单个 Broker,而后该生产者产生的音讯都发送到该 Broker。这种形式逻辑简略,每个生产者不须要同其余零碎建设额定的 TCP 链接,只须要和 Broker 保护单个 TCP 连贯即可。然而无奈做到真正的负载平衡,因为理论零碎中的每个生产者产生的音讯量及每个 Broker 的音讯存储量都是不一样的,如果有些生产者产生的音讯远多于其余生产者的话,那么会导致不同的 Broker 接管到的音讯总数差别微小,同时,生产者也无奈实时感知到 Broker 的新增和删除。

应用 Zookeeper 进行负载平衡,因为每个 Broker 启动时,都会实现 Broker 注册过程,生产者会通过该节点的变动来动静地感知到 Broker 服务器列表的变更,这样就能够实现动静的负载平衡机制。

5. 消费者负载平衡

与生产者类似,Kafka 中的消费者同样须要进行负载平衡来实现多个消费者正当地从对应的 Broker 服务器上接管音讯,每个消费者分组蕴含若干消费者,每条音讯都只会发送给分组中的一个消费者,不同的消费者分组生产本人特定的 Topic 上面的音讯,互不烦扰。

6. 分区与消费者的关系

生产组 consumer group 下有多个 Consumer(消费者)。

对于每个消费者组(consumer group),Kafka 都会为其调配一个全局惟一的 Group ID,Group 外部的所有消费者共享该 ID。订阅的 topic 下的每个分区只能调配给某个 group 下的一个 consumer(当然该分区还能够被调配给其余 group)
同时,kafka 为每个消费者调配一个 Consumer ID,通常采纳“Hostname:UUID”模式示意。
在 kafka 中,规定了每个音讯分区只能被同组的一个消费者进行生产,因而,须要在 zookeeper 上记录音讯分区与 Consumer 之间的关系,每个消费者一旦确定了对一个生产分区的生产权力,须要将其 Consumer ID 写入到平 Zookeeper 对应音讯分区的长期节点上,例如:/consumers/[group_id]/owners/topic/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个音讯分区的示意,节点内容就是该音讯分区上消费者的 Consumer ID。

7. 补充

晚期版本的 kafka 用 zk 做 meta 信息存储,consumer 的生产状态,group 的治理以及 offse t 的值。思考到 zk 自身的一些因素以及整个架构较大概率存在单点问题,新版本中的确逐步弱化了 zookeeper 的作用。新的 consumer 应用了 kafka 外部的 group coordination 协定,也缩小了对 zookeeper 的依赖

8、Kafka 如何保证数据不失落

Kafka 存在丢音讯的问题,次要产生在 Broker、Producer、Consumer 三种。

  1. Broker
    broker 写数据时首先写到 PageCache 中,pageCache 的数据通过 linux 的 flusher 程序异步批量存储至磁盘中,此过程称为刷盘。而 pageCache 位于内存。这部分数据会在断电后失落。
    刷盘触发条件有三:

被动调用 sync 或 fsync 函数
可用内存低于阀值
dirty data 工夫达到阀值。dirty 是 pagecache 的一个标识位,当有数据写入到 pageCache 时,pagecache 被标注为 dirty,数据刷盘当前,dirty 标记革除。

kafka 没有提供同步刷盘的形式,也就是说实践上要齐全让 kafka 保障单个 broker 不失落音讯是做不到的,只能通过调整刷盘机制的参数缓解该状况,比方:

缩小刷盘距离 log.flush.interval.ms(在刷新到磁盘之前,任何 topic 中的音讯保留在内存中的最长工夫)
缩小刷盘数据量大小 log.flush.interval.messages(在将音讯刷新到磁盘之前,在日志分区上累积的音讯数量)。

工夫越短,数据量越小,性能越差,然而失落的数据会变少,可靠性越好。这是一个选择题。
同时,Kafka 通过 producer 和 broker 协同解决音讯失落的状况,一旦 producer 发现 broker 音讯失落,即可主动进行 retry。retry 次数可依据参数 retries 进行配置,超过指定次数会,此条音讯才会被判断失落。
producer 和 broker 之间,通过 ack 机制来判断音讯是否失落。

acks=0,producer 不期待 broker 的响应,效率最高,然而音讯很可能会丢。
acks=1,leader broker 收到音讯后,不期待其余 follower 的响应,即返回 ack。也能够了解为 ack 数为 1。此时,如果 follower 还没有收到 leader 同步的音讯 leader 就挂了,那么音讯会失落。依照上图中的例子,如果 leader 收到音讯,胜利写入 PageCache 后,会返回 ack,此时 producer 认为音讯发送胜利。但此时,依照上图,数据还没有被同步到 follower。如果此时 leader 断电,数据会失落。
acks=-1,leader broker 收到音讯后,挂起,期待所有 ISR 列表中的 follower 返回后果后,再返回 ack。- 1 等效与 all。这种配置下,只有 leader 写入数据到 pagecache 是不会返回 ack 的,还须要所有的 ISR 返回“胜利”才会触发 ack。如果此时断电,producer 能够晓得音讯没有被发送胜利,将会从新发送。如果在 follower 收到数据当前,胜利返回 ack,leader 断电,数据将存在于原来的 follower 中。
在从新选举当前,新的 leader 会持有该局部数据。数据从 leader 同步到 follower,须要 2 步:
数据从 pageCache 被刷盘到 disk。因为只有 disk 中的数据能力被同步到 replica。
数据同步到 replica,并且 replica 胜利将数据写入 PageCache。在 producer 失去 ack 后,哪怕是所有机器都停电,数据也至多会存在于 leader 的磁盘内。
下面第三点提到了 ISR 的列表的 follower,须要配合另一个参数能力更好的保障 ack 的有效性。ISR 是 Broker 保护的一个“牢靠的 follower 列表”,in-sync Replica 列表,broker 的配置蕴含一个参数:min.insync.replicas。
该参数示意 ISR 中起码的正本数。如果不设置该值,ISR 中的 follower 列表可能为空。此时相当于 acks=1。

  1. Producer
    producer 在发送数据时能够将多个申请进行合并后异步发送,合并后的申请首先缓存在本地 buffer 中,失常状况下,producer 客户端的异步调用能够通过 callback 回调函数来解决音讯发送失败或者超时的状况,然而当呈现以下状况,将会呈现数据失落

producer 异常中断,buffer 中的数据将失落。
producer 客户端内存不足,如果采取的策略是抛弃音讯(另一种策略是 block 阻塞),音讯也会失落。
音讯产生(异步)过快,导致挂起线程过多,内存不足,导致程序解体,音讯失落。

针对以上状况,能够有以下解决思路。

producer 采纳同步形式发送音讯,或者生产数据时采纳阻塞的线程池,并且线程数不宜过多。整体思路就是管制音讯产生速度。
扩充 buffer 的容量配置,配置项为:buffer.memory。这种办法能够缓解数据失落的状况,但不能杜绝。

3.Consumer

Consumer 生产音讯有以下几个步骤:

接管音讯
解决音讯
反馈处理结果

生产形式次要分为两种

主动提交 offset,Automatic Offset Committing(enable.auto.commit=true)
手动提交 offset,Manual Offset Control(enable.auto.commit=false)

Consumer 主动提交机制是依据肯定的工夫距离,将收到的音讯进行 commit,具体配置为:auto.commit.interval.ms。commit 和生产的过程是异步的,也就是说可能存在生产过程未胜利,commit 音讯就曾经提交,此时就会呈现音讯失落。
咱们可将提交类型改为手动提交,在生产实现后再进行提交,这样能够保障音讯“至多被生产一次”(at least once),但如果生产实现后在提交过程中呈现故障,则会呈现反复生产的状况,本章不探讨,下章解说。

9、简述 Kafka 的 Rebalance 机制

什么是 Rebalance

Rebalance 实质上是一种协定,规定了一个 Consumer Group 下的所有 consumer 如何达成统一,来调配订阅
Topic 的每个分区。
例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具备 100 个 partition 的 Topic。失常状况下,kafka 会为每个 Consumer 均匀的调配 5 个分区。这个调配的过程就是 Rebalance。

触发 Rebalance 的机会

Rebalance 的触发条件有 3 个。

组成员个数发生变化。例如有新的 consumer 实例退出该生产组或者来到组。
订阅的 Topic 个数发生变化。
订阅 Topic 的分区数发生变化。

Rebalance 产生时,Group 下所有 consumer 实例都会协调在一起独特参加,kafka 可能保障尽量达到最偏心的调配。然而
Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会进行工作,期待 Rebalance 过程实现。

Rebalance 过程

Rebalance 过程分为两步:JoinGroup 申请和 SyncGroup 申请。
JoinGroup :JoinGroup 申请的次要作用是将组成员订阅信息发送给领导者消费者,待领导者制订好调配计划后,重均衡流程进入到 SyncGroup 申请阶段。
SyncGroup:SyncGroup 申请的次要目标,就是让协调者把领导者制订的调配计划下发给各个组内成员。当所有成员都胜利接管到调配计划后,消费者组进入到 Stable 状态,即开始失常的生产工作。

10、如何保障音讯不被反复生产(保障音讯的幂等性)

幂等性:就是用户对于同一操作发动的一次申请或者屡次申请的后果是统一的,不会因为屡次点击而产生了副作用。

呈现起因:

起因 1:Consumer 在生产过程中,被强行 kill 掉消费者线程或异常中断(生产零碎宕机、重启等),导致理论生产后的数据,offset 没有提交。
起因 2:设置 offset 为主动提交,敞开 kafka 时,如果在 close 之前,调用 consumer.unsubscribe() 则有可能局部 offset 没提交,下次重启会反复生产。
起因 3:生产超时导致消费者与集群断开连接,offset 尚未提交,导致重均衡后反复生产。个别生产超时(session.time.out)有以下起因:并发过大,消费者忽然宕机,解决超时等。

解决思路:

进步生产能力,进步单条音讯的处理速度,例如对音讯解决中比 较耗时的步骤可通过异步的形式进行解决、利用多线程解决等。在缩短单条音讯生产时常的同时,依据理论场景可将 session.time.out(Consumer 心跳超时工夫)和 max.poll.interval.ms(consumer 两次 poll 的最大工夫距离)值设置大一点,防止不必要的 rebalance,此外可适当减小 max.poll.records 的值(示意每次生产的时候,获取多少条音讯),默认值是 500,可依据理论音讯速率适当调小。这种思路可解决因生产工夫过长导致的反复生产问题,对代码改变较小,但无奈相对防止反复生产问题。
依据业务状况制订:引入独自去重机制,例如生成音讯时,在音讯中退出惟一标识符如主键 id。写入时依据逐步主键判断 update 还是 insert。如果写 redis,则每次依据主键 id 进行 set 即可,人造幂等性。或者应用 redis 作为缓冲,将 id 首先写入 redis 进行反复判断,而后在进行后续操作。

11、为什么 Kafka 不反对读写拆散?

在 Kafka 中,生产者写入音讯、消费者读取音讯的操作都是与 leader 正本进行交互的,从 而实现的是一种主写主读的生产生产模型。
Kafka 并不反对主写从读,因为主写从读有 2 个很明 显的毛病:

数据一致性问题。数据从主节点转到从节点必然会有一个延时的工夫窗口,这个工夫 窗口会导致主从节点之间的数据不统一。某一时刻,在主节点和从节点中 A 数据的值都为 X,之后将主节点中 A 的值批改为 Y,那么在这个变更告诉到从节点之前,利用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不统一的问题。
延时问题。相似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程须要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会消耗肯定的工夫。而在 Kafka 中,主从同步会比 Redis 更加耗时,它须要经验网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的利用而言,主写从读的性能并不太实用。

12、Kafka 选举机制

Kafka 选举次要分为以下三种:

控制器(Broker)选举机制

分区正本选举机制

生产组选举机制

控制器选举

控制器是 Kafka 的外围组件,它的次要作用是在 Zookeeper 的帮忙下治理和协调整个 Kafka 集群包含所有分区与正本的状态。集群中任意一个 Broker 都能充当控制器的角色,但在运行过程中,只能有一个 Broker 成为控制器。

集群中第一个启动的 Broker 会通过在 Zookeeper 中创立长期节点 /controller 来让本人成为控制器,其余 Broker 启动时也会在 zookeeper 中创立长期节点,然而发现节点曾经存在,所以它们会收到一个异样,意识到控制器曾经存在,那么就会在 Zookeeper 中创立 watch 对象,便于它们收到控制器变更的告诉。

如果控制器与 Zookeeper 断开连接或异样退出,其余 broker 通过 watch 收到控制器变更的告诉,就会尝试创立长期节点 /controller,如果有一个 Broker 创立胜利,那么其余 broker 就会收到创立异样告诉,代表控制器曾经选举胜利,其余 Broker 只需创立 watch 对象即可。
控制器作用

主题治理:
创立、删除 Topic,以及减少 Topic 分区等操作都是由控制器执行。

分区重调配:

执行 Kafka 的 reassign 脚本对 Topic 分区重调配的操作,也是由控制器实现。

如果集群中有一个 Broker 异样退出,控制器会查看这个 broker 是否有分区的正本 leader,如果有那么这个分区就须要一个新的 leader,此时控制器就会去遍历其余正本,决定哪一个成为新的 leader,同时更新分区的 ISR 汇合。

如果有一个 Broker 退出集群中,那么控制器就会通过 Broker ID 去判断新退出的 Broker 中是否含有现有分区的正本,如果有,就会从分区正本中去同步数据。

Preferred leader 选举:

因为在 Kafka 集群长时间运行中,broker 的宕机或解体是不可避免的,leader 就会产生转移,即便 broker 从新回来,也不会是 leader 了。在泛滥 leader 的转移过程中,就会产生 leader 不平衡景象,可能一小部分 broker 上有大量的 leader,影响了整个集群的性能,所以就须要把 leader 调整回最后的 broker 上,这就须要 Preferred leader 选举。

集群成员治理:

控制器可能监控新 broker 的减少,broker 的被动敞开与被动宕机,进而做其余工作。这也是利用 Zookeeper 的 ZNode 模型和 Watcher 机制,控制器会监听 Zookeeper 中 /brokers/ids 下长期节点的变动。同时对 broker 中的 leader 节点进行调整。

比方,控制器组件会利用 Watch 机制查看 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创立专属的 znode 节点。一旦创立结束,ZooKeeper 会通过 Watch 机制将音讯告诉推送给控制器,这样,控制器就能主动地感知到这个变动,进而开启后续的新增 Broker 作业。

侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:长期节点。每个 Broker 启动后,会在 /brokers/ids 下创立一个长期 znode。当 Broker 宕机或被动敞开后,该 Broker 与 ZooKeeper 的会话完结,这个 znode 会被主动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能晓得有 Broker 敞开或宕机了,从而进行“善后”。

元数据服务:

控制器上保留了最全的集群元数据信息,其余所有 broker 会定期接管控制器发来的元数据更新申请,从而更新其内存中的缓存数据。

分区正本选举机制
产生正本选举的状况:

创立主题
减少分区
分区下线(分区中原先的 leader 正本下线,此时分区须要选举一个新的 leader 上线来对外提供服务)
分区重调配

分区 leader 正本的选举由 Kafka 控制器负责具体实施。次要过程如下:

从 Zookeeper 中读取以后分区的所有 ISR(in-sync replicas)汇合。
调用配置的分区抉择算法抉择分区的 leader。

分区正本分为 ISR(同步正本)和 OSR(非同步正本),当 leader 产生故障时,只有“同步正本”才能够被选举为 leader。选举时依照汇合中正本的程序查找第一个存活的正本,并且这个正本在 ISR 汇合中。

同时 kafka 反对 OSR(非同步正本)也加入选举,Kafka broker 端提供了一个参数 unclean.leader.election.enable,用于管制是否容许非同步正本参加 leader 选举;如果开启,则当 ISR 为空时就会从这些正本中选举新的 leader,这个过程称为 Unclean leader 选举。
能够依据理论的业务场景抉择是否开启 Unclean leader 选举。开启 Unclean 领导者选举可能会造成数据失落,但益处是,它使得分区 Leader 正本始终存在,不至于进行对外提供服务,因而晋升了高可用性。个别倡议是敞开 Unclean leader 选举,因为通常数据的一致性要比可用性重要。

生产组(Consumer Group)选主

在 Kafka 的生产端,会有一个消费者协调器以及生产组,组协调器(Group Coordinator)须要为生产组内的消费者选举出一个生产组的 leader。

如果生产组内还没有 leader,那么第一个退出生产组的消费者即为生产组的 leader,如果某一个时刻 leader 消费者因为某些起因退出了生产组,那么就会从新选举 leader,选举源码如下:

private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption

在组协调器中消费者的信息是以 HashMap 的模式存储的,其中 key 为消费者的 member_id,而 value 是消费者相干的元数据信息。而 leader 的取值为 HashMap 中的第一个键值对的 key(这种选举形式等同于随机)。
生产组的 Leader 和 Coordinator 没有关联。生产组的 leader 负责 Rebalance 过程中生产调配计划的制订。

13、脑裂问题

controller 挂掉后,Kafka 集群会从新选举一个新的 controller。这外面存在一个问题,很难确定之前的 controller 节点是挂掉还是只是短暂性的故障。如果之前挂掉的 controller 又失常了,他并不知道本人曾经被取代了,那么此时集群中会呈现两台 controller。

其实这种状况是很容易产生。比方,某个 controller 因为 GC 而被认为曾经挂掉,并抉择了一个新的 controller。在 GC 的状况下,在最后的 controller 眼中,并没有扭转任何货色,该 Broker 甚至不晓得它曾经暂停了。因而,它将持续充当以后 controller,这是分布式系统中的常见状况,称为脑裂。

如果,处于沉闷状态的 controller 进入了长时间的 GC 暂停。它的 ZooKeeper 会话过期了,之前注册的 /controller 节点被删除。集群中其余 Broker 会收到 zookeeper 的这一告诉。

因为集群中必须存在一个 controller Broker,所以当初每个 Broker 都试图尝试成为新的 controller。假如 Broker 2 速度比拟快,成为了最新的 controller Broker。此时,每个 Broker 会收到 Broker2 成为新的 controller 的告诉,因为 Broker3 正在进行 ”stop the world” 的 GC,可能不会收到 Broker2 成为最新的 controller 的告诉。

等到 Broker3 的 GC 实现之后,仍会认为本人是集群的 controller,在 Broker3 的眼中如同什么都没有产生一样。

当初,集群中呈现了两个 controller,它们可能一起收回具备抵触的命令,就会呈现脑裂的景象。如果对这种状况不加以解决,可能会导致重大的不统一。所以须要一种办法来辨别谁是集群以后最新的 Controller。

Kafka 是通过应用 epoch number(纪元编号,也称为隔离令牌)来实现的。epoch number 只是枯燥递增的数字,第一次选出 Controller 时,epoch number 值为

1,如果再次选出新的 Controller,则 epoch number 将为

2,顺次枯燥递增。

每个新选出的 controller 通过 Zookeeper 的条件递增操作取得一个全新的、数值更大的 epoch number。其余 Broker 在晓得以后 epoch number 后,如果收到由 controller 收回的蕴含较旧 (较小)epoch number 的音讯,就会疏忽它们,即 Broker 依据最大的 epoch number 来辨别以后最新的 controller。

上图,Broker3 向 Broker1 收回命令: 让 Broker1 上的某个分区正本成为 leader,该音讯的 epoch number 值为 1。于此同时,Broker2 也向 Broker1 发送了雷同的命令,不同的是,该音讯的 epoch number 值为 2,此时 Broker1 只服从 Broker2 的命令(因为其 epoch number 较大),会疏忽 Broker3 的命令,从而防止脑裂的产生。

14、如何为 Kafka 集群抉择适合的 Topics/Partitions 数量

在 kafka 中,单个 patition 是 kafka 并行操作的最小单元。在 producer 和 broker 端,向每一个分区写入数据是能够齐全并行化的,此时,能够通过加大硬件资源的利用率来晋升零碎的吞吐量,例如对数据进行压缩。在 consumer 段,kafka 只容许单个 partition 的数据被一个 consumer 线程生产。因而,在 consumer 端,每一个 Consumer Group 外部的 consumer 并行度齐全依赖于被生产的分区数量。综上所述,通常状况下,在一个 Kafka 集群中,partition 的数量越多,意味着能够达到的吞吐量越大。

咱们能够粗略地通过吞吐量来计算 kafka 集群的分区数量。假如对于单个 partition,producer 端的可达吞吐量为 p,Consumer 端的可达吞吐量为 c,冀望的指标吞吐量为 t,那么集群所须要的 partition 数量至多为 max(t/p,t/c)。在 producer 端,单个分区的吞吐量大小会受到批量大小、数据压缩办法、确认类型(同步 / 异步)、复制因子等配置参数的影响。通过测试,在 producer 端,单个 partition 的吞吐量通常是在 10MB/ s 左右。在 consumer 端,单个 partition 的吞吐量依赖于 consumer 端每个音讯的应用逻辑处理速度。因而,咱们须要对 consumer 端的吞吐量进行测量。

15、Kafka 分区数能够减少或缩小吗?为什么?

kafka 反对分区数减少

例如咱们能够应用 bin/kafka-topics.sh -alter –topic –topic topic-name –partitions 3 命令将本来分区数为 1 得 topic-name 设置为 3。

当主题中的音讯蕴含有 key 时(即 key 不为 null),依据 key 来计算分区的行为就会有所影响。当 topic-config 的分区数为 1 时,不论音讯的 key 为何值,音讯都会发往这一个分区中;当分区数减少到 3 时,那么就会依据音讯的 key 来计算分区号,本来发往分区 0 的音讯当初有可能会发往分区 1 或者分区 2 中。如此还会影响既定音讯的程序,所以在减少分区数时肯定要三思而后行。对于基于 key 计算的主题而言,倡议在一开始就设置好分区数量,防止当前对其进行调整。

Kafka 不反对缩小分区数。

依照 Kafka 现有的代码逻辑而言,此性能齐全能够实现,不过也会使得代码的复杂度急剧增大。
实现此性能须要思考的因素很多,比方删除掉的分区中的音讯该作何解决?如果随着分区一起隐没则音讯的可靠性得不到保障;

如果须要保留则又须要思考如何保留。间接存储到现有分区的尾部,音讯的工夫戳就不会递增,如此对于 Spark、Flink 这类须要音讯工夫戳 (事件工夫) 的组件将会受到影响;

如果扩散插入到现有的分区中,那么在音讯量很大的时候,外部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何失去保障?

与此同时,程序性问题、事务性问题、以及分区和正本的状态机切换问题都是不得不面对的。反观这个性能的收益点却是很低,如果真的须要实现此类的性能,齐全能够从新创立一个分区数较小的主题,而后将现有主题中的音讯依照既定的逻辑复制过来即可。

17、谈谈你对 Kafka 幂等的理解?

Kafka 幂等性次要针对生产者而言。防止生产者数据反复提交至 Kafka broker 中并落盘。

在失常状况下,Producer 向 Broker 发送音讯,Broker 将音讯追加写到对应的流(即某一 Topic 的某一 Partition)中并落盘,并向 Producer 返回 ACK 信号,示意确认收到。

然而 Producer 和 Broker 之间的通信总有可能出现异常,如果音讯曾经写入,但 ACK 在半途失落了,Producer 就会进行 retry 操作再次发送该音讯,造成反复写入。

为了实现 Producer 的幂等性,Kafka 引入了 Producer ID(即 PID)和 Sequence Number。

PID。每个新的 Producer 在初始化的时候会被调配一个惟一的 PID,这个 PID 对用户是不可见的。

Sequence Numbler。对于每个 PID,该 Producer 发送数据的每个都对应一个从 0 开始枯燥递增的 Sequence Number

Broker 端在缓存中保留了这 seq number, 对于接管的每条音讯, 如果其序号比 Broker 缓存中序号大于 1 则承受它, 否则将其抛弃, 这样就能够实现了音讯反复提交了. 然而只能保障单个 Producer 对于同一个的 Exactly Once 语义


Producer 应用幂等性的示例非常简单, 与失常状况下 Producer 应用相比变动不大, 只须要
把 Producer 的配置 enable.idempotence 设置为 true 即可, 如下所示:


Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// 当 enable.idempotence 为 true 时 acks 默认为 all
// props.put("acks", "all"); 
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");

Prodcuer 幂等性对外保留的接口非常简单,其底层的实现对下层利用做了很好的封装,应用层并不需要去关怀具体的实现细节,对用户十分敌对

Kafka 的幂等性实现了对于单个 Producer 会话、单个 TopicPartition 级别的不重不漏,也就是最细粒度的保障。深圳大数据培训如果 Producer 重启(PID 发生变化),或者写入是跨 Topic、跨 Partition 的,单纯的幂等性就会生效,须要更高级别的事务性来解决了。当然事务性的原理更加简单

18、谈谈你对 Kafka 事务的理解?
幂等性能够保障单个 Producer 会话、单个 TopicPartition、单个会话 session 的不重不漏,如果 Producer 重启,或者是写入跨 Topic、跨 Partition 的音讯,幂等性无奈保障。此时须要用到 Kafka 事务。
Kafka 的事务处理,次要是容许利用能够把生产和生产的 batch 解决(波及多个 Partition)在一个原子单元内实现,操作要么全副实现、要么全副失败。为了实现这种机制,咱们须要利用能提供一个惟一 id,即便故障复原后也不会扭转,这个 id 就是 TransactionnalId(也叫 txn.id),txn.id 能够跟外部的 PID 1:1 调配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 外部主动生成的(并且故障复原后这个 PID 会变动),有了 txn.id 这个机制,就能够实现多 partition、跨会话的 EOS 语义。
当用户应用 Kafka 的事务性时,Kafka 能够做到的保障:

跨会话的幂等性写入:即便两头故障,复原后仍然能够放弃幂等性;
跨会话的事务复原:如果一个利用实例挂了,启动的下一个实例仍然能够保障上一个事务实现(commit 或者 abort);
跨多个 Topic-Partition 的幂等性写入,Kafka 能够保障跨多个 Topic-Partition 的数据要么全副写入胜利,要么全副失败,不会呈现中间状态。

事务性示例
Kafka 事务性的应用办法也非常简单,用户只须要在 Producer 的配置中配置 transactional.id,通过 initTransactions() 初始化事务状态信息,再通过 beginTransaction() 标识一个事务的开始,而后通过 commitTransaction() 或 abortTransaction() 对事务进行 commit 或 abort,示例如下所示:
生产者:

Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try {
    String msg = "matt test";
    producer.beginTransaction();
    producer.send(new ProducerRecord(topic, "0", msg.toString()));
    producer.send(new ProducerRecord(topic, "1", msg.toString()));
    producer.send(new ProducerRecord(topic, "2", msg.toString()));
    producer.commitTransaction();} catch (ProducerFencedException e1) {e1.printStackTrace();
    producer.close();} catch (KafkaException e2) {e2.printStackTrace();
    producer.abortTransaction();}
producer.close();

消费者:
消费者应该设置提交事务的隔离级别

properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");

Kafka 中只有两种事务隔离级别:read_committed、read_uncommitted
设置为 read_committed 时候是生产者事务已提交的数据能力读取到。在执行 commitTransaction() 或 abortTransaction() 办法前,设置为“read_committed”的生产端利用是生产不到这些音讯的,不过在 KafkaConsumer 外部会缓存这些音讯,直到生产者执行 commitTransaction() 办法之后它能力将这些音讯推送给生产端利用。同时 KafkaConsumer 会依据分区对数据进行整合,推送时依照分区程序进行推送。而不是依照数据发送程序。
反之,如果生产者执行了 abortTransaction() 办法,那么 KafkaConsumer 会将这些缓存的音讯抛弃而不推送给生产端利用。
设置为 read_uncommitted 时候能够读取到未提交的数据(报错终止前的数据)

正文完
 0