共计 2016 个字符,预计需要花费 6 分钟才能阅读完成。
1 整体发消息流程
(1)第一步,等元数据拉取,上一回说过。
(2)元数据到位,对 topic 和 key 进行序列化。
(3)选取 partition,3 种状况
a 如果音讯里指定了 patition 的序号,先用指定的。但个别不会这么
b 没指定 key,就用个原子 int 自增,和 size 取模抉择 partition,相当于轮询。C 指定了 key,那就把序列化后的 key+topic,转化成 hash,再取模。
(4)对音讯做大小校验,包含音讯自身大小是否超过单条限度,是否超过缓冲区大小。
(5)音讯进入缓冲区,Acumulator, 重点,前面sender 线程会操作缓冲区,进行网络发送 / 接管
。
(6)如果 batch 满了,或者有新 batch 了,唤醒 send,筹备发老 batch。
2 缓冲区 RecordAcumulator 里退出音讯 - 概览
(1)RecordAccumulator 的数据结构,重点是 batches 这个 map,topic->deque 队列 ->batchs 的三层构造
(2)发消息的第一步,就是把音讯放入这个 accumulator 里,外面的 deque,batch,batch 对应的 buffer,都须要初始化过程,初始化实现当前,只须要把对应音讯放到某个 topic 的 batch 里就行了。
(3)每个组件的初始化,都保障了线程平安。
3 RecordAccumulator 退出音讯细节解析
依据上图,看看实现细节:
(1)getOrCreateDeque, 第一次来是空的,建设一个退出 batches,上图看到 batches 是个 concurrentHashMap,key 是 topicPartition, 对应的 hash 值是 topic + “-” + partition,就算是并发 send,这里也能保障线程平安。
(2)tryappend,尝试写入音讯,
a 能够看到,tryappend 是拿到 topic+partition 的最近一个 recordbatch,退出音讯。b RecordBatch 外面有 MemoryRecords, 封装了底层 buffer。c 写入数据的过程,其实是通过 compressor 组件,把音讯格局解析成 offset|size|crc|magic|attributes|timestamp|key size|key| value size | value 固定的 kafka 报文格式,而后通过输出流写入对应的 buffer。
对于个别的异步写音讯过程,其实这里写入胜利了 send 就返回了。
由此可见,一个 topic+patition,对应一个发送队列,对应 n 个发送批次,同时也对应 n 个发送缓冲区。
(3)第一次发这个 topic-partition 的音讯,是没有 batch 的,也就没有对应的内存缓冲区,对应上图右上角返回 null 的时候,就要去 申请内存缓冲 buffer
。
(4)失去 buffer 当前,buffer 封装到 MemoryRecords 外面,而后封装到 RecordBatch 代表一个批次,而后调用该 batch 的 tryAppend, 这次就能够写入了,最初把 batch 放入 acummulator 对应的 topic-partition 队列里。
4 申请内存缓冲区的过程
(1)第一次筹备往某个 topic 的 partition 发消息的时候,RecordAccumulator 里的对应的 deque 里必定没有它的缓冲队列 batch,那么就须要申请 batch 批次。bach 批次的实质是封装的 ByteBuffer
, 这又须要从机器内存申请。
(2)申请内存,须要从 RecordAccumulator 的 BufferPool 来申请,这是个 buffer 池,记录了 buffer 队列 Deque<ByteBuffer> , 还保护了缓冲区的总内存 totalMemory,可用内存 availableMemory,每个 batch 的 buffer 大小 poolableSize 等。
(3)申请缓存,如果残余缓存够,那么分两种状况
a 如果来申请的内存就是批次大小,并且 buffer 池里有现成的,那么间接 poll 一个返回
b 如果申请的大小大于一个批次大小,并且残余缓存够,那么就从 buffer 池里开释一些缓存进去,调配一个大的缓存 buffer 进来。freeup 就是 while 循环开释 buffer 池的 buffer,直到残余缓存够调配 buffer 为止。
(4)如果残余缓存不够申请的,那就阻塞,等到别的线程开释出内存资源再来唤醒本线程,这里的唤醒可能是发送完一个 batch 的时候,batch 空了就把占用的 buffer 还回队列里。
假如唤醒了,这里要查看两个方面,一个是唤醒后残余缓存或 buffer 池有没有空余了,一个是等待时间有没有超时(这里工夫指的是 producer 的 send 的超时工夫)。而后从buffer 池里调配一个或者内存池里调配一份 buffer 进去
。
5 发消息缓冲区的数据结构
由此可见,简化版的发送缓冲区的数据结构,集成了音讯批次划分,缓冲区暂存和零碎内存调配三个性能