conusmer分区调配是通过组治理协定来施行的:具体如下:
consumer group外面的各个consumer都向 group coordinator发送JoinGroup申请,这样group coordinator就有了所有consumer的成员信息,于是它从中选出一个consumer作为Leader consumer,并通知Leader consumer说:你拿着这些成员信息和我给你的topic分区信息去安顿一下哪些consumer负责生产哪些分区吧
接下来,Leader consumer就依据咱们配置的调配策略(由参数partition.assignment.strategy指定)为各个consumer计算好了各自待生产的分区。于是,各个consumer向 group coordinator 发送SyncGroup申请,但只有Leader consumer的申请中有分区调配策略,group coordinator 收到leader consumer的分区调配计划后,把该计划下发给各个consumer。画个图,就是上面这样的:
而在失常状况下 ,当有consumer进出consumer group时就会触发rebalance,所谓rebalance就是从新制订一个分区调配计划。而制订好了分区调配计划,就得及时告知各个consumer,这就与 heartbeat.interval.ms参数无关了。具体说来就是:每个consumer 都会依据 heartbeat.interval.ms 参数指定的工夫周期性地向group coordinator发送 hearbeat,group coordinator会给各个consumer响应,若产生了 rebalance,各个consumer收到的响应中会蕴含 REBALANCE_IN_PROGRESS 标识,这样各个consumer就晓得曾经产生了rebalance,同时 group coordinator也晓得了各个consumer的存活状况。
那为什么要把 heartbeat.interval.ms 与 session.timeout.ms 进行比照呢?session.timeout.ms是指:group coordinator检测consumer产生解体所需的工夫。一个consumer group外面的某个consumer挂掉了,最长须要 session.timeout.ms 秒检测进去。
举个示例session.timeout.ms=10,heartbeat.interval.ms=3,session.timeout.ms是个"逻辑"指标,它指定了一个阈值---10秒,在这个阈值内如果coordinator未收到consumer的任何音讯,那coordinator就认为consumer挂了。而heartbeat.interval.ms是个"物理"指标,它通知consumer要每3秒给coordinator发一个心跳包,heartbeat.interval.ms越小,发的心跳包越多,它是会影响发TCP包的数量的,产生了理论的影响,这也是我为什么将之称为"物理"指标的起因。
如果group coordinator在一个heartbeat.interval.ms周期内未收到consumer的心跳,就把该consumer移出group,这有点说不过去。就如同consumer犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能consumer呈现了一次长时间GC,影响了心跳包的达到,说不定下一个heartbeat就失常了。
而heartbeat.interval.ms必定是要小于session.timeout.ms的,如果consumer group产生了rebalance,通过心跳包外面的REBALANCE_IN_PROGRESS,consumer就能及时晓得产生了rebalance,从而更新consumer可生产的分区。而如果超过了session.timeout.ms,group coordinator都认为consumer挂了,那也当然不必把 rebalance信息通知该consumer了。
在while true循环中执行consumer.poll拉取音讯这个过程中,其实背地是有2个线程的,即一个kafka consumer实例蕴含2个线程:一个是heartbeat 线程,另一个是processing线程,processing线程可了解为调用consumer.poll办法执行音讯解决逻辑的线程,而heartbeat线程是一个后盾线程,对程序员是"暗藏不见"的。如果音讯解决逻辑很简单,比如说须要解决5min,那么 max.poll.interval.ms可设置成比5min大一点的值。而heartbeat 线程则和下面提到的参数 heartbeat.interval.ms无关,heartbeat线程 每隔heartbeat.interval.ms向coordinator发送一个心跳包,证实本人还活着。
在用户线程中,个别会做一些失败的重试解决。原本生产解决须要很长的工夫,如果某个consumer解决超时:音讯解决逻辑的时长大于max.poll.interval.ms (或者音讯处理过程中产生了异样),被coordinator移出了consumer组,这时因为失败的重试解决,主动从线程池中拿出一个新线程作为消费者去订阅topic,那么意味着有新消费者退出group,就会引发 rebalance