关于kafka:聊聊-KafkaKafka-消息重复的场景以及最佳实践

37次阅读

共计 6568 个字符,预计需要花费 17 分钟才能阅读完成。

一、前言

上一篇咱们讲了 聊聊 Kafka:Kafka 音讯失落的场景以及最佳实际,这一篇咱们来说一说 Kafka 音讯反复的场景以及最佳实际。

咱们上面会从以下两个方面来说一下 Kafka 音讯反复的场景以及最佳实际。

  • 生产者反复音讯
  • 消费者反复音讯

二、Kafka 音讯反复的场景

2.1 生产者反复音讯

2.1.1 根本原因

生产者发送的音讯没有收到 Broker 正确的响应,导致生产者重试。

生产者收回一条音讯,Broker 落盘当前因为网络等种种原因,发送端失去一个发送失败的响应或者网
络中断,而后生产者收到一个可复原的 Exception 重试音讯导致音讯反复。

2.1.2 重试过程


次要流程:

  • new KafkaProducer() 后创立一个后盾线程 KafkaThread 扫描 RecordAccumulator 中是否有音讯;
  • 调用 KafkaProducer.send() 发送音讯,实际上只是把音讯保留到 RecordAccumulator 中;
  • 后盾线程 KafkaThread 扫描到 RecordAccumulator 中有音讯后,将音讯发送到 Kafka 集群;
  • 如果发送胜利,那么返回胜利;
  • 如果发送失败,那么判断是否容许重试。如果不容许重试,那么返回失败的后果;如果容许重

试,把音讯再保留到 RecordAccumulator 中,期待后盾线程 KafkaThread 扫描再次发送。

具体的重试流程见下图:

2.1.3 可复原异样阐明

从上面源码中不难发现异常是 RetriableException 类型或者音讯是事务类型 TransactionManager 容许重试;

常见的 RetriableException 类继承关系如下:

2.1.4 记录程序问题

如果设置 max.in.flight.requests.per.connection > 1(默认 5,单个连贯上发送的未确认申请的最大数量,示意上一个收回的申请没有确认下一个申请又收回了)。大于1 可能会扭转记录的程序,因为如果将两个 batch 发送到单个分区,第一个 batch 解决失败并重试,然而第二个 batch 解决胜利,那么第二个 batch 解决中的记录可能先呈现被生产。

设置 max.in.flight.requests.per.connection = 1,可能会影响吞吐量,能够解决单个生产者发送程序问题。如果多个生产者,生产者 1 先发送一个申请,生产者 2 后发送申请,此时生产者 1 返回可复原异样,重试肯定次数胜利了。尽管生产者 1 先发送音讯,但生产者 2 发送的音讯会被先生产。

2.2 消费者反复音讯

2.2.1 根本原因

数据生产完没有及时提交 offset 到 Broker

2.2.2 业务场景

音讯生产端在生产过程中挂掉没有及时提交 offset 到 Broker,另一个生产端启动拿之前记录的 offset 开始生产,因为 offset 的滞后性可能会导致新启动的客户端有大量反复生产。

三、Kafka 的三种音讯语义

上一篇咱们提到 Kafka 的三种音讯语义,有一种仅有一次传递语义,能够保障音讯不会失落,也不会被反复发送。

忘了的我这里再说一下:

  • 最多一次(At most once):音讯可能会失落,但绝不会被反复发送。
  • 至多一次(At least once):音讯不会失落,但有可能被反复发送。
  • 仅有一次(Exactly once):音讯不会失落,也不会被反复发送。

那你可能就会问了,那就间接用这个 Exactly once 语义不就行了吗?

Kafka 反对的 Exactly once 和消息传递的服务质量规范 Exactly once 是不一样的。它是 Kafka 提供的另外一个个性,Kafka 中反对的事务也和咱们通常意义了解的事务有肯定的差别。在 Kafka 中,事务和 Excactly once 次要是为了配合流计算应用的个性。

刚开始我也认为 Kafka 像 MQTT 一样能保障消息传递的仅有一次语义,起初才发现须要本人去实现,不得不说下 Kafka 的营销伎俩还是不错的。

既然 Kafka 无奈保障音讯不反复,那就须要咱们的生产代码可能承受“音讯是可能会反复的”这一现状,而后,通过一些办法来打消反复音讯对业务的影响。

Kafka 实际上通过两种机制来确保音讯生产的准确一次:

  • 幂等性(Idempotence)
  • 事务(Transaction)

四、幂等性

4.1 什么是幂等性

保障在音讯重发的时候,消费者不会反复解决。即便在消费者收到反复音讯的时候,反复解决,也要保障最终后果的一致性。

所谓幂等性,数学概念就是: f(f(x)) = f(x)。f 函数示意对音讯的解决。

比方,银行转账,如果失败,须要重试。不论重试多少次,都要保障最终后果肯定是统一的。

4.2 引入幂等性之前

Producer 向 Broker 发送音讯,而后 Broker 将音讯追加到音讯流中后再给 Producer 返回 Ack 信号值。实现流程如下:

生产中,会呈现各种不确定的因素,比方在 Producer 在发送给 Broker 的时候呈现网络异样。比方以下这种异常情况的呈现:


上图这种状况,当 Producer 第一次发送音讯给 Broker 时,Broker 将音讯 (x2,y2) 追加到了音讯流中,然而在返回 Ack 信号给 Producer 时失败了(比方网络异样)。此时,Producer 端 触发重试机制,将音讯 (x2,y2) 从新发送给 Broker,Broker 接管到音讯后,再次将该音讯追加到音讯流中,而后胜利返回 Ack 信号给 Producer。这样下来,音讯流中就被反复追加了两条雷同的 (x2,y2) 的音讯。

4.3 引入幂等性之后

Kafka 为了实现幂等性,在 0.11.0 版本之后,它在底层设计架构中引入了 ProducerIDSequenceNumber

  • ProducerID:在每个新的 Producer 初始化时,会被调配一个惟一的 ProducerID,这个 ProducerID 对客户端使用者是不可见的。
  • SequenceNumber:对于每个 ProducerID,Producer 发送数据的每个 Topic 和 Partition 都对应一个从 0 开始枯燥递增的 SequenceNumber 值。


同样,这是一种现实状态下的发送流程。理论状况下,会有很多不确定的因素,比方 Broker 在发送 Ack 信号给 Producer 时呈现网络异样,导致发送失败。异常情况如下图所示:


当 Producer 发送音讯 (x2,y2) 给 Broker 时,Broker 接管到音讯并将其追加到音讯流中。此时,Broker 返回 Ack 信号给 Producer 时,产生异样导致 Producer 接管 Ack 信号失败。对于 Producer 来说,会触发重试机制,将音讯 (x2,y2) 再次发送,然而,因为引入了幂等性 ,在每条音讯中附带 PID(ProducerID) 和 SequenceNumber。雷同的 PID 和 SequenceNumber 发送给 Broker, 而之前 Broker 缓存过之前发送的雷同的音讯,那么在音讯流中的音讯就只有一条 (x2,y2),不会呈现反复发送的状况。

4.4 源码剖析

间接看 org.apache.kafka.clients.producer.internals.Sender#run 办法,而后跟进 runOnce 办法:


细心地读者可能会问了,老周啊,你下面不是说的 SequenceNumber 么,怎么 ProducerIdAndEpoch 里是 epoch 啊?没错,老周这里的版本是 2.7.0 了,跟以前的老版本有点不一样,但意思是一样的,之前是序列号,当初的是代。

4.5 留神

咱们下面说了在每个新的 Producer 初始化时,会调配一个 PID,音讯发送到的每一个分区都有对应的代号,这些代号从 0 开始枯燥递增。生产者每发送一条音讯就会将 <PID, 分区 > 对应的代号值加 1。

Broker 端在内存中为每一对 <PID, 分区 > 保护一个序列号 epoch_old。针对生产者发送来的每一条音讯,对其代号 epoch_new 进行判断,并作相应解决。

  • 只有 epoch_new 比 epoch_old 大 1 时,即 epoch_new = epoch_old + 1 时,Broker 才会承受这条音讯;
  • epoch_new < epoch_old + 1,阐明音讯被反复写入,Broker 间接抛弃该条音讯;
  • epoch_new > epoch_old + 1,阐明两头有数据尚未写入,呈现了音讯乱序,可能存在音讯失落的景象,对应的生产者会抛出 OutOfOrderSequenceException 异样。

代号针对 <PID, 分区 >,这意味着幂等生产者只能保障单个主题的繁多分区内音讯不反复;其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性,这里的会话即能够了解为:Producer 过程的一次运行。当重启了 Producer 过程之后,则幂等性保障就生效了。

那么你可能会问,如果我想实现多分区以及多会话上的音讯无反复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

五、事务

幂等性不能实现多分区以及多会话上的音讯无反复,而 Kafka 事务则能够补救这个缺点,Kafka 自 0.11 版本开始也提供了对事务的反对,目前次要是在 read committed 隔离级别上做事件。它能保障多条音讯原子性地写入到指标分区,同时也能保障 Consumer 只能看到事务胜利提交的音讯。上面咱们就来看看 Kafka 中的事务型 Producer。

事务型 Producer 可能保障将音讯原子性地写入到多个分区中。这批音讯要么全副写入胜利,要么全副失败。另外,事务型 Producer 也不惧过程的重启。Producer 重启回来后,Kafka 仍然保障它们发送音讯的准确一次解决。

六、最佳实际

6.1 生产端

6.1.1 幂等性 Producer

幂等性 Producer 只适宜单个主题的繁多分区内音讯不反复,其次,它只能实现单会话上的幂等性。

  • 设置 enable.idempotence=true,生产者将确保在流中精确地写入每个音讯的一个正本。
  • 设置 acks = all。代表了你对“已提交”音讯的定义。如果设置成 all,则表明所有正本 Broker 都要接管到音讯,该音讯才算是“已提交”。这是最高等级的“已提交”定义。
  • 设置 max.in.flight.requests.per.connection < 5,客户端将在单个连贯上发送的未确认申请的最大数量。

    具体设置多少,能够参考上文 2.1.4 记录程序问题,依据本人的业务状况来设置。
  • 设置 retries = 3,当呈现网络的刹时抖动时,音讯发送可能会失败,此时配置了 retries > 0 的 Producer 可能主动重试音讯发送,防止音讯失落。

    如果重试达到设定的次数,那么生产者就会放弃重试并返回异样。不过并不是所有的异样都是能够通过重试来解决的,比方音讯太大,超过 max.request.size 参数配置的值时,这种形式就不可行了。
  • 设置 retry.backoff.ms = 300,正当估算重试的工夫距离,能够防止有效的频繁重试。

    它用来设定两次重试之间的工夫距离,防止有效的频繁重试。在配置 retriesretry.backoff.ms之前,最好先估算一下可能的异样复原工夫,这样能够设定总的重试工夫大于这个异样复原工夫,以此来防止生产者过早地放弃重试。

6.1.2 事务型 Producer

能实现多分区以及多会话上的音讯无反复,即便过程的重启。Producer 重启回来后,Kafka 仍然保障它们发送音讯的准确一次解决。

其它参数和下面的幂等性 Producer 参数统一,多加一个:

  • 设置 transactional.id。这就反对了跨多个生产者会话的可靠性语义,因为它容许客户端在启动任何新事务之前确保应用雷同 TransactionalId 的事务曾经实现,最好为其设置一个有意义的名字。

事务型 Producer 代码应该这样写:

producer.initTransactions();
try {producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();} catch (KafkaException e) {producer.abortTransaction();
}

和一般 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们别离对应事务的初始化、事务开始、事务提交以及事务终止。

这段代码可能保障 record1 和 record2 被当作一个事务对立提交到 Kafka,要么它们全副提交胜利,要么全副写入失败。

6.1.3 不在乎数据失落的场景

你可能会问了,我的零碎存在这种场景,我不在乎数据的失落,该怎么配呢?你不会让我配置下面那些幂等性、事务型的参数吧?比方我的可能是日志收集的零碎,这会导致系统吞吐量重大升高。

别着急,针对这种场景也有有有相应的计划的,那下面的参数别管了,

  • 设置 ack=0,不须要 Broker 接管到音讯的确认也不须要重试。

6.2 生产端

6.2.1 生产事务型 Producer 音讯

设置 isolation.level=read_committed 参数的值即可

  • read_uncommitted:这是默认值,表明 Consumer 可能读取到 Kafka 写入的任何音讯,不管事务型 Producer 提交事务还是终止事务,其写入的音讯都能够读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要应用这个值。
  • read_committed:表明 Consumer 只会读取事务型 Producer 胜利提交事务写入的音讯。当然了,它也能看到非事务型 Producer 写入的所有音讯。

6.2.2 通用设置

确保音讯生产实现再提交。最好把它设置成 enable.auto.commit = false,并采纳手动提交位移的形式。这对于单 Consumer 多线程解决的场景而言是至关重要的。

6.2.3 上游生产端做幂等

6.2.3.1 利用数据库的惟一束缚实现幂等

比方金融畛域,有一张流水表,表里有三个字段:转账单 ID、账户 ID 和变更金额,对于每个转账单每个账户只能够执行一次变更操作,咱们能够给转账单 ID 和账户 ID 这两个字段联结起来创立一个惟一束缚,这样对于雷同的转账单 ID 和账户 ID,表里至少只能存在一条记录。


基于这个思路,不光是能够应用关系型数据库,只有是反对相似“INSERT IF NOT EXIST”语义的存储类零碎都能够用于实现幂等,比方,你能够用 Redis 的 SETNX 命令来代替数据库中的惟一束缚,来实现幂等生产。

6.2.3.2 设置前置条件

给数据变更设置一个前置条件,如果满足条件就更新数据,否则回绝更新数据,在更新数据的时候,同时变更前置条件中须要判断的数据。这样,反复执行这个操作时,因为第一次更新数据的时候曾经变更了前置条件中须要判断的数据,不满足前置条件,则不会反复执行更新数据操作。

然而,如果咱们要更新的数据不是数值,或者咱们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的办法是,给你的数据减少一个版本号属性,每次更数据前,比拟以后数据的版本号是否和音讯中的版本号统一,如果不统一就回绝更新数据,更新数据的同时将版本号 +1,一样能够实现幂等更新。

不晓得小伙伴们有没有发现,这个思维就是赫赫有名 CAS 乐观锁机制,其实很多高大上的货色并没有那么难,思维往往很简略。

6.2.3.3 记录并查看操作

如果下面两种办法都不实用你,这里还有一种通用性更强的办法,就是给每条音讯都记录一个全局惟一 ID,生产时,先依据这个全局惟一 ID 查看这条音讯是否有被生产过,如果没有生产过,才更新数据,而后将生产状态置为已生产。

是不是感觉很简略?但外面的坑很大,说到底还是数据一致性不好保障,在“查看生产状态,而后更新数据并且设置生产状态”中,三个操作必须作为一组操作保障原子性,能力真正实现幂等,否则就会呈现 Bug。

好了,这就是 Kafka 音讯反复的场景以及最佳实际的所有内容了,心愿对你实际有所参考意义,咱们下期再见。

正文完
 0