上一篇提到,因为音讯队列负载机制,会往pullRequestQueue队列放入一个个的PullRequest。

这些PullRequest会有一个专门的线程,把它取出来并封装成服务端的一个Request,发送给broker。

在发送服务端之前,须要晓得broker的地址在哪,须要从什么偏移量开始拉取(偏移量集群模式存broker,播送模式存本地),一次性拉多少数据。

broker收到申请后,就会从commitlog中,依据偏移量把所须要的音讯给取出来,因为broker的主从同步,这边返回音讯的时候,也会告知下一次拉取是从主broker拉取还是slave的broker拉取数据。

从commit拉取音讯的时候,会有这几个状况(上面疏忽offsetCheckInSlave判断):

  1. 以后音讯队列并没有音讯,则下次拉取音讯的时候,如果这个broker是主节点,还是这个偏移量。如果这个broker是从节点,则下次间接从0开始。

  1. 以后音讯队列有音讯,然而咱们拉取的偏移量比队列里最小的偏移量还小,比方咱们须要拉取100的数据,然而音讯队列的最小偏移量是500。则下次拉取音讯的时候,如果这个broker是主节点,还是这个偏移量。如果这个broker是从节点,则下次间接从500开始。

  1. 如果咱们咱们拉取的偏移量刚好等于音讯队列里最大的偏移量,所以咱们也没有数据能够生产了,那下次拉取的时候,还是100这个偏移量。

  1. 如果咱们咱们拉取的偏移量大于音讯队列里最大的偏移量,所以咱们也没有数据能够生产了,那下次拉取的时候,这里还要判断音讯队列里最小的偏移量是否等于0。

  1. 失常状况下,咱们须要的音讯是在音讯队列里最小的偏移量和最大的偏移量之间,那就间接把数据从commitlog中取出来并返回。

生产端接管broker响应后,就会依据下面各个状况的偏移量进行更新,并且把音讯存入PullRequest的processQueue中。

并且再把PullRequest放入pullRequestQueue队列中,期待下次拉取。

processQueue的音讯如果来不及生产,会始终的沉积,所以PullRequest在拉取音讯的时候,会先判断processQueue里的音讯数量是否曾经超过1000,如果超过了,则以后不拉取,并放入pullRequestQueue队列中,50ms后才能够持续拉取。

此外还会判断processQueue中音讯的大小、processQueue中队列最大偏移量与最小偏离量的间距,如果超过了阈值,也会放入pullRequestQueue队列中,50ms后才能够持续拉取。