共计 1312 个字符,预计需要花费 4 分钟才能阅读完成。
kafka 在音讯指定好分区后,并没有间接把音讯收回去,而是把音讯寄存在内存中,再分批收回去,这样就缩小了网络传输的次数,缩小了网络的开销,整体的效率就会失去了很大的晋升。治理音讯的内存叫做 RecordAccumulator,即音讯累加器。
batches
上一章咱们曾经晓得了音讯是往哪个 partition 发的,所以 topic 和 partition 就会组成一个 TopicPartition 对象。
如果一个 topic 有 3 个 partition,那就会有 3 个 TopicPartition,这 3 个 TopicPartition 在音讯累加器里,就有对应的 3 个队列 Deque,这个信息,寄存在数据结构为 CopyOnWriteMap 的 batches 中。
当然 RecordAccumulator 一开始的时候,必定是没有对应的 Deque,比方 topic1+ 0 的 TopicPartition 刚筹备往 Deque 扔数据的时候,就会看看是否有这个 TopicPartition 对应的 Deque,如果有,就拿进去用,如果没有,就会创立一个。
Deque 和是 TopicPartition 在 RecordAccumulator 是以 map 来做映射关系的,因为 get 的时候比拟多,put 的时候时候比拟少,是一个写少读多的 场景,所以这个 map 是 CopyOnWriteMap 的构造,在 put 的时候会先复制再写,导致内存有肯定的耗费,然而对于 get 来说,因为没有加锁,而且是线程平安的,性能就大大的晋升。
RecordBatch
Deque 队列里寄存的并不是音讯自身,而是一个个封装音讯的 RecordBatch,每个 RecordBatch 默认大小为 16k,由 batch.size
进行设置。
如果第一个音讯封装后有 4k,第二个音讯封装后有 10k,那这两个音讯是放在同一个 RecordBatch 的。
如果音讯的大小超过了默认的 16k,那这个 RecordBatch 的大小就是以理论的大小为准。
BufferPool
下面的内存空间并不是能够始终申请的,他有一个总的空间,默认大小为 32M,由 buffer.memory
进行设置。
内存空间通过 BufferPool 进行治理,每次申请一个新的 RecordBatch,都要看看以后已用的空间 + 以后申请的 RecordBatch 是不是小于 32M,如果小,那就能够始终申请。
BufferPool 外面也有一个 Deque 队列,当大小为 16k 的 RecordBatch 用完的时候,就会把这个空间寄存在 BufferPool 的 Deque 里,当申请内存的时候,就能够间接从 Deque 里拿进去。这样做的目标是为了避免反复的创建对象,进而引起垃圾回收。
如果申请的时候,32M 的内存都被耗费完了,此时就会阻塞在这里,等有空间偿还了,会被唤醒持续申请内存了。
如果 RecordBatch 的大小并不等于 16k,那他此时是没有方法回收到 BufferPool 的 Deque 里,所以只能等着垃圾回收,所以咱们应用 kafka 的时候,batch.size
的大小要依据业务须要进行扩充,比方咱们根本都是发送 500k 的数据,BufferPool 的 Deque 实际上并没有起到什么作用,JVM 还会始终进行垃圾回收。