上篇讲到音讯从broker拉取后,会把音讯存入PullRequest的processQueue中。此时就会有其余线程,对这些音讯进行生产。
其余线程包含程序生产和并发生产。
并发生产
并发生产中,用于生产音讯的线程叫做ConsumeRequest,每一个ConsumeRequest默认生产32条音讯,如果须要生产的音讯超过32条,就会创立多个ConsumeRequest。这些ConsumeRequest就会放入线程池中期待运行。
ConsumeRequest线程运行的时候,会调用监听器,拿到音讯生产的后果,并发生产的后果有两种,RECONSUME_LATER和CONSUME_SUCCESS。
RECONSUME_LATER阐明须要从新生产,但播送模式下只打印正告级别的日志,集群模式下,从新封装音讯成ConsumeRequest,再放入线程池,5s后再进行生产。
同时,也须要跟broker说,这个音讯生产失败了。broker创立重试主题,并设置音讯重试次数,当音讯拉取超过重试次数后,就会进入DLQ队列中。
有哪些音讯须要从新生产?比方这次音讯有10个,ackIndex是2,那下标为3到9的音讯,都要从新生产。如果这个后果是CONSUME_LATER,ackIndex就会被设置为-1,那就是所有的音讯,都要从新生产。
NSUME_SUCCESS阐明生产胜利,依据ackIndex计算一下胜利和失败的信息。
对于RECONSUME_LATER或NSUME_SUCCESS的音讯解决完后,就要把音讯从ProcessQueue中移除,并更新OffsetStore里的音讯生产进度。
OffsetStore里的音讯生产进度会由一个定时工作,每5s会提交给broker,而后broker再更新偏移量。所以即使音讯生产胜利了,然而还没提交给broker的时候,宕机了,那下次还是从之前的偏移量开始读取数据,导致音讯反复生产。
程序生产
程序生产中,也是封装一个ConsumeRequest,这个ConsumeRequest和并发生产的ConsumeRequest不同,他并没有音讯列表,所以音讯间接从ProcessQueue拿。
线程生产的时候,首先会获取一个锁。这个锁的粒度是音讯队列,也就是说,一个音讯生产队列,只能有一个线程进行生产。
获取锁后,就从ProcessQueue把音讯拿进去。
如果ProcessQueue有音讯,那还须要拿到ProcessQueue的ConsumeLock。
而后进行音讯生产,并通过监听器拿到生产后果的状态,并开释锁。
如果后果是SUCCESS,则更新ProcessQueue中的音讯生产信息。
如果后果是SUSPEND_CURRENT_QUEUE_A_MOMENT,则会查看音讯的重试次数。
如果这个重试次数超过容许的最大重试次数,就会把这个音讯发送给broker,且寄存在死信队列中。
如果没有超过,则会持续从新生产,并累加生产次数。
最初,再更新OffsetStore里的音讯生产进度。
OffsetStore的解决,和并发生产一样。
线程拿到messageQueueLock锁后,并不会始终持有这个锁,默认生产60s,就会开释这个锁。