共计 3465 个字符,预计需要花费 9 分钟才能阅读完成。
kafka 中的 exactly once semantics
kafka0.11.0.0 版本正式反对准确一次解决语义(exactly once semantics,EOS),EOS 次要体现在 3 个方面:
- 幂等 producer: 保障发送单个分区的音讯只会发送一次,不会呈现反复音讯
- 事务:保障原子性地写入到多个分区,即写入到多个分区的音讯要么全胜利,要么失败回滚
- 流解决 EOS:流解决实质上可看成是‘读取 - 解决 - 写入’的管道。此 EOS 保障整个过程的操作是原子性。留神,这只实用 kafka streams。
下面 3 种 EOS 语义有着不同的利用范畴,幂等 producer 只能保障单分区上无反复音讯;事务能够保障多分区写入音讯的完整性;而流解决 EOS 保障的是端到端(E2E)音讯解决的 EOS。不同的配置如下:
- 启用幂等 producer:在 producer 程序中设置属性 enable.idempotence=true,留神不要设置 transactional.id,不要设置成 null 也不要设置成空字符串
- 启用带伤反对:在 producer 程序中设置属性 transcational.id 为一个指定字符串,同时设置 enable.idempotence=true
- 启用流式解决 EOS:在 kafka streams 程序中设置 processing.guarantee=exactly_once
接下来聊聊 producer 幂等性
producer 幂等性
producer 幂等性指的是当发送同一条音讯时,数据在 server 端只会被长久化一次,数据不丢不重,然而这里的幂等是有条件的:
- 只能保障 producer 在单个会话内不丢不重,如果 producer 呈现重启是无奈保障幂等性的(在设置了幂等的状况下,是无奈获取之前的状态信息的,因而无奈做到跨会话级别的不丢不重)
- 幂等性不能跨多个 topic-partition,只能保障单个 partition 内的幂等性,当波及多个 topic-partition 时,这两头的状态并没有同步。
如果须要跨会话、跨多个 topic-partition 的状况,须要应用 kafka 的事务性来实现。
幂等示例
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all"); // 当 enable.idempotence 为 true,这里默认为 all
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");
幂等实现原理
kafka proudcer 在实现幂等时有两个重要机制:
- PID(Producer ID),用来标识每个 producer client
- sequence numbers,client 发送的每条音讯都会带相应的 sn,server 端再依据这个值来判断数据是否反复
PID
每个 producer 在初始化时都会被调配一个惟一的 PID,这个 PID 对用户是通明的,没有裸露给用户。对于一个给定的 PID,sequence number 将会从 0 开始自增,每个 topic-partition 都会有一个独立的 sequence number。producer 在发送数据时,将会给每条音讯标识一个 sn,而后 server 以此来验重。这里的 PID 是全局惟一的,如果 producer 重启后会被调配一个新的 PID,这也是幂等性无奈做到跨会话的一个起因。
PID 申请与治理
producer 向 broker 发送一个申请获取 PID(server 端会抉择一台连贯数量起码的 broker 进行解决),broker 收到申请 PID 的申请后,会尝试在 ZK 创立一个 /latest_producer_id_block 节点,每个 broker 向 ZK 申请一个 PID 段后,都会将本人申请的 PID 段信息写入到这个节点,这样当其余 broker 再申请 PID 段时,会首先读取这个节点的信息,而后依据 block_end 抉择一个 PID 段,最初再将信息写回到 ZK 的这个节点里,这个节点信息内容如下:
{"version":1,"broker":35,"block_start":"4000","block_end":"4999"}
broker 与 ZK 交互流程:
- 先从 ZK 的 /latest_producer_id_block 节点读取最新曾经调配的 PID 段信息
- 如果该节点不存在,间接从 0 开始调配,抉择 0 -1000 的 PID 段(PidBlockSize 默认为 1000)
- 如果该节点存在,读取其中数据,依据 block_end 抉择这个 PID 段
- 在抉择了相应的 PID 段后,将这个 PID 段信息写回到 ZK 的这个节点中,如果写入胜利,证实 PID 段申请胜利;如果失败证实此时可能其它的 broker 曾经更新了这个节点,就须要从步骤 1 从新开始执行
针对第 4 点再阐明下,在回写 ZK 节点时会判断以后节点的 zkVersion 是否与第 1 点里获取的 zkVersion 雷同,如果雷同才能够写入胜利,不同阐明这个节点被批改了则会写入失败。
client 幂等时发送流程
java producer(区别于 scala producer)是双线程设计,分为用户主线程与 sender 线程,前者调用 send 办法将音讯写入到 producer 的内存缓冲区,即 RecordAccumulator 中,后者会定期从 RecordAccumulator 中获取音讯并将音讯纳入不同的 batch 中发送到对应的 broker 上。在幂等 producer 中,用户主线程的逻辑变动不大。send 办法仍然是将音讯写入到 RecordAccumulator。而 Sender 线程却有着很大的改支。
- 用户线程调用 send 办法将数据增加到 RecordAccumulator 中,增加时会判断是否须要新建一个 ProducerBatch,这时的 ProducerBatch 还是没有 PID 和 sequence number
- sender 线程在执行时,判断以后 PID 是否须要重置:如果有音讯重试屡次人失败最初因为超时而被移除,这时的 seqeunce number 有局部曾经调配进来,那这是不容许发送的。
- sender 线程阻塞获取 PID
- 在 ProducerBatch 里设置相应的 PID 与 sequence number,进行发送。
server 端解决 producer 申请
server 端会次要测验申请里是否有 PID 信息,校验 batch 是否反复:
- 如果 PID 不存在,那么判断 sequence number 是否从 0 开始,是的话,在缓存中记录 PID 的 meta 信息(PID,epoch,sequence number),并执行写入操作,否则返回 UnknowProducerIdException(PID 在 server 端曾经过期或这个 PID 写的数据曾经过期,但 producer 还在接着上次的 sn 发送数据)
- 如果 PID 存在,先查看 PID epoch 与 server 端记录的是否雷同
- 如果不同,且 sn 不是从 0 开始,那么返回 OutOfOrderSequenceException 异样
- 如果雷同,那么依据缓存中记录的最近一次 sn(currentLastSeq)查看是否为连贯,不连贯的状况下那么返回 OutOfOrderSequenceException 异样
最初总结一下幂等音讯解决流程:
- 每个 producer 会被调配一个 PID,SN
- producer 与 broker 端都有 <PID,PartitionID> 与 SN 的映射关系
- producer 每发送一条音讯后就将对应的分区序列号加一
- broker 会比拟序列号, 如果 new SN < old SN+1,阐明是过期的,会摈弃这条数据;如果 new SN > old SN+1, 阐明有音讯失落了,抛出异样。
参考文章:
Kafka 事务性之幂等性实现
Kafka 幂等性及事务
对于 Kafka 幂等 producer 的探讨