上一篇曾经晓得元数据在什么时候拉取的,咱们这里看看他整个拉取的流程。

topics

元素的拉取,是依据生产者的发送的topic来拉取的,并不是拉取所有的元数据,所以在发送音讯的时候,就会把这个topic寄存在topics中,这个topics就是记录了以后已有的topic,数据类型是map,他的key就是topic,value刚开始默认为-1。
所以topic1发送的时候,就会在topics新增一个topic1->-1的键值对。

版本号和更新标识

因为元数据的拉取,是由Sender线程来执行的(这个前面流程会讲到),所以须要一个更新标识needUpdate,来告知Sender线程,我须要拉取元数据。

同时,Sender线程在胜利拉取元数据后,也须要告知其余线程,这里就须要一个版本号version,每次胜利拉取,就进行累加,所以其余线程就会拿本人的版本号跟内存里的版本号进行比照,发现内存的版本号比本人大,那就是拉取元数据胜利了。

在拉取的时候,此时version=0,以后线程保留的version也是0,needUpdate为true,阐明须要拉取元数据。

sender线程

sender线程有一个死循环,他会始终运行,元数据的发送以及音讯的发送,都要通过ender线程。

版本号和更新标识筹备好后,就开始唤醒sender线程,实际上sender线程最终会唤醒NIO的Selector上的线程。

这里波及到网络传输局部,咱们就简要的讲一下,网络传输等前面再讲。简略的说,就是sender线程会把申请交给NIO的Selector,而后再解决Selector收到的申请。

sender线程解决完音讯后,就会更新元数据里的信息,而后版本号累加,needUpdate改为false。

下面topics中,topic1对应的value是1,此时曾经拉取到元数据了,这里的value就要改为以后工夫+5 60 1000,即以后工夫+5分钟。意思是当某个topic超过5分钟没有发送音讯,就会从topics中移除,下次更新的时候,就不会拉取这个topic的元数据信息。

休眠

sender线程辛辛苦苦拉取元数据的时候,发送音讯的线程在干什么呢?

他进入了休眠,也就是wait,被唤醒有两种状况,一个是sender线程更新完元数据再唤醒他,另外一个是休眠工夫到了,他会有一个最大等待时间,默认60s。

被唤醒后,就会查看以后拉取的工夫是不是超过了,如果超过60s还没有拉取到元数据,此时就要抛异样的。

另外还要判断,以后保留的版本号是不是小于内存的版本号,如果小于,他就晓得更新胜利了,反之,阐明还没更新胜利,他就会持续休眠,期待下一次唤醒。

缓存

Sender拉取元数据后,是保留在内存中的,这样下次发送音讯的时候,就间接从内存拿了,并不会每次都向下面的流程一样,一次次的从Kafka拉取元数据,既升高了生产者发送音讯的效率,也加大了Kafka的压力。

如果某个topic在5分钟内会至多发一条音讯,那个这个topic就会保留在topics中。生产者客户端每5分钟就会更新元数据,所以继续发送的音讯,他在缓存中的元数据都会始终的更新。