关于java:Kafka导致重复消费原因和解决方案

39次阅读

共计 3382 个字符,预计需要花费 9 分钟才能阅读完成。

问题剖析

导致 kafka 的反复生产问题起因在于,曾经生产了数据,然而 offset 没来得及提交(比方 Kafka 没有或者不晓得该数据曾经被生产)。
总结以下场景导致 Kakfa 反复生产:

起因 1:强行 kill 线程,导致生产后的数据,offset 没有提交(生产零碎宕机、重启等)。
起因 2:设置 offset 为主动提交,敞开 kafka 时,如果在 close 之前,调用 consumer.unsubscribe() 则有可能局部 offset 没提交,下次重启会反复生产。
例如:

try {consumer.unsubscribe();
} catch (Exception e) {
}

try {consumer.close();
} catch (Exception e) {}

下面代码会导致局部 offset 没提交,下次启动时会反复生产。

解决办法:设置 offset 主动提交为 false

整合了 Spring 配置的批改如下配置
spring 配置:

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest

整合了 API 形式的批改 enable.auto.commit 为 false
API 配置:

 Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");

一旦设置了 enable.auto.commit 为 true,Kafka 会保障在开始调用 poll 办法时,提交上次 poll 返回的所有音讯。从程序上来说,poll 办法的逻辑是先提交上一批音讯的位移,再解决下一批音讯,因而它能保障不呈现生产失落的状况。

起因 3:(反复生产最常见的起因):生产后的数据,当 offset 还没有提交时,partition 就断开连接。比方,通常会遇到生产的数据,解决很耗时,导致超过了 Kafka 的 session timeout 工夫(0.10.x 版本默认是 30 秒),那么就会 re-blance 重均衡,此时有肯定几率 offset 没提交,会导致重均衡后反复生产。

起因 4:当消费者重新分配 partition 的时候,可能呈现从头开始生产的状况,导致重发问题。

起因 5:当消费者生产的速度很慢的时候,可能在一个 session 周期内还未实现,导致心跳机制检测报告出问题。

起因 6:并发很大,可能在规定的工夫(session.time.out 默认 30s)内没有生产完,就会可能导致 reblance 重均衡,导致一部分 offset 主动提交失败,而后重均衡后反复生产

问题形容:
咱们零碎压测过程中呈现上面问题:异样 rebalance,而且均匀距离 3 到 5 分钟就会触发 rebalance,剖析日志发现比较严重。谬误日志如下:

08-09 11:01:11 131 pool-7-thread-3 ERROR [] - 
commit failed 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
        at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

这个谬误的意思是,消费者在解决完一批 poll 的音讯后,在同步提交偏移量给 broker 时报的错。初步剖析日志是因为以后消费者线程生产的分区曾经被 broker 给回收了,因为 kafka 认为这个消费者死了,那么为什么呢?

问题剖析:

这里就波及到问题是消费者在创立时会有一个属性 max.poll.interval.ms(默认间隔时间为 300s),
该属性意思为 kafka 消费者在每一轮 poll() 调用之间的最大提早, 消费者在获取更多记录之前能够闲暇的工夫量的下限。如果此超时工夫期满之前 poll() 没有被再次调用,则消费者被视为失败,并且分组将从新均衡,以便将分区重新分配给别的成员。

解决反复数据

因为 offset 此时曾经不精确,生产环境不能间接去批改 offset 偏移量。
所以从新指定了一个生产组(group.id=order_consumer_group),而后指定 auto-offset-reset=latest 这样我就只须要重启我的服务了,而不须要动 kafka 和 zookeeper 了!

#consumer
spring.kafka.consumer.group-id=order_consumer_group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest

注:如果你想要消费者从头开始生产某个 topic 的全量数据,能够从新指定一个全新的 group.id=new_group,而后指定 auto-offset-reset=earliest 即可

正文完
 0