关于java:kafka原理剖析4producer消息发送之缓冲区

2次阅读

共计 2590 个字符,预计需要花费 7 分钟才能阅读完成。

1 整体过程

(1)sender 线程整体是个 while 循环,就是在一直轮询每个 partition-batch 外面有没有适合发送的 batch 数据。

(2)有要发送的 batch 数据,看看元数据有没有(没有的要做个标记,前面元数据到位能力发),有元数据才晓得往哪个具体机器发。晓得了机器,还要建设连贯。

(3)对要发送到对立机器的 batch 做一下聚合分组,聚合后的一组封装成一个 request,不便一起发送节俭资源。

(4)对发送的数据依照 kafka 的协定标准进行二进制转化。

(5)通过 for 循环,用 kafka 的 networkclient 把一个个 request 放入本人的 inFlightRequests 列表,留神只是进了发送队列,还没真发。

(6)通过 networkclient 的网络读写事件,用 java 的 nio 进行真正收发数据。

2 收集可发送的 batch 批次

对照下面的 accumulator 组件的 ready 办法,这个组件就是作为 producer 发送数据的缓冲区,依照 topic-patition-batch 队列的数据结构进行存储,遍历后失去队列队列第一个 batch,而后判断各种条件:
(1)如果 batch 是重试,没到重试距离(100ms 默认),先不发。

(2)batch 只有一个,而且满了,能够发。

(3)batch 有多个,取第一个发。(多个是因为后面的满了,才会申请更多内存)

(4)没满,但到了每个 batch 的最大等待时间(配置是 linger.ms),也要发。如果 linger.ms 配置了 0,那么在这只有有 batch 就肯定发。

(5)内存不足,强制 flush 发送等场景,都须要立即发。
满足了发送条件的 batch,会被收集起来。

3 连贯建设过程

元数据拉取过程后面有独自的一篇介绍,这里是看连贯的建设过程,是 networkClient 的 ready 办法,次要通过一些连贯状态察看连贯的建设状况,通过 kafka 本人封装的 selectable 组件发动连贯,留神连贯是非阻塞的,放弃 alive 的,并且敞开 nable 算法(目标是放慢发送)。连贯对立管制在 selectable 中,组件关系为:
NetworkClient 持有 Selectable,Selectable 持有各个机器的连贯。

4 外围:NetworkClient 的 poll

(1)networkClient 的 poll 办法,其实实质上就是 selectable 的 poll 办法,下面说了,selectable 持有了每个连贯,那么也就有 selectKey, 所以用 nio 的办法,解决各种网络事件,典型的就是建设连贯,读事件,写事件。
外围组件有:

inFlightRequests:正在发送或期待回应的音讯,下面看到 sender 线程,把可发送的 batch 调用了 send 办法,那里不是真发,而是放入本队列。socketSendBuffer,socketReveiceBuffer: 发送 / 接收缓冲区

(2)selectable 的 nio 几个重要成员变量:

  a channels,nioselector:保留 channel 和解决 nio 的组件,封装在这个网络组件内,十分适合。b List<String> connected:有连贯的 node 实例列表
  c completedReveices : 曾经承受回来的响应
  d completedSends : 曾经收回去的申请
  e stagedReceive: 暂存的接管到的轻球

(3)selectable 会轮询所有 selectkey,失去有事件的 key,而后解决三种事件

(4)ON_CONNECT 连贯事件,上文说过,发动的连贯是非阻塞的,不论连贯胜利没胜利间接往下走,这里就要调用 finishConnect, 真正阻塞期待连贯建设,节省时间,这个思路值得学习。

(5)OP_READ 连贯建设后,默认加上了读事件,通过定制的 kafkachannel 进行读取数据。通过 NetworkReceive 形象封装响应,kafka 的音讯格局也是常见的 音讯长度(固定 4 字节)+ 音讯内容组成。而后 kafkachannel 会有固定的 4 个字节的 buffer 来读长度,而后依据音讯长度申请对应大小的内容 buffer 来下一步读取内容。

    有了这个模式,那么拆包问题能够解决:音讯长度 4 个字节的 buffer 没读满,阐明拆包了,这次掠过下次持续读,读到 buffer 满了才认为长度拿到;而后内容 buffer 没读完也阐明拆包了,同理下回持续读,读满为止。读取进去的数据,会放入 stageReceives 队列里,外面存了 channel 和 receive 的对应关系,前面的办法会紧接着这里的 receive 退出到 completedReceives 外面。留神,这里一次只会把一个 receive 放入 completeReceives 外面,起因是如果一次读取进去了多个 receive 响应,心愿前面的逻辑能一条一条解决。

(6)OP_WRITE
op_write 事件,封装了 send 对象,这个 send 对象就是最外层 send 线程往 networkclient 外面增加的分组好了的 request,而后 kafkachannel 利用 nio 进行发送,如果遇到拆包没发完,那就屡次发送,发完了当前,移除 channel 的 write 事件,而后把收回去的申请,退出到 selector 的 completeSends 队列里。

所以综上所述,selectable 的 poll 的流程,其实是应用 nio 解决对应的事件,而后把事件后果写入到 selectable 的多个队列里,不便 networkclient 间接读取队列失去不同场景的数据。这种封装解藕思路值得学习。

5 networkclient 后续解决
还是参考上图,后续其实是想把返回的 receive,调用胜利或失败的回调函数。
(1)handleCompletedSend, 其实是把 ack= 0 的状况立即返回了,因为不必等网络申请,只有 accumulator 存进去根本就能够认为发送胜利了,而后封装个 response 放个列表。

(2)handleCompletedReceive, 这里把收集到的 receive 封装个 response,也放入列表

(3)handleTimeoutRequests, 对超时的申请标记下 node 状态,下回从新拉元数据

(4)最终,遍历所有 response,调用 producer 胜利的 response,发送正式结束。

正文完
 0