关于kafka:深度剖析-Kafka-Producer-的缓冲池机制图解-源码分析

8次阅读

共计 5411 个字符,预计需要花费 14 分钟才能阅读完成。

上次跟大家分享的文章「Kafka Producer 异步发送音讯竟然也会阻塞?」中提到了缓冲池,前面再通过一番浏览源码后,发现了这个缓冲池设计的很棒,被它的设计思维优雅到了,所以忍不住跟大家持续分享一波。

在新版的 Kafka Producer 中,设计了一个音讯缓冲池,在创立 Producer 时会默认创立一个大小为 32M 的缓冲池,也能够通过 buffer.memory 参数指定缓冲池的大小,同时缓冲池被切分成多个内存块,内存块的大小就是咱们创立 Producer 时传的 batch.size 大小,默认大小 16384,而每个 Batch 都会蕴含一个 batch.size 大小的内存块,音讯就是寄存在内存块当中。整个缓冲池的构造如下图所示:

客户端将音讯追加到对应主题分区的某个 Batch 中,如果 Batch 曾经满了,则会新建一个 Batch,同时向缓冲池(RecordAccumulator)申请一块大小为 batch.size 的内存块用于存储音讯。

当 Batch 的音讯被发到了 Broker 后,Kafka Producer 就会移除该 Batch,既然 Batch 持有某个内存块,那必然就会波及到 GC 问题,如下:

以上,频繁的申请内存,用完后就抛弃,必然导致频繁的 GC,造成重大的性能问题。那么,Kafka 是怎么做到防止频繁 GC 的呢?

后面说过了,缓冲池在设计逻辑下面被切分成一个个大小相等的内存块,当音讯发送结束,归还给缓冲池不就能够防止被回收了吗?

缓冲池的内存持有类是 BufferPool,咱们先来看下 BufferPool 都有哪些成员:

public class BufferPool {
  // 总的内存大小
  private final long totalMemory;
  // 每个内存块大小,即 batch.size
  private final int poolableSize;
  // 申请、偿还内存的办法的同步锁
  private final ReentrantLock lock;
  // 闲暇的内存块
  private final Deque<ByteBuffer> free;
  // 须要期待闲暇内存块的事件
  private final Deque<Condition> waiters;
  /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
  // 缓冲池还未调配的闲暇内存,新申请的内存块就是从这里获取内存值
  private long nonPooledAvailableMemory;
    // ...
}

从 BufferPool 的成员可看出,缓冲池实际上由一个个 ByteBuffer 组成的,BufferPool 持有这些内存块,并保留在成员 free 中,free 的总大小由 totalMemory 作限度,而 nonPooledAvailableMemory 则示意还剩下缓冲池还剩下多少内存还未被调配。

当 Batch 的音讯发送结束后,就会将它持有的内存块偿还到 free 中,以便前面的 Batch 申请内存块时不再创立新的 ByteBuffer,从 free 中取就能够了,从而防止了内存块被 JVM 回收的问题。

接下来跟大家一起剖析申请内存和偿还内存是如何实现的。

1、申请内存

申请内存的入口:

org.apache.kafka.clients.producer.internals.BufferPool#allocate

1)内存足够的状况

当用户申请申请内存时,如果发现 free 中有闲暇的内存,则间接从中取:

if (size == poolableSize && !this.free.isEmpty()){return this.free.pollFirst(); 
}

这里的 size 即申请的内存大小,它等于 Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

即如果你的音讯大小小于 batchSize,则申请的内存大小为 batchSize,那么下面的逻辑就是如果申请的内存大小等于 batchSize 并且 free 不闲暇,则间接从 free 中获取。

咱们无妨想一下,为什么 Kafka 肯定要申请内存大小等于 batchSize,能力从 free 获取闲暇的内存块呢?

后面也说过,缓冲池的内存块大小是固定的,它等于 batchSize,如果申请的内存比 batchSize 还大,阐明一条音讯所须要寄存的内存空间比内存块的内存空间还要大,因而不满足需要,不满组需要怎么办呢?咱们接着往下剖析:

// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
  // we have enough unallocated or pooled memory to immediately
  // satisfy the request, but need to allocate the buffer
  freeUp(size);
  this.nonPooledAvailableMemory -= size;
}

freeListSize:指的是 free 中曾经调配好并且曾经回收的闲暇内存块总大小;

nonPooledAvailableMemory:缓冲池还未调配的闲暇内存,新申请的内存块就是从这里获取内存值;

this.nonPooledAvailableMemory + freeListSize:即缓冲池中总的闲暇内存空间。

如果缓冲池的内存空间比申请内存大小要大,则调用 freeUp(size); 办法,接着将闲暇的内存大小减去申请的内存大小。

private void freeUp(int size) {while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
    this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

freeUp 这个办法很乏味,它的思维是这样的:

如果未调配的内存大小比申请的内存还要小,那只能从已调配的内存列表 free 中将内存空间要回来,直到 nonPooledAvailableMemory 比申请内存大为止。

2)内存不足的状况

在我的「Kafka Producer 异步发送音讯竟然也会阻塞?」这篇文章当中也提到了,当缓冲池的内存块用完后,音讯追加调用将会被阻塞,直到有闲暇的内存块。

阻塞期待的逻辑是怎么实现的呢?

// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
  this.waiters.addLast(moreMemory);
  // loop over and over until we have a buffer or have reserved
  // enough memory to allocate one
  while (accumulated < size) {long startWaitNs = time.nanoseconds();
    long timeNs;
    boolean waitingTimeElapsed;
    try {waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
    } finally {long endWaitNs = time.nanoseconds();
      timeNs = Math.max(0L, endWaitNs - startWaitNs);
      recordWaitTime(timeNs);
    }

    if (waitingTimeElapsed) {throw new TimeoutException("Failed to allocate memory within the configured max blocking time" + maxTimeToBlockMs + "ms.");
    }

    remainingTimeToBlockNs -= timeNs;

    // check if we can satisfy this request from the free list,
    // otherwise allocate memory
    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
      // just grab a buffer from the free list
      buffer = this.free.pollFirst();
      accumulated = size;
    } else {
      // we'll need to allocate memory, but we may only get
      // part of what we need on this iteration
      freeUp(size - accumulated);
      int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
      this.nonPooledAvailableMemory -= got;
      accumulated += got;
    }
  }

以上源码的大抵逻辑:

首先创立一个本次期待 Condition,并且把它增加到类型为 Deque 的 waiters 中(前面在偿还内存中会唤醒),while 循环不断收集闲暇的内存,直到内存比申请内存大时退出,在 while 循环过程中,调用 Condition#await 办法进行阻塞期待,偿还内存时会被唤醒,唤醒后会判断以后申请内存是否大于 batchSize,如果等与 batchSize 则间接将偿还的内存返回即可,如果以后申请的内存大于 大于 batchSize,则须要调用 freeUp 办法从 free 中开释闲暇的内存进去,而后进行累加,直到大于申请的内存为止。

2、偿还内存

申请内存的入口:

org.apache.kafka.clients.producer.internals.BufferPool#deallocate(java.nio.ByteBuffer, int)

public void deallocate(ByteBuffer buffer, int size) {lock.lock();
  try {if (size == this.poolableSize && size == buffer.capacity()) {buffer.clear();
      this.free.add(buffer);
    } else {this.nonPooledAvailableMemory += size;}
    Condition moreMem = this.waiters.peekFirst();
    if (moreMem != null)
      moreMem.signal();} finally {lock.unlock();
  }
}

偿还内存块的逻辑比较简单:

如果偿还的内存块大小等于 batchSize,则将其清空后增加到缓冲池的 free 中,行将其归还给缓冲池,防止了 JVM GC 回收该内存块。如果不等于呢?间接将内存大小累加到未调配并且闲暇的内存大小值中即可,内存就无需偿还了,期待 JVM GC 回收掉,最初唤醒正在期待闲暇内存的线程。

通过以上的源码剖析之后,给大家指出须要留神的一个问题,如果设置不当,会给 Producer 端带来重大的性能影响:

如果你的音讯大小比 batchSize 还要大,则不会从 free 中循环获取已调配好的内存块,而是从新创立一个新的 ByteBuffer,并且该 ByteBuffer 不会被偿还到缓冲池中(JVM GC 回收),如果此时 nonPooledAvailableMemory 比音讯体还要小,还会将 free 中闲暇的内存块销毁(JVM GC 回收),以便缓冲池中有足够的内存空间提供给用户申请,这些动作都会导致频繁 GC 的问题呈现。

因而,须要依据业务音讯的大小,适当调整 batch.size 的大小,防止频繁 GC。

作者简介

作者张乘辉,善于消息中间件技能,负责公司百万 TPS 级别 Kafka 集群的保护,作者保护的公号「后端进阶」不定期分享 Kafka、RocketMQ 系列不讲概念间接真刀真枪的实战总结以及细节上的源码剖析;同时作者也是阿里开源分布式事务框架 Seata Contributor,因而也会分享对于 Seata 的相干常识;当然公号也会分享 WEB 相干常识比方 Spring 全家桶等。内容不肯定八面玲珑,但肯定让你感触到作者对于技术的谋求是认真的!

公众号:后端进阶

技术博客:https://objcoding.com/

GitHub:https://github.com/objcoding/

正文完
 0