1 整体过程

(1)sender线程整体是个while循环,就是在一直轮询每个partition-batch外面有没有适合发送的batch数据。

(2)有要发送的batch数据,看看元数据有没有(没有的要做个标记,前面元数据到位能力发),有元数据才晓得往哪个具体机器发。晓得了机器,还要建设连贯。

(3)对要发送到对立机器的batch做一下聚合分组,聚合后的一组封装成一个request,不便一起发送节俭资源。

(4)对发送的数据依照kafka的协定标准进行二进制转化。

(5)通过for循环,用kafka的networkclient把一个个request放入本人的inFlightRequests列表,留神只是进了发送队列,还没真发。

(6)通过networkclient的网络读写事件,用java的nio进行真正收发数据。

2 收集可发送的batch批次

对照下面的accumulator组件的ready办法,这个组件就是作为producer发送数据的缓冲区,依照topic-patition-batch队列的数据结构进行存储,遍历后失去队列队列第一个batch,而后判断各种条件:
(1)如果batch是重试,没到重试距离(100ms默认),先不发。

(2)batch只有一个,而且满了,能够发。

(3)batch有多个,取第一个发。(多个是因为后面的满了,才会申请更多内存)

(4)没满,但到了每个batch的最大等待时间(配置是 linger.ms ),也要发。如果linger.ms 配置了0,那么在这只有有batch就肯定发。

(5)内存不足,强制flush发送等场景,都须要立即发。
满足了发送条件的batch,会被收集起来。

3 连贯建设过程

元数据拉取过程后面有独自的一篇介绍,这里是看连贯的建设过程,是networkClient的ready办法,次要通过一些连贯状态察看连贯的建设状况,通过kafka本人封装的selectable组件发动连贯,留神连贯是非阻塞的,放弃alive的,并且敞开nable算法(目标是放慢发送)。连贯对立管制在selectable中,组件关系为:
NetworkClient持有Selectable,Selectable持有各个机器的连贯。

4 外围:NetworkClient的poll

(1)networkClient的poll办法,其实实质上就是selectable的poll办法,下面说了,selectable持有了每个连贯,那么也就有selectKey,所以用nio的办法,解决各种网络事件,典型的就是建设连贯,读事件,写事件。
外围组件有:

inFlightRequests:正在发送或期待回应的音讯,下面看到sender线程,把可发送的batch调用了send办法,那里不是真发,而是放入本队列。socketSendBuffer,socketReveiceBuffer: 发送/接收缓冲区

(2)selectable的nio几个重要成员变量:

  a channels,nioselector:保留channel和解决nio的组件,封装在这个网络组件内,十分适合。  b List<String> connected :有连贯的node实例列表  c completedReveices : 曾经承受回来的响应  d completedSends : 曾经收回去的申请  e stagedReceive: 暂存的接管到的轻球

(3)selectable会轮询所有selectkey,失去有事件的key,而后解决三种事件

(4) ON_CONNECT 连贯事件 ,上文说过,发动的连贯是非阻塞的,不论连贯胜利没胜利间接往下走,这里就要调用finishConnect,真正阻塞期待连贯建设,节省时间,这个思路值得学习。

(5)OP_READ 连贯建设后,默认加上了读事件,通过定制的kafkachannel进行读取数据。通过NetworkReceive形象封装响应,kafka的音讯格局也是常见的 音讯长度(固定4字节)+音讯内容组成。而后kafkachannel会有固定的4个字节的buffer来读长度,而后依据音讯长度申请对应大小的内容buffer来下一步读取内容。

    有了这个模式,那么拆包问题能够解决:音讯长度4个字节的buffer没读满,阐明拆包了,这次掠过下次持续读,读到buffer满了才认为长度拿到;而后内容buffer没读完也阐明拆包了,同理下回持续读,读满为止。        读取进去的数据,会放入stageReceives队列里,外面存了channel和receive的对应关系,前面的办法会紧接着这里的receive退出到completedReceives外面。留神,这里一次只会把一个receive放入completeReceives外面,起因是如果一次读取进去了多个receive响应,心愿前面的逻辑能一条一条解决。

(6)OP_WRITE
op_write事件,封装了send对象,这个send对象就是最外层send线程往networkclient外面增加的分组好了的request,而后kafkachannel利用nio进行发送,如果遇到拆包没发完,那就屡次发送,发完了当前,移除channel的write事件,而后把收回去的申请,退出到selector的completeSends队列里。

所以综上所述,selectable的poll的流程,其实是应用nio解决对应的事件,而后把事件后果写入到selectable的多个队列里,不便networkclient间接读取队列失去不同场景的数据。这种封装解藕思路值得学习。

5 networkclient后续解决
还是参考上图,后续其实是想把返回的receive,调用胜利或失败的回调函数。
(1)handleCompletedSend, 其实是把ack=0的状况立即返回了,因为不必等网络申请,只有accumulator存进去根本就能够认为发送胜利了,而后封装个response放个列表。

(2)handleCompletedReceive,这里把收集到的receive封装个response,也放入列表

(3)handleTimeoutRequests, 对超时的申请标记下node状态,下回从新拉元数据

(4)最终,遍历所有response,调用producer胜利的response,发送正式结束。