kafka中的exactly once semantics

kafka0.11.0.0版本正式反对准确一次解决语义(exactly once semantics,EOS),EOS次要体现在3个方面:

  • 幂等producer:保障发送单个分区的音讯只会发送一次,不会呈现反复音讯
  • 事务:保障原子性地写入到多个分区,即写入到多个分区的音讯要么全胜利,要么失败回滚
  • 流解决EOS:流解决实质上可看成是‘读取-解决-写入’的管道。此EOS保障整个过程的操作是原子性。留神,这只实用kafka streams。

下面3种EOS语义有着不同的利用范畴,幂等producer只能保障单分区上无反复音讯;事务能够保障多分区写入音讯的完整性;而流解决EOS保障的是端到端(E2E)音讯解决的EOS。不同的配置如下:

  • 启用幂等producer:在producer程序中设置属性enable.idempotence=true,留神不要设置transactional.id,不要设置成null也不要设置成空字符串
  • 启用带伤反对:在producer程序中设置属性transcational.id为一个指定字符串,同时设置enable.idempotence=true
  • 启用流式解决EOS:在kafka streams程序中设置processing.guarantee=exactly_once

接下来聊聊producer幂等性

producer幂等性

producer幂等性指的是当发送同一条音讯时,数据在server端只会被长久化一次,数据不丢不重,然而这里的幂等是有条件的:

  • 只能保障producer在单个会话内不丢不重,如果producer呈现重启是无奈保障幂等性的(在设置了幂等的状况下,是无奈获取之前的状态信息的,因而无奈做到跨会话级别的不丢不重)
  • 幂等性不能跨多个topic-partition,只能保障单个partition内的幂等性,当波及多个topic-partition时,这两头的状态并没有同步。

如果须要跨会话、跨多个topic-partition的状况,须要应用kafka的事务性来实现。

幂等示例

Properties props = new Properties();props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");props.put("acks", "all"); // 当 enable.idempotence 为 true,这里默认为 allprops.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer(props);producer.send(new ProducerRecord(topic, "test");

幂等实现原理

kafka proudcer在实现幂等时有两个重要机制:

  1. PID(Producer ID),用来标识每个producer client
  2. sequence numbers,client发送的每条音讯都会带相应的sn,server端再依据这个值来判断数据是否反复

PID

每个producer在初始化时都会被调配一个惟一的PID,这个PID对用户是通明的,没有裸露给用户。对于一个给定的PID,sequence number将会从0开始自增,每个topic-partition都会有一个独立的sequence number。producer在发送数据时,将会给每条音讯标识一个sn,而后server以此来验重。这里的PID是全局惟一的,如果producer重启后会被调配一个新的PID,这也是幂等性无奈做到跨会话的一个起因。

PID申请与治理

producer向broker发送一个申请获取PID(server端会抉择一台连贯数量起码的broker进行解决),broker收到申请PID的申请后,会尝试在ZK创立一个/latest_producer_id_block节点,每个broker向ZK申请一个PID段后,都会将本人申请的PID段信息写入到这个节点,这样当其余broker再申请PID段时,会首先读取这个节点的信息,而后依据block_end抉择一个PID段,最初再将信息写回到ZK的这个节点里,这个节点信息内容如下:

{"version":1,"broker":35,"block_start":"4000","block_end":"4999"}

broker与ZK交互流程:

  1. 先从ZK的/latest_producer_id_block节点读取最新曾经调配的PID段信息
  2. 如果该节点不存在,间接从0开始调配,抉择0-1000的PID段(PidBlockSize默认为1000)
  3. 如果该节点存在,读取其中数据,依据block_end抉择这个PID段
  4. 在抉择了相应的PID段后,将这个PID段信息写回到ZK的这个节点中,如果写入胜利,证实PID段申请胜利;如果失败证实此时可能其它的broker曾经更新了这个节点,就须要从步骤1从新开始执行

针对第4点再阐明下,在回写ZK节点时会判断以后节点的zkVersion是否与第1点里获取的zkVersion雷同,如果雷同才能够写入胜利,不同阐明这个节点被批改了则会写入失败。

client幂等时发送流程

java producer(区别于scala producer)是双线程设计,分为用户主线程与sender线程,前者调用send办法将音讯写入到producer的内存缓冲区,即RecordAccumulator中,后者会定期从RecordAccumulator中获取音讯并将音讯纳入不同的batch中发送到对应的broker上。在幂等producer中,用户主线程的逻辑变动不大。send办法仍然是将音讯写入到RecordAccumulator。而Sender线程却有着很大的改支。

  1. 用户线程调用send办法将数据增加到RecordAccumulator中,增加时会判断是否须要新建一个ProducerBatch,这时的ProducerBatch还是没有PID和sequence number
  2. sender线程在执行时,判断以后PID是否须要重置:如果有音讯重试屡次人失败最初因为超时而被移除,这时的seqeunce number有局部曾经调配进来,那这是不容许发送的。
  3. sender线程阻塞获取PID
  4. 在ProducerBatch里设置相应的PID与sequence number,进行发送。

server端解决producer申请

server端会次要测验申请里是否有PID信息,校验batch是否反复:

  1. 如果PID不存在,那么判断sequence number是否从0开始,是的话,在缓存中记录PID的meta信息(PID,epoch,sequence number),并执行写入操作,否则返回UnknowProducerIdException(PID在server端曾经过期或这个PID写的数据曾经过期,但producer还在接着上次的sn发送数据)
  2. 如果PID存在,先查看PID epoch与server端记录的是否雷同
  3. 如果不同,且sn不是从0开始,那么返回OutOfOrderSequenceException异样
  4. 如果雷同,那么依据缓存中记录的最近一次sn(currentLastSeq)查看是否为连贯,不连贯的状况下那么返回OutOfOrderSequenceException异样

最初总结一下幂等音讯解决流程:

  1. 每个producer会被调配一个PID,SN
  2. producer与broker端都有<PID,PartitionID>与SN的映射关系
  3. producer每发送一条音讯后就将对应的分区序列号加一
  4. broker会比拟序列号,如果new SN < old SN+1,阐明是过期的,会摈弃这条数据;如果new SN > old SN+1,阐明有音讯失落了,抛出异样。

参考文章:
Kafka 事务性之幂等性实现
Kafka幂等性及事务
对于Kafka幂等producer的探讨