关于kafka:kafka消费组

45次阅读

共计 2260 个字符,预计需要花费 6 分钟才能阅读完成。

conusmer 分区调配是通过组治理协定来施行的:具体如下:

consumer group 外面的各个 consumer 都向 group coordinator 发送 JoinGroup 申请,这样 group coordinator 就有了所有 consumer 的成员信息,于是它从中选出一个 consumer 作为 Leader consumer,并通知 Leader consumer 说:你拿着这些成员信息和我给你的 topic 分区信息去安顿一下哪些 consumer 负责生产哪些分区吧

接下来,Leader consumer 就依据咱们配置的调配策略 (由参数 partition.assignment.strategy 指定) 为各个 consumer 计算好了各自待生产的分区。于是,各个 consumer 向 group coordinator 发送 SyncGroup 申请,但只有 Leader consumer 的申请中有分区调配策略,group coordinator 收到 leader consumer 的分区调配计划后,把该计划下发给各个 consumer。画个图,就是上面这样的:

而在失常状况下,当有 consumer 进出 consumer group 时就会触发 rebalance,所谓 rebalance 就是从新制订一个分区调配计划。而制订好了分区调配计划,就得及时告知各个 consumer,这就与 heartbeat.interval.ms 参数无关了。具体说来就是:每个 consumer 都会依据 heartbeat.interval.ms 参数指定的工夫周期性地向 group coordinator 发送 hearbeat,group coordinator 会给各个 consumer 响应,若产生了 rebalance,各个 consumer 收到的响应中会蕴含 REBALANCE_IN_PROGRESS 标识,这样各个 consumer 就晓得曾经产生了 rebalance,同时 group coordinator 也晓得了各个 consumer 的存活状况。

那为什么要把 heartbeat.interval.ms 与 session.timeout.ms 进行比照呢?session.timeout.ms 是指:group coordinator 检测 consumer 产生解体所需的工夫。一个 consumer group 外面的某个 consumer 挂掉了,最长须要 session.timeout.ms 秒检测进去。

举个示例 session.timeout.ms=10,heartbeat.interval.ms=3,session.timeout.ms 是个 ” 逻辑 ” 指标,它指定了一个阈值 —10 秒,在这个阈值内如果 coordinator 未收到 consumer 的任何音讯,那 coordinator 就认为 consumer 挂了。而 heartbeat.interval.ms 是个 ” 物理 ” 指标,它通知 consumer 要每 3 秒给 coordinator 发一个心跳包,heartbeat.interval.ms 越小,发的心跳包越多,它是会影响发 TCP 包的数量的,产生了理论的影响,这也是我为什么将之称为 ” 物理 ” 指标的起因。

如果 group coordinator 在一个 heartbeat.interval.ms 周期内未收到 consumer 的心跳,就把该 consumer 移出 group,这有点说不过去。就如同 consumer 犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能 consumer 呈现了一次长时间 GC,影响了心跳包的达到,说不定下一个 heartbeat 就失常了。

而 heartbeat.interval.ms 必定是要小于 session.timeout.ms 的,如果 consumer group 产生了 rebalance,通过心跳包外面的 REBALANCE_IN_PROGRESS,consumer 就能及时晓得产生了 rebalance,从而更新 consumer 可生产的分区。而如果超过了 session.timeout.ms,group coordinator 都认为 consumer 挂了,那也当然不必把 rebalance 信息通知该 consumer 了。

在 while true 循环中执行 consumer.poll 拉取音讯这个过程中,其实背地是有 2 个线程的,即一个 kafka consumer 实例蕴含 2 个线程:一个是 heartbeat 线程,另一个是 processing 线程,processing 线程可了解为调用 consumer.poll 办法执行音讯解决逻辑的线程,而 heartbeat 线程是一个后盾线程,对程序员是 ” 暗藏不见 ” 的。如果音讯解决逻辑很简单,比如说须要解决 5min,那么 max.poll.interval.ms 可设置成比 5min 大一点的值。而 heartbeat 线程则和下面提到的参数 heartbeat.interval.ms 无关,heartbeat 线程 每隔 heartbeat.interval.ms 向 coordinator 发送一个心跳包,证实本人还活着。

在用户线程中,个别会做一些失败的重试解决。原本生产解决须要很长的工夫,如果某个 consumer 解决超时:音讯解决逻辑的时长大于 max.poll.interval.ms (或者音讯处理过程中产生了异样),被 coordinator 移出了 consumer 组,这时因为失败的重试解决,主动从线程池中拿出一个新线程作为消费者去订阅 topic,那么意味着有新消费者退出 group,就会引发 rebalance

正文完
 0