kafka在音讯指定好分区后,并没有间接把音讯收回去,而是把音讯寄存在内存中,再分批收回去,这样就缩小了网络传输的次数,缩小了网络的开销,整体的效率就会失去了很大的晋升。治理音讯的内存叫做RecordAccumulator,即音讯累加器。
batches
上一章咱们曾经晓得了音讯是往哪个partition发的,所以topic和partition就会组成一个TopicPartition对象。
如果一个topic有3个partition,那就会有3个TopicPartition,这3个TopicPartition在音讯累加器里,就有对应的3个队列Deque,这个信息,寄存在数据结构为CopyOnWriteMap的batches中。
当然RecordAccumulator一开始的时候,必定是没有对应的Deque,比方topic1+0的TopicPartition刚筹备往Deque扔数据的时候,就会看看是否有这个TopicPartition对应的Deque,如果有,就拿进去用,如果没有,就会创立一个。
Deque和是TopicPartition在RecordAccumulator是以map来做映射关系的,因为get的时候比拟多,put的时候时候比拟少,是一个写少读多的 场景,所以这个map是CopyOnWriteMap的构造,在put的时候会先复制再写,导致内存有肯定的耗费,然而对于get来说,因为没有加锁,而且是线程平安的,性能就大大的晋升。
RecordBatch
Deque队列里寄存的并不是音讯自身,而是一个个封装音讯的RecordBatch,每个RecordBatch默认大小为16k,由batch.size
进行设置。
如果第一个音讯封装后有4k,第二个音讯封装后有10k,那这两个音讯是放在同一个RecordBatch的。
如果音讯的大小超过了默认的16k,那这个RecordBatch的大小就是以理论的大小为准。
BufferPool
下面的内存空间并不是能够始终申请的,他有一个总的空间,默认大小为32M,由buffer.memory
进行设置。
内存空间通过BufferPool进行治理,每次申请一个新的RecordBatch,都要看看以后已用的空间+以后申请的RecordBatch是不是小于32M,如果小,那就能够始终申请。
BufferPool外面也有一个Deque队列,当大小为16k的RecordBatch用完的时候,就会把这个空间寄存在BufferPool的Deque里,当申请内存的时候,就能够间接从Deque里拿进去。这样做的目标是为了避免反复的创建对象,进而引起垃圾回收。
如果申请的时候,32M的内存都被耗费完了,此时就会阻塞在这里,等有空间偿还了,会被唤醒持续申请内存了。
如果RecordBatch的大小并不等于16k,那他此时是没有方法回收到BufferPool的Deque里,所以只能等着垃圾回收,所以咱们应用kafka的时候,batch.size
的大小要依据业务须要进行扩充,比方咱们根本都是发送500k的数据,BufferPool的Deque实际上并没有起到什么作用,JVM还会始终进行垃圾回收。