乐趣区

关于kafka:Kafka又出问题了

写在后面

预计运维年前没有祭拜服务器,Nginx 的问题修复了,Kafka 又不行了。明天,原本想再睡会,后果,电话又响了。还是经营,“喂,冰河,到公司了吗?连忙看看服务器吧,又出问题了“。“在路上了,运维那哥们儿还没下班吗”?“还在休假。。。”,我:“。。。”。哎,这哥们儿是跑路了吗?先不论他,问题还是要解决。

问题重现

到公司后,放下我专用的双肩包,拿出我的利器——笔记本电脑,关上后迅速登录监控零碎,发现次要业务零碎没啥问题。一个非核心服务收回了告警,并且监控零碎中显示这个服务频繁的抛出如下异样。

2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] - 
commit failed 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
        at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

从下面输入的异样信息,大略能够判断出零碎呈现的问题:Kafka 消费者在解决完一批 poll 音讯后,在同步提交偏移量给 broker 时报错了。大略就是因为以后消费者线程的分区被 broker 给回收了,因为 Kafka 认为这个消费者挂掉了,咱们能够从上面的输入信息中能够看出这一点。

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Kafka 外部触发了 Rebalance 机制,明确了问题,接下来,咱们就开始剖析问题了。

剖析问题

既然 Kafka 触发了 Rebalance 机制,那我就来说说 Kafka 触发 Rebalance 的机会。

什么是 Rebalance

举个具体点的例子,比方某个分组下有 10 个 Consumer 实例,这个分组订阅了一个 50 个分区的主题。失常状况下,Kafka 会为每个消费者调配 5 个分区。这个调配的过程就是 Rebalance。

触发 Rebalance 的机会

当 Kafka 中满足如下条件时,会触发 Rebalance:

  • 组内成员的个数产生了变动,比方有新的消费者退出生产组,或者来到生产组。组成员来到生产组蕴含组成员解体或者被动来到生产组。
  • 订阅的主题个数产生了变动。
  • 订阅的主题分区数产生了变动。

前面两种状况咱们能够人为的防止,在理论工作过程中,对于 Kafka 产生 Rebalance 最常见的起因是生产组成员的变动。

消费者成员失常的增加和停掉导致 Rebalance,这种状况无奈防止,然而时在某些状况下,Consumer 实例会被 Coordinator 谬误地认为“已进行”从而被“踢出”Group,导致 Rebalance。

当 Consumer Group 实现 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳申请,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳申请,Coordinator 就会认为该 Consumer 曾经“死”了,从而将其从 Group 中移除,而后开启新一轮 Rebalance。这个工夫能够通过 Consumer 端的参数 session.timeout.ms 进行配置。默认值是 10 秒。

除了这个参数,Consumer 还提供了一个管制发送心跳申请频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳申请的频率就越高。频繁地发送心跳申请会额定耗费带宽资源,但益处是可能更加疾速地通晓以后是否开启 Rebalance,因为,目前 Coordinator 告诉各个 Consumer 实例开启 Rebalance 的办法,就是将 REBALANCE_NEEDED 标记封装进心跳申请的响应体中。

除了以上两个参数,Consumer 端还有一个参数,用于管制 Consumer 理论生产能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 办法的最大工夫距离。它的默认值是 5 分钟,示意 Consumer 程序如果在 5 分钟之内无奈生产完 poll 办法返回的音讯,那么 Consumer 会被动发动“来到组”的申请,Coordinator 也会开启新一轮 Rebalance。

通过下面的剖析,咱们能够看一下那些 rebalance 是能够防止的:

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的 。这种状况下咱们能够设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免 rebalance 的呈现。( 以下的配置是在网上找到的最佳实际,临时还没测试过

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保障 Consumer 实例在被断定为“dead”之前,可能发送至多 3 轮的心跳申请,即 session.timeout.ms >= 3 * heartbeat.interval.ms

将 session.timeout.ms 设置成 6s 次要是为了让 Coordinator 可能更快地定位曾经挂掉的 Consumer,早日把它们踢出 Group。

第二类非必要 Rebalance 是 Consumer 生产工夫过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为要害。如果要防止非预期的 Rebalance,最好将该参数值设置得大一点,比上游最大解决工夫稍长一点。

总之,要为业务解决逻辑留下短缺的工夫。这样,Consumer 就不会因为解决这些音讯的工夫太长而引发 Rebalance。

拉取偏移量与提交偏移量

kafka的偏移量 (offset) 是由消费者进行治理的,偏移量有两种,拉取偏移量 (position) 与提交偏移量 (committed)。拉取偏移量代表以后消费者分区生产进度。每次音讯生产后,须要提交偏移量。在提交偏移量时,kafka 会应用 拉取偏移量 的值作为分区的 提交偏移量 发送给协调者。

如果没有提交偏移量,下一次消费者从新与 broker 连贯后,会从以后消费者 group 已提交到 broker 的偏移量处开始生产。

所以,问题就在这里,当咱们解决音讯工夫太长时, 曾经被 broker 剔除,提交偏移量又会报错。所以拉取偏移量没有提交到 broker,分区又 rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始生产。这里就呈现了反复生产的问题。

异样日志提醒的计划

其实,说了这么多,Kafka 消费者输入的异样日志中也给出了相应的解决方案。

接下来,咱们说说 Kafka 中的拉取偏移量和提交偏移量。

其实,从输入的日志信息中,也大略给出了解决问题的形式,简略点来说,就是能够通过减少 max.poll.interval.ms 时长和 session.timeout.ms 时长,缩小 max.poll.records的配置值,并且生产端在解决完音讯时要及时提交偏移量。

问题解决

通过之前的剖析,咱们应该晓得如何解决这个问题了。这里须要说一下的是,我在集成 Kafka 的时候,应用的是 SpringBoot 和 Kafka 生产监听器,生产端的次要代码构造如下所示。

@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0"}) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value());
    try {Object value = record.value();
        logger.info(value.toString());
        ack.acknowledge();} catch (Exception e) {logger.error("日志生产端异样: {}", e);
    }
}

上述代码逻辑比较简单,就是获取到 Kafka 中的音讯后间接打印输出到日志文件中。

尝试解决

这里,我先依据异样日志的提示信息进行配置,所以,我在 SpringBoot 的 application.yml 文件中新增了如下配置信息。

spring:
  kafka:
    consumer:
    properties:
     max.poll.interval.ms: 3600000
     max.poll.records: 50
     session.timeout.ms: 60000
     heartbeat.interval.ms: 3000

配置实现后,再次测试消费者逻辑,发现还是抛出 Rebalance 异样。

最终解决

咱们从另一个角度来看下 Kafka 消费者所产生的问题:一个 Consumer 在生产音讯,另一个 Consumer 在生产它的音讯,它们不能在同一个 groupId 上面,更改其中一个的 groupId 即可。

这里,咱们的业务我的项目是分模块和子系统进行开发的,例如模块 A 在生产音讯,模块 B 生产模块 A 生产的音讯。此时,批改配置参数,例如 session.timeout.ms: 60000,基本不起作用,还是抛出Rebalance 异样。

此时,我尝试批改下消费者分组的 groupId,将上面的代码

@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0"}) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){

批改为如下所示的代码。

@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0"}) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){

再次测试,问题解决~~

这次解决的问题真是个奇葩啊!!接下来写个【Kafka 系列】专题,具体介绍 Kafka 的原理、源码解析和实战等内容,小伙伴们你们感觉呢?欢送文末留言探讨~~

举荐浏览

  • 为何你进不了大厂?
  • 新年下班第一天生产环境分布式文件系统崩了!!
  • 冰河公开了进大厂必备的外围技能,服了!
  • 万字长文带你图解计算机网络(超全)!!
  • 在一个执行力极差的团队工作是一种怎么的体验?
  • 肝了三天三夜整顿出这份 36 万字全网最牛的开源高并发编程 PDF!!
  • 【高并发】高并发秒杀零碎架构解密,不是所有的秒杀都是秒杀!
  • 【高并发】高并发分布式锁架构解密,不是所有的锁都是分布式锁(升级版)!!

好了,明天就到这儿吧,我是冰河,大家有啥问题能够在下方留言,也能够加我微信:sun_shine_lyz,我拉你进群,一起交换技术,一起进阶,一起牛逼~~

退出移动版