乐趣区

关于kafka:kafka之七幂等性

kafka 中的 exactly once semantics

kafka0.11.0.0 版本正式反对准确一次解决语义(exactly once semantics,EOS),EOS 次要体现在 3 个方面:

  • 幂等 producer: 保障发送单个分区的音讯只会发送一次,不会呈现反复音讯
  • 事务:保障原子性地写入到多个分区,即写入到多个分区的音讯要么全胜利,要么失败回滚
  • 流解决 EOS:流解决实质上可看成是‘读取 - 解决 - 写入’的管道。此 EOS 保障整个过程的操作是原子性。留神,这只实用 kafka streams。

下面 3 种 EOS 语义有着不同的利用范畴,幂等 producer 只能保障单分区上无反复音讯;事务能够保障多分区写入音讯的完整性;而流解决 EOS 保障的是端到端(E2E)音讯解决的 EOS。不同的配置如下:

  • 启用幂等 producer:在 producer 程序中设置属性 enable.idempotence=true,留神不要设置 transactional.id,不要设置成 null 也不要设置成空字符串
  • 启用带伤反对:在 producer 程序中设置属性 transcational.id 为一个指定字符串,同时设置 enable.idempotence=true
  • 启用流式解决 EOS:在 kafka streams 程序中设置 processing.guarantee=exactly_once

接下来聊聊 producer 幂等性

producer 幂等性

producer 幂等性指的是当发送同一条音讯时,数据在 server 端只会被长久化一次,数据不丢不重,然而这里的幂等是有条件的:

  • 只能保障 producer 在单个会话内不丢不重,如果 producer 呈现重启是无奈保障幂等性的(在设置了幂等的状况下,是无奈获取之前的状态信息的,因而无奈做到跨会话级别的不丢不重)
  • 幂等性不能跨多个 topic-partition,只能保障单个 partition 内的幂等性,当波及多个 topic-partition 时,这两头的状态并没有同步。

如果须要跨会话、跨多个 topic-partition 的状况,须要应用 kafka 的事务性来实现。

幂等示例

Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all"); // 当 enable.idempotence 为 true,这里默认为 all
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer producer = new KafkaProducer(props);

producer.send(new ProducerRecord(topic, "test");

幂等实现原理

kafka proudcer 在实现幂等时有两个重要机制:

  1. PID(Producer ID),用来标识每个 producer client
  2. sequence numbers,client 发送的每条音讯都会带相应的 sn,server 端再依据这个值来判断数据是否反复

PID

每个 producer 在初始化时都会被调配一个惟一的 PID,这个 PID 对用户是通明的,没有裸露给用户。对于一个给定的 PID,sequence number 将会从 0 开始自增,每个 topic-partition 都会有一个独立的 sequence number。producer 在发送数据时,将会给每条音讯标识一个 sn,而后 server 以此来验重。这里的 PID 是全局惟一的,如果 producer 重启后会被调配一个新的 PID,这也是幂等性无奈做到跨会话的一个起因。

PID 申请与治理

producer 向 broker 发送一个申请获取 PID(server 端会抉择一台连贯数量起码的 broker 进行解决),broker 收到申请 PID 的申请后,会尝试在 ZK 创立一个 /latest_producer_id_block 节点,每个 broker 向 ZK 申请一个 PID 段后,都会将本人申请的 PID 段信息写入到这个节点,这样当其余 broker 再申请 PID 段时,会首先读取这个节点的信息,而后依据 block_end 抉择一个 PID 段,最初再将信息写回到 ZK 的这个节点里,这个节点信息内容如下:

{"version":1,"broker":35,"block_start":"4000","block_end":"4999"}

broker 与 ZK 交互流程:

  1. 先从 ZK 的 /latest_producer_id_block 节点读取最新曾经调配的 PID 段信息
  2. 如果该节点不存在,间接从 0 开始调配,抉择 0 -1000 的 PID 段(PidBlockSize 默认为 1000)
  3. 如果该节点存在,读取其中数据,依据 block_end 抉择这个 PID 段
  4. 在抉择了相应的 PID 段后,将这个 PID 段信息写回到 ZK 的这个节点中,如果写入胜利,证实 PID 段申请胜利;如果失败证实此时可能其它的 broker 曾经更新了这个节点,就须要从步骤 1 从新开始执行

针对第 4 点再阐明下,在回写 ZK 节点时会判断以后节点的 zkVersion 是否与第 1 点里获取的 zkVersion 雷同,如果雷同才能够写入胜利,不同阐明这个节点被批改了则会写入失败。

client 幂等时发送流程

java producer(区别于 scala producer)是双线程设计,分为用户主线程与 sender 线程,前者调用 send 办法将音讯写入到 producer 的内存缓冲区,即 RecordAccumulator 中,后者会定期从 RecordAccumulator 中获取音讯并将音讯纳入不同的 batch 中发送到对应的 broker 上。在幂等 producer 中,用户主线程的逻辑变动不大。send 办法仍然是将音讯写入到 RecordAccumulator。而 Sender 线程却有着很大的改支。

  1. 用户线程调用 send 办法将数据增加到 RecordAccumulator 中,增加时会判断是否须要新建一个 ProducerBatch,这时的 ProducerBatch 还是没有 PID 和 sequence number
  2. sender 线程在执行时,判断以后 PID 是否须要重置:如果有音讯重试屡次人失败最初因为超时而被移除,这时的 seqeunce number 有局部曾经调配进来,那这是不容许发送的。
  3. sender 线程阻塞获取 PID
  4. 在 ProducerBatch 里设置相应的 PID 与 sequence number,进行发送。

server 端解决 producer 申请

server 端会次要测验申请里是否有 PID 信息,校验 batch 是否反复:

  1. 如果 PID 不存在,那么判断 sequence number 是否从 0 开始,是的话,在缓存中记录 PID 的 meta 信息(PID,epoch,sequence number),并执行写入操作,否则返回 UnknowProducerIdException(PID 在 server 端曾经过期或这个 PID 写的数据曾经过期,但 producer 还在接着上次的 sn 发送数据)
  2. 如果 PID 存在,先查看 PID epoch 与 server 端记录的是否雷同
  3. 如果不同,且 sn 不是从 0 开始,那么返回 OutOfOrderSequenceException 异样
  4. 如果雷同,那么依据缓存中记录的最近一次 sn(currentLastSeq)查看是否为连贯,不连贯的状况下那么返回 OutOfOrderSequenceException 异样

最初总结一下幂等音讯解决流程:

  1. 每个 producer 会被调配一个 PID,SN
  2. producer 与 broker 端都有 <PID,PartitionID> 与 SN 的映射关系
  3. producer 每发送一条音讯后就将对应的分区序列号加一
  4. broker 会比拟序列号, 如果 new SN < old SN+1,阐明是过期的,会摈弃这条数据;如果 new SN > old SN+1, 阐明有音讯失落了,抛出异样。

参考文章:
Kafka 事务性之幂等性实现
Kafka 幂等性及事务
对于 Kafka 幂等 producer 的探讨

退出移动版