关于java:Kafka实战三kafka数据可靠性深度解读

5次阅读

共计 20106 个字符,预计需要花费 51 分钟才能阅读完成。

1 概述

Kakfa 起初是由 LinkedIn 公司开发的一个分布式的音讯零碎,后成为 Apache 的一部分,它应用 Scala 编写,以可程度扩大和高吞吐率而被宽泛应用。目前越来越多的开源分布式解决零碎如 Cloudera、Apache Storm、Spark 等都反对与 Kafka 集成。

Kafka 凭借着本身的劣势,越来越受到互联网企业的青眼,唯品会也采纳 Kafka 作为其外部外围音讯引擎之一。Kafka 作为一个商业级消息中间件,音讯可靠性的重要性可想而知。如何确保音讯的准确传输?如何确保音讯的精确存储?如何确保音讯的正确生产?这些都是须要思考的问题。本文首先从 Kafka 的架构着手,先理解下 Kafka 的基本原理,而后通过对 kakfa 的存储机制、复制原理、同步原理、可靠性和持久性保障等等一步步对其可靠性进行剖析,最初通过 benchmark 来加强对 Kafka 高可靠性的认知。


2 Kafka 体系架构

如上图所示,一个典型的 Kafka 体系架构包含若干 Producer(能够是服务器日志,业务数据,页面前端产生的 page view 等等),若干 broker(Kafka 反对程度扩大,个别 broker 数量越多,集群吞吐率越高),若干 Consumer (Group),以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 治理集群配置,选举 leader,以及在 consumer group 发生变化时进行 rebalance。Producer 应用 push(推)模式将音讯公布到 broker,Consumer 应用 pull(拉)模式从 broker 订阅并生产音讯。

名词解释:

名称 解释
Broker 消息中间件解决节点,一个 Kafka 节点就是一个 broker,一个或者多个 Broker 能够组成一个 Kafka 集群
Topic Kafka 依据 topic 对音讯进行归类,公布到 Kafka 集群的每条音讯都须要指定一个 topic
Producer 音讯生产者,向 Broker 发送音讯的客户端
Consumer 音讯消费者,从 Broker 读取音讯的客户端
ConsumerGroup 每个 Consumer 属于一个特定的 Consumer Group,一条音讯能够发送到多个不同的 Consumer Group,然而一个 Consumer Group 中只能有一个 Consumer 可能生产该音讯
Partition 物理上的概念,一个 topic 能够分为多个 partition,每个 partition 外部是有序的

2.1 Topic & Partition

一个 topic 能够认为一个一类音讯,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何公布到此 partition 的音讯都会被追加到 log 文件的尾部,每条音讯在文件中的地位称为 offset(偏移量),offset 为一个 long 型的数字,它惟一标记一条音讯。每条音讯都被 append 到 partition 中,是程序写磁盘,因而效率十分高(教训证,程序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保障)。

每一条音讯被发送到 broker 中,会依据 partition 规定抉择被存储到哪一个 partition。如果 partition 规定设置的正当,所有音讯能够均匀分布到不同的 partition 里,这样就实现了程度扩大。(如果一个 topic 对应一个文件,那这个文件所在的机器 I / O 将会成为这个 topic 的性能瓶颈,而 partition 解决了这个问题)。在创立 topic 时能够在 $KAFKA_HOME/config/server.properties 中指定这个 partition 的数量(如下所示),当然能够在 topic 创立之后去批改 partition 的数量。

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

在发送一条音讯时,能够指定这个音讯的 key,producer 依据这个 key 和 partition 机制来判断这个音讯发送到哪个 partition。partition 机制能够通过指定 producer 的 partition.class 这一参数来指定,该 class 必须实现 kafka.producer.Partitioner 接口。

无关 Topic 与 Partition 的更多细节,能够参考上面的“Kafka 文件存储机制”这一节。


3 高可靠性存储剖析

Kafka 的高可靠性的保障来源于其强壮的正本(replication)策略。通过调节其正本相干参数,能够使得 Kafka 在性能和可靠性之间运行的熟能生巧。Kafka 从 0.8.x 版本开始提供 partition 级别的复制,replication 的数量能够在 $KAFKA_HOME/config/server.properties 中配置(default.replication.refactor)。

这里先从 Kafka 文件存储机制动手,从最底层理解 Kafka 的存储细节,进而对其的存储有个宏观的认知。之后通过 Kafka 复制原理和同步形式来论述宏观层面的概念。最初从 ISR,HW,leader 选举以及数据可靠性和持久性保障等等各个维度来丰盛对 Kafka 相干知识点的认知。

3.1 Kafka 文件存储机制

Kafka 中音讯是以 topic 进行分类的,生产者通过 topic 向 Kafka broker 发送音讯,消费者通过 topic 读取数据。然而 topic 在物理层面又能以 partition 为分组,一个 topic 能够分成若干个 partition,那么 topic 以及 partition 又是怎么存储的呢?partition 还能够细分为 segment,一个 partition 物理上由多个 segment 组成,那么这些 segment 又是什么呢?上面咱们来一一揭晓。

为了便于阐明问题,假如这里只有一个 Kafka 集群,且这个集群只有一个 Kafka broker,即只有一台物理机。在这个 Kafka broker 中配置($KAFKA_HOME/config/server.properties 中)log.dirs=/tmp/kafka-logs,以此来设置 Kafka 音讯文件存储目录,与此同时创立一个 topic:topic_zzh_test,partition 的数量为 4($KAFKA_HOME/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic topic_zzh_test –replication-factor 1)。那么咱们此时能够在 /tmp/kafka-logs 目录中能够看到生成了 4 个目录:

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3

在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录,partition 的名称规定为:topic 名称 + 有序序号,第一个序号从 0 开始计,最大的序号为 partition 数量减 1,partition 是理论物理上的概念,而 topic 是逻辑上的概念。

下面提到 partition 还能够细分为 segment,这个 segment 又是什么?如果就以 partition 为最小存储单位,咱们能够设想当 Kafka producer 一直发送音讯,必然会引起 partition 文件的有限扩张,这样对于音讯文件的保护以及曾经被生产的音讯的清理带来重大的影响,所以这里以 segment 为单位又将 partition 细分。每个 partition(目录)相当于一个巨型文件被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment 文件中音讯数量不肯定相等)这种个性也不便 old segment 的删除,即不便已被生产的音讯的清理,进步磁盘的利用率。每个 partition 只须要反对程序读写就行,segment 的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。

segment 文件由两局部组成,别离为“.index”文件和“.log”文件,别离示意为 segment 索引文件和数据文件。这两个文件的命令规定为:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最初一条音讯的 offset 值,数值大小为 64 位,20 位数字字符长度,没有数字用 0 填充,如下:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log`

以下面的 segment 文件为例,展现出 segment:00000000000000170410 的“.index”文件和“.log”文件的对应的关系,如下图:

如上图,“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的音讯,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。其中以“.index”索引文件中的元数据 [3, 348] 为例,在“.log”数据文件示意第 3 个音讯,即在全局 partition 中示意 170410+3=170413 个音讯,该音讯的物理偏移地址为 348。

那么如何从 partition 中通过 offset 查找 message 呢?
以上图为例,读取 offset=170418 的音讯,首先查找 segment 文件,其中 00000000000000000000.index 为最开始的文件,第二个文件为 00000000000000170410.index(起始偏移为 170410+1=170411),而第三个文件为 00000000000000239430.index(起始偏移为 239430+1=239431),所以这个 offset=170418 就落到了第二个文件之中。其余后续文件能够顺次类推,以其实偏移量命名并排列这些文件,而后依据二分查找法就能够疾速定位到具体文件地位。其次依据 00000000000000170410.index 文件中的 [8,1325] 定位到 00000000000000170410.log 文件中的 1325 的地位进行读取。

要是读取 offset=170418 的音讯,从 00000000000000170410.log 文件中的 1325 的地位进行读取,那么怎么晓得何时读完本条音讯,否则就读到下一条音讯的内容了?
这个就须要分割到音讯的物理构造了,音讯都具备固定的物理构造,包含:offset(8 Bytes)、音讯体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,能够确定一条音讯的大小,即读取到哪里截止。

3.2 复制原理和同步形式

Kafka 中 topic 的每个 partition 有一个预写式的日志文件,尽管 partition 能够持续细分为若干个 segment 文件,然而对于下层利用来说能够将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的“巨型”文件),每个 partition 都由一些列有序的、不可变的音讯组成,这些音讯被间断的追加到 partition 中。

上图中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,示意每个 partition 的 log 最初一条 Message 的地位。HW 是 HighWatermark 的缩写,是指 consumer 可能看到的此 partition 的地位,这个波及到多正本的概念,这里先提及一下,下节再详表。

言归正传,为了进步音讯的可靠性,Kafka 每个 topic 的 partition 有 N 个正本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多正本机制实现故障主动转移,当 Kafka 集群中一个 broker 生效状况下依然保障服务可用。在 Kafka 中产生复制时确保 partition 的日志能有序地写到其余节点上,N 个 replicas 中,其中一个 replica 为 leader,其余都为 follower, leader 解决 partition 的所有读写申请,与此同时,follower 会被动定期地去复制 leader 上的数据。

如下图所示,Kafka 集群中有 4 个 broker, 某 topic 有 3 个 partition, 且复制因子即正本个数也为 3:

Kafka 提供了数据复制算法保障,如果 leader 产生故障或挂掉,一个新 leader 被选举并被承受客户端的音讯胜利写入。Kafka 确保从同步正本列表中选举一个正本为 leader,或者说 follower 追赶 leader 数据。leader 负责保护和跟踪 ISR(In-Sync Replicas 的缩写,示意正本同步队列,具体可参考下节)中所有 follower 滞后的状态。当 producer 发送一条音讯到 broker 后,leader 写入音讯并复制到所有 follower。音讯提交之后才被胜利复制到所有的同步正本。音讯复制提早受最慢的 follower 限度,重要的是疾速检测慢正本,如果 follower“落后”太多或者生效,leader 将会把它从 ISR 中删除。

3.3 ISR

上节咱们波及到 ISR (In-Sync Replicas),这个是指正本同步队列。正本数对 Kafka 的吞吐率是有肯定的影响,但极大的加强了可用性。默认状况下 Kafka 的 replica 数量为 1,即每个 partition 都有一个惟一的 leader,为了确保音讯的可靠性,通常利用中将其值 (由 broker 的参数 offsets.topic.replication.factor 指定) 大小设置为大于 1,比方 3。所有的正本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 保护 ISR 列表,follower 从 leader 同步数据有一些提早(包含延迟时间 replica.lag.time.max.ms 和提早条数 replica.lag.max.messages 两个维度, 以后最新的版本 0.10.x 中只反对 replica.lag.time.max.ms 这个维度),任意一个超过阈值都会把 follower 剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新退出的 follower 也会先寄存在 OSR 中。AR=ISR+OSR。

Kafka 0.9.0.0 版本后移除了 replica.lag.max.messages 参数,只保留了 replica.lag.time.max.ms 作为 ISR 中正本治理的参数。为什么这样做呢?replica.lag.max.messages 示意以后某个正本落后 leader 的音讯数量超过了这个参数的值,那么 leader 就会把 follower 从 ISR 中删除。假如设置 replica.lag.max.messages=4,那么如果 producer 一次传送至 broker 的音讯数量都小于 4 条时,因为在 leader 承受到 producer 发送的音讯之后而 follower 正本开始拉取这些音讯之前,follower 落后 leader 的音讯数不会超过 4 条音讯,故此没有 follower 移出 ISR,所以这时候 replica.lag.max.message 的设置仿佛是正当的。然而 producer 发动刹时顶峰流量,producer 一次发送的音讯超过 4 条时,也就是超过 replica.lag.max.messages,此时 follower 都会被认为是与 leader 正本不同步了,从而被踢出了 ISR。但实际上这些 follower 都是存活状态的且没有性能问题。那么在之后追上 leader, 并被重新加入了 ISR。于是就会呈现它们一直地剔出 ISR 而后从新回归 ISR,这无疑减少了无谓的性能损耗。而且这个参数是 broker 全局的。设置太大了,影响真正“落后”follower 的移除;设置的太小了,导致 follower 的频繁进出。无奈给定一个适合的 replica.lag.max.messages 的值,故此,新版本的 Kafka 移除了这个参数。

注:ISR 中包含:leader 和 follower。

下面一节还波及到一个概念,即 HW。HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能生产到 HW 所在的地位。另外每个 replica 都有 HW,leader 和 follower 各自负责更新本人的 HW 的状态。对于 leader 新写入的音讯,consumer 不能立即生产,leader 会期待该音讯被所有 ISR 中的 replicas 同步后更新 HW,此时音讯能力被 consumer 生产。这样就保障了如果 leader 所在的 broker 生效,该音讯依然能够从新选举的 leader 中获取。对于来自外部 broker 的读取申请,没有 HW 的限度。

下图具体的阐明了当 producer 生产音讯至 broker 后,ISR 以及 HW 和 LEO 的流转过程:

由此可见,Kafka 的复制机制既不是齐全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条音讯才会被 commit,这种复制形式极大的影响了吞吐率。而异步复制形式下,follower 异步的从 leader 复制数据,数据只有被 leader 写入 log 就被认为曾经 commit,这种状况下如果 follower 都还没有复制完,落后于 leader 时,忽然 leader 宕机,则会失落数据。而 Kafka 的这种应用 ISR 的形式则很好的平衡了确保数据不失落以及吞吐率。

Kafka 的 ISR 的治理最终都会反馈到 Zookeeper 节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个中央会对这个 Zookeeper 的节点进行保护:

  1. Controller 来保护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,次要负责 Partition 治理和正本状态治理,也会执行相似于重调配 partition 之类的治理工作。在合乎某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 Zookeeper 的相干节点中。同时发动 LeaderAndIsrRequest 告诉所有的 replicas。
  2. leader 来保护:leader 有独自的线程定期检测 ISR 中 follower 是否脱离 ISR, 如果发现 ISR 变动,则会将新的 ISR 的信息返回到 Zookeeper 的相干节点中。

3.4 数据可靠性和持久性保障

当 producer 向 leader 发送数据时,能够通过 request.required.acks 参数来设置数据可靠性的级别:

  • 1(默认):这意味着 producer 在 ISR 中的 leader 已胜利收到数据并失去确认。如果 leader 宕机了,则会失落数据。
  • 0:这意味着 producer 无需期待来自 broker 的确认而持续发送下一批音讯。这种状况下数据传输效率最高,然而数据可靠性确是最低的。
  • -1:producer 须要期待 ISR 中的所有 follower 都确认接管到数据后才算一次发送实现,可靠性最高。然而这样也不能保证数据不失落,比方当 ISR 中只有 leader 时(后面 ISR 那一节讲到,ISR 中的成员因为某些状况会减少也会缩小,起码就只剩一个 leader),这样就变成了 acks= 1 的状况。

如果要进步数据的可靠性,在设置 request.required.acks=- 1 的同时,也要 min.insync.replicas 这个参数 (能够在 broker 或者 topic 层面进行设置) 的配合,这样能力施展最大的效用。min.insync.replicas 这个参数设定 ISR 中的最小正本数是多少,默认值为 1,当且仅当 request.required.acks 参数设置为 - 1 时,此参数才失效。如果 ISR 中的正本数少于 min.insync.replicas 配置的数量时,客户端会返回异样:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

接下来对 acks= 1 和 - 1 的两种状况进行详细分析:

1. request.required.acks=1

producer 发送数据到 leader,leader 写本地日志胜利,返回客户端胜利;此时 ISR 中的正本还没有来得及拉取该音讯,leader 就宕机了,那么此次发送的音讯就会失落。

2. request.required.acks=-1

同步(Kafka 默认为同步,即 producer.type=sync)的发送模式,replication.factor>= 2 且 min.insync.replicas>= 2 的状况下,不会失落数据。

有两种典型状况。acks=- 1 的状况下(如无非凡阐明,以下 acks 都示意为参数 request.required.acks),数据发送到 leader, ISR 的 follower 全副实现数据同步后,leader 此时挂掉,那么会选举出新的 leader,数据不会失落。

acks=- 1 的状况下,数据发送到 leader 后,局部 ISR 的正本同步,leader 此时挂掉。比方 follower1 和 follower2 都有可能变成新的 leader, producer 端会失去返回异样,producer 端会从新发送数据,数据可能会反复。

当然上图中如果在 leader crash 的时候,follower2 还没有同步到任何数据,而且 follower2 被选举为新的 leader 的话,这样音讯就不会反复。

注:Kafka 只解决 fail/recover 问题, 不解决 Byzantine 问题。

3.5 对于 HW 的进一步探讨

思考上图(即 acks=-1, 局部 ISR 正本同步)中的另一种状况,如果在 Leader 挂掉的时候,follower1 同步了音讯 4,5,follower2 同步了音讯 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的音讯 5 该做如何解决呢?

这里就须要 HW 的协同配合了。如前所述,一个 partition 中的 ISR 列表中,leader 的 HW 是所有 ISR 列表里正本中最小的那个的 LEO。相似于木桶原理,水位取决于最低那块短板。

如上图,某个 topic 的某 partition 有三个正本,别离为 A、B、C。A 作为 leader 必定是 LEO 最高,B 紧随其后,C 机器因为配置比拟低,网络比拟差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,如果没有 HW,在 A 从新复原之后会做同步 (makeFollower) 操作,在宕机时 log 文件之后间接做追加操作,而如果 B 的 LEO 曾经达到了 A 的 LEO,会产生数据不统一的状况,所以应用 HW 来防止这种状况。
A 在做同步操作的时候,先将 log 文件截断到之前本人的 HW 的地位,即 3,之后再从 B 中拉取音讯进行同步。

如果失败的 follower 恢复过来,它首先将本人的 log 文件截断到上次 checkpointed 时刻的 HW 的地位,之后再从 leader 中同步音讯。leader 挂掉会从新选举,新的 leader 会发送“指令”让其余的 follower 截断至本身的 HW 的地位而后再拉取新的音讯。

当 ISR 中的个正本的 LEO 不统一时,如果此时 leader 挂掉,选举新的 leader 时并不是依照 LEO 的高下进行选举,而是依照 ISR 中的程序选举。

3.6 Leader 选举

一条音讯只有被 ISR 中的所有 follower 都从 leader 复制过来才会被认为已提交。这样就防止了局部数据被写进了 leader,还没来得及被任何 follower 复制就宕机了,而造成数据失落。而对于 producer 而言,它能够抉择是否期待音讯 commit,这能够通过 request.required.acks 来设置。这种机制确保了只有 ISR 中有一个或者以上的 follower,一条被 commit 的音讯就不会失落。

有一个很重要的问题是当 leader 宕机了,怎么在 follower 中选举出新的 leader,因为 follower 可能落后很多或者间接 crash 了,所以必须确保抉择“最新”的 follower 作为新的 leader。一个根本的准则就是,如果 leader 不在了,新的 leader 必须领有原来的 leader commit 的所有音讯。这就须要做一个折中,如果 leader 在一个音讯被 commit 前期待更多的 follower 确认,那么在它挂掉之后就有更多的 follower 能够成为新的 leader,但这也会造成吞吐率的降落。

一种十分罕用的选举 leader 的形式是“多数遵从少数”,Kafka 并不是采纳这种形式。这种模式下,如果咱们有 2f+ 1 个正本,那么在 commit 之前必须保障有 f + 1 个 replica 复制完音讯,同时为了保障能正确选举出新的 leader,失败的正本数不能超过 f 个。这种形式有个很大的劣势,零碎的提早取决于最快的几台机器,也就是说比方正本数为 3,那么提早就取决于最快的那个 follower 而不是最慢的那个。“多数遵从少数”的形式也有一些劣势,为了保障 leader 选举的失常进行,它所能容忍的失败的 follower 数比拟少,如果要容忍 1 个 follower 挂掉,那么至多要 3 个以上的正本,如果要容忍 2 个 follower 挂掉,必须要有 5 个以上的正本。也就是说,在生产环境下为了保障较高的容错率,必须要有大量的正本,而大量的正本又会在大数据量下导致性能的急剧下降。这种算法更多用在 Zookeeper 这种共享集群配置的零碎中而很少在须要大量数据的零碎中应用的起因。HDFS 的 HA 性能也是基于“多数遵从少数”的形式,然而其数据存储并不是采纳这样的形式。

实际上,leader 选举的算法十分多,比方 Zookeeper 的 Zab、Raft 以及 Viewstamped Replication。而 Kafka 所应用的 leader 选举算法更像是微软的 PacificA 算法。

Kafka 在 Zookeeper 中为每一个 partition 动静的保护了一个 ISR,这个 ISR 里的所有 replica 都跟上了 leader,只有 ISR 里的成员能力有被选为 leader 的可能(unclean.leader.election.enable=false)。在这种模式下,对于 f + 1 个正本,一个 Kafka topic 能在保障不失落曾经 commit 音讯的前提下容忍 f 个正本的失败,在大多数应用场景下,这种模式是非常无利的。事实上,为了容忍 f 个正本的失败,“多数遵从少数”的形式和 ISR 在 commit 前须要期待的正本的数量是一样的,然而 ISR 须要的总的正本的个数简直是“多数遵从少数”的形式的一半。

上文提到,在 ISR 中至多有一个 follower 时,Kafka 能够确保曾经 commit 的数据不失落,但如果某一个 partition 的所有 replica 都挂了,就无奈保证数据不失落了。这种状况下有两种可行的计划:

  1. 期待 ISR 中任意一个 replica“活”过去,并且选它作为 leader
  2. 抉择第一个“活”过去的 replica(并不一定是在 ISR 中)作为 leader

这就须要在可用性和一致性当中作出一个简略的抉择。如果肯定要期待 ISR 中的 replica“活”过去,那不可用的工夫就可能会绝对较长。而且如果 ISR 中所有的 replica 都无奈“活”过去了,或者数据失落了,这个 partition 将永远不可用。抉择第一个“活”过去的 replica 作为 leader, 而这个 replica 不是 ISR 中的 replica, 那即便它并不保障曾经蕴含了所有已 commit 的音讯,它也会成为 leader 而作为 consumer 的数据源。默认状况下,Kafka 采纳第二种策略,即 unclean.leader.election.enable=true,也能够将此参数设置为 false 来启用第一种策略。

unclean.leader.election.enable 这个参数对于 leader 的选举、零碎的可用性以及数据的可靠性都有至关重要的影响。上面咱们来剖析下几种典型的场景。

如果上图所示,假如某个 partition 中的正本数为 3,replica-0, replica-1, replica- 2 别离寄存在 broker0, broker1 和 broker2 中。AR=(0,1,2),ISR=(0,1)。
设置 request.required.acks=-1, min.insync.replicas=2,unclean.leader.election.enable=false。这里将 broker0 中的正本也称之为 broker0 起初 broker0 为 leader,broker1 为 follower。

  • 当 ISR 中的 replica- 0 呈现 crash 的状况时,broker1 选举为新的 leader[ISR=(1)],因为受 min.insync.replicas= 2 影响,write 不能服务,然而 read 能持续失常服务。此种状况复原计划:

    1. 尝试复原(重启)replica-0,如果能起来,零碎失常;
    2. 如果 replica- 0 不能复原,须要将 min.insync.replicas 设置为 1,复原 write 性能。
  • 当 ISR 中的 replica- 0 呈现 crash,紧接着 replica- 1 也呈现了 crash, 此时[ISR=(1),leader=-1], 不能对外提供服务,此种状况复原计划:

    1. 尝试复原 replica- 0 和 replica-1,如果都能起来,则零碎恢复正常;
    2. 如果 replica- 0 起来,而 replica- 1 不能起来,这时候依然不能选出 leader,因为当设置 unclean.leader.election.enable=false 时,leader 只能从 ISR 中选举,当 ISR 中所有正本都生效之后,须要 ISR 中最初生效的那个副本能复原之后能力选举 leader, 即 replica- 0 先生效,replica- 1 后生效,须要 replica- 1 复原后能力选举 leader。激进的计划倡议把 unclean.leader.election.enable 设置为 true, 然而这样会有失落数据的状况产生,这样能够复原 read 服务。同样须要将 min.insync.replicas 设置为 1,复原 write 性能;
    3. replica- 1 复原,replica- 0 不能复原,这个状况下面遇到过,read 服务可用,须要将 min.insync.replicas 设置为 1,复原 write 性能;
    4. replica- 0 和 replica- 1 都不能复原,这种状况能够参考情景 2.
  • 当 ISR 中的 replica-0, replica- 1 同时宕机, 此时[ISR=(0,1)], 不能对外提供服务,此种状况复原计划:尝试复原 replica- 0 和 replica-1,当其中任意一个正本恢复正常时,对外能够提供 read 服务。直到 2 个正本恢复正常,write 性能能力复原,或者将将 min.insync.replicas 设置为 1。

3.7 Kafka 的发送模式

Kafka 的发送模式由 producer 端的配置参数 producer.type 来设置,这个参数指定了在后盾线程中音讯的发送形式是同步的还是异步的,默认是同步的形式,即 producer.type=sync。如果设置成异步的模式,即 producer.type=async,能够是 producer 以 batch 的模式 push 数据,这样会极大的进步 broker 的性能,然而这样会减少失落数据的危险。如果须要确保音讯的可靠性,必须要将 producer.type 设置为 sync。

对于异步模式,还有 4 个配套的参数,如下:

Property Description
queue.buffering.max.ms 默认值:5000。启用异步模式时,producer 缓存音讯的工夫。比方咱们设置成 1000 时,它会缓存 1s 的数据再一次发送进来,这样能够极大的减少 broker 吞吐量,但也会造成时效性的升高。
queue.buffering.max.messages 默认值:10000。启用异步模式时,producer 缓存队列里最大缓存的音讯数量,如果超过这个值,producer 就会阻塞或者丢掉音讯。
queue.enqueue.timeout.ms 默认值:-1。当达到下面参数时 producer 会阻塞期待的工夫。如果设置为 0,buffer 队列满时 producer 不会阻塞,音讯间接被丢掉;若设置为 -1,producer 会被阻塞,不会丢音讯。
batch.num.messages 默认值:200。启用异步模式时,一个 batch 缓存的音讯数量。达到这个数值时,producer 才会发送音讯。(每次批量发送的数量)

以 batch 的形式推送数据能够极大的进步解决效率,kafka producer 能够将音讯在内存中累计到肯定数量后作为一个 batch 发送申请。batch 的数量大小能够通过 producer 的参数(batch.num.messages)管制。通过减少 batch 的大小,能够缩小网络申请和磁盘 IO 的次数,当然具体参数设置须要在效率和时效性方面做一个衡量。在比拟新的版本中还有 batch.size 这个参数。


4 高可靠性应用剖析

4.1 音讯传输保障

后面曾经介绍了 Kafka 如何进行无效的存储,以及理解了 producer 和 consumer 如何工作。接下来探讨的是 Kafka 如何确保音讯在 producer 和 consumer 之间传输。有以下三种可能的传输保障(delivery guarantee):

  • At most once: 音讯可能会丢,但绝不会反复传输
  • At least once:音讯绝不会丢,但可能会反复传输
  • Exactly once:每条音讯必定会被传输一次且仅传输一次

Kafka 的音讯传输保障机制十分直观。当 producer 向 broker 发送音讯时,一旦这条音讯被 commit,因为正本机制(replication)的存在,它就不会失落。然而如果 producer 发送数据给 broker 后,遇到的网络问题而造成通信中断,那 producer 就无奈判断该条音讯是否曾经提交(commit)。尽管 Kafka 无奈确定网络故障期间产生了什么,然而 producer 能够 retry 屡次,确保音讯曾经正确传输到 broker 中,所以目前 Kafka 实现的是 at least once。

consumer 从 broker 中读取音讯后,能够抉择 commit,该操作会在 Zookeeper 中存下该 consumer 在该 partition 下读取的音讯的 offset。该 consumer 下一次再读该 partition 时会从下一条开始读取。如未 commit,下一次读取的开始地位会跟上一次 commit 之后的开始地位雷同。当然也能够将 consumer 设置为 autocommit,即 consumer 一旦读取到数据立刻主动 commit。如果只探讨这一读取音讯的过程,那 Kafka 是确保了 exactly once, 然而如果因为后面 producer 与 broker 之间的某种原因导致音讯的反复,那么这里就是 at least once。

思考这样一种状况,当 consumer 读完音讯之后先 commit 再解决音讯,在这种模式下,如果 consumer 在 commit 后还没来得及解决音讯就 crash 了,下次从新开始工作后就无奈读到刚刚已提交而未解决的音讯,这就对应于 at most once 了。

读完音讯先解决再 commit。这种模式下,如果解决完了音讯在 commit 之前 consumer crash 了,下次从新开始工作时还会解决刚刚未 commit 的音讯,实际上该音讯曾经被解决过了,这就对应于 at least once。

要做到 exactly once 就须要引入音讯去重机制。

4.2 音讯去重

如上一节所述,Kafka 在 producer 端和 consumer 端都会呈现音讯的反复,这就须要去重解决。

Kafka 文档中提及 GUID(Globally Unique Identifier)的概念,通过客户端生成算法失去每个音讯的 unique id,同时可映射至 broker 上存储的地址,即通过 GUID 便可查问提取音讯内容,也便于发送方的幂等性保障,须要在 broker 上提供此去重解决模块,目前版本尚不反对。

针对 GUID, 如果从客户端的角度去重,那么须要引入集中式缓存,必然会减少依赖复杂度,另外缓存的大小难以界定。

不只是 Kafka, 相似 RabbitMQ 以及 RocketMQ 这类商业级中间件也只保障 at least once, 且也无奈从本身去进行音讯去重。所以咱们倡议业务方依据本身的业务特点进行去重,比方业务音讯自身具备幂等性,或者借助 Redis 等其余产品进行去重解决。

4.3 高可靠性配置

Kafka 提供了很高的数据冗余弹性,对于须要数据高可靠性的场景,咱们能够减少数据冗余备份数(replication.factor),调高最小写入正本数的个数(min.insync.replicas)等等,然而这样会影响性能。反之,性能进步而可靠性则升高,用户须要本身业务个性在彼此之间做一些衡量性抉择。

要保证数据写入到 Kafka 是平安的,高牢靠的,须要如下的配置:

  • topic 的配置:replication.factor>=3, 即正本数至多是 3 个;2<=min.insync.replicas<=replication.factor
  • broker 的配置:leader 的选举条件 unclean.leader.election.enable=false
  • producer 的配置:request.required.acks=-1(all),producer.type=sync
    • *

5 BenchMark

Kafka 在唯品会有着很深的历史渊源,依据唯品会消息中间件团队(VMS 团队)所把握的材料显示,在 VMS 团队运行的 Kafka 集群中所撑持的 topic 数已靠近 2000,每天的申请量也已达千亿级。这里就以 Kafka 的高可靠性为基准点来探索几种不同场景下的行为表现,以此来加深对 Kafka 的认知,为大家在当前高效的应用 Kafka 时提供一份根据。

5.1 测试环境

Kafka broker 用到了 4 台机器,别离为 broker[0/1/2/3]配置如下:

  • CPU: 24core/2.6GHZ
  • Memory: 62G
  • Network: 4000Mb
  • OS/kernel: CentOs release 6.6 (Final)
  • Disk: 1089G
  • Kafka 版本:0.10.1.0

broker 端 JVM 参数设置:
-Xmx8G -Xms8G -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/apps/service/kafka/bin/…/logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999

客户端机器配置:

  • CPU: 24core/2.6GHZ
  • Memory: 3G
  • Network: 1000Mb
  • OS/kernel: CentOs release 6.3 (Final)
  • Disk: 240G

5.2 不同场景测试

场景 1:测试不同的正本数、min.insync.replicas 策略以及 request.required.acks 策略(以下简称 acks 策略)对于发送速度(TPS)的影响。

具体配置:一个 producer;发送形式为 sync;音讯体大小为 1kB;partition 数为 12。正本数为:1/2/4;min.insync.replicas 别离为 1 /2/4;acks 别离为 -1(all)/1/0。

具体测试数据如下表(min.insync.replicas 只在 acks=- 1 时无效):

acks replicas min.insync.replicas retries TPS
-1 1 1 0 28511.3
-1 2 1 0 22359.5
-1 2 2 0 22927.4
-1 4 1 0 16193.9
-1 4 2 0 16599.9
-1 4 4 0 16680.3
0 1 N/A 0 45353.8
0 2 N/A 0 46426.5
0 4 N/A 0 46764.2
1 1 N/A 0 33950.3
1 2 N/A 0 32192.2
1 4 N/A 0 32275.9

测试后果剖析:

  • 客户端的 acks 策略对发送的 TPS 有较大的影响,TPS:acks_0 > acks_1 > ack_-1;
  • 正本数越高,TPS 越低;正本数统一时,min.insync.replicas 不影响 TPS;
  • acks=0/ 1 时,TPS 与 min.insync.replicas 参数以及正本数无关,仅受 acks 策略的影响。

上面将 partition 的个数设置为 1,来进一步确认下不同的 acks 策略、不同的 min.insync.replicas 策略以及不同的正本数对于发送速度的影响,具体请看情景 2 和情景 3。

场景 2:在 partition 个数固定为 1,测试不同的正本数和 min.insync.replicas 策略对发送速度的影响。

具体配置:一个 producer;发送形式为 sync;音讯体大小为 1kB;producer 端 acks=-1(all)。变换正本数:2/3/4;min.insync.replicas 设置为:1/2/4。

测试后果如下:

replicas min.insync.replicas TPS
2 1 9738.8
2 2 9701.6
3 1 8999.7
3 2 9243.1
4 1 9005.8
4 2 8216.9
4 4 9092.4

测试后果剖析:正本数越高,TPS 越低(这点与场景 1 的测试论断吻合),然而当 partition 数为 1 时差距甚微。min.insync.replicas 不影响 TPS。

场景 3:在 partition 个数固定为 1,测试不同的 acks 策略和正本数对发送速度的影响。

具体配置:一个 producer;发送形式为 sync;音讯体大小为 1kB;min.insync.replicas=1。topic 正本数为:1/2/4;acks:0/1/-1。

测试后果如下:

replicas acks TPS
1 0 76696
2 0 57503
4 0 59367
1 1 19489
2 1 20404
4 1 18365
1 -1 18641
2 -1 9739
4 -1 9006

测试后果剖析(与情景 1 统一):

  • 正本数越多,TPS 越低;
  • 客户端的 acks 策略对发送的 TPS 有较大的影响,TPS:acks_0 > acks_1 > ack_-1。

场景 4:测试不同 partition 数对发送速率的影响

具体配置:一个 producer;音讯体大小为 1KB;发送形式为 sync;topic 正本数为 2;min.insync.replicas=2;acks=-1。partition 数量设置为 1 /2/4/8/12。

测试后果:

测试后果剖析:partition 的不同会影响 TPS,随着 partition 的个数的增长 TPS 会有所增长,但并不是始终成正比关系,达到肯定临界值时,partition 数量的减少反而会使 TPS 稍微升高。

场景 5:通过将集群中局部 broker 设置成不可服务状态,测试对客户端以及音讯落盘的影响。

具体配置:一个 producer;音讯体大小 1KB; 发送形式为 sync;topic 正本数为 4;min.insync.replicas 设置为 2;acks=-1;retries=0/100000000;partition 数为 12。

具体测试数据如下表:

acks replicas min.insync.replicas retries 测试方法 TPS 数据落盘 呈现谬误
-1 4 2 0 发送过程中 kill 两台 broker 12840 统一(局部数据可落盘,局部失败) 谬误 1
-1 4 2 100000000 发送过程中 kill 两台 broker 13870 统一(音讯有反复落盘) 谬误 2
-1 4 2 100000000 发送过程中 kill 三台 broker,之后重启 N/A 统一(音讯有反复落盘) 谬误 2、3、4

出错信息:

  • 谬误 1:客户端返回异样,局部数据可落盘,局部失败:org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
  • 谬误 2:[WARN]internals.Sender – Got error produce response with correlation id 19369 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NETWORK_EXCEPTION
  • 谬误 3:[WARN]internals.Sender – Got error produce response with correlation id 77890 on topic-partition default_channel_replicas_4_1-8, retrying (999999859 attempts left). Error: NOT_ENOUGH_REPLICAS
  • 谬误 4:[WARN]internals.Sender – Got error produce response with correlation id 77705 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NOT_ENOUGH_REPLICAS_AFTER_APPEND

测试后果剖析:

  • kill 两台 broker 后,客户端能够持续发送。broker 缩小后,partition 的 leader 散布在残余的两台 broker 上,造成了 TPS 的减小;
  • kill 三台 broker 后,客户端无奈持续发送。Kafka 的主动重试性能开始起作用,当大于等于 min.insync.replicas 数量的 broker 复原后,能够持续发送;
  • 当 retries 不为 0 时,音讯有反复落盘;客户端胜利返回的音讯都胜利落盘,异样时局部音讯能够落盘。

场景 6:测试单个 producer 的发送提早,以及端到端的提早。

具体配置::一个 producer;音讯体大小 1KB;发送形式为 sync;topic 正本数为 4;min.insync.replicas 设置为 2;acks=-1;partition 数为 12。

测试数据及后果(单位为 ms):

发送端(avg) 发送端(min) 发送端(max) 发送端(99%) 发送端(99.99%) 生产端(avg) 生产端(min) 生产端(max) 生产端(99%) 生产端(99.99%)
1.715 1 157 3 29 1.646 1 288 4 72

各场景测试总结

  • 当 acks=- 1 时,Kafka 发送端的 TPS 受限于 topic 的正本数量(ISR 中),正本越多 TPS 越低;
  • acks= 0 时,TPS 最高,其次为 1,最差为 -1,即 TPS:acks_0 > acks_1 > ack_-1;
  • min.insync.replicas 参数不影响 TPS;
  • partition 的不同会影响 TPS,随着 partition 的个数的增长 TPS 会有所增长,但并不是始终成正比关系,达到肯定临界值时,partition 数量的减少反而会使 TPS 稍微升高;
  • Kafka 在 acks=-1,min.insync.replicas>= 1 时,具备高可靠性,所有胜利返回的音讯都能够落盘。
正文完
 0