问题剖析
导致kafka的反复生产问题起因在于,曾经生产了数据,然而offset没来得及提交(比方Kafka没有或者不晓得该数据曾经被生产)。
总结以下场景导致Kakfa反复生产:
起因1:强行kill线程,导致生产后的数据,offset没有提交(生产零碎宕机、重启等)。
起因2:设置offset为主动提交,敞开kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能局部offset没提交,下次重启会反复生产。
例如:
try { consumer.unsubscribe();} catch (Exception e) {}try { consumer.close();} catch (Exception e) {}
下面代码会导致局部offset没提交,下次启动时会反复生产。
解决办法:设置offset主动提交为false
整合了Spring配置的批改如下配置
spring配置:
spring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-offset-reset=latest
整合了API形式的批改enable.auto.commit为false
API配置:
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");
一旦设置了 enable.auto.commit 为 true,Kafka 会保障在开始调用 poll 办法时,提交上次 poll 返回的所有音讯。从程序上来说,poll 办法的逻辑是先提交上一批音讯的位移,再解决下一批音讯,因而它能保障不呈现生产失落的状况。
起因3:(反复生产最常见的起因):生产后的数据,当offset还没有提交时,partition就断开连接。比方,通常会遇到生产的数据,解决很耗时,导致超过了Kafka的session timeout工夫(0.10.x版本默认是30秒),那么就会re-blance重均衡,此时有肯定几率offset没提交,会导致重均衡后反复生产。
起因4:当消费者重新分配partition的时候,可能呈现从头开始生产的状况,导致重发问题。
起因5:当消费者生产的速度很慢的时候,可能在一个session周期内还未实现,导致心跳机制检测报告出问题。
起因6:并发很大,可能在规定的工夫(session.time.out默认30s)内没有生产完,就会可能导致reblance重均衡,导致一部分offset主动提交失败,而后重均衡后反复生产
问题形容:
咱们零碎压测过程中呈现上面问题:异样rebalance,而且均匀距离3到5分钟就会触发rebalance,剖析日志发现比较严重。谬误日志如下:
08-09 11:01:11 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
这个谬误的意思是,消费者在解决完一批poll的音讯后,在同步提交偏移量给broker时报的错。初步剖析日志是因为以后消费者线程生产的分区曾经被broker给回收了,因为kafka认为这个消费者死了,那么为什么呢?
问题剖析:
这里就波及到问题是消费者在创立时会有一个属性max.poll.interval.ms(默认间隔时间为300s),
该属性意思为kafka消费者在每一轮poll()调用之间的最大提早,消费者在获取更多记录之前能够闲暇的工夫量的下限。如果此超时工夫期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将从新均衡,以便将分区重新分配给别的成员。
解决反复数据
因为offset此时曾经不精确,生产环境不能间接去批改offset偏移量。
所以从新指定了一个生产组(group.id=order_consumer_group),而后指定auto-offset-reset=latest这样我就只须要重启我的服务了,而不须要动kafka和zookeeper了!
#consumerspring.kafka.consumer.group-id=order_consumer_groupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-offset-reset=latest
注:如果你想要消费者从头开始生产某个topic的全量数据,能够从新指定一个全新的group.id=new_group,而后指定auto-offset-reset=earliest即可