一、borker
broker 可以理解成一个 kafka 服务 node,是一个运行的 kafka 服务。broker 与 broker 之间是平等的关系,任意 broker 都可以 down 机而不影响其他 broker 正常工作。kafaka 在启动的时候,会将自己的信息同步到 zk 上。数据存储使用了 zk 的零时节点,broker 需要通过心跳机制维护与 zk 的注册关系,一旦 broker 宕机,zk 上面对应的零时节点也会被删除。
kafka Controller
我们可以把 controller 当成集群的管理者,集群中 borker 启动时如果没有 controller 回主动的去 zk 上注册 znode 节点来抢夺 controller 的位置,注册成功的 broker 会被当选为 kafka controller。若当前的 controller 宕机其他 borker 会从重新进入 controller 争抢流程,从而选出新的 controller。controller 主要的功能如下:
UpdateMetadataRequest:更新元数据请求。topic 分区状态经常会发生变更 (比如 leader 重新选举了或副本集合变化了等)。由于当前 clients 只能与分区的 leader broker 进行交互,那么一旦发生变更,controller 会将最新的元数据广播给所有存活的 broker。具体方式就是给所有 broker 发送 UpdateMetadataRequest 请求
CreateTopics: 创建 topic 请求。当前不管是通过 API 方式、脚本方式抑或是 CreateTopics 请求方式来创建 topic,做法几乎都是在 Zookeeper 的 /brokers/topics 下创建 znode 来触发创建逻辑,而 controller 会监听该 path 下的变更来执行真正的“创建 topic”逻辑。
DeleteTopics:删除 topic 请求。和 CreateTopics 类似,也是通过创建 Zookeeper 下的 /admin/delete_topics/<topic> 节点来触发删除 topic,controller 执行真正的逻辑
分区重分配:即 kafka-reassign-partitions 脚本做的事情。同样是与 Zookeeper 结合使用,脚本写入 /admin/reassign_partitions 节点来触发,controller 负责按照方案分配分区
Preferred leader 分配:preferred leader 选举当前有两种触发方式:1. 自动触发 (auto.leader.rebalance.enable = true);2. kafka-preferred-replica-election 脚本触发。两者“玩法”相同,向 Zookeeper 的 /admin/preferred_replica_election 写数据,controller 提取数据执行 preferred leader 分配
分区扩展:即增加 topic 分区数。标准做法也是通过 kafka-reassign-partitions 脚本完成,不过用户可直接往 Zookeeper 中写数据来实现,比如直接把新增分区的副本集合写入到 /brokers/topics/<topic> 下,然后 controller 会为你自动地选出 leader 并增加分区
集群扩展:新增 broker 时 Zookeeper 中 /brokers/ids 下会新增 znode,controller 自动完成服务发现的工作
broker 崩溃处理:同样地,controller 通过 Zookeeper 可实时侦测 broker 状态。一旦有 broker 挂掉了,controller 可立即感知并为受影响分区选举新的 leader
ControlledShutdown:broker 除了崩溃,还能“优雅”地退出。broker 一旦自行终止,controller 会接收到一个 ControlledShudownRequest 请求,然后 controller 会妥善处理该请求并执行各种收尾工作
Controller leader 选举:controller 必然要提供自己的 leader 选举以防这个全局唯一的组件崩溃宕机导致服务中断。这个功能也是通过 Zookeeper 的帮助实现的。
二、topic & partiton
Topic 相当于传统消息系统 MQ 中的一个队列 queue,可以把 topic 当成是消息的分类。partiton 可以看成是 topic 的一个分区, 目的是突破 IO 瓶颈。kafka 在存储 topic 日志的时候,将 topic 分开存储,这样就能将同一个消息的写压力分配到不同的分区,这样可以提升 kafka 的整体吞吐能力。为了保证数据的高可用,kafka 使用 partiton-Replica 进行数据备份,若 partition leader 挂了,kafka controller 会自动从 partiton-Replica 选举新的 leader。提到备份不得不提到 ISR,这是一个同步备份列表,每当用户添加新的消息时,分区 leader 成功写入日志后,后必须保证 ISR 列表里面的备份也成功写入日志后,才能给客户端相应成功。因此 ISR 列表的备份的日志总是和 leader 保持一致,在 leader 宕机的时候,可以使用 ISR 列表的备份取代 leader 的位置。
三、log & segment
kafka 最终的数据承载是通过 log 的方式进行,kafka 会按照请求的顺序将消息存储到 log 中。我们知道一个 topic 可能会被分配到到个分区 partiton 来减轻单点负载。每个 partiton 实际上在写 log 的时候也会存在,单个文件大小物理极限的问题。因此 kafka 引入了 segment 解决方案,即将日志分段存储。不同的 segment log 组合起来的数据就是分区的存储消息数据。为了方便通过 offset 定位消息,segment log 使用 first-offset 格式进行文件命名,first-offset 是该文件存储的第一条消息的 offset。这样就能通过消费者提供的 offset 很快定位到文件,然后通过 offset 偏移量可以快速定位消息的存储位置。
四、producer 消息 / 数据生产者
生产者负责消息的发送,生产者需要指定消息的 topic 来区分不同消息。kafka 收到消息后通过 loadbalance 策略,使用 hash(message) % topic 分片数 决定将数据存储到哪一个分片。然后将 message 发送到制定分片的 leader,leader 收到消息后,将消息保存下来,接着等待 ISR(a set of in-sync replicas,该列表的备份数据时刻保持和 leader 数据一致) 中的 replica 消费消息并发送 ack,若 ISR 列表中的备份分区都已经确认收到消息并保存成功后,leader 将成功的消息返回给 producer 以表明,消息被妥善保存。
五、consumer [group] 消息 / 数据消费者 &offset
与其他消息系统不同的是:kafka 不会复制去保存客服端之前消费了那条消息,以及下一条应当消费那条消息,kafka 将这些工作交给了消费客服端来做,因此 kafka 在消息消费可以做到无状态。offset 就是用来保存某个消费组(consumer group)消费的在当前分区日志下的偏移量的。通常情况下,多个客服端在同时消费同一个消息分区消息的时候会存在并发问题,对于 offset 的控制就会出现问题,这样就会出现消费重复的情况,kafka 使用无锁机制解决这个问题。kafka 规定,同一个分区(partition)下的数据只能被通一个 consumer group 中的一个线程消费,这样就避免了不同线程之间争夺通一个资源,通过这种设计 kafka 做到了无锁,这样可以避免锁竞争造成效率下降。因此建议 consumer group 里面的线程数应当和分区数保持一致,这样做可以有效的利用线程资源,线程多了会被浪费掉,少了一个线程可能会处理多个分区的数据。如果你需要多个业务消费同一个消息,由于不同的 consumer group 对同意主题分区的 offset 是分开存储的,我们可以创建多个 consumer group 实现多个线程来消费同一个消息的目的。
kafaka 如何常量时间复杂度?
写数据:通过上面消息的存储过程可以发现,除了数据存储和备份操作,并没有其他耗时操作。路由分区 ->leader 写数据 -> 数据复制,这些操作都和现有数据规模没有任何关系。每次写数据只会在原来的基础上做追加存储。由于 kafka 使用了顺序存储而不是非随机存储(据说磁盘的顺序存储效率远高于磁盘的随机存储、有时候甚至比内存的随机写效率还高),同时 kafka 还使用了批量存储的方式减少了对 io 的操作,提升了 io 效率。读数据:consumer 在消费某个 topic 的时候, 消费者会将所有的分区数据消费完,kafka 要求,同一时刻对同一分区的数据只会被一个线程消费,这样避免了锁操作。同时通过 consumer group 提供的 offset 数据,通过 kafka 的文件存储机制可以快速的定位到一个 segment 文件,并且通过计算 offset 偏移量可以快速定位到数据。从整个消费流程来看,数据规模对每个过程效率是不敏感的。
kafaka 如何做到高可用的 & 动态扩展
高可用的解决方案通常是采用数据冗余以及快速恢复来解决的。kafka 通过分区数据备份(partition replica)& 分区数据分散到不同的机器以及 kafka controller 可以快速检测到宕机节点,通过读取节点的分区数据,可以快速重新选取分区 leader,以恢复故障。同时在故障的处理过程中,就算该分区不可用,不往分区写入数据即可,对 kafka 的数据读取也是没有影响的。kafka 使用 hash 取余的目的在于均衡负载,并不在于为了通过 message 可以快速的查找到这个 message 所在位置,这个不是 kafka 关注的业务。kafka 通过数据复制和快速恢复做到了高可用,同时基于 message 不关注通过某个具体 message 的具体存存储位置,因此在扩展 kafka 的时候,或者在扩展消息分区的时候,不需要进行额为的数据复制操作,降低了扩展时候的成本。
引用
Kafka HA Kafka 一致性重要机制之 ISR(kafka replica)
Kafka 史上最详细原理总结
Kafka controller 重设计
kafka document disn
更多文章可以访问 jframe.cn