1 整体发消息流程
(1)第一步,等元数据拉取,上一回说过。
(2)元数据到位,对topic和key进行序列化。
(3)选取partition,3种状况
a 如果音讯里指定了patition的序号,先用指定的。但个别不会这么 b 没指定key,就用个原子int自增,和size取模抉择partition,相当于轮询。 C 指定了key,那就把序列化后的key+topic,转化成hash,再取模。
(4)对音讯做大小校验,包含音讯自身大小是否超过单条限度,是否超过缓冲区大小。
(5)音讯进入缓冲区,Acumulator,重点,前面sender线程会操作缓冲区,进行网络发送/接管
。
(6)如果batch满了,或者有新batch了,唤醒send,筹备发老batch。
2 缓冲区RecordAcumulator里退出音讯-概览
(1)RecordAccumulator的数据结构,重点是batches这个map,topic->deque队列->batchs的三层构造
(2)发消息的第一步,就是把音讯放入这个accumulator里,外面的deque,batch,batch对应的buffer,都须要初始化过程,初始化实现当前,只须要把对应音讯放到某个topic的batch里就行了。
(3)每个组件的初始化,都保障了线程平安。
3 RecordAccumulator退出音讯细节解析
依据上图,看看实现细节:
(1)getOrCreateDeque, 第一次来是空的,建设一个退出batches,上图看到batches是个concurrentHashMap,key是topicPartition,对应的hash值是topic + "-" + partition,就算是并发send,这里也能保障线程平安。
(2)tryappend, 尝试写入音讯,
a 能够看到,tryappend是拿到topic+partition的最近一个recordbatch,退出音讯。b RecordBatch外面有MemoryRecords,封装了底层buffer。c 写入数据的过程,其实是通过compressor组件,把音讯格局解析成offset|size|crc|magic|attributes|timestamp|key size|key| value size | value 固定的kafka报文格式,而后通过输出流写入对应的buffer。
对于个别的异步写音讯过程,其实这里写入胜利了send就返回了。
由此可见,一个topic+patition,对应一个发送队列,对应n个发送批次,同时也对应n个发送缓冲区。
(3)第一次发这个topic-partition的音讯,是没有batch的,也就没有对应的内存缓冲区,对应上图右上角返回null的时候,就要去申请内存缓冲buffer
。
(4)失去buffer当前,buffer封装到MemoryRecords外面,而后封装到RecordBatch代表一个批次,而后调用该batch的tryAppend,这次就能够写入了,最初把batch放入acummulator对应的topic-partition队列里。
4 申请内存缓冲区的过程
(1)第一次筹备往某个topic的partition发消息的时候,RecordAccumulator里的对应的deque里必定没有它的缓冲队列batch,那么就须要申请batch批次。bach批次的实质是封装的ByteBuffer
,这又须要从机器内存申请。
(2)申请内存,须要从RecordAccumulator的BufferPool来申请,这是个buffer池,记录了buffer队列Deque<ByteBuffer> , 还保护了缓冲区的总内存totalMemory,可用内存availableMemory,每个batch的buffer大小poolableSize等。
(3)申请缓存,如果残余缓存够,那么分两种状况
a 如果来申请的内存就是批次大小,并且buffer池里有现成的,那么间接poll一个返回
b 如果申请的大小大于一个批次大小,并且残余缓存够,那么就从buffer池里开释一些缓存进去,调配一个大的缓存buffer进来。freeup就是while循环开释buffer池的buffer,直到残余缓存够调配buffer为止。
(4)如果残余缓存不够申请的,那就阻塞,等到别的线程开释出内存资源再来唤醒本线程,这里的唤醒可能是发送完一个batch的时候,batch空了就把占用的buffer还回队列里。
假如唤醒了,这里要查看两个方面,一个是唤醒后残余缓存或buffer池有没有空余了,一个是等待时间有没有超时(这里工夫指的是producer的send的超时工夫)。而后从buffer池里调配一个或者内存池里调配一份buffer进去
。
5 发消息缓冲区的数据结构
由此可见,简化版的发送缓冲区的数据结构,集成了音讯批次划分,缓冲区暂存和零碎内存调配三个性能