Kafka存在丢音讯的问题,音讯失落会产生在Broker,Producer和Consumer三种。

Broker

Broker失落音讯是因为Kafka自身的起因造成的,kafka为了失去更高的性能和吞吐量,将数据异步批量的存储在磁盘中。音讯的刷盘过程,为了进步性能,缩小刷盘次数,kafka采纳了批量刷盘的做法。即,依照肯定的音讯量,和工夫距离进行刷盘。这种机制也是因为linux操作系统决定的。将数据存储到linux操作系统种,会先存储到页缓存(Page cache)中,依照工夫或者其余条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache中时,如果零碎挂掉,数据会失落。

Broker在linux服务器上高速读写以及同步到Replica

上图简述了broker写数据以及同步的一个过程。broker写数据只写到PageCache中,而pageCache位于内存。这部分数据在断电后是会失落的。pageCache的数据通过linux的flusher程序进行刷盘。刷盘触发条件有三:

  • 被动调用sync或fsync函数
  • 可用内存低于阀值
  • dirty data工夫达到阀值。dirty是pagecache的一个标识位,当有数据写入到pageCache时,pagecache被标注为dirty,数据刷盘当前,dirty标记革除。

Broker配置刷盘机制,是通过调用fsync函数接管了刷盘动作。从单个Broker来看,pageCache的数据会失落。

Kafka没有提供同步刷盘的形式。同步刷盘在RocketMQ中有实现,实现原理是将异步刷盘的流程进行阻塞,期待响应,相似ajax的callback或者是java的future。上面是一段rocketmq的源码。

GroupCommitRequest request = new  GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盘

也就是说,实践上,要齐全让kafka保障单个broker不失落音讯是做不到的,只能通过调整刷盘机制的参数缓解该状况。比方,缩小刷盘距离,缩小刷盘数据量大小。工夫越短,性能越差,可靠性越好(尽可能牢靠)。这是一个选择题。

为了解决该问题,kafka通过producer和broker协同解决单个broker失落参数的状况。一旦producer发现broker音讯失落,即可主动进行retry。除非retry次数超过阀值(可配置),音讯才会失落。此时须要生产者客户端手动解决该状况。那么producer是如何检测到数据失落的呢?是通过ack机制,相似于http的三次握手的形式。

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

http://kafka.apache.org/20/do...

以上的援用是kafka官网对于参数acks的解释(在老版本中,该参数是request.required.acks)。

  • acks=0,producer不期待broker的响应,效率最高,然而音讯很可能会丢。
  • acks=1,leader broker收到音讯后,不期待其余follower的响应,即返回ack。也能够了解为ack数为1。此时,如果follower还没有收到leader同步的音讯leader就挂了,那么音讯会失落。依照上图中的例子,如果leader收到音讯,胜利写入PageCache后,会返回ack,此时producer认为音讯发送胜利。但此时,依照上图,数据还没有被同步到follower。如果此时leader断电,数据会失落。
  • acks=-1,leader broker收到音讯后,挂起,期待所有ISR列表中的follower返回后果后,再返回ack。-1等效与all。这种配置下,只有leader写入数据到pagecache是不会返回ack的,还须要所有的ISR返回“胜利”才会触发ack。如果此时断电,producer能够晓得音讯没有被发送胜利,将会从新发送。如果在follower收到数据当前,胜利返回ack,leader断电,数据将存在于原来的follower中。在从新选举当前,新的leader会持有该局部数据。数据从leader同步到follower,须要2步:
    1. 数据从pageCache被刷盘到disk。因为只有disk中的数据能力被同步到replica。
    2. 数据同步到replica,并且replica胜利将数据写入PageCache。在producer失去ack后,哪怕是所有机器都停电,数据也至多会存在于leader的磁盘内。

下面第三点提到了ISR的列表的follower,须要配合另一个参数能力更好的保障ack的有效性。ISR是Broker保护的一个“牢靠的follower列表”,in-sync Replica列表,broker的配置蕴含一个参数:min.insync.replicas。该参数示意ISR中起码的正本数。如果不设置该值,ISR中的follower列表可能为空。此时相当于acks=1。

如上图中:

  • acks=0,总耗时f(t) = f(1)。
  • acks=1,总耗时f(t) = f(1) + f(2)。
  • acks=-1,总耗时f(t) = f(1) + max( f(A) , f(B) ) + f(2)。

性能顺次递加,可靠性顺次升高。

Producer

Producer失落音讯,产生在生产者客户端。

为了晋升效率,缩小IO,producer在发送数据时能够将多个申请进行合并后发送。被合并的申请咋发送一线缓存在本地buffer中。缓存的形式和前文提到的刷盘相似,producer能够将申请打包成“块”或者依照工夫距离,将buffer中的数据收回。通过buffer咱们能够将生产者革新为异步的形式,而这能够晋升咱们的发送效率。

然而,buffer中的数据就是危险的。在失常状况下,客户端的异步调用能够通过callback来解决音讯发送失败或者超时的状况,然而,一旦producer被非法的进行了,那么buffer中的数据将失落,broker将无奈收到该局部数据。又或者,当Producer客户端内存不够时,如果采取的策略是抛弃音讯(另一种策略是block阻塞),音讯也会被失落。抑或,音讯产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序解体,音讯失落。

producer

依据上图,能够想到几个解决的思路:

  • 异步发送音讯改为同步发送消。或者service产生音讯时,应用阻塞的线程池,并且线程数有肯定下限。整体思路是管制音讯产生速度。
  • 扩充Buffer的容量配置。这种形式能够缓解该状况的呈现,但不能杜绝。
  • service不间接将音讯发送到buffer(内存),而是将音讯写到本地的磁盘中(数据库或者文件),由另一个(或大量)生产线程进行音讯发送。相当于是在buffer和service之间又加了一层空间更加富裕的缓冲层。

Consumer

Consumer生产音讯有上面几个步骤:

  1. 接管音讯
  2. 解决音讯
  3. 反馈“处理完毕”(commited)

Consumer的生产形式次要分为两种:

  • 主动提交offset,Automatic Offset Committing
  • 手动提交offset,Manual Offset Control

Consumer主动提交的机制是依据肯定的工夫距离,将收到的音讯进行commit。commit过程和生产音讯的过程是异步的。也就是说,可能存在生产过程未胜利(比方抛出异样),commit音讯曾经提交了。此时音讯就失落了。

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");// 主动提交开关props.put("enable.auto.commit", "true");// 主动提交的工夫距离,此处是1sprops.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {        // 调用poll后,1000ms后,音讯状态会被改为 committed ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records)  insertIntoDB(record); // 将音讯入库,工夫可能会超过1000ms}

下面的示例是主动提交的例子。如果此时,insertIntoDB(record)产生异样,音讯将会呈现失落。接下来是手动提交的例子:

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");// 敞开主动提交,改为手动提交props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));final int minBatchSize = 200;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {        // 调用poll后,不会进行auto commit ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) {  buffer.add(record); } if (buffer.size() >= minBatchSize) {  insertIntoDb(buffer);                // 所有音讯生产结束当前,才进行commit操作  consumer.commitSync();  buffer.clear(); }}

将提交类型改为手动当前,能够保障音讯“至多被生产一次”(at least once)。但此时可能呈现反复生产的状况,反复生产不属于本篇探讨范畴。

下面两个例子,是间接应用Consumer的High level API,客户端对于offset等管制是通明的。也能够采纳Low level API的形式,手动管制offset,也能够保障音讯不丢,不过会更加简单。

 try {     while(running) {         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);         for (TopicPartition partition : records.partitions()) {             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);             for (ConsumerRecord<String, String> record : partitionRecords) {                 System.out.println(record.offset() + ": " + record.value());             }             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();             // 准确管制offset             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));         }     } } finally {   consumer.close(); }

起源:https://blog.dogchao.cn/?p=305

近期热文举荐:

1.Java 15 正式公布, 14 个新个性,刷新你的认知!!

2.终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!

3.我用 Java 8 写了一段逻辑,共事直呼看不懂,你试试看。。

4.吊打 Tomcat ,Undertow 性能很炸!!

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!