关于kafka:聊聊-Kafka如何避免消费组的-Rebalance

76次阅读

共计 9530 个字符,预计需要花费 24 分钟才能阅读完成。

一、前言

咱们上一篇聊了 Rebalance 机制,置信你对生产组的重均衡有个整体的意识。这里再简略回顾一下,Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何生产订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,Consumer Group 下所有的 Consumer 实例独特参加,在 Coordinator 协调者组件的帮忙下,实现订阅主题分区的调配。然而,在整个过程中,所有实例都不能生产任何音讯,因而它对 Consumer 的 TPS 影响很大。像不像 JVM 的 GC?咱们晓得 JVM 频繁 GC 的话,对时延敏感的业务来说,几乎是噩梦,所以咱们会针对 GC 进行相应的调优,让 JVM 不那么频繁的产生 STW。

Kafka 生产组的重均衡也是相似的,生产组产生重均衡,Consumer 就很慢,对于实时性不敏感的业务,慢一点也能承受,就怕 Consumer 解决业务超时了,生产组把 Consumer 踢出去了,业务设置重试机制,主动从线程池中拿出一个新线程作为消费者去订阅 topic,那么意味着有新消费者退出 Consumer Group,又会引发 Rebalance,新的消费者还是来不及解决完所有音讯,又被移出 Consumer Group。如此循环,就产生了频繁的 Rebalance 景象。

二、Rebalance 的弊病

  • Rebalance 影响 Consumer 端 TPS,Coordinator 协调者组件实现订阅主题分区的调配的过程,该生产组下所有实例都不能生产任何音讯。
  • 如果你的组成员消费者实例很多的话,Rebalance 很慢,对业务会造成肯定的影响。
  • Rebalance 效率不高。以后 Kafka 的设计机制决定了每次 Rebalance 时,Consumer Group 下的所有成员都要参加进来,而且通常不会思考局部性原理,但局部性原理对晋升零碎性能是特地重要的。

对于第三点,你是不是感觉,Kafka 社区让所有成员都要参加进来很不合理啊,应该把那个退出生产组的消费者负责的分区随机调配给其它消费者,其它消费者的分区调配策略不变,这样就最大限度地缩小 Rebalance 对残余 Consumer 成员的冲击。

没错,你想到的社区也想到了,社区于 0.11.0.0 版本推出了 StickyAssignor,即有粘性的分区调配策略。所谓的有粘性,是指每次 Rebalance 时,该策略会尽可能地保留之前的调配计划,尽量实现分区调配的最小变动。不过有些遗憾的是,这个策略目前还有一些 bug,而且须要降级到 0.11.0.0 能力应用,因而在理论生产环境中用得还不是很多。

你可能会问了,社区对下面的弊病有没有什么解决办法?没有,特地是 Rebalance 慢这个问题,Kafka 社区对此无能为力。设计就是这样的话,那咱们是不是能够尽可能去躲避 Rebalance 呢,特地是那些不必要的 Rebalance。

三、触发 Rebalance 机制的机会

要防止 Rebalance,还是要从触发 Rebalance 机制的机会动手。咱们在后面说过,触发 Rebalance 机制的机会次要有以下几个:

  • 有新的 Consumer 退出 Consumer Group
  • 有 Consumer 宕机下线。Consumer 并不一定须要真正下线,例如遇到长时间的 GC、网络提早导致消费者长时间未向 GroupCoordinator 发送 HeartbeatRequest 时,GroupCoordinator 会认为 Consumer 下线。
  • 有 Consumer 被动退出 Consumer Group(发送 LeaveGroupRequest 申请)。比方客户端调用了 unsubscribe() 办法勾销对某些主题的订阅。
  • Consumer 生产超时,没有在指定工夫内提交 offset 偏移量。
  • Consumer Group 所对应的 GroupCoordinator 节点产生了变更。
  • Consumer Group 所订阅的任一主题或者主题的分区数量发生变化。

四、Rebalance 实战

有点形象是不是?没有关系,上面来看个例子,老周简略来模仿一下 Rebalance。

4.1 生产者

/**
 * @author: 微信公众号【老周聊架构】*/
public class KafkaProducerRebalanceTest {public static void main(String[] args) {Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        KafkaProducer<String,String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {ProducerRecord record = new ProducerRecord<>("topic_test", "userName", "riemann_" + i);
            producer.send(record);
        }
        producer.close();}
}

4.2 消费者

/**
 * @author: 微信公众号【老周聊架构】*/
public class KafkaConsumerRebalanceTest {public static void main(String[] args) {Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_test");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "topic_test";
        long pollTimeout = 100;
        long sleep = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);

        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {System.out.println("Partition Revoked");
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {System.out.println("New assignment :" + collection.size() + "partitions");
            }
        });

        while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeout));
            for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                try {Thread.sleep(sleep);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        }
    }
}

4.3 终端日志

消费者日志

[Consumer clientId=consumer-consumer_group_test-1, groupId=consumer_group_test] Member consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061 sending LeaveGroup request to coordinator 127.0.0.1:9092 (id: 2147483643 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)


服务端日志


那生产组为啥会呈现 Rebalance 呢?从消费者日志能够看出,生产超时,导致生产线程长时间无奈向 Coordinator 节点发送心跳,Coordinator 节点认为 Consumer 曾经宕机,Coordinator 于是将 Consumer 节点从生产组中剔除,并触发了 Rebalance 机制。

这其实和 Consumer 的心跳发送机制也有关系,在大多数中间件的设计中,都会拆散业务线程和心跳发送线程,而 Kafka 却没有这样做,其目标可能是为了实现简略。如果消费者生产业务的确须要十分长时间,咱们能够通过参数 max.poll.interval.ms 配置,它代表生产两次 poll 最大的工夫距离,默认是 300000 ms,也就是 5 分钟,5 分钟都还超时,那能够再调大一点;或者咱们能够缩小 consumer 每次从 broker 拉取的数据量,能够通过参数 max.poll.records 配置,consumer 默认拉取 500 条,咱们能够将其批改了 200 条。

Kafka 在 0.10.1 版本中修改了 Consumer 的心跳发送机制,将心跳发送的工作交给了专门的 HeartbeatThread,而不是像晚期版本那样依赖于用户利用线程来定期轮询。这个设计被证实是相当辣手的调整,减少会话超时将为音讯解决提供更多的工夫,但消费者组也将花更多的工夫来检测过程解体等故障。Kafka 消费者 0.10.1 引入了 max.poll.interval.ms 来解耦解决超时和会话超时。这个 max.poll.interval.ms 参数还是很有意义的,因为即便心跳发送失常,那也只能证实 Consumer 是存活状态,然而 Consumer 可能处于假死状态,比方 Consumer 遇到了死锁导致长时间期待超过了 poll 设定的工夫距离 max.poll.interval.ms

五、Rebalance 问题解决思路

咱们下面第三点说的触发 Rebalance 机制的机会有好几点,其实次要就三大类:

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

前面两个通常都是运维的被动操作,所以它们引发的 Rebalance 大都是不可避免的。接下来,咱们次要来说说不必要的 Rebalance 该如何防止,也就是组成员数量变动而引发的 Rebalance 该如何防止。

如果 Consumer Group 下的 Consumer 实例数量发生变化,就肯定会引发 Rebalance。这是 Rebalance 产生的最常见的起因。

Consumer 实例减少的状况很好了解,当咱们启动一个配置有雷同 group.id 值的 Consumer 程序时,实际上就向这个 Consumer Group 增加了一个新的 Consumer 实例。此时,Coordinator 会接收这个新实例,将其退出到组中,并重新分配分区。通常来说,减少 Consumer 实例的操作都是打算内的,可能是出于减少 TPS 或进步伸缩性的须要。总之,它不属于咱们要躲避的那类“不必要 Rebalance”。

咱们更在意的是 Consumer Group 下实例数缩小这件事。如果你就是要停掉某些 Consumer 实例,那自不必说,要害是在某些状况下,Consumer 实例会被 Coordinator 谬误地认为“已进行”从而被“踢出”Consumer Group。如果是这个起因导致的 Rebalance,咱们就不能不论了。

Coordinator 会在什么状况下认为某个 Consumer 实例已挂从而要退组呢?那就要来看看消费者端配置的几个参数:

  • session.timeout.ms 设置了超时工夫
  • heartbeat.interval.ms 心跳工夫距离
  • max.poll.interval.ms 每次生产的解决工夫
  • max.poll.records 每次生产的音讯数

5.1 session.timeout.ms

Consumer 与 Broker 的心跳超时工夫,默认 10s,Broker 如果超过 session.timeout.ms 设定的值依然没有收到心跳,Broker 端将会将该消费者移除,并触发 Rebalance。

这个值必须设置在 Broker 配置中的 group.min.session.timeout.msgroup.max.session.timeout.ms 之间。

该参数和 heartbeat.interval.ms 这两个参数能够适当的管制 Rebalance 的频率。

5.2 heartbeat.interval.ms

心跳间隔时间。心跳是在 Consumer 与 Coordinator 之间进行的。心跳用来放弃 Consumer 的会话,并且在有 Consumer 退出或者来到 Consumer Group 时帮忙进行 Rebalance。

这个值必须设置的小于 session.timeout.ms,因为:当 Consumer 因为某种原因不能发 Heartbeat 到 Coordinator 时,并且工夫超过 session.timeout.ms 时,就会认为该 Consumer 已退出,它所订阅的 Partition 会调配到同一 Consumer Group 内的其它的 Consumer 上。

通常设置的值要低于 session.timeout.ms1/3。默认值是:3s

5.3 max.poll.interval.ms

两次 poll 办法调用的最大间隔时间,单位毫秒,默认为 5 分钟。如果生产端在该距离内没有发动 poll 操作,该消费者将被剔除,触发重均衡,将该消费者调配的队列调配给其余消费者。

Kafka 中有一个专门的心跳线程来实现发送心跳的动作,所以存在 Consumer Client 仍旧能够无效的发送心跳,但 Consumer 理论却处于 livelock(活锁)状态,从而导致无奈无效的进行数据处理,所以基于此 Kafka 通过参数 max.poll.interval.ms 来躲避该问题。

5.4 max.poll.records

Consumer 每次调用 poll() 时取到的 records 的最大数。每执行一次 poll 办法所拉取的最大数据量;是基于所调配的所有 Partition 而言的数据总和,而非每个 Partition 上拉去的最大数据量;默认值为 500

艰深点讲示意每次生产的时候,获取多少条音讯。获取的音讯条数越多,须要解决的工夫越长。所以每次拉取的音讯数不能太多,须要保障在 max.poll.interval.ms 设置的工夫内能生产完,否则会产生 rebalance。

六、如何防止 Rebalance

简略来说,非必要 Rebalance 有上面两个点:

  • 消费者心跳超时,导致 Rebalance。
  • 消费者解决工夫过长,导致 Rebalance。

6.1 消费者心跳超时

咱们晓得消费者是通过心跳和协调者放弃通信的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发动 Rebalance。

这里给一下业界支流举荐的值,能够依据本人的业务可做相应的调整:

  • 设置 session.timeout.ms = 6s
  • 设置 heartbeat.interval.ms = 2s
  • 要保障 Consumer 实例在被断定为“dead”之前,可能发送至多 3 轮的心跳申请,即 session.timeout.ms >= 3 * heartbeat.interval.ms

这里你可能会问了,为啥要 session.timeout.ms >= 3 * heartbeat.interval.ms,而不是 5 或者 10 呢?

session.timeout.ms 是个 ” 逻辑 ” 指标,比方它指定了一个阈值 6 秒,在这个阈值内如果 Coordinator 未收到 Consumer 的任何音讯,那 Coordinator 就认为 Consumer 挂了。而 heartbeat.interval.ms 是个 ” 物理 ” 指标,它通知 Consumer 要每 2 秒给 Coordinator 发一个心跳包,heartbeat.interval.ms 越小,发的心跳包越多,它是会影响发 TCP 包的数量的,产生了理论的影响,这也是我为什么将之称为 ” 物理 ” 指标的起因。

如果 Coordinator 在一个 heartbeat.interval.ms 周期内未收到 Consumer 的心跳,就把该 Consumer 移出 Consumer Group,这有点说不过去。就如同 Consumer 犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能 Consumer 呈现了一次长时间 GC,影响了心跳包的达到,说不定下一个 Heartbeat 就失常了。

heartbeat.interval.ms 必定是要小于 session.timeout.ms 的,如果 Consumer Group 产生了 Rebalance,通过心跳包外面的 REBALANCE_IN_PROGRESS,Consumer 就能及时晓得产生了 Rebalance,从而更新 Consumer 可生产的分区。而如果超过了 session.timeout.ms,Coordinator 都认为 Consumer 挂了,那也当然不必把 Rebalance 信息通知该 Consumer 了。

Kafka 0.10.1 之后 的版本中,将 session.timeout.msmax.poll.interval.ms 解耦了。也就是说:new KafkaConsumer 对象后,在 while true 循环中执行 consumer.poll 拉取音讯这个过程中,其实背地是有 2 个线程的,即一个 Consumer 实例蕴含 2 个线程:一个是 Heartbeat 线程,另一个是 Processing 线程。Processing 线程可了解为调用 consumer.poll 办法执行音讯解决逻辑的线程,而 Heartbeat 线程是一个后盾线程,对程序员是 ” 暗藏不见 ” 的。如果音讯解决逻辑很简单,比如说须要解决 5 min,那么 max.poll.interval.ms 可设置成比 5 min 大一点的值。而 Heartbeat 线程则和下面提到的参数 heartbeat.interval.ms 无关,Heartbeat 线程每隔 heartbeat.interval.ms 向 Coordinator 发送一个心跳包,证实本人还活着。只有 Heartbeat 线程 在 session.timeout.ms 工夫外向 Coordinator 发送过心跳包,那么 Coordinator 就认为以后的 Consumer 是活着的。

Kafka 0.10.1 之前,发送心跳包和音讯解决逻辑这 2 个过程是耦合在一起的,试想:如果一条音讯解决时长要 5 min,而 session.timeout.ms=3000ms,那么等 Consumer 解决完音讯,Coordinator 早就将 Consumer 移出 Consumer Group 了,因为只有一个线程,在音讯处理过程中就无奈向 Coordinator 发送心跳包,超过 3000ms 未发送心跳包,Coordinator 就将该 Consumer 移出 Consumer Group 了。而将二者离开,一个 Processing 线程负责执行音讯解决逻辑,一个 Heartbeat 线程负责发送心跳包。那么就算一条音讯须要解决 5min,只有 Heartbeat 线程在 session.timeout.ms 工夫外向 Coordinator 发送了心跳包,那 Consumer 能够持续解决音讯,而不必放心被移出 Consumer Group 了。另一个益处是:如果 Consumer 出了问题,那么在 session.timeout.ms 内就能检测进去,而不必等到 max.poll.interval.ms 时长后能力检测进去。

为啥要 session.timeout.ms >= 3 * heartbeat.interval.ms,我感觉是社区做的测试得进去的最优值吧,因为 heartbeat.interval.ms 越小,发的心跳包越频繁,节约没必要的流量;而设置越大,Consumer 挂了很久能力检测到,显著也不合理。

6.2 消费者解决工夫过长

如果消费者解决工夫过长,那么同样会导致协调者认为该 Consumer 死亡了,从而发动重均衡。

而 Kafka 的消费者参数设置中,跟生产解决的两个参数为:

  • max.poll.interval.ms 每次生产的解决最大工夫
  • max.poll.records 每次生产的音讯数

对于这种状况,一般来说就是减少消费者解决的工夫(即进步 max.poll.interval.ms 的值),缩小每次解决的音讯数(即缩小 max.poll.records 的值)。

咱们下面的那个例子就是这个场景触发的 Rebalance,max.poll.interval.ms 每次生产的解决最大工夫设置的是 60000ms,也就是 1min。而我在 consumer.poll 办法里休眠了 2min 来模仿解决业务的工夫,解决业务的工夫大于 max.poll.interval.ms,导致 Rebalance。

6.3 Consumer 端的 GC 体现

如果下面两种从 Kafka 层面还无奈防止 Rebalance,那我倡议你去排查下 Consumer 端的 GC 体现,比方是否呈现了频繁的 Full GC 导致的长时间进展,从而引发了 Rebalance。

正文完
 0