名词解释
置信做过数据处理的小伙伴们对于kafka必定是相熟的。根底的kafka常识这里就不过多陈说了。明天次要来讲一下kafka的几个个性,上面先简略解释下这几个个性的含意:
安全性:
数据从producer中写入到kafka以及consumer从topic中生产数据,数据都不会失落。
幂等性:
数据在kafka的流程中既不会被从新生产,也不会被反复生产。
这也是实现exactly-once语义的根底。
有序性:
因为kafka是程序生产,所以kafka的有序性次要体现在生产者写入kafka时数据的有序性。
应用场景
介绍完这几个定义后,上面来看看这几个个性的应用场景:
安全性
安全性这里就不多说了,毕竟作为一个消息中间件,数据失落是万万不可承受的。所以数据安全性的保障是kafka可能应用的根底。
幂等性
幂等性对于某些业务可能意义不是很大,然而对于一些业务却是非常重要的。就拿资金结算场景来说,业务方作为上游把数据打到结算平台,如果一份数据被计算、解决了屡次,产生的结果将会特地重大,到时候损失的就是真金白银了,而且很多其余业务也可能受到影响。
此外kafka的幂等性也是实现kafka exactly-once语义的根底,对于kafka来说还是很重要的。
有序性
有序性在数据有更新以及某些依赖数据程序的业务场景意义很大,因为数据乱序造成的数据净化或者业务出错显然是不可承受的。
家喻户晓,kafka是程序读取数据的,所以kafka的有序性取决于生产者的生产数据的有序性。
性能实现
在理解了这几个个性的应用场景后,上面来看一下kafka是如何实现这些个性的:
安全性
kafka的数据在绝大多数状况下平安的,然而某些极其的状况还是可能导致kafka丢数据的。比方上面这些状况:
上面具体说下这些场景:
producer在生产数据和receive ack时会遇到上面的场景:
- 发送数据后,疏忽ack间接确认胜利:即kafka的at most once语义。生产者认为数据生产胜利,然而broker数据处理失败。造成数据失落
consumer在生产数据和commit offset时会遇到上面的场景: - 先commit offset,再执行业务逻辑:commit胜利,数据处理失败。造成数据失落
先执行业务逻辑,再commit offset:commit胜利,异步数据处理失败。造成数据失落
还有一种状况就是kafka本身数据的安全性,在某些节点下线后,仍能对外提供服务,保证数据不失落。然而kafka broker本身的问题也会造成数据“失落”,这里的失落代表的意思是无奈对外提供服务,毕竟数据写入kafka后存储在磁盘上。思考下上面的场景: - kafka数据无备份,某台broker挂掉后,这台broker上的数据将“失落”且无奈对外提供服务
producer数据发送到broker后,数据在leader节点写入胜利即返回ack胜利,在leader向replica节点同步数据时,leader节点宕机。造成producer端认为数据生产胜利,而broker端数据“失落”
显然针对下面的场景,kafka都有相干的预案。上面就从生产数据安全、生产数据安全以及kafka本身存储数据安全三个场景来阐明下这个问题:
生产者数据安全性
producer的acks设置的批改,先看看acks的取值:
- acks为0:这意味着producer发送数据后,不会期待broker确认,间接发送下一条数据,性能最好然而有失落数据的危险
- acks为1:为1意味着producer发送数据后,须要期待leader正本确认接管后,才会发送下一条数据,性能稍差,然而安全性大大增高(还是有可能丢数据,上面broker丢数据场景具体解说)
- acks为-1:这个代表的是all,意味着发送的音讯写入所有的ISR汇合中的正本(留神不是全副正本)后,才会发送下一条数据,性能最差,但安全性最强,保障不会丢数据。
从下面的取值能够看出,producer端要是想不丢数据,acks设置要从0起码改成1,如果要求相对不能丢数据则须要设置成-1
另外,producer发送音讯的模式最好选用异步形式,能够晋升性能;并且通过回调函数的形式解决发送后的一些逻辑解决,如打印日志或者数据统计等。
消费者数据安全性
consumer呈现丢数据的问题次要还是enable.auto.commit这个配置项。这个配置项的配置为true就是kafka主动commit offset;而设置为false就是手动commit offset。
主动提交只有程序出问题,基本上就会呈现丢数据的问题。而手动提交如果解决不好,也会存在丢数据的问题,上面咱们就来剖析下这两种提交形式的区别,以及正确的应用办法。
主动提交
在Kafka 中默认的生产位移的提交形式是主动提交。这个由消费者客户端参数enable.auto.commit 配置,默认值为true。
这个默认的主动提交不是每生产一条音讯就提交一次,而是定期提交。这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒。另外此参数失效的前提是enable.auto.commit参数为true。
主动位移提交的动作是在poll()办法的逻辑里实现的,在每次真正向服务端发动拉取申请之前会查看是否能够进行位移提交,如果能够,那么就会提交上一轮生产的位移。
手动提交
手动提交分为同步提交(commitSync)和异步提交(commitAsync)两种形式。
同步提交最大的问题是数据是批量解决的时候,当局部数据实现生产,还没来得及提交offset就被中断,则会使得下次生产会反复生产那局部曾经生产过的数据。而且同步提交会阻塞线程,造成整体性能受影响。
异步提交不会阻塞线程,比起同步提交commitSync具备更好的性能。然而须要解决回调函数,也须要在某些极其场景下如rebalance操作时有相应的解决。
综上,惯例的做法就是数据生产解决流程中应用异步提交的形式commit offset;而消费者失常退出的场景,咱们能够应用,commitSync同步提交,保障offset的正确。例子如下:
try { while(isRunning.get()) { //poll records and do some data processing . consumer.commitAsync() ; }) finally { try { consumer.commitSync() ; ) finally { consumer.close() ;}}
而rebalance时offset无奈失常提交,导致数据反复生产,在这种场景下咱们须要在监听到rebalance产生之前进行一次offset提交。例子如下:
//currentOffsets须要保留该消费者生产的分区的最新的offset//本段代码中没有体现,能够在生产数据之后 进行更新该对象,并在rebalance之后清空该对象Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ;consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () { //产生在rebalance之前,并且消费者进行读取音讯的时候 @Override public void onPartitionsRevoked(Collection<TopicPart ition> partitions) { consume.commitSync(currentOffsets) ; currentOffsets.clear(); } @Override public void onPartitions Assigned(Collection<TopicPartition > partitions) { //do nothing . }});
kafka存储数据安全性
kafka数据因为是存储在硬盘上,所以kafka自身的安全性次要是数据的高可用性上,所以针对kafka数据的安全性次要是数据备份以及数据同步时候相干的策略配置。
首先是replication.factor配置参数,这个配置决定了正本的数量,默认是1。留神这个参数不能超过broker的数量。说这个参数其实是因为如果应用默认的1,或者不在创立topic的时候指定正本数量(也就是正本数为1),那么当一台机器呈现磁盘损坏或者服务宕机等状况,那么数据也就从kafka外面失落了。所以replication.factor这个参数最好是配置大于1,比如说3。
其次就是数据同步的时候导致的数据不可用。比方leader正本接管到数据,但还没同步给其余正本的时候就挂掉了,这时候数据也是失落了。针对这种场景能够通过配置producer端的acks设置为-1能够解决问题,然而这样的代价是会影响kafka的局部性能以及吞吐量。
最初kafka有一个配置参数,min.insync.replicas,默认是1(也就是只有leader,理论生产应该调高),该属性规定了最小的ISR数。这意味着当acks为-1(即all)的时候,这个参数规定了必须写入的ISR集中的正本数,如果没达到,那么producer会产生异样。这个参数的作用是leader挂了之后,kakfa能疾速从ISR中选出leader,最快速度复原服务。
幂等性
幂等性蕴含生产数据的幂等性以及生产数据的幂等性。上面就从这两个方面来阐明:
producer端的幂等性
幂等性producer
kafka从0.11版本当前开始反对producer端的幂等性,通过上面的配置项就能够实现幂等性producer的创立:
props.put("enable.idempotence", true)
开启幂等性的时候,acks就主动配置成“all”了,如果这时候手动将acks设置为0,程序那么会报错。
而幂等性的producer实现逻辑也比较简单,即Kafka减少了pid和seq。Producer中每个RecordBatch都有一个枯燥递增的seq; Broker上每个topic也会保护pid-seq的映射,并且每Commit都会更新lastSeq。这样recordBatch到来时,broker会先查看RecordBatch再保留数据:如果batch中 baseSeq(第一条音讯的seq)比Broker保护的序号(lastSeq)大1,则保留数据,否则不保留(inSequence办法)。
尽管幂等性的producer解决了局部问题,然而还是有两个次要缺点:
幂等性的producer仅做到单分区上的幂等性,即单分区音讯不反复,多分区无奈保障幂等性。
只能放弃单会话的幂等性,无奈实现跨会话的幂等性,也就是说如果producer挂掉再重启,无奈保障两个会话间的幂等(新会话可能会重发)。因为broker端无奈获取之前的状态信息,所以无奈实现跨会话的幂等。
针对这种状况,kafka提供了事务性的producer来解决上述问题。
事务性producer
kafka事务引入了transactionId和Epoch,设置开启事务后,一个transactionId只对应一个pid, 且Server端会记录最新的Epoch值。
这样有新的producer初始化时,会向TransactionCoordinator发送InitPIDRequest申请,TransactionCoordinator曾经有了这个transactionId对应的meta,会返回之前调配的 PID,并把Epoch自增 1返回,这样当old producer恢复过来申请操作时,将被认为是有效producer抛出异样。
如果没有开启事务,TransactionCoordinator会为新的producer返回new pid,这样就起不到隔离成果,因而无奈实现多会话幂等。
上面就是开启事务性producer的办法:
//初始化事务producer.initTransactions();try { //开启一个事务 producer.beginTransaction(); producer.send(record1); producer.send(record2); //提交 producer.commitTransaction();} catch (KafkaException e) { //出现异常的时候,终止事务 producer.abortTransaction();}
下面就是对于producer端幂等性的实现。然而无论开启幂等还是事务的个性,都会对性能有肯定影响,这是必然的。所以kafka默认也并没有开启这两个个性,大家也须要依据业务来衡量是否须要开启。
consumer端的幂等性
比拟遗憾的是,kafka目前没有保障consumer幂等生产的措施,如果的确须要保障consumer的幂等,能够对每条音讯维持一个全局的id,每次生产进行加锁去重。
至于消耗这么多的资源来实现consumer端的幂等性,进而实现kafka的exactly once的生产到底值不值,那就得根与业务进行衡量了。
上面一共一段伪代码:
if(cache.contain(msgId)){ // cache中蕴含msgId,曾经解决过 continue;}else { lock.lock(); cache.put(msgId,timeout); commitSync(); lock.unLock();}
// 后续实现所有操作后,删除cache中的msgId,只有msgId存在cache中,就认为曾经解决过。Note:须要给cache设置有音讯
有序性
首先kafka的有序性只能保障是单分区数据有序,无奈保障全局有序。
如果的确须要保障须要解决的数据有序,能够通过重写分区函数将须要程序解决的数据写入到同一个分区进行解决。分区选择器须要实现org.apache.kafka.clients.producer.Partitioner接口。而后通过配置项ProducerConfig.PARTITIONER_CLASS_CONFIG进行配置指定。
再回到单分区数据有序的实现形式。这其实是kafka幂等性实现的一个附带成果。新版本kafka设置enable.idempotence=true后可能动静调整max-in-flight-request。该参数指定了生产者在收到服务端响应之前能够发送多少个batch音讯,默认值为5。
当重试申请到来时,batch会依据seq从新增加到队列的适合地位,并把max.in.flight.requests.per.connection设为1,这样它后面的batch序号都比它小,只有后面的都发完了,它能力发。这样就在就义了局部性能和吞吐量的前提下,保障了数据的有序性。
最初路漫漫其修远兮,大数据之路还很漫长。如果想一起大数据的小伙伴,欢送点赞转发加关注,下次学习不迷路,咱们在大数据的路上共同前进!
本文由mdnice多平台公布