关于java:Kafka高性能架构设计

7次阅读

共计 7775 个字符,预计需要花费 20 分钟才能阅读完成。

1、概述

Kafka 是大数据畛域无处不在的消息中间件,目前宽泛应用在企业外部的实时数据管道,并帮忙企业构建本人的流计算应用程序。Kafka 尽管是基于磁盘做的数据存储,但却具备高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万,这其中的原由值得咱们一探到底,让咱们一起探索 Kafka 各种精美的设计。

2、Kafka 高性能剖析

1、Kafka 零碎架构

上图是 Kafka 的架构图,Producer 生产音讯,以 Partition 的维度,依照肯定的路由策略,提交音讯到 Broker 集群中各 Partition 的 Leader 节点,Consumer 以 Partition 的维度,从 Broker 中的 Leader 节点拉取音讯并生产音讯。

    Producer 生产音讯会波及大量的音讯网络传输,如果 Producer 每生产一个音讯就发送到 Broker 会造成大量的网络耗费,重大影响到 Kafka 的性能。为了解决这个问题,Kafka 应用了批量发送的形式。Broker 在长久化音讯、读取音讯的时候,如果采纳传统的 IO 读写形式,会重大影响 Kafka 的性能,为了解决这个问题,Kafka 采纳了程序写 + 零拷贝的形式。上面别离从批量发送音讯、长久化音讯、零拷贝三个角度介绍 Kafka 如何进步性能。

2、批量发送音讯

Producer 生成音讯发送到 Broker,波及到大量的网络传输,如果一次网络传输只发送一条音讯,会带来重大的网络耗费。为了解决这个问题,Kafka 采纳批量发送的形式。上面介绍 Producer 生产音讯发送到 Broker 的过程。

2.1 Parition

Kafka 的音讯是一个一个的键值对,键能够设置为默认的 null。键有两个用处,能够作为音讯的附加信息,也能够用来决定该音讯被写入到哪个 Partition。Topic 的数据被分成一个或多个 Partition,Partition 是音讯的汇合,Partition 是 Consumer 生产的最小粒度。

Kafka 通过将 Topic 划分成多个 Partition,Producer 将音讯散发到多个本地 Partition 的音讯队列中,每个 Partition 音讯队列中的音讯会写入到不同的 Leader 节点。如上图所示,音讯通过路由策略,被散发到不同的 Partition 对应的本地队列,而后再批量发送到 Partition 对应的 Leader 节点。

2.2. 音讯路由

Kafka 中 Topic 有多个 Partition,那么音讯调配到某个 Partition 的策略称为路由策略。Kafka 的路由策略次要有三种:

  • Round Robin:Producer 将音讯平衡地调配到各 Partition 本地队列上,是最罕用的分区策略。

  • 散列:Kafka 对音讯的 key 进行散列,依据散列值将音讯路由到特定的 Parttion 上,键雷同的音讯总是被路由到雷同的 Partition 上。
  • 自定义分区策略:Kafka 反对自定义分区策略,能够将某一系列的音讯映射到雷同的 Partition。

    2.3 发送流程

上图是 Producer 生产音讯到发送到 Broker 的主流程。Producer 先生产音讯、序列化音讯并压缩音讯后,追加到本地的记录收集器(RecordAccumulator),Sender 一直轮询记录收集器,当满足肯定条件时,将队列中的数据发送到 Partition Leader 节点。Sender 发送数据到 Broker 的条件有两个:

  • 音讯大小达到阈值
  • 音讯期待发送的工夫达到阈值
    Producer 会为每个 Partition 都创立一个双端队列来缓存客户端音讯,队列的每个元素是一个批记录(ProducerBatch),批记录应用 createdMs 示意批记录的创立工夫(批记录中第一条音讯退出的工夫),topicPartion 示意对应的 Partition 元数据。当 Producer 生产的音讯通过序列化,会被先写入到 recordsBuilder 对象中。一旦队列中有批记录的大小达到阈值,就会被 Sender 发送到 Partition 对应的 Leader 节点;若批记录期待发送的工夫达到阈值,音讯也会被发送到 Partition 对应的 Leader 节点中。

追加音讯时首先要获取 Partition 所属的队列,而后取队列中最初一个批记录,如果队列中不存在批记录或者批记录的大小达到阈值,应该创立新的批记录,并且退出队列的尾部。这里先创立的批记录最先被音讯填满,后创立的批记录示意最新的音讯,追加音讯时总是往队列尾部的批记录中追加。记录收集器用来缓存客户端的音讯,还须要通过 Sender 能力将音讯发送到 Partition 对应的 Leader 节点。

   Sender 读取记录收集器,失去每个 Leader 节点对应的批记录列表,找出筹备好的 Broker 节点并建设连贯,而后将各个 Partition 的批记录发送到 Leader 节点。Sender 的外围代码如下:
//Sender 读取记录收集器,依照节点分组,创立客户端申请,发送申请
public void run(long now) {Cluster cluster = metadata.fetch();
  // 获取筹备发送的所有分区
  ReadCheckResult result = accumulator.ready(cluster, now);
  // 建设到 Leader 节点的网络连接,移除还没有筹备好的节点
  Iterator<Node> iter = result.readyNodes.iterator();
  while(iter.hasNext()) {Node node = iter.next();
    if (!this.client.read(node, now)) {iter.remove();
    }
    // 读取记录收集器,返回的每个 Leader 节点对应的批记录列表,每个批记录对应一个分区
    Map<Integer, List<RecordBatch>> batches = accumulator.drain(cluster, result.readyNodes, 
                                                                this.maxRequestSize, now);
    // 以节点为级别的生产申请列表,即每个节点只有一个客户端申请
    List<ClientRequest> requests = createProduceRequests(batches, now);
    for (ClientRequest request : requests) {client.send(request, now);
    }
    // 这里才会执行真正的网络读写,比方将下面的客户端申请发送进来
    this.client.poll(pollTimeout, now);
  }
}

具体的步骤如下:

  1. 音讯被记录收集器收集,并依照 Partition 追加到队列尾部一个批记录中。
  2. Sender 通过 ready()从记录收集器中找出曾经筹备好的服务端节点,规定是 Partition 期待发送的音讯大小和期待发送的工夫达到阈值。
  3. 节点曾经筹备好,如果客户端还没有和它们建设连贯,通过 connect()建设到服务端的连贯。
  4. Sender 通过 drain()从记录收集器获取依照节点整顿好的每个 Partition 的批记录。
  5. Sender 失去每个节点的批记录后,为每个节点创立客户端申请,并将音讯发送到服务端。

    3、音讯长久化

    3.1 随机 IO 和程序 IO

上图是磁盘的繁难模型图。磁盘上的数据由柱面号、盘片号、扇区号标识。当须要从磁盘读数据时,零碎会将数据逻辑地址传给磁盘,磁盘的控制电路依照寻址逻辑地址翻译成物理地址,即确定要读的数据在哪个磁道,哪个扇区。

   为了实现读取这个扇区的数据,须要将磁头放到这个扇区上方,为了实现这一点:
  • 首先必须找到柱面,即磁头须要挪动对准相应磁道,这个过程叫做寻道或定位;
  • 盘面确定当前,盘片开始旋转,将指标扇区旋转到磁头下
    因而一次读数据申请实现过程由三个动作组成:
  • 寻道:磁头挪动定位到指定磁道,这部分工夫代价最高,最大可达到 0.1s 左右;
  • 旋转提早:期待指定扇区旋转至磁头下。与硬盘本身性能无关,xxxx 转 / 分;
  • 数据传输:数据通过系统总线从磁盘传送到内存的工夫。
    对于从磁盘中读取数据的操作,叫做 IO 操作,这里有两种状况:
  • 假如咱们所须要的数据是随机扩散在磁盘的不同盘片的不同扇区中的,那么找到相应的数据须要等到磁臂通过寻址作用旋转到指定的盘片,而后盘片寻找到对应的扇区,能力找到咱们所须要的一块数据,一次进行此过程直到找完所有数据,称为随机 IO,读取数据速度较慢。
  • 假如咱们曾经找到了第一块数据,并且其余所需的数据就在这一块数据后边,那么就不须要从新寻址,能够顺次拿到咱们所需的数据,称为程序 IO。
    程序 IO 绝对于随机 IO,缩小了大量的磁盘寻址过程,进步了数据的查问效率。

    3.2 Broker 写音讯

    Broker 中须要将大量的音讯做长久化,而且存在大量的音讯查问场景,如果采纳传统的 IO 操作,会带来大量的磁盘寻址,影响音讯的查问速度,限度了 Kafka 的性能。为了解决这个问题,Kafka 采纳程序写的形式来做音讯长久化。

       Producer 传递到 Broker 的音讯集中的每条音讯都会调配一个程序值,用来标记 Producer 所生产音讯的程序,每一批音讯的程序值都从 0 开始。下图给出一个例子,Producer 写到 Partition 的音讯有 3 条音讯,对应的程序值是[0,1,2]。


Producer 创立的音讯集中每条音讯的程序值只是绝对于本批次的序号,所以这个值不能间接存储在日志文件中。服务端会将每条音讯的程序值转换成相对偏移量 (Broker 从 Partition 维度来标记音讯的程序,用于管制 Consumer 生产音讯的程序)。Kafka 通过 nextOffset(下一个偏移量) 来记录存储在日志中最近一条音讯的偏移量。

Message 相对偏移量 程序值
Message0 900 0
Message1 901 1
Message2 902 2

如上表所示,音讯写入前,nextOffset 是 899,Message0、Message1、Message2 是间断写入的三条音讯,音讯被写入后其相对偏移量别离是 900、901、902,对应的程序值别离是 0、1、2,nextOffset 变成 902。

    Broker 将每个 Partition 的音讯追加到日志中,是以日志分段 (Segment) 为单位的。当 Segment 的大小达到阈值 (默认是 1G) 时,会新创建一个 Segment 保留新的音讯,每个 Segment 都有一个基准偏移量 (baseOffset,每个 Segment 保留的第一个音讯的相对偏移量),通过这个基准偏移量,就能够计算出每条音讯在 Partition 中的相对偏移量。每个日志分段由数据文件和索引文件组,数据文件(文件名以 log 结尾) 保留了音讯集的具体内容,索引文件 (文件名以 index 结尾) 保留了音讯偏移量到物理地位的索引。如下图所示:

Broker 中通过下一个偏移量元数据(nextOffsetMetaData),指定以后写入日志的音讯的起始偏移值,在追加音讯后,更新 nextOffsetMetaData,作为下一批音讯的起始偏移量。外围代码如下所示:

@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), 
                                                        activeSegment.baseOffset, activeSegment.size.toInt);
def append(messages:ByteBufferMessageSet, assignOffsets:Boolean) = {
    //LogAppendInfo 对象,代表这批音讯的概要信息,而后对音讯进行验证
    var appendInfo = analyzeAndValidateMessageSet(messages)
    var validMessages = trimInvalidBytes(messages, appendInfo)
    // 获取最新的”下一个偏移量“作为第一条音讯的相对偏移量
    appendInfo.firstOffset = nextOffsetMetadata.messageOffset
    if (assignOffsets) { // 如果每条音讯的偏移量都是递增的
      // 音讯的起始偏移量来自于最新的”下一个偏移量“,而不是音讯自带的程序值
      var offset = new AtomicLong(nextOffsetMetadata.messageOffset);
      // 基于起始偏移量,为无效的音讯集的每条音讯重新分配相对偏移量
      validMessages = validMessages.validateMessagesAndAssignOffsets(offset);
      appendInfo.lastOffset = offset.get - 1 // 最初一条音讯的相对偏移量
    }
    var segment = maybeRoll(validMessages.sizeInBytes) // 如果达到 Segment 大小的阈值,须要创立新的 Segment
    segment.append(appendInfo.firstOffset,validMessages) // 追加音讯到以后分段
    updateLogEndOffset(appendInfo.lastOffset + 1) // 批改最新的”下一个偏移量“if (unflushedMessages >= config.flushInterval) {flush() // 如果没有刷新的音讯数大于配置的,那么将音讯刷入到磁盘
    }
}
// 更新日志的”最近的偏移量“,传入的参数个别是最初一条音讯的偏移量加上 1
// 应用发须要获取日志的”最近的量“时,就不须要再做加一的操作了
private def updateLogEndOffset(messageOffset:Long) {nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset,activeSegment.size.toInt)
}

nextOffsetMetaData 的读写操作产生在长久化和读取音讯中,具体流程如下所示:
1、Producer 发送音讯集到 Broker,Broker 将这一批音讯追加到日志;
2、每条音讯须要指定相对偏移量,Broker 会用 nextOffsetMetaData 的值作为起始偏移值;
3、Broker 将每条带有偏移量的音讯写入到日志分段中;
4、Broker 获取这一批音讯中最初一条音讯的偏移量,加 1 后更新 nextOffsetMetaData;
5、Consumer 依据这个变量的最新值拉取音讯。一旦这个值发生变化,Consumer 就能拉取到新写入的音讯。

    因为写入到日志分段中的音讯集,都是以 nextOffsetMetaData 作为起始的相对偏移量。因为这个起始偏移量总是递增,所以每一批音讯的偏移量也放弃递增,而且每一个 Partition 的所有日志分段中,所有音讯的偏移量都是递增。如下图所示,新创建日志分段的基准偏移量,比之前的分段的基准偏移量要大,同一个日志分段中,新音讯的偏移量也比之前音讯的偏移量要大。


建设索引文件的目标:疾速定位指定偏移量音讯在数据文件中的物理地位。其中索引文件保留的是一部分音讯的绝对偏移量到物理地位的映射,应用绝对偏移量而不是相对偏移量是为了节约内存。

3.3 基于索引文件查问

Kafka 通过索引文件进步对磁盘上音讯的查问效率。

如上图所示:假如有 1000 条音讯,每 100 条音讯写满了一个日志分段,一共会有 10 个日志分段。客户端要查问偏移量为 938 的音讯内容,如果没有索引文件,咱们必须从第一个日志分段的数据文件中,从第一条音讯始终往前读,直到找到偏移量为 999 的音讯。有了索引文件后,咱们能够在最初一个日志分段的索引文件中,首先应用相对偏移量 999 减去基准偏移量 900 失去绝对偏移量 99,而后找到最靠近绝对偏移量 99 的索引数据 90,绝对偏移量 90 对应的物理地址是 1365,而后再到数据文件中,从文件物理地位 1365 开始往后读音讯,直到找到偏移量为 999 的音讯。

  Kafka 的索引文件的个性:
  • 索引文件映射偏移量到文件的物理地位,它不会对每条音讯都建设索引,所以是稠密的。
  • 索引条目标偏移量存储的是绝对于“基准偏移量”的“绝对偏移量”,不是音讯的“相对偏移量”。
  • 偏移量是有序的,查问指定的偏移量时,应用二分查找能够疾速确定偏移量的地位。
  • 指定偏移量如果在索引文件中不存在,能够找到小于等于指定偏移量的最大偏移量。
  • 稠密索引能够通过内存映射形式,将整个索引文件都放入内存,放慢偏移量的查问。

因为 Broker 是将音讯长久化到以后日志的最初一个分段中,写入文件的形式是追加写,采纳了对磁盘文件的程序写。对磁盘的程序写以及索引文件放慢了 Broker 查问音讯的速度。

4、零拷贝

Kafka 中存在大量的网络数据长久化到磁盘 (Producer 到 Broker) 和磁盘文件通过网络发送 (Broker 到 Consumer) 的过程。这一过程的性能间接影响到 Kafka 的整体性能。Kafka 采纳零拷贝这一通用技术解决该问题。

    零拷贝技术能够缩小数据拷贝和共享总线操作的次数,打消传输数据在存储器之间不必要的两头拷贝次数,缩小用户应用程序地址空间和操作系统内核地址空间之间因为上下文切换而带来的开销,从而无效地进步数据传输效率。以将磁盘文件通过网络发送为例。上面展现了传统形式下读取数据后并通过网络发送所产生的数据拷贝:

    • 一个读操作产生后,DMA 执行了一次数据拷贝,数据从磁盘拷贝到内核空间;
    • cpu 将数据从内核空间拷贝至用户空间
    • 调用 send(),cpu 产生第三次数据拷贝,由 cpu 将数据从用户空间拷贝至内核空间(socket 缓冲区)
    • send()执行完结后,DMA 执行第四次数据拷贝,将数据从内核拷贝至协定引擎
      Linux 2.4+ 内核通过 sendfile 零碎调用,提供了零拷贝。数据通过 DMA 拷贝到内核态 Buffer 后,间接通过 DMA 拷贝到 NIC Buffer,无需 CPU 拷贝,这也是零拷贝这一说法的起源。除了缩小数据拷贝外,因为整个读文件 - 网络发送由一个 sendfile 调用实现,整个过程只有两次上下文切换,没有 cpu 数据拷贝,因而大大提高了性能。零拷贝过程如下图所示。

      • sendfile()通过 DMA 将文件内容拷贝到一个读取缓冲区,而后由内核将数据拷贝到与输入套接字相关联的内核缓冲区。

      从具体实现来看,Kafka 的数据传输通过 TransportLayer 来实现,其子类 PlaintextTransportLayer 通过 Java NIO 的 FileChannel 的 transferTo()和 transferFrom()办法实现零拷贝。transferTo()和 transferFrom()并不保障肯定能应用零拷贝,实际上是否能应用零拷贝与操作系统相干,如果操作系统提供 sendfile 这样的零拷贝零碎调用,则这两个办法会通过这样的零碎调用充分利用零拷贝的劣势,否则并不能通过这两个办法自身实现零拷贝。

    参考文献

    《Kafka 技术底细:图文详解 Kafka 源码设计与实现》
    《Kafka 权威指南》
    Kafka 的高性能磁盘读写实现原
    Kafka 日志存储

    正文完
     0