预祝 2019 年元旦节快乐!2018 年最后一周,分享些 Kafka 的知识点。
Kafka 数据分区和消费者的关系
分区(partition)
topic 是逻辑概念,分区(partition)是物理概念,对于用户来说是透明的。producer 只需要关心消息网哪个 topic 发送,而 consumer 之关系自己订阅哪个 topic,不需要关心每条消息存于整个集群的哪个 Broker。
为了性能考虑,如果 topic 的消息都放在一个 Broker,这个 Broker 必然称为瓶颈,而且无法做到水平扩展。所以 topic 内的数据分布到整个集群就是个自然而然的设计了。分区的引入就是解决水平扩展问题的一个解决方案。
Kafka 尽量将所有的分区均匀分配到整个集群上。基本算法如下:
将所有存货的 N 个 Broker 和待分配的分区排序。
将第 i 个分区分配到第(i mon n)个 Broker 上,这个分区的第一个副本(Replica)存在于这个分配的 Broker 上,并且会作为分区的优先副本。
将第 i 个分区的第 j 个副本分配到第(i+j)mod n 个 Broker 上。
但实际情况 Kafka 的算法是上述基础上再加些,看 Kafka 的函数 assignReplicasToBrokers。变了两点:
限制了副本因子(replication factor)不得大于 Broker 的个数。因为当 j 大于 n 时,就会存在一个 Broker 有两个副本,这没意义且浪费。
起始位置不是第 0 个 Broker,是第 rand.nextInt(Brokers.size)个。那是因为(i mon n)造成的问题就是,永远都会有 0,未必有 n。所以必须加上随机数,也就是第 i 个分区分配到第(i mon n)+rand.nextInt(Brokers.size)个 Broker 上。
如果考虑多机架问题,那么 Broker 顺序就未必是 0,1,2,3,4,5。而是如果 0,1,2 是机架 A,3,4,5 是机架 B。则 Broker 的顺序为 0,3,1,4,2,5。错开,每个机架依次选一次。所以当副本因子为 3 时,保证每一个分区在各个机架都至少有一个副本。
分区中副本一般都是 Leader,其余的都是 Follow 副本。生产者消费者都固定在 Leader 进行生产和消费。
分区与生产者
负载均衡(Load balancing)
生产者直接发送数据到 Broker,不需要任何的中间路由层,而接受的 Broker 是该分区的 Leader。为了帮助生产者实现这一点,所有 Broker 都可以回答关于哪些是可用服务器的元数据的请求,以及在任何给定的时间内,某个主题的分区的 Leader 是否允许生产者适当地发送它的请求。客户端可以控制往哪个分区生产消息。这可以随机地进行,实现一种随机的负载平衡,或者可以通过一些语义分区函数来实现负载平衡。Kafka 提供了语义分区的接口,允许用户指定一个分区的 key,并使用这个 key 来做 hash 到一个分区(如果需要的话,也是可以复写这分区功能的)。例如,我们选择 user 的 id 作为可用,则所以该用户的信息都会发送到同样的分区。
异步发送(Asynchronus send)
批处理是效率的主要驱动因素之一,为了能够批处理,Kafka 的生产者会尝试在内存中积累数据,然后在一起在一个请求中以大批量的形式发送出去。批处理这个可以设置按固定的消息数量或按特定的延迟(64k 或 10ms)。这允许累积更多字节的发送出去,这样只是在服务器上做少量的大 IO 操作。这种缓冲是可配置的,这样提供了一种机制来以额外的延迟来提高吞吐量。具体的配置)和生产者的 api 可以在这文档中找到。
分区与消费者
消费者的工作方式是,向分区的 Leader 发送“fetch”请求。在每个请求中消费者指定日志的偏移量(position),然后接受回一大块从偏移量开始的日志。因此,消费者对偏移量有重要的控制权,如果需要,可以重置偏移量来重新消费数据。
Push 和 pull
我们首先考虑的一个问题是,消费者应该是从 Broker 拉取消息,还是应该是 Broker 把消息推送给消费者。在这方面,Kafka 遵循了一种更传统的设计,大多数消息队列系统也会用的,那就是数据是从生产者 push 到 Broker,消费者是从 Broker 拉取数据。一些日志集中系统,如 Scribe 和 Apache Flume,遵循一个非常不同的,基于推送的路径,将数据被推到下游。这两种方法都由利弊,在基于推送的系统,由于是 Broker 得控制数据传输的速率,不同消费者可能要不同的速率。然而消费者一般的目的都是让消费者自己能够以最大的速度进行消费,但在基于 push 的系统,当消费速率低于生产效率时,消费者就不知道该怎么办好了(本质上就是一种拒绝服务攻击(DOS))。一个基于 pull 的系统就拥有很好的熟悉,消费者可以简单的调控速率。
基于 pull 的系统的另一个优点是,它可以对发送给消费者的数据进行聚合的批处理。基于推送的系统必须选择立即发送请求或积累更多数据,然后在不知道下游用户是否能够立即处理它的情况下发送它。
基于 pull 的系统的缺点是,如果 Broker 没数据,则消费者可能会不停的轮训。为了避免这一点,我们在 pull 请求上提供了参数,允许消费者在“长轮训”中阻塞,直到数据达到(并且可以选择等待,直到一定数量的自己可以,确保传输的大小)。
消费者的 Position(Consumer Position)
令人惊讶的是,跟踪消息是否使用了,是消息队列系统的关键性能点之一。很多消息队列系统在 Broker 中保存了关于什么消息是被消费了的元数据。也就是说,当消息队列给消费者时,Broker 要么立即记录信息到本地,要么就是等待消费者的确认。这是一个相当直观的选择,而且对于一台机器服务器来说,很清楚地知道这些消息的状态。由于许多消息队列系统中用于存储的数据结构都很糟糕,因此这(记录消息状态)也是一个实用的选择——因为 Broker 知道什么是已经被消费的,所以可以立即删除它,保持数据的大小。
让 Broker 和消费者就已经消费的东西达成一致,这可不是小问题。如果一条消息发送到网络上,Broker 就把它置为已消费,但消费者可能处理这条消息失败了(或许是消费者挂了,也或许是请求超时等),这条消息就会丢失了。为了解决这个问题,很多消息队列系统增加了确认机制。当消息被发送时,是被标志为已发送,而不是已消费;这是 Broker 等待消费者发来特定的确认信息,则将消息置为已消费。这个策略虽然解决了消息丢失的问题,但却带来了新的问题。第一,如果消费者在发送确认信息之前,在处理完消息之后,消费者挂了,则会导致此消息会被处理两次。第二个问题是关于性能,Broker 必须保存每个消息的不同状态(首先先锁住消息以致于不会让它发送第二次,其次标志位已消费从而可以删除它)。还有些棘手的问题要处理。如消息被发送出去,但其确认信息一直没返回。
Kafka 处理则不一样。我们的主题被分为一个有序分区的集合,且每个分区在任何给定的时间内只会被订阅它的消费者组中的一个消费者给使用。这意味着每个分区中的消费者的 position 仅仅是一个整数,这是下一次消费时,消息的偏移量。这使状态(记录是否被消费)非常小,每个分区只有一个数字。这个状态可以被定期检查。这样确认一条消息是否被消费的成本就很低。
这样还附加了一个好处。消费者可以重置其最先的 position 从而重新消费数据。这虽然违反了队列的公共契约,但它却变成关键功能给许多消费者。例如,如果消费者代码有一个 bug,并且在一些消息被消费后才被发现,那么当 bug 被修复后,消费者就可以重新使用这些消息。
消费组
每群消费者都会被标志有消费组名。有消费组这个概念,Kafka 就可以实现类似与工作队列(Worke Queues)模式和发布 / 订阅(Publish/Subscribe)。如果消费者都在同一个消费组,则消息则会负载均衡的分配每个消费者,一条消息不会分配个两个及以上的消费者。如果消费者不在同一个组,则消息会被广播到每一个消费组中。
Kafka 的数据 offset 读取流程
每个消息在分区中都是被分配一个有序的 ID 数字,而这数字,我们称之为偏移量(offset)。在一个分区上,offset 唯一标识一个消息。由每个消费者维护 offset。
在 Kafka 文件存储中,同一个 topic 下有多个不同分区,每个分区为一个目录,分区命名规则为 topic 名称 + 有序序号,第一个分区序号从 0 开始,序号最大值为分区数量减 1。
partition 物理上由多个大小相等的 segment 组成。segment 由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀 ”.index” 和“.log”分别表示为 segment 索引文件、数据文件.segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
00000000000000000.index
00000000000000000.log
00000000000368769.index
00000000000368769.log
00000000000737337.index
00000000000737337.log
00000000001105814.index
00000000001105814.log
index file 的结构:
1,0
3,497
6,1407
8,1686
….
N,position
index file 结构是两个数字两个数字一组,N,position。N 用于查找相对于当前文件名的 offset 值的 N 个消息。如 00000000000368769.index 的 3,497,则为 368769+3= 第 368772 个消息。而 position 497 是指 data file 的偏移量 497。
data file 由许多 message 组成,message 物理结构如下:
8 byte offset
4 byte message size
4 byte CRC32
1 byte “magic”
1 byte “attributes”
4 byte key length
K byte key
4 byte payload length
value bytes payload
这样的结构,配合 index file,很快就可以知道某条消息的大小。
在 partition 中如何通过 offset 查找 message
例如读取 offset=368776 的 message,需要通过下面 2 个步骤查找。
第一步查找 segment file
上述为例,其中 00000000000000000000.index 表示最开始的文件,起始偏移量 (offset) 为 0. 第二个文件 00000000000000368769.index 的消息量起始偏移量为 368770 = 368769 + 1. 同样,第三个文件 00000000000000737337.index 的起始偏移量为 737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据 offset 二分查找文件列表,就可以快速定位到具体文件。当 offset=368776 时定位到 00000000000000368769.index|log
第二步通过 segment file 查找 message
通过第一步定位到 segment file,当 offset=368776 时,依次定位到 00000000000000368769.index 的元数据物理位置和 00000000000000368769.log 的物理偏移地址,然后再通过 00000000000000368769.log 顺序查找直到 offset=368776 为止。
从上述图 3 可知这样做的优点,segment index file 采取稀疏索引存储方式,它减少索引文件大小,通过 mmap 可以直接内存操作,稀疏索引为数据文件的每个对应 message 设置一个元数据指针, 它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
Kafka 高效文件存储设计特点
Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
通过索引信息可以快速定位 message 和确定 response 的最大大小。
通过 index 元数据全部映射到 memory,可以避免 segment file 的 IO 磁盘操作。
通过索引文件稀疏存储,可以大幅降低 index 文件元数据占用空间大小。
注:稀疏索引类似于带一级索引的跳表,但是一级索引是数组可以使用二分法查找。
注:mmap()函数是 Linux 的文件空间映射函数,用来将文件或设备空间映射到内存中,可以通过映射后的内存空间存取来获得与存取文件一致的控制方式,不必再使用 read(),write()函数。mmap 和常规文件操作的区别回顾一下常规文件系统操作(调用 read/fread 等类函数)中,函数的调用过程:
进程发起读文件请求。
内核通过查找进程文件符表,定位到内核已打开文件集上的文件信息,从而找到此文件的 inode。
inode 在 address_space 上查找要请求的文件页是否已经缓存在页缓存中。如果存在,则直接返回这片文件页的内容。
如果不存在,则通过 inode 定位到文件磁盘地址,将数据从磁盘复制到页缓存。之后再次发起读页面过程,进而将页缓存中的数据发给用户进程。
总结来说,常规文件操作为了提高读写效率和保护磁盘,使用了页缓存机制。这样造成读文件时需要先将文件页从磁盘拷贝到页缓存中,由于页缓存处在内核空间,不能被用户进程直接寻址,所以还需要将页缓存中数据页再次拷贝到内存对应的用户空间中。这样,通过了两次数据拷贝过程,才能完成进程对文件内容的获取任务。写操作也是一样,待写入的 buffer 在内核空间不能直接访问,必须要先拷贝至内核空间对应的主存,再写回磁盘中(延迟写回),也是需要两次数据拷贝。而使用 mmap 操作文件中,创建新的虚拟内存区域和建立文件磁盘地址和虚拟内存区域映射这两步,没有任何文件拷贝操作。而之后访问数据时发现内存中并无数据而发起的缺页异常过程,可以通过已经建立好的映射关系,只使用一次数据拷贝,就从磁盘中将数据传入内存的用户空间中,供进程使用。总而言之,常规文件操作需要从磁盘到页缓存再到用户主存的两次数据拷贝。而 mmap 操控文件,只需要从磁盘到用户主存的一次数据拷贝过程。说白了,mmap 的关键点是实现了用户空间和内核空间的数据直接交互而省去了空间不同数据不通的繁琐过程。因此 mmap 效率更高。
函数原型
void *mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset);
也就是可以将大数据的文件,局部映射到内存中,在内存中进行此部分文件的操作。对此内存操作,都不涉及到内核空间到用户空间之间交互。直接操作内存,内存直接写入(读取)文件。就只有一次 IO。如果是普通文件操作,则需要文件复制到内核,再由内核复制到用户空间,用户空间才能操作。从而达到零拷贝。换句话说,但凡是需要用磁盘空间代替内存的时候,mmap 都可以发挥其功效。
Kafka 内部如何保证顺序,结合外部组建如何保证消费者的顺序
Kafka 中每个分区都是有序,由于 Kafka 的消息是不可变的,所以都是追加的形式有序的往上加消息。这个结构体叫 结构化提交日志(a structured commit log)。
首先就要考虑是否真的需要所有消息在队列都得有序。一般情况,不止一般,而是很大一部分,是可以无序的。就跟分布式一样。有很多业务,看起来是同步的,静下来慢慢思考,就会发现很多东西是可以异步执行的。如果实在有这样保证顺序的需要,保证生产者需将有序地提交给一个分区,首先是生产者不能提交错顺序。其次,消费者组就不能拥有两个或以上消费者实例了。连两个或以上的消费者组也不能有。
持久化
Kafka 会根据保留时间这参数,持久化所有已经收到的消息。虽然可以设置保留时间这参数,但是 Kafka 优秀的性能,添加删除都是常量级的性能,所以理论上,数据保存很长时间也不成问题。
参考: http://kafka.apache.org/https://www.zhihu.com/questio…https://blog.csdn.net/yangyut…http://www.cnblogs.com/huxiao…https://www.cnblogs.com/ITtan…
稀疏索引:https://blog.csdn.net/qq_2223… 跳表:https://www.jianshu.com/p/dc2…