1 概述

Kakfa起初是由LinkedIn公司开发的一个分布式的音讯零碎,后成为Apache的一部分,它应用Scala编写,以可程度扩大和高吞吐率而被宽泛应用。目前越来越多的开源分布式解决零碎如Cloudera、Apache Storm、Spark等都反对与Kafka集成。

Kafka凭借着本身的劣势,越来越受到互联网企业的青眼,唯品会也采纳Kafka作为其外部外围音讯引擎之一。Kafka作为一个商业级消息中间件,音讯可靠性的重要性可想而知。如何确保音讯的准确传输?如何确保音讯的精确存储?如何确保音讯的正确生产?这些都是须要思考的问题。本文首先从Kafka的架构着手,先理解下Kafka的基本原理,而后通过对kakfa的存储机制、复制原理、同步原理、可靠性和持久性保障等等一步步对其可靠性进行剖析,最初通过benchmark来加强对Kafka高可靠性的认知。


2 Kafka体系架构

如上图所示,一个典型的Kafka体系架构包含若干Producer(能够是服务器日志,业务数据,页面前端产生的page view等等),若干broker(Kafka反对程度扩大,个别broker数量越多,集群吞吐率越高),若干Consumer (Group),以及一个Zookeeper集群。Kafka通过Zookeeper治理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。Producer应用push(推)模式将音讯公布到broker,Consumer应用pull(拉)模式从broker订阅并生产音讯。

名词解释:

名称解释
Broker消息中间件解决节点,一个Kafka节点就是一个broker,一个或者多个Broker能够组成一个Kafka集群
TopicKafka依据topic对音讯进行归类,公布到Kafka集群的每条音讯都须要指定一个topic
Producer音讯生产者,向Broker发送音讯的客户端
Consumer音讯消费者,从Broker读取音讯的客户端
ConsumerGroup每个Consumer属于一个特定的Consumer Group,一条音讯能够发送到多个不同的Consumer Group,然而一个Consumer Group中只能有一个Consumer可能生产该音讯
Partition物理上的概念,一个topic能够分为多个partition,每个partition外部是有序的

2.1 Topic & Partition

一个topic能够认为一个一类音讯,每个topic将被分成多个partition,每个partition在存储层面是append log文件。任何公布到此partition的音讯都会被追加到log文件的尾部,每条音讯在文件中的地位称为offset(偏移量),offset为一个long型的数字,它惟一标记一条音讯。每条音讯都被append到partition中,是程序写磁盘,因而效率十分高(教训证,程序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保障)。

每一条音讯被发送到broker中,会依据partition规定抉择被存储到哪一个partition。如果partition规定设置的正当,所有音讯能够均匀分布到不同的partition里,这样就实现了程度扩大。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创立topic时能够在$KAFKA_HOME/config/server.properties中指定这个partition的数量(如下所示),当然能够在topic创立之后去批改partition的数量。

# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=3

在发送一条音讯时,能够指定这个音讯的key,producer依据这个key和partition机制来判断这个音讯发送到哪个partition。partition机制能够通过指定producer的partition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。

无关Topic与Partition的更多细节,能够参考上面的“Kafka文件存储机制”这一节。


3 高可靠性存储剖析

Kafka的高可靠性的保障来源于其强壮的正本(replication)策略。通过调节其正本相干参数,能够使得Kafka在性能和可靠性之间运行的熟能生巧。Kafka从0.8.x版本开始提供partition级别的复制,replication的数量能够在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)。

这里先从Kafka文件存储机制动手,从最底层理解Kafka的存储细节,进而对其的存储有个宏观的认知。之后通过Kafka复制原理和同步形式来论述宏观层面的概念。最初从ISR,HW,leader选举以及数据可靠性和持久性保障等等各个维度来丰盛对Kafka相干知识点的认知。

3.1 Kafka文件存储机制

Kafka中音讯是以topic进行分类的,生产者通过topic向Kafka broker发送音讯,消费者通过topic读取数据。然而topic在物理层面又能以partition为分组,一个topic能够分成若干个partition,那么topic以及partition又是怎么存储的呢?partition还能够细分为segment,一个partition物理上由多个segment组成,那么这些segment又是什么呢?上面咱们来一一揭晓。

为了便于阐明问题,假如这里只有一个Kafka集群,且这个集群只有一个Kafka broker,即只有一台物理机。在这个Kafka broker中配置($KAFKA_HOME/config/server.properties中)log.dirs=/tmp/kafka-logs,以此来设置Kafka音讯文件存储目录,与此同时创立一个topic:topic_zzh_test,partition的数量为4($KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 4 --topic topic_zzh_test --replication-factor 1)。那么咱们此时能够在/tmp/kafka-logs目录中能够看到生成了4个目录:

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3

在Kafka文件存储中,同一个topic下有多个不同的partition,每个partiton为一个目录,partition的名称规定为:topic名称+有序序号,第一个序号从0开始计,最大的序号为partition数量减1,partition是理论物理上的概念,而topic是逻辑上的概念。

下面提到partition还能够细分为segment,这个segment又是什么?如果就以partition为最小存储单位,咱们能够设想当Kafka producer一直发送音讯,必然会引起partition文件的有限扩张,这样对于音讯文件的保护以及曾经被生产的音讯的清理带来重大的影响,所以这里以segment为单位又将partition细分。每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(段)数据文件中(每个segment 文件中音讯数量不肯定相等)这种个性也不便old segment的删除,即不便已被生产的音讯的清理,进步磁盘的利用率。每个partition只须要反对程序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。

segment文件由两局部组成,别离为“.index”文件和“.log”文件,别离示意为segment索引文件和数据文件。这两个文件的命令规定为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最初一条音讯的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:

00000000000000000000.index00000000000000000000.log00000000000000170410.index00000000000000170410.log00000000000000239430.index00000000000000239430.log`

以下面的segment文件为例,展现出segment:00000000000000170410的“.index”文件和“.log”文件的对应的关系,如下图:

如上图,“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的音讯,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“.index”索引文件中的元数据[3, 348]为例,在“.log”数据文件示意第3个音讯,即在全局partition中示意170410+3=170413个音讯,该音讯的物理偏移地址为348。

那么如何从partition中通过offset查找message呢?
以上图为例,读取offset=170418的音讯,首先查找segment文件,其中00000000000000000000.index为最开始的文件,第二个文件为00000000000000170410.index(起始偏移为170410+1=170411),而第三个文件为00000000000000239430.index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其余后续文件能够顺次类推,以其实偏移量命名并排列这些文件,而后依据二分查找法就能够疾速定位到具体文件地位。其次依据00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的地位进行读取。

要是读取offset=170418的音讯,从00000000000000170410.log文件中的1325的地位进行读取,那么怎么晓得何时读完本条音讯,否则就读到下一条音讯的内容了?
这个就须要分割到音讯的物理构造了,音讯都具备固定的物理构造,包含:offset(8 Bytes)、音讯体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,能够确定一条音讯的大小,即读取到哪里截止。

3.2 复制原理和同步形式

Kafka中topic的每个partition有一个预写式的日志文件,尽管partition能够持续细分为若干个segment文件,然而对于下层利用来说能够将partition看成最小的存储单元(一个有多个segment文件拼接的“巨型”文件),每个partition都由一些列有序的、不可变的音讯组成,这些音讯被间断的追加到partition中。

上图中有两个新名词:HW和LEO。这里先介绍下LEO,LogEndOffset的缩写,示意每个partition的log最初一条Message的地位。HW是HighWatermark的缩写,是指consumer可能看到的此partition的地位,这个波及到多正本的概念,这里先提及一下,下节再详表。

言归正传,为了进步音讯的可靠性,Kafka每个topic的partition有N个正本(replicas),其中N(大于等于1)是topic的复制因子(replica fator)的个数。Kafka通过多正本机制实现故障主动转移,当Kafka集群中一个broker生效状况下依然保障服务可用。在Kafka中产生复制时确保partition的日志能有序地写到其余节点上,N个replicas中,其中一个replica为leader,其余都为follower, leader解决partition的所有读写申请,与此同时,follower会被动定期地去复制leader上的数据。

如下图所示,Kafka集群中有4个broker, 某topic有3个partition,且复制因子即正本个数也为3:

Kafka提供了数据复制算法保障,如果leader产生故障或挂掉,一个新leader被选举并被承受客户端的音讯胜利写入。Kafka确保从同步正本列表中选举一个正本为leader,或者说follower追赶leader数据。leader负责保护和跟踪ISR(In-Sync Replicas的缩写,示意正本同步队列,具体可参考下节)中所有follower滞后的状态。当producer发送一条音讯到broker后,leader写入音讯并复制到所有follower。音讯提交之后才被胜利复制到所有的同步正本。音讯复制提早受最慢的follower限度,重要的是疾速检测慢正本,如果follower“落后”太多或者生效,leader将会把它从ISR中删除。

3.3 ISR

上节咱们波及到ISR (In-Sync Replicas),这个是指正本同步队列。正本数对Kafka的吞吐率是有肯定的影响,但极大的加强了可用性。默认状况下Kafka的replica数量为1,即每个partition都有一个惟一的leader,为了确保音讯的可靠性,通常利用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比方3。 所有的正本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader保护ISR列表,follower从leader同步数据有一些提早(包含延迟时间replica.lag.time.max.ms和提早条数replica.lag.max.messages两个维度, 以后最新的版本0.10.x中只反对replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新退出的follower也会先寄存在OSR中。AR=ISR+OSR。

Kafka 0.9.0.0版本后移除了replica.lag.max.messages参数,只保留了replica.lag.time.max.ms作为ISR中正本治理的参数。为什么这样做呢?replica.lag.max.messages示意以后某个正本落后leader的音讯数量超过了这个参数的值,那么leader就会把follower从ISR中删除。假如设置replica.lag.max.messages=4,那么如果producer一次传送至broker的音讯数量都小于4条时,因为在leader承受到producer发送的音讯之后而follower正本开始拉取这些音讯之前,follower落后leader的音讯数不会超过4条音讯,故此没有follower移出ISR,所以这时候replica.lag.max.message的设置仿佛是正当的。然而producer发动刹时顶峰流量,producer一次发送的音讯超过4条时,也就是超过replica.lag.max.messages,此时follower都会被认为是与leader正本不同步了,从而被踢出了ISR。但实际上这些follower都是存活状态的且没有性能问题。那么在之后追上leader,并被重新加入了ISR。于是就会呈现它们一直地剔出ISR而后从新回归ISR,这无疑减少了无谓的性能损耗。而且这个参数是broker全局的。设置太大了,影响真正“落后”follower的移除;设置的太小了,导致follower的频繁进出。无奈给定一个适合的replica.lag.max.messages的值,故此,新版本的Kafka移除了这个参数。

注:ISR中包含:leader和follower。

下面一节还波及到一个概念,即HW。HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能生产到HW所在的地位。另外每个replica都有HW,leader和follower各自负责更新本人的HW的状态。对于leader新写入的音讯,consumer不能立即生产,leader会期待该音讯被所有ISR中的replicas同步后更新HW,此时音讯能力被consumer生产。这样就保障了如果leader所在的broker生效,该音讯依然能够从新选举的leader中获取。对于来自外部broker的读取申请,没有HW的限度。

下图具体的阐明了当producer生产音讯至broker后,ISR以及HW和LEO的流转过程:

由此可见,Kafka的复制机制既不是齐全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条音讯才会被commit,这种复制形式极大的影响了吞吐率。而异步复制形式下,follower异步的从leader复制数据,数据只有被leader写入log就被认为曾经commit,这种状况下如果follower都还没有复制完,落后于leader时,忽然leader宕机,则会失落数据。而Kafka的这种应用ISR的形式则很好的平衡了确保数据不失落以及吞吐率。

Kafka的ISR的治理最终都会反馈到Zookeeper节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个中央会对这个Zookeeper的节点进行保护:

  1. Controller来保护:Kafka集群中的其中一个Broker会被选举为Controller,次要负责Partition治理和正本状态治理,也会执行相似于重调配partition之类的治理工作。在合乎某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相干节点中。同时发动LeaderAndIsrRequest告诉所有的replicas。
  2. leader来保护:leader有独自的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变动,则会将新的ISR的信息返回到Zookeeper的相干节点中。

3.4 数据可靠性和持久性保障

当producer向leader发送数据时,能够通过request.required.acks参数来设置数据可靠性的级别:

  • 1(默认):这意味着producer在ISR中的leader已胜利收到数据并失去确认。如果leader宕机了,则会失落数据。
  • 0:这意味着producer无需期待来自broker的确认而持续发送下一批音讯。这种状况下数据传输效率最高,然而数据可靠性确是最低的。
  • -1:producer须要期待ISR中的所有follower都确认接管到数据后才算一次发送实现,可靠性最高。然而这样也不能保证数据不失落,比方当ISR中只有leader时(后面ISR那一节讲到,ISR中的成员因为某些状况会减少也会缩小,起码就只剩一个leader),这样就变成了acks=1的状况。

如果要进步数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(能够在broker或者topic层面进行设置)的配合,这样能力施展最大的效用。min.insync.replicas这个参数设定ISR中的最小正本数是多少,默认值为1,当且仅当request.required.acks参数设置为-1时,此参数才失效。如果ISR中的正本数少于min.insync.replicas配置的数量时,客户端会返回异样:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

接下来对acks=1和-1的两种状况进行详细分析:

1. request.required.acks=1

producer发送数据到leader,leader写本地日志胜利,返回客户端胜利;此时ISR中的正本还没有来得及拉取该音讯,leader就宕机了,那么此次发送的音讯就会失落。

2. request.required.acks=-1

同步(Kafka默认为同步,即producer.type=sync)的发送模式,replication.factor>=2且min.insync.replicas>=2的状况下,不会失落数据。

有两种典型状况。acks=-1的状况下(如无非凡阐明,以下acks都示意为参数request.required.acks),数据发送到leader, ISR的follower全副实现数据同步后,leader此时挂掉,那么会选举出新的leader,数据不会失落。

acks=-1的状况下,数据发送到leader后 ,局部ISR的正本同步,leader此时挂掉。比方follower1和follower2都有可能变成新的leader, producer端会失去返回异样,producer端会从新发送数据,数据可能会反复。

当然上图中如果在leader crash的时候,follower2还没有同步到任何数据,而且follower2被选举为新的leader的话,这样音讯就不会反复。

注:Kafka只解决fail/recover问题,不解决Byzantine问题。

3.5 对于HW的进一步探讨

思考上图(即acks=-1,局部ISR正本同步)中的另一种状况,如果在Leader挂掉的时候,follower1同步了音讯4,5,follower2同步了音讯4,与此同时follower2被选举为leader,那么此时follower1中的多出的音讯5该做如何解决呢?

这里就须要HW的协同配合了。如前所述,一个partition中的ISR列表中,leader的HW是所有ISR列表里正本中最小的那个的LEO。相似于木桶原理,水位取决于最低那块短板。

如上图,某个topic的某partition有三个正本,别离为A、B、C。A作为leader必定是LEO最高,B紧随其后,C机器因为配置比拟低,网络比拟差,故而同步最慢。这个时候A机器宕机,这时候如果B成为leader,如果没有HW,在A从新复原之后会做同步(makeFollower)操作,在宕机时log文件之后间接做追加操作,而如果B的LEO曾经达到了A的LEO,会产生数据不统一的状况,所以应用HW来防止这种状况。
A在做同步操作的时候,先将log文件截断到之前本人的HW的地位,即3,之后再从B中拉取音讯进行同步。

如果失败的follower恢复过来,它首先将本人的log文件截断到上次checkpointed时刻的HW的地位,之后再从leader中同步音讯。leader挂掉会从新选举,新的leader会发送“指令”让其余的follower截断至本身的HW的地位而后再拉取新的音讯。

当ISR中的个正本的LEO不统一时,如果此时leader挂掉,选举新的leader时并不是依照LEO的高下进行选举,而是依照ISR中的程序选举。

3.6 Leader选举

一条音讯只有被ISR中的所有follower都从leader复制过来才会被认为已提交。这样就防止了局部数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据失落。而对于producer而言,它能够抉择是否期待音讯commit,这能够通过request.required.acks来设置。这种机制确保了只有ISR中有一个或者以上的follower,一条被commit的音讯就不会失落。

有一个很重要的问题是当leader宕机了,怎么在follower中选举出新的leader,因为follower可能落后很多或者间接crash了,所以必须确保抉择“最新”的follower作为新的leader。一个根本的准则就是,如果leader不在了,新的leader必须领有原来的leader commit的所有音讯。这就须要做一个折中,如果leader在一个音讯被commit前期待更多的follower确认,那么在它挂掉之后就有更多的follower能够成为新的leader,但这也会造成吞吐率的降落。

一种十分罕用的选举leader的形式是“多数遵从少数”,Kafka并不是采纳这种形式。这种模式下,如果咱们有2f+1个正本,那么在commit之前必须保障有f+1个replica复制完音讯,同时为了保障能正确选举出新的leader,失败的正本数不能超过f个。这种形式有个很大的劣势,零碎的提早取决于最快的几台机器,也就是说比方正本数为3,那么提早就取决于最快的那个follower而不是最慢的那个。“多数遵从少数”的形式也有一些劣势,为了保障leader选举的失常进行,它所能容忍的失败的follower数比拟少,如果要容忍1个follower挂掉,那么至多要3个以上的正本,如果要容忍2个follower挂掉,必须要有5个以上的正本。也就是说,在生产环境下为了保障较高的容错率,必须要有大量的正本,而大量的正本又会在大数据量下导致性能的急剧下降。这种算法更多用在Zookeeper这种共享集群配置的零碎中而很少在须要大量数据的零碎中应用的起因。HDFS的HA性能也是基于“多数遵从少数”的形式,然而其数据存储并不是采纳这样的形式。

实际上,leader选举的算法十分多,比方Zookeeper的Zab、Raft以及Viewstamped Replication。而Kafka所应用的leader选举算法更像是微软的PacificA算法。

Kafka在Zookeeper中为每一个partition动静的保护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员能力有被选为leader的可能(unclean.leader.election.enable=false)。在这种模式下,对于f+1个正本,一个Kafka topic能在保障不失落曾经commit音讯的前提下容忍f个正本的失败,在大多数应用场景下,这种模式是非常无利的。事实上,为了容忍f个正本的失败,“多数遵从少数”的形式和ISR在commit前须要期待的正本的数量是一样的,然而ISR须要的总的正本的个数简直是“多数遵从少数”的形式的一半。

上文提到,在ISR中至多有一个follower时,Kafka能够确保曾经commit的数据不失落,但如果某一个partition的所有replica都挂了,就无奈保证数据不失落了。这种状况下有两种可行的计划:

  1. 期待ISR中任意一个replica“活”过去,并且选它作为leader
  2. 抉择第一个“活”过去的replica(并不一定是在ISR中)作为leader

这就须要在可用性和一致性当中作出一个简略的抉择。如果肯定要期待ISR中的replica“活”过去,那不可用的工夫就可能会绝对较长。而且如果ISR中所有的replica都无奈“活”过去了,或者数据失落了,这个partition将永远不可用。抉择第一个“活”过去的replica作为leader,而这个replica不是ISR中的replica,那即便它并不保障曾经蕴含了所有已commit的音讯,它也会成为leader而作为consumer的数据源。默认状况下,Kafka采纳第二种策略,即unclean.leader.election.enable=true,也能够将此参数设置为false来启用第一种策略。

unclean.leader.election.enable这个参数对于leader的选举、零碎的可用性以及数据的可靠性都有至关重要的影响。上面咱们来剖析下几种典型的场景。

如果上图所示,假如某个partition中的正本数为3,replica-0, replica-1, replica-2别离寄存在broker0, broker1和broker2中。AR=(0,1,2),ISR=(0,1)。
设置request.required.acks=-1, min.insync.replicas=2,unclean.leader.election.enable=false。这里将broker0中的正本也称之为broker0起初broker0为leader,broker1为follower。

  • 当ISR中的replica-0呈现crash的状况时,broker1选举为新的leader[ISR=(1)],因为受min.insync.replicas=2影响,write不能服务,然而read能持续失常服务。此种状况复原计划:

    1. 尝试复原(重启)replica-0,如果能起来,零碎失常;
    2. 如果replica-0不能复原,须要将min.insync.replicas设置为1,复原write性能。
  • 当ISR中的replica-0呈现crash,紧接着replica-1也呈现了crash, 此时[ISR=(1),leader=-1],不能对外提供服务,此种状况复原计划:

    1. 尝试复原replica-0和replica-1,如果都能起来,则零碎恢复正常;
    2. 如果replica-0起来,而replica-1不能起来,这时候依然不能选出leader,因为当设置unclean.leader.election.enable=false时,leader只能从ISR中选举,当ISR中所有正本都生效之后,须要ISR中最初生效的那个副本能复原之后能力选举leader, 即replica-0先生效,replica-1后生效,须要replica-1复原后能力选举leader。激进的计划倡议把unclean.leader.election.enable设置为true,然而这样会有失落数据的状况产生,这样能够复原read服务。同样须要将min.insync.replicas设置为1,复原write性能;
    3. replica-1复原,replica-0不能复原,这个状况下面遇到过,read服务可用,须要将min.insync.replicas设置为1,复原write性能;
    4. replica-0和replica-1都不能复原,这种状况能够参考情景2.
  • 当ISR中的replica-0, replica-1同时宕机,此时[ISR=(0,1)],不能对外提供服务,此种状况复原计划:尝试复原replica-0和replica-1,当其中任意一个正本恢复正常时,对外能够提供read服务。直到2个正本恢复正常,write性能能力复原,或者将将min.insync.replicas设置为1。

3.7 Kafka的发送模式

Kafka的发送模式由producer端的配置参数producer.type来设置,这个参数指定了在后盾线程中音讯的发送形式是同步的还是异步的,默认是同步的形式,即producer.type=sync。如果设置成异步的模式,即producer.type=async,能够是producer以batch的模式push数据,这样会极大的进步broker的性能,然而这样会减少失落数据的危险。如果须要确保音讯的可靠性,必须要将producer.type设置为sync。

对于异步模式,还有4个配套的参数,如下:

PropertyDescription
queue.buffering.max.ms默认值:5000。启用异步模式时,producer缓存音讯的工夫。比方咱们设置成1000时,它会缓存1s的数据再一次发送进来,这样能够极大的减少broker吞吐量,但也会造成时效性的升高。
queue.buffering.max.messages默认值:10000。启用异步模式时,producer缓存队列里最大缓存的音讯数量,如果超过这个值,producer就会阻塞或者丢掉音讯。
queue.enqueue.timeout.ms默认值:-1。当达到下面参数时producer会阻塞期待的工夫。如果设置为0,buffer队列满时producer不会阻塞,音讯间接被丢掉;若设置为-1,producer会被阻塞,不会丢音讯。
batch.num.messages默认值:200。启用异步模式时,一个batch缓存的音讯数量。达到这个数值时,producer才会发送音讯。(每次批量发送的数量)
以batch的形式推送数据能够极大的进步解决效率,kafka producer能够将音讯在内存中累计到肯定数量后作为一个batch发送申请。batch的数量大小能够通过producer的参数(batch.num.messages)管制。通过减少batch的大小,能够缩小网络申请和磁盘IO的次数,当然具体参数设置须要在效率和时效性方面做一个衡量。在比拟新的版本中还有batch.size这个参数。

4 高可靠性应用剖析

4.1 音讯传输保障

后面曾经介绍了Kafka如何进行无效的存储,以及理解了producer和consumer如何工作。接下来探讨的是Kafka如何确保音讯在producer和consumer之间传输。有以下三种可能的传输保障(delivery guarantee):

  • At most once: 音讯可能会丢,但绝不会反复传输
  • At least once:音讯绝不会丢,但可能会反复传输
  • Exactly once:每条音讯必定会被传输一次且仅传输一次

Kafka的音讯传输保障机制十分直观。当producer向broker发送音讯时,一旦这条音讯被commit,因为正本机制(replication)的存在,它就不会失落。然而如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无奈判断该条音讯是否曾经提交(commit)。尽管Kafka无奈确定网络故障期间产生了什么,然而producer能够retry屡次,确保音讯曾经正确传输到broker中,所以目前Kafka实现的是at least once。

consumer从broker中读取音讯后,能够抉择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的音讯的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始地位会跟上一次commit之后的开始地位雷同。当然也能够将consumer设置为autocommit,即consumer一旦读取到数据立刻主动commit。如果只探讨这一读取音讯的过程,那Kafka是确保了exactly once, 然而如果因为后面producer与broker之间的某种原因导致音讯的反复,那么这里就是at least once。

思考这样一种状况,当consumer读完音讯之后先commit再解决音讯,在这种模式下,如果consumer在commit后还没来得及解决音讯就crash了,下次从新开始工作后就无奈读到刚刚已提交而未解决的音讯,这就对应于at most once了。

读完音讯先解决再commit。这种模式下,如果解决完了音讯在commit之前consumer crash了,下次从新开始工作时还会解决刚刚未commit的音讯,实际上该音讯曾经被解决过了,这就对应于at least once。

要做到exactly once就须要引入音讯去重机制。

4.2 音讯去重

如上一节所述,Kafka在producer端和consumer端都会呈现音讯的反复,这就须要去重解决。

Kafka文档中提及GUID(Globally Unique Identifier)的概念,通过客户端生成算法失去每个音讯的unique id,同时可映射至broker上存储的地址,即通过GUID便可查问提取音讯内容,也便于发送方的幂等性保障,须要在broker上提供此去重解决模块,目前版本尚不反对。

针对GUID, 如果从客户端的角度去重,那么须要引入集中式缓存,必然会减少依赖复杂度,另外缓存的大小难以界定。

不只是Kafka, 相似RabbitMQ以及RocketMQ这类商业级中间件也只保障at least once, 且也无奈从本身去进行音讯去重。所以咱们倡议业务方依据本身的业务特点进行去重,比方业务音讯自身具备幂等性,或者借助Redis等其余产品进行去重解决。

4.3 高可靠性配置

Kafka提供了很高的数据冗余弹性,对于须要数据高可靠性的场景,咱们能够减少数据冗余备份数(replication.factor),调高最小写入正本数的个数(min.insync.replicas)等等,然而这样会影响性能。反之,性能进步而可靠性则升高,用户须要本身业务个性在彼此之间做一些衡量性抉择。

要保证数据写入到Kafka是平安的,高牢靠的,须要如下的配置:

  • topic的配置:replication.factor>=3,即正本数至多是3个;2<=min.insync.replicas<=replication.factor
  • broker的配置:leader的选举条件unclean.leader.election.enable=false
  • producer的配置:request.required.acks=-1(all),producer.type=sync
    • *

5 BenchMark

Kafka在唯品会有着很深的历史渊源,依据唯品会消息中间件团队(VMS团队)所把握的材料显示,在VMS团队运行的Kafka集群中所撑持的topic数已靠近2000,每天的申请量也已达千亿级。这里就以Kafka的高可靠性为基准点来探索几种不同场景下的行为表现,以此来加深对Kafka的认知,为大家在当前高效的应用Kafka时提供一份根据。

5.1 测试环境

Kafka broker用到了4台机器,别离为broker[0/1/2/3]配置如下:

  • CPU: 24core/2.6GHZ
  • Memory: 62G
  • Network: 4000Mb
  • OS/kernel: CentOs release 6.6 (Final)
  • Disk: 1089G
  • Kafka版本:0.10.1.0

broker端JVM参数设置:
-Xmx8G -Xms8G -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/apps/service/kafka/bin/…/logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999

客户端机器配置:

  • CPU: 24core/2.6GHZ
  • Memory: 3G
  • Network: 1000Mb
  • OS/kernel: CentOs release 6.3 (Final)
  • Disk: 240G

5.2 不同场景测试

场景1:测试不同的正本数、min.insync.replicas策略以及request.required.acks策略(以下简称acks策略)对于发送速度(TPS)的影响。

具体配置:一个producer;发送形式为sync;音讯体大小为1kB;partition数为12。正本数为:1/2/4;min.insync.replicas别离为1/2/4;acks别离为-1(all)/1/0。

具体测试数据如下表(min.insync.replicas只在acks=-1时无效):

acksreplicasmin.insync.replicasretriesTPS
-111028511.3
-121022359.5
-122022927.4
-141016193.9
-142016599.9
-144016680.3
01N/A045353.8
02N/A046426.5
04N/A046764.2
11N/A033950.3
12N/A032192.2
14N/A032275.9

测试后果剖析:

  • 客户端的acks策略对发送的TPS有较大的影响,TPS:acks_0 > acks_1 > ack_-1;
  • 正本数越高,TPS越低;正本数统一时,min.insync.replicas不影响TPS;
  • acks=0/1时,TPS与min.insync.replicas参数以及正本数无关,仅受acks策略的影响。

上面将partition的个数设置为1,来进一步确认下不同的acks策略、不同的min.insync.replicas策略以及不同的正本数对于发送速度的影响,具体请看情景2和情景3。

场景2:在partition个数固定为1,测试不同的正本数和min.insync.replicas策略对发送速度的影响。

具体配置:一个producer;发送形式为sync;音讯体大小为1kB;producer端acks=-1(all)。变换正本数:2/3/4; min.insync.replicas设置为:1/2/4。

测试后果如下:

replicasmin.insync.replicasTPS
219738.8
229701.6
318999.7
329243.1
419005.8
428216.9
449092.4

测试后果剖析:正本数越高,TPS越低(这点与场景1的测试论断吻合),然而当partition数为1时差距甚微。min.insync.replicas不影响TPS。

场景3:在partition个数固定为1,测试不同的acks策略和正本数对发送速度的影响。

具体配置:一个producer;发送形式为sync;音讯体大小为1kB;min.insync.replicas=1。topic正本数为:1/2/4;acks: 0/1/-1。

测试后果如下:

replicasacksTPS
1076696
2057503
4059367
1119489
2120404
4118365
1-118641
2-19739
4-19006

测试后果剖析(与情景1统一):

  • 正本数越多,TPS越低;
  • 客户端的acks策略对发送的TPS有较大的影响,TPS:acks_0 > acks_1 > ack_-1。

场景4:测试不同partition数对发送速率的影响

具体配置:一个producer;音讯体大小为1KB;发送形式为sync;topic正本数为2;min.insync.replicas=2;acks=-1。partition数量设置为1/2/4/8/12。

测试后果:

测试后果剖析:partition的不同会影响TPS,随着partition的个数的增长TPS会有所增长,但并不是始终成正比关系,达到肯定临界值时,partition数量的减少反而会使TPS稍微升高。

场景5:通过将集群中局部broker设置成不可服务状态,测试对客户端以及音讯落盘的影响。

具体配置:一个producer;音讯体大小1KB;发送形式为sync;topic正本数为4;min.insync.replicas设置为2;acks=-1;retries=0/100000000;partition数为12。

具体测试数据如下表:

acksreplicasmin.insync.replicasretries测试方法TPS数据落盘呈现谬误
-1420发送过程中kill两台broker12840统一(局部数据可落盘,局部失败)谬误1
-142100000000发送过程中kill两台broker13870统一(音讯有反复落盘)谬误2
-142100000000发送过程中kill三台broker,之后重启N/A统一(音讯有反复落盘)谬误2、3、4

出错信息:

  • 谬误1:客户端返回异样,局部数据可落盘,局部失败:org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
  • 谬误2:[WARN]internals.Sender - Got error produce response with correlation id 19369 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NETWORK_EXCEPTION
  • 谬误3: [WARN]internals.Sender - Got error produce response with correlation id 77890 on topic-partition default_channel_replicas_4_1-8, retrying (999999859 attempts left). Error: NOT_ENOUGH_REPLICAS
  • 谬误4: [WARN]internals.Sender - Got error produce response with correlation id 77705 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NOT_ENOUGH_REPLICAS_AFTER_APPEND

测试后果剖析:

  • kill两台broker后,客户端能够持续发送。broker缩小后,partition的leader散布在残余的两台broker上,造成了TPS的减小;
  • kill三台broker后,客户端无奈持续发送。Kafka的主动重试性能开始起作用,当大于等于min.insync.replicas数量的broker复原后,能够持续发送;
  • 当retries不为0时,音讯有反复落盘;客户端胜利返回的音讯都胜利落盘,异样时局部音讯能够落盘。

场景6:测试单个producer的发送提早,以及端到端的提早。

具体配置::一个producer;音讯体大小1KB;发送形式为sync;topic正本数为4;min.insync.replicas设置为2;acks=-1;partition数为12。

测试数据及后果(单位为ms):

发送端(avg)发送端(min)发送端(max)发送端(99%)发送端(99.99%)生产端(avg)生产端(min)生产端(max)生产端(99%)生产端(99.99%)
1.71511573291.6461288472

各场景测试总结

  • 当acks=-1时,Kafka发送端的TPS受限于topic的正本数量(ISR中),正本越多TPS越低;
  • acks=0时,TPS最高,其次为1,最差为-1,即TPS:acks_0 > acks_1 > ack_-1;
  • min.insync.replicas参数不影响TPS;
  • partition的不同会影响TPS,随着partition的个数的增长TPS会有所增长,但并不是始终成正比关系,达到肯定临界值时,partition数量的减少反而会使TPS稍微升高;
  • Kafka在acks=-1,min.insync.replicas>=1时,具备高可靠性,所有胜利返回的音讯都能够落盘。