1、概述
Kafka是大数据畛域无处不在的消息中间件,目前宽泛应用在企业外部的实时数据管道,并帮忙企业构建本人的流计算应用程序。Kafka尽管是基于磁盘做的数据存储,但却具备高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万,这其中的原由值得咱们一探到底,让咱们一起探索Kafka各种精美的设计。
2、Kafka高性能剖析
1、Kafka零碎架构
上图是Kafka的架构图,Producer生产音讯,以Partition的维度,依照肯定的路由策略,提交音讯到Broker集群中各Partition的Leader节点,Consumer以Partition的维度,从Broker中的Leader节点拉取音讯并生产音讯。
Producer生产音讯会波及大量的音讯网络传输,如果Producer每生产一个音讯就发送到Broker会造成大量的网络耗费,重大影响到Kafka的性能。为了解决这个问题,Kafka应用了批量发送的形式。 Broker在长久化音讯、读取音讯的时候,如果采纳传统的IO读写形式,会重大影响Kafka的性能,为了解决这个问题,Kafka采纳了程序写+零拷贝的形式。 上面别离从批量发送音讯、长久化音讯、零拷贝三个角度介绍Kafka如何进步性能。
2、批量发送音讯
Producer生成音讯发送到Broker,波及到大量的网络传输,如果一次网络传输只发送一条音讯,会带来重大的网络耗费。为了解决这个问题,Kafka采纳批量发送的形式。上面介绍Producer生产音讯发送到Broker的过程。
2.1 Parition
Kafka的音讯是一个一个的键值对,键能够设置为默认的null。键有两个用处,能够作为音讯的附加信息,也能够用来决定该音讯被写入到哪个Partition。Topic的数据被分成一个或多个Partition,Partition是音讯的汇合,Partition是Consumer生产的最小粒度。
Kafka通过将Topic划分成多个Partition,Producer将音讯散发到多个本地Partition的音讯队列中,每个Partition音讯队列中的音讯会写入到不同的Leader节点。如上图所示,音讯通过路由策略,被散发到不同的Partition对应的本地队列,而后再批量发送到Partition对应的Leader节点。
2.2. 音讯路由
Kafka中Topic有多个Partition,那么音讯调配到某个Partition的策略称为路由策略。Kafka的路由策略次要有三种:
- Round Robin:Producer将音讯平衡地调配到各Partition本地队列上,是最罕用的分区策略。
- 散列:Kafka对音讯的key进行散列,依据散列值将音讯路由到特定的Parttion上,键雷同的音讯总是被路由到雷同的Partition上。
自定义分区策略:Kafka反对自定义分区策略,能够将某一系列的音讯映射到雷同的Partition。
2.3 发送流程
上图是Producer生产音讯到发送到Broker的主流程。Producer先生产音讯、序列化音讯并压缩音讯后,追加到本地的记录收集器(RecordAccumulator),Sender一直轮询记录收集器,当满足肯定条件时,将队列中的数据发送到Partition Leader节点。Sender发送数据到Broker的条件有两个:
- 音讯大小达到阈值
- 音讯期待发送的工夫达到阈值
Producer会为每个Partition都创立一个双端队列来缓存客户端音讯,队列的每个元素是一个批记录(ProducerBatch),批记录应用createdMs示意批记录的创立工夫(批记录中第一条音讯退出的工夫), topicPartion示意对应的Partition元数据。当Producer生产的音讯通过序列化,会被先写入到recordsBuilder对象中。一旦队列中有批记录的大小达到阈值,就会被Sender发送到Partition对应的Leader节点;若批记录期待发送的工夫达到阈值,音讯也会被发送到Partition对应的Leader节点中。
追加音讯时首先要获取Partition所属的队列,而后取队列中最初一个批记录,如果队列中不存在批记录或者批记录的大小达到阈值,应该创立新的批记录,并且退出队列的尾部。这里先创立的批记录最先被音讯填满,后创立的批记录示意最新的音讯,追加音讯时总是往队列尾部的批记录中追加。记录收集器用来缓存客户端的音讯,还须要通过Sender能力将音讯发送到Partition对应的Leader节点。
Sender读取记录收集器,失去每个Leader节点对应的批记录列表,找出筹备好的Broker节点并建设连贯,而后将各个Partition的批记录发送到Leader节点。Sender的外围代码如下:
//Sender读取记录收集器,依照节点分组,创立客户端申请,发送申请public void run(long now) { Cluster cluster = metadata.fetch(); //获取筹备发送的所有分区 ReadCheckResult result = accumulator.ready(cluster, now); //建设到Leader节点的网络连接,移除还没有筹备好的节点 Iterator<Node> iter = result.readyNodes.iterator(); while(iter.hasNext()) { Node node = iter.next(); if (!this.client.read(node, now)) { iter.remove(); } //读取记录收集器,返回的每个Leader节点对应的批记录列表,每个批记录对应一个分区 Map<Integer, List<RecordBatch>> batches = accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); //以节点为级别的生产申请列表,即每个节点只有一个客户端申请 List<ClientRequest> requests = createProduceRequests(batches, now); for (ClientRequest request : requests) { client.send(request, now); } //这里才会执行真正的网络读写,比方将下面的客户端申请发送进来 this.client.poll(pollTimeout, now); }}
具体的步骤如下:
- 音讯被记录收集器收集,并依照Partition追加到队列尾部一个批记录中。
- Sender通过ready()从记录收集器中找出曾经筹备好的服务端节点,规定是Partition期待发送的音讯大小和期待发送的工夫达到阈值。
- 节点曾经筹备好,如果客户端还没有和它们建设连贯,通过connect()建设到服务端的连贯。
- Sender通过drain()从记录收集器获取依照节点整顿好的每个Partition的批记录。
Sender失去每个节点的批记录后,为每个节点创立客户端申请,并将音讯发送到服务端。
3、音讯长久化
3.1 随机IO和程序IO
上图是磁盘的繁难模型图。磁盘上的数据由柱面号、盘片号、扇区号标识。当须要从磁盘读数据时,零碎会将数据逻辑地址传给磁盘,磁盘的控制电路依照寻址逻辑地址翻译成物理地址,即确定要读的数据在哪个磁道,哪个扇区。
为了实现读取这个扇区的数据,须要将磁头放到这个扇区上方,为了实现这一点:
- 首先必须找到柱面,即磁头须要挪动对准相应磁道,这个过程叫做寻道或定位;
- 盘面确定当前,盘片开始旋转,将指标扇区旋转到磁头下
因而一次读数据申请实现过程由三个动作组成: - 寻道:磁头挪动定位到指定磁道,这部分工夫代价最高,最大可达到0.1s左右;
- 旋转提早:期待指定扇区旋转至磁头下。与硬盘本身性能无关,xxxx转/分;
- 数据传输:数据通过系统总线从磁盘传送到内存的工夫。
对于从磁盘中读取数据的操作,叫做IO操作,这里有两种状况: - 假如咱们所须要的数据是随机扩散在磁盘的不同盘片的不同扇区中的,那么找到相应的数据须要等到磁臂通过寻址作用旋转到指定的盘片,而后盘片寻找到对应的扇区,能力找到咱们所须要的一块数据,一次进行此过程直到找完所有数据,称为随机IO,读取数据速度较慢。
假如咱们曾经找到了第一块数据,并且其余所需的数据就在这一块数据后边,那么就不须要从新寻址,能够顺次拿到咱们所需的数据,称为程序IO。
程序IO绝对于随机IO,缩小了大量的磁盘寻址过程,进步了数据的查问效率。3.2 Broker写音讯
Broker中须要将大量的音讯做长久化,而且存在大量的音讯查问场景,如果采纳传统的IO操作,会带来大量的磁盘寻址,影响音讯的查问速度,限度了Kafka的性能。为了解决这个问题,Kafka采纳程序写的形式来做音讯长久化。
Producer传递到Broker的音讯集中的每条音讯都会调配一个程序值,用来标记Producer所生产音讯的程序,每一批音讯的程序值都从0开始。下图给出一个例子,Producer写到Partition的音讯有3条音讯,对应的程序值是[0,1,2]。
Producer创立的音讯集中每条音讯的程序值只是绝对于本批次的序号,所以这个值不能间接存储在日志文件中。服务端会将每条音讯的程序值转换成相对偏移量(Broker从Partition维度来标记音讯的程序,用于管制Consumer生产音讯的程序)。Kafka通过nextOffset(下一个偏移量)来记录存储在日志中最近一条音讯的偏移量。
Message | 相对偏移量 | 程序值 |
---|---|---|
Message0 | 900 | 0 |
Message1 | 901 | 1 |
Message2 | 902 | 2 |
如上表所示,音讯写入前,nextOffset是899,Message0、Message1、Message2是间断写入的三条音讯,音讯被写入后其相对偏移量别离是900、901、902,对应的程序值别离是0、1、2,nextOffset变成902。
Broker将每个Partition的音讯追加到日志中,是以日志分段(Segment)为单位的。当Segment的大小达到阈值(默认是1G)时,会新创建一个Segment保留新的音讯,每个Segment都有一个基准偏移量(baseOffset,每个Segment保留的第一个音讯的相对偏移量),通过这个基准偏移量,就能够计算出每条音讯在Partition中的相对偏移量。 每个日志分段由数据文件和索引文件组,数据文件(文件名以log结尾)保留了音讯集的具体内容,索引文件(文件名以index结尾)保留了音讯偏移量到物理地位的索引。如下图所示:
Broker中通过下一个偏移量元数据(nextOffsetMetaData),指定以后写入日志的音讯的起始偏移值,在追加音讯后,更新nextOffsetMetaData,作为下一批音讯的起始偏移量。外围代码如下所示:
@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt);def append(messages:ByteBufferMessageSet, assignOffsets:Boolean) = { //LogAppendInfo对象,代表这批音讯的概要信息,而后对音讯进行验证 var appendInfo = analyzeAndValidateMessageSet(messages) var validMessages = trimInvalidBytes(messages, appendInfo) //获取最新的”下一个偏移量“作为第一条音讯的相对偏移量 appendInfo.firstOffset = nextOffsetMetadata.messageOffset if (assignOffsets) { //如果每条音讯的偏移量都是递增的 //音讯的起始偏移量来自于最新的”下一个偏移量“,而不是音讯自带的程序值 var offset = new AtomicLong(nextOffsetMetadata.messageOffset); //基于起始偏移量,为无效的音讯集的每条音讯重新分配相对偏移量 validMessages = validMessages.validateMessagesAndAssignOffsets(offset); appendInfo.lastOffset = offset.get - 1 //最初一条音讯的相对偏移量 } var segment = maybeRoll(validMessages.sizeInBytes) //如果达到Segment大小的阈值,须要创立新的Segment segment.append(appendInfo.firstOffset,validMessages) //追加音讯到以后分段 updateLogEndOffset(appendInfo.lastOffset + 1) //批改最新的”下一个偏移量“ if (unflushedMessages >= config.flushInterval) { flush() //如果没有刷新的音讯数大于配置的,那么将音讯刷入到磁盘 }}//更新日志的”最近的偏移量“,传入的参数个别是最初一条音讯的偏移量加上1//应用发须要获取日志的”最近的量“时,就不须要再做加一的操作了private def updateLogEndOffset(messageOffset:Long) { nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset,activeSegment.size.toInt)}
nextOffsetMetaData的读写操作产生在长久化和读取音讯中,具体流程如下所示:
1、Producer发送音讯集到Broker,Broker将这一批音讯追加到日志;
2、每条音讯须要指定相对偏移量,Broker会用nextOffsetMetaData的值作为起始偏移值;
3、Broker将每条带有偏移量的音讯写入到日志分段中;
4、Broker获取这一批音讯中最初一条音讯的偏移量,加1后更新nextOffsetMetaData;
5、Consumer依据这个变量的最新值拉取音讯。一旦这个值发生变化,Consumer就能拉取到新写入的音讯。
因为写入到日志分段中的音讯集,都是以nextOffsetMetaData作为起始的相对偏移量。因为这个起始偏移量总是递增,所以每一批音讯的偏移量也放弃递增,而且每一个Partition的所有日志分段中,所有音讯的偏移量都是递增。如下图所示,新创建日志分段的基准偏移量,比之前的分段的基准偏移量要大,同一个日志分段中,新音讯的偏移量也比之前音讯的偏移量要大。
建设索引文件的目标:疾速定位指定偏移量音讯在数据文件中的物理地位。其中索引文件保留的是一部分音讯的绝对偏移量到物理地位的映射,应用绝对偏移量而不是相对偏移量是为了节约内存。
3.3 基于索引文件查问
Kafka通过索引文件进步对磁盘上音讯的查问效率。
如上图所示:假如有1000条音讯,每100条音讯写满了一个日志分段,一共会有10个日志分段。客户端要查问偏移量为938的音讯内容,如果没有索引文件,咱们必须从第一个日志分段的数据文件中,从第一条音讯始终往前读,直到找到偏移量为999的音讯。有了索引文件后,咱们能够在最初一个日志分段的索引文件中,首先应用相对偏移量999减去基准偏移量900失去绝对偏移量99,而后找到最靠近绝对偏移量99的索引数据90,绝对偏移量90对应的物理地址是1365,而后再到数据文件中,从文件物理地位1365开始往后读音讯,直到找到偏移量为999的音讯。
Kafka的索引文件的个性:
- 索引文件映射偏移量到文件的物理地位,它不会对每条音讯都建设索引,所以是稠密的。
- 索引条目标偏移量存储的是绝对于“基准偏移量”的“绝对偏移量” ,不是音讯的“相对偏移量” 。
- 偏移量是有序的,查问指定的偏移量时,应用二分查找能够疾速确定偏移量的地位。
- 指定偏移量如果在索引文件中不存在,能够找到小于等于指定偏移量的最大偏移量。
- 稠密索引能够通过内存映射形式,将整个索引文件都放入内存,放慢偏移量的查问。
因为Broker是将音讯长久化到以后日志的最初一个分段中,写入文件的形式是追加写,采纳了对磁盘文件的程序写。对磁盘的程序写以及索引文件放慢了Broker查问音讯的速度。
4、零拷贝
Kafka中存在大量的网络数据长久化到磁盘(Producer到Broker)和磁盘文件通过网络发送(Broker到Consumer)的过程。这一过程的性能间接影响到Kafka的整体性能。Kafka采纳零拷贝这一通用技术解决该问题。
零拷贝技术能够缩小数据拷贝和共享总线操作的次数,打消传输数据在存储器之间不必要的两头拷贝次数,缩小用户应用程序地址空间和操作系统内核地址空间之间因为上下文切换而带来的开销,从而无效地进步数据传输效率。 以将磁盘文件通过网络发送为例。上面展现了传统形式下读取数据后并通过网络发送所产生的数据拷贝:
- 一个读操作产生后,DMA执行了一次数据拷贝,数据从磁盘拷贝到内核空间;
- cpu将数据从内核空间拷贝至用户空间
- 调用send(),cpu产生第三次数据拷贝,由cpu将数据从用户空间拷贝至内核空间(socket缓冲区)
send()执行完结后,DMA执行第四次数据拷贝,将数据从内核拷贝至协定引擎
Linux 2.4+内核通过sendfile零碎调用,提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,间接通过DMA拷贝到NIC Buffer,无需CPU拷贝,这也是零拷贝这一说法的起源。除了缩小数据拷贝外,因为整个读文件-网络发送由一个sendfile调用实现,整个过程只有两次上下文切换,没有cpu数据拷贝,因而大大提高了性能。零拷贝过程如下图所示。- sendfile()通过DMA将文件内容拷贝到一个读取缓冲区,而后由内核将数据拷贝到与输入套接字相关联的内核缓冲区。
从具体实现来看,Kafka的数据传输通过TransportLayer来实现,其子类PlaintextTransportLayer通过Java NIO的FileChannel的transferTo()和transferFrom()办法实现零拷贝。transferTo()和transferFrom()并不保障肯定能应用零拷贝,实际上是否能应用零拷贝与操作系统相干,如果操作系统提供sendfile这样的零拷贝零碎调用,则这两个办法会通过这样的零碎调用充分利用零拷贝的劣势,否则并不能通过这两个办法自身实现零拷贝。
参考文献
《Kafka技术底细:图文详解Kafka源码设计与实现》
《Kafka权威指南》
Kafka的高性能磁盘读写实现原
Kafka日志存储