共计 3954 个字符,预计需要花费 10 分钟才能阅读完成。
该博客迁移到 github:https://github.com/dackh/blog
基础架构及术语
主题和分区
topic 是发布记录的类别或订阅源名称。Kafka 的 topic 总是多用户; 也就是说,一个主题可以有零个,一个或多个消费者订阅写入它的数据。
对于每个主题,Kafka 群集都维护一个如下所示的分区日志:
每个分区都是一个有序的,不可变的记录序列,不断附加到结构化的提交日志中。分区中的记录每个都分配了一个称为 offset 的顺序 ID 号,它唯一地标识分区中的每个记录。
Kafka 集群持久保存所有已发布的记录 – 无论是否已使用 – 使用可配置的保留期。例如,如果保留策略设置为两天,则在发布记录后的两天内,它可供使用,之后将被丢弃以释放空间。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。
实际上,基于每个消费者保留的唯一元数据是该消费者在日志中的偏移或位置。这种偏移由消费者控制:通常消费者在读取记录时会线性地提高其偏移量,但事实上,由于该位置由消费者控制,因此它可以按照自己喜欢的任何顺序消费记录。例如,消费者可以重置为较旧的偏移量来重新处理过去的数据,或者跳到最近的记录并从“现在”开始消费。(offset 由 comsumer 决定,comsumer 可以决定 offset 的数据)。
生产者和消费者
生产者 将数据发布到他们选择的 topic。生产者负责选择分配给 topic 中哪个 partition 的记录。这可以以 round-robin 方式完成,仅仅是为了 balance load,或者可以根据一些语义分区功能(例如基于记录中的某些键)来完成。
消费者 使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在不同的进程中,也可以在不同的机器。
如果所有使用者实例具有相同的使用者组,则记录将有效地在使用者实例上进行负载平衡。
如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程。
broker 和集群
一个独立的 Kafka 服务器被称为 broker,broker 是集群的组成部分,每个集群都有一个 broker 充当了集群控制者的角色(自动从集群活跃成员中选举出来),控制者负责管理工作,包括将分区分配给 broker 和监控 broker。
容错
日志的分区分布在 Kafka 集群中的服务器上,每个服务器处理数据并请求分区的共享。每个分区都在可配置数量的服务器上进行复制,以实现容错。(多个服务器保存 partition 数据,实现容错)
每个分区都有一个服务器充当“领导者”,零个或多个服务器充当“追随者”。领导者处理分区的所有读取和写入请求,而关注者被动地复制领导者。如果领导者失败,其中一个粉丝将自动成为新的领导者。每个服务器都充当其某些分区的领导者和其他服务器的追随者,因此负载在群集中得到很好的平衡。(高性能实现)
再均衡
在 kafka 中,当消费者发生崩溃或者有新的消费者加入时,将会触发 再均衡。
消费者会像叫做_consumer_offset 的特殊主题发送消息,消息内包含灭个分区的偏移量。
再均衡之后,消费者可能分配到新的分区,为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的 offset,但如果提交的 offset 小于客户端处理的最后一个 offset,那么消息将会被重复处理。
KafkaConsumer API 提供了很多种方式来提交偏移量。
自动提交
最简单的提交方式,将 enable.auto.commit 设为 true,提交时间为 auto.commit.interval.ms 设置的值,默认为 5s。每过 5s,消费者会自动把从 poll()方法接收的最大 offset 提交上去。
这种方式虽然简单,但是并没有避免重复处理消息的问题。(在 5s 内发生再均衡)
提交当前 offset
消费者 API 提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前的偏移量而不是基于时间间隔。通过调用 commitSync()
或者 commitAsync()
方法进行提交。
commitSync()
在 broker 回应之前一直阻塞,限制了应用程序的吞吐量。commitAsync()
是异步提交的方式,但是 commitAsync()无法保证一定成功,commitSync()在成功提交或者遇到无法恢复的错误之前会一直重试,但是 commitAsync()不会。之所以不进行重试是因为它收到响应之前可能另一个更大的 offset 提交成功了。commitAsync 同时也支持回调。
集群成员之间的关系
kafka 通过 zookeeper 来维护集群成员之间的关系,每个 broker 都有一个唯一标识符,这个标识符可以配置里指定,也可以自动生成。在 broker 启动的时候,通过创建 zookeeper 的临时节点把自己的 ID 注册到 zookeeper。kafka 组件订阅 zookeeper 的 /brokers/ids 路径,当有 broker 加入集群或者退出集群时,这个组件就会获得通知。
在 broker 停机、出现网络分区或长时间垃圾回收停顿时,broker 会从 zookeeper 上断开连接,此时 broker 在启动时创建的临时节点会自动从 zookeeper 移除,监听 broker 列表的 kafka 组件会被告知该 broker 已移除。
在 broker 关闭之后,它的节点会消息,但是 ID 会继续存在于其他数据结构中,在之后如果使用相同 ID 启动一个全新的 broker,它会立刻加入集群并且拥有与旧 broker 相同的分区和主题。
控制器
控制器也是 broker,同时还负责 分区首领的选举。集群中第一个启动的 broker 通过在 zookeeper 里创建一个临时节点 /controller 让自己成为控制器。其他节点在该节点上创建 watch 对象,通过这种方式确保集群里只有一个控制器存在。
如果控制器断开连接,其他监听的节点收到变更通知之后将会尝试创建 controller 节点成为控制器,只有第一个创建成功的才能成为控制器。
分区复制
kafka 中的每个分区都有多个副本,这些副本保存在 broker 中,副本可以分为:
- 首领副本(leader):所有生产者消费者请求都会经过这个副本。
- 跟随者副本(follower):从首领复制消息,保持与首领一致。
leader 通过查看每个 follower 请求的最新 offset 了解 follower 进度,如果 follower 在 10s(通过 replica.lag.time.max.ms 配值)内没有请求最新消息,那么它被认为不同步。只有同步的 follower 才能在当前 leader 宕机时被选定为新的 leader。
每个分区都会有一个首选 leader——创建主题时选定的首领就是分区的首选首领,默认情况下,kafka 的 auto.leader.rebalance.enable 被设为 true,它会检查首选首领是不是当前首领,如果不是,并且该副本同步,那么就会触发首领选举,让首选首领成为当前首领。
幂等性
为了实现 Producer 的幂等性,kafka 引入了 Producer ID(即 PID)和 Sequence Number
PID
每个 producer 在初始化的时候都会被分配一个唯一的 PID,这个 PID 对应用透明,完全没有暴露给用户,对于一个给定的 PID,sequence number 将会从 0 开始自增,每个 topic-partition 都会有一个独立的 sequence number,producer 在发送数据时,将会给 msg 标识一个 sequence number,Server 也就是通过这个验证数据是否重复,这里的 PID 是全局唯一,producer 故障后重新启动后会被分配一个新的 PID,这也是幂等性无法做到夸会话的一个原因。
Sequence Number
有 PID 之后,在 PID+topic-partition 级别上添加一个 sequence numbers 信息,就可以实现 producer 的幂等性。
幂等性前后对比
前
后
持久化
kafka 很大程度上依赖文件系统来存储和缓存消息,有一普遍的认识:磁盘很慢。但是其实顺序的磁盘读写比任意内存读写都快。
基于 JVM 内存有一下缺点:
- 对象的内存开销很大,通常会让存储数据的大小加倍
- 随着堆内数据的增加,GC 的速度越来越慢,而且可能导致错误
基于 OS 的文件系统设计有一下好处:
- 可以通过 os 的 pagecache 来有效利用主内存空间,由于数据紧凑,可以 cache 大量数据,并且没有 gc 的压力
- 即使服务重启,缓存中的数据也是热的(不需要预热)。而基于进程的缓存,需要程序进行预热,而且会消耗很长的时间。(10G 大概需要 10 分钟)
- 大大简化了代码。因为在缓存和文件系统之间保持一致性的所有逻辑都在 OS 中。以上建议和设计使得代码实现起来十分简单,不需要尽力想办法去维护内存中的数据,数据会立即写入磁盘。
数据持久化
- 发现线性的访问磁盘(即:按顺序的访问磁盘),很多时候比随机的内存访问快得多,而且有利于持久化
- 传统的使用内存做为磁盘的缓存
- Kafka 直接将数据写入到日志文件中,以追加的形式写入
日志数据持久化
- 写操作:通过将数据追加到文件中实现
- 读操作:读的时候从文件中读就好了
优势
- 读操作不会阻塞写操作和其他操作(因为读和写都是追加的形式,都是顺序的,不会乱,所以不会发生阻塞),数据大小不对性能产生影响;
- 没有容量限制(相对于内存来说)的硬盘空间建立消息系统;
- 线性访问磁盘,速度快,可以保存任意一段时间!
kafka 为什么这么快
- 为什么 kafka 那么快
- 什么是 Zero-Copy?
- Kafka 副本同步机制理解
- Kafka 深度解析
参考
- kafka 权威指南
- http://kafka.apache.org/intro
- Kafka 的内部机制深入(持久化,分布式,通讯协议)