关于java:kafka原理剖析2producer元数据的获取

5次阅读

共计 2599 个字符,预计需要花费 7 分钟才能阅读完成。

1 整体流程

(1)自定义音讯拦截器,个别没啥用。(2)同步期待,拉取元数据。第一次发 topic 须要拉元数据,是懒加载思维。拉取的是 cluster 的信息。cluster 蕴含了集群 topic-broker-partition 等信息。(3)对 topic 和 key 和 value 进行序列化,转化成 byte[]数组(4)依据 Partitioner 对 key 和 value 计算,失去要发送到哪个分区(5)判断音讯大小,不能大于单条申请限度大小和缓冲区大小。(6)绑定音讯回调函数和拦截器回调函数(7)发送音讯到 Accumulator(8)欢送 sender 线程。如果 batchisfull 代表一个批次已满,或者有了新批次,都代表有批次可发,都会唤醒 sender 线程。

2 元数据的获取
发消息的时候,producer 只晓得 topic 而已,第一次发的时候元数据是不晓得的。

先看 producer 的 send 办法
(1)首先要获取 topic 的元数据,真正 dosend 的第一步 waitOnMetadata,进行阻塞,直到取得元数据。
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
(2)把须要发送的 topic 记录在 metadata 的 topic 的 map 中。
(3)先从 metadata 外面拿到集群信息,如果第一次发这个 topic 信息,那么拿不到元数据。前面再来就能拿到间接返回了。
(4)把 metadata 的 needupdate 设置为 true,并且记录下以后的元数据 version,为当前比照用。
(5)唤醒 sender 线程,而后本人就 awaite 阻塞了。awaitUpdate(final int lastVersion, final long maxWaitMs)
await 过程比较简单,就是个 while 循环,依据配置的超时工夫,计算出还剩的工夫,而后 wait 期待,要么两头 sender 线程唤醒,要么到工夫本人醒过来,而后看看版本更新没,更新了阐明数据拉到了,没更新。

留神,整个过程里,producer 都治理个超时工夫,计算剩下的工夫,一旦超过,就报超时。

producer 的 send 期待了,sender 线程在干什么,怎么唤醒 producer 的 send 线程呢?看看 sender 线程:
(1)sender 自身也是个线程,在 kafkaProducer 启动的时候一起启动起来,外面是个 while 死循环跑 run 办法。
(2)this.client.ready,检查和 broker 是否建设好连贯,没建设就发动连贯。

    查看连贯:connectionStates.canConnect(node.idString(), now))
    发动连贯:initiateConnect(Node node, long now)
    一些要害参数:// 连贯非阻塞
         socketChannel.configureBlocking(false);
         // keepalive 2 小时主动探活,socket.setKeepAlive(true);
         // 敞开 nagle 算法,不组合小数据包发送,升高提早
         socket.setTcpNoDelay(true);

因为建设连贯是非阻塞的,这里发动连贯间接就走,前面有中央期待连贯实现。

(3)因为启动连贯都没有,两头很多过程能够省略,间接看 sender 的 run 的最初,
this.client.poll(pollTimeout, now) -> metadataUpdater.maybeUpdate(now); 这里是封装一个拉取元数据的申请,
个别状况只针对咱们发送的 topic 拉取元数据信息,封装一个 clientRequest, 调用 dosend 办法,目标是把这个元数据申请放入 inFlightRequests 队列,并退出到 kafka 本人封装的 Selectable 的 kafkaChannel 的发送对象中,kafkaChannel 一次只会发一个申请,这个组件在服务端也会用到。牵强附会,申请放入 kafkachannel,那么前面必定有 java 的 channel 进行下一步发送。

doSend 追下去,还有个重要的局部,把对应的 connect 关注 op_write 事件,:

(4)this.client.poll(pollTimeout, now)
-> this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
-> pollSelectKeys
kafka 本人封装的 selector 间接解决了各种场景,通过辨别 selectkey 关注的事件,解决不同场景,在这里先看连贯场景:

通过 finneshConnect, 期待连贯建设实现(因为上一步发动连贯是非阻塞的没后果,要用连贯就得等着实现),同时通过底层组件 TransportLayer 把 selectkey 勾销 connect 事件,减少 op_read 事件,
因为第三步,增加了 op_write 事件,所以这里连贯实现后,也会进入前面的 write 分支,把方才封装好的元数据 send 申请,通过底层 cannel 收回去,并且记录在 completedSends, 阐明发送胜利。

(5)现实状况下,过一段时间,服务器返回 response 了,那么理当走到 op_read 的逻辑,读取 response 数据放入 stageReceives 队列里,

(6)回到 NetworkClient 的外围 poll,第一个 maybeupdate 封装了元数据申请 request,poll 负责发送和承受,
前面会对申请和响应进行下一步解决,这里只关怀元数据,能够先只看
handleCompletedSends -> handleResponse -> this.metadata.update(cluster,now),
留神这里是集群信息更新,最重要的是 versioin+1,而后就能够回到 producer 的 send 过程里 awaiteUpdate 的中央,因为在一遍一遍的等新版本,这里新版原本了就能够往下真正发消息了。失去元数据的过程,原本也是一个发申请和承受申请的过程,这个路和发消息的过程是统一的,是对 nio 的封装和多层形象,网络组件和业务组件拆散,通过一些两头队列通信,值得思考学习。

正文完
 0