共计 1291 个字符,预计需要花费 4 分钟才能阅读完成。
上一篇曾经晓得元数据在什么时候拉取的,咱们这里看看他整个拉取的流程。
topics
元素的拉取,是依据生产者的发送的 topic 来拉取的,并不是拉取所有的元数据,所以在发送音讯的时候,就会把这个 topic 寄存在 topics 中,这个 topics 就是记录了以后已有的 topic,数据类型是 map,他的 key 就是 topic,value 刚开始默认为 -1。
所以 topic1 发送的时候,就会在 topics 新增一个 topic1->- 1 的键值对。
版本号和更新标识
因为元数据的拉取,是由 Sender 线程来执行的(这个前面流程会讲到),所以须要一个更新标识 needUpdate,来告知 Sender 线程,我须要拉取元数据。
同时,Sender 线程在胜利拉取元数据后,也须要告知其余线程,这里就须要一个版本号 version,每次胜利拉取,就进行累加,所以其余线程就会拿本人的版本号跟内存里的版本号进行比照,发现内存的版本号比本人大,那就是拉取元数据胜利了。
在拉取的时候,此时 version=0,以后线程保留的 version 也是 0,needUpdate 为 true,阐明须要拉取元数据。
sender 线程
sender 线程有一个死循环,他会始终运行,元数据的发送以及音讯的发送,都要通过 ender 线程。
版本号和更新标识筹备好后,就开始唤醒 sender 线程,实际上 sender 线程最终会唤醒 NIO 的 Selector 上的线程。
这里波及到网络传输局部,咱们就简要的讲一下,网络传输等前面再讲。简略的说,就是 sender 线程会把申请交给 NIO 的 Selector,而后再解决 Selector 收到的申请。
sender 线程解决完音讯后,就会更新元数据里的信息,而后版本号累加,needUpdate 改为 false。
下面 topics 中,topic1 对应的 value 是 1,此时曾经拉取到元数据了,这里的 value 就要改为以后工夫 +5 60 1000,即以后工夫 + 5 分钟。意思是当某个 topic 超过 5 分钟没有发送音讯,就会从 topics 中移除,下次更新的时候,就不会拉取这个 topic 的元数据信息。
休眠
sender 线程辛辛苦苦拉取元数据的时候,发送音讯的线程在干什么呢?
他进入了休眠,也就是 wait,被唤醒有两种状况,一个是 sender 线程更新完元数据再唤醒他,另外一个是休眠工夫到了,他会有一个最大等待时间,默认 60s。
被唤醒后,就会查看以后拉取的工夫是不是超过了,如果超过 60s 还没有拉取到元数据,此时就要抛异样的。
另外还要判断,以后保留的版本号是不是小于内存的版本号,如果小于,他就晓得更新胜利了,反之,阐明还没更新胜利,他就会持续休眠,期待下一次唤醒。
缓存
Sender 拉取元数据后,是保留在内存中的,这样下次发送音讯的时候,就间接从内存拿了,并不会每次都向下面的流程一样,一次次的从 Kafka 拉取元数据,既升高了生产者发送音讯的效率,也加大了 Kafka 的压力。
如果某个 topic 在 5 分钟内会至多发一条音讯,那个这个 topic 就会保留在 topics 中。生产者客户端每 5 分钟就会更新元数据,所以继续发送的音讯,他在缓存中的元数据都会始终的更新。