1 整体流程
(1)自定义音讯拦截器,个别没啥用。(2)同步期待,拉取元数据。第一次发topic须要拉元数据,是懒加载思维。拉取的是cluster的信息。cluster蕴含了集群topic-broker-partition等信息。(3)对topic和key和value进行序列化,转化成byte[]数组(4)依据Partitioner对key和value计算,失去要发送到哪个分区(5)判断音讯大小,不能大于单条申请限度大小和缓冲区大小。(6)绑定音讯回调函数和拦截器回调函数(7)发送音讯到Accumulator(8)欢送sender线程。如果batchisfull代表一个批次已满,或者有了新批次,都代表有批次可发,都会唤醒sender线程。
2 元数据的获取
发消息的时候,producer只晓得topic而已,第一次发的时候元数据是不晓得的。
先看producer的send办法
(1) 首先要获取topic的元数据,真正dosend的第一步waitOnMetadata,进行阻塞,直到取得元数据。
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
(2)把须要发送的topic记录在metadata的topic的map中。
(3)先从metadata外面拿到集群信息,如果第一次发这个topic信息,那么拿不到元数据。前面再来就能拿到间接返回了。
(4)把metadata的needupdate设置为true,并且记录下以后的元数据version,为当前比照用。
(5)唤醒sender线程,而后本人就awaite阻塞了。awaitUpdate(final int lastVersion, final long maxWaitMs)
await过程比较简单,就是个while循环,依据配置的超时工夫,计算出还剩的工夫 ,而后wait期待,要么两头sender线程唤醒,要么到工夫本人醒过来,而后看看版本更新没,更新了阐明数据拉到了,没更新。
留神,整个过程里,producer都治理个超时工夫,计算剩下的工夫,一旦超过,就报超时。
producer的send期待了,sender线程在干什么,怎么唤醒producer的send线程呢?看看sender线程:
(1)sender自身也是个线程,在kafkaProducer启动的时候一起启动起来,外面是个while死循环跑run办法。
(2)this.client.ready ,检查和broker是否建设好连贯,没建设就发动连贯。
查看连贯:connectionStates.canConnect(node.idString(), now)) 发动连贯:initiateConnect(Node node, long now) 一些要害参数: //连贯非阻塞 socketChannel.configureBlocking(false); // keepalive 2小时主动探活, socket.setKeepAlive(true); //敞开nagle算法,不组合小数据包发送,升高提早 socket.setTcpNoDelay(true);
因为建设连贯是非阻塞的,这里发动连贯间接就走,前面有中央期待连贯实现。
(3)因为启动连贯都没有,两头很多过程能够省略,间接看sender的run的最初,
this.client.poll(pollTimeout, now) -> metadataUpdater.maybeUpdate(now);这里是封装一个拉取元数据的申请,
个别状况只针对咱们发送的topic拉取元数据信息,封装一个clientRequest,调用dosend办法,目标是把这个元数据申请放入inFlightRequests队列,并退出到kafka本人封装的Selectable的kafkaChannel的发送对象中,kafkaChannel一次只会发一个申请,这个组件在服务端也会用到。牵强附会,申请放入kafkachannel,那么前面必定有java的channel进行下一步发送。
doSend追下去,还有个重要的局部,把对应的connect关注op_write事件,:
(4)this.client.poll(pollTimeout, now)
-> this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
-> pollSelectKeys
kafka本人封装的selector 间接解决了各种场景,通过辨别selectkey关注的事件,解决不同场景,在这里先看连贯场景:
通过finneshConnect,期待连贯建设实现(因为上一步发动连贯是非阻塞的没后果,要用连贯就得等着实现),同时通过底层组件TransportLayer把selectkey勾销connect事件,减少op_read事件,
因为第三步,增加了op_write事件,所以这里连贯实现后,也会进入前面的write分支,把方才封装好的元数据send申请,通过底层cannel收回去,并且记录在completedSends,阐明发送胜利。
(5)现实状况下,过一段时间,服务器返回response了,那么理当走到op_read的逻辑,读取response数据放入stageReceives队列里,
(6)回到NetworkClient的外围poll,第一个maybeupdate封装了元数据申请request,poll负责发送和承受,
前面会对申请和响应进行下一步解决,这里只关怀元数据,能够先只看
handleCompletedSends -> handleResponse -> this.metadata.update(cluster,now) ,
留神这里是集群信息更新,最重要的是versioin+1,而后就能够回到producer的send过程里awaiteUpdate的中央,因为在一遍一遍的等新版本,这里新版原本了就能够往下真正发消息了。失去元数据的过程,原本也是一个发申请和承受申请的过程,这个路和发消息的过程是统一的,是对nio的封装和多层形象,网络组件和业务组件拆散,通过一些两头队列通信,值得思考学习。