关于java:Kafka-如果丢了消息怎么处理的

3次阅读

共计 6722 个字符,预计需要花费 17 分钟才能阅读完成。

Kafka 存在丢音讯的问题,音讯失落会产生在 Broker,Producer 和 Consumer 三种。

Broker

Broker 失落音讯是因为 Kafka 自身的起因造成的,kafka 为了失去更高的性能和吞吐量,将数据异步批量的存储在磁盘中。音讯的刷盘过程,为了进步性能,缩小刷盘次数,kafka 采纳了批量刷盘的做法。即,依照肯定的音讯量,和工夫距离进行刷盘。这种机制也是因为 linux 操作系统决定的。将数据存储到 linux 操作系统种,会先存储到页缓存(Page cache)中,依照工夫或者其余条件进行刷盘(从 page cache 到 file),或者通过 fsync 命令强制刷盘。数据在 page cache 中时,如果零碎挂掉,数据会失落。

Broker 在 linux 服务器上高速读写以及同步到 Replica

上图简述了 broker 写数据以及同步的一个过程。broker 写数据只写到 PageCache 中,而 pageCache 位于内存。这部分数据在断电后是会失落的。pageCache 的数据通过 linux 的 flusher 程序进行刷盘。刷盘触发条件有三:

  • 被动调用 sync 或 fsync 函数
  • 可用内存低于阀值
  • dirty data 工夫达到阀值。dirty 是 pagecache 的一个标识位,当有数据写入到 pageCache 时,pagecache 被标注为 dirty,数据刷盘当前,dirty 标记革除。

Broker 配置刷盘机制,是通过调用 fsync 函数接管了刷盘动作。从单个 Broker 来看,pageCache 的数据会失落。

Kafka 没有提供同步刷盘的形式。同步刷盘在 RocketMQ 中有实现,实现原理是将异步刷盘的流程进行阻塞,期待响应,相似 ajax 的 callback 或者是 java 的 future。上面是一段 rocketmq 的源码。

GroupCommitRequest request = new  GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盘

也就是说,实践上,要齐全让 kafka 保障单个 broker 不失落音讯是做不到的,只能通过调整刷盘机制的参数缓解该状况。比方,缩小刷盘距离,缩小刷盘数据量大小。工夫越短,性能越差,可靠性越好(尽可能牢靠)。这是一个选择题。

为了解决该问题,kafka 通过 producer 和 broker 协同解决单个 broker 失落参数的状况。一旦 producer 发现 broker 音讯失落,即可主动进行 retry。除非 retry 次数超过阀值(可配置),音讯才会失落。此时须要生产者客户端手动解决该状况。那么 producer 是如何检测到数据失落的呢?是通过 ack 机制,相似于 http 的三次握手的形式。

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

http://kafka.apache.org/20/do…

以上的援用是 kafka 官网对于参数 acks 的解释(在老版本中,该参数是request.required.acks)。

  • acks=0,producer 不期待 broker 的响应,效率最高,然而音讯很可能会丢。
  • acks=1,leader broker 收到音讯后,不期待其余 follower 的响应,即返回 ack。也能够了解为 ack 数为 1。此时,如果 follower 还没有收到 leader 同步的音讯 leader 就挂了,那么音讯会失落。依照上图中的例子,如果 leader 收到音讯,胜利写入 PageCache 后,会返回 ack,此时 producer 认为音讯发送胜利。但此时,依照上图,数据还没有被同步到 follower。如果此时 leader 断电,数据会失落。
  • acks=-1,leader broker 收到音讯后,挂起,期待所有 ISR 列表中的 follower 返回后果后,再返回 ack。- 1 等效与 all。这种配置下,只有 leader 写入数据到 pagecache 是不会返回 ack 的,还须要所有的 ISR 返回“胜利”才会触发 ack。如果此时断电,producer 能够晓得音讯没有被发送胜利,将会从新发送。如果在 follower 收到数据当前,胜利返回 ack,leader 断电,数据将存在于原来的 follower 中。在从新选举当前,新的 leader 会持有该局部数据。数据从 leader 同步到 follower,须要 2 步:
    1. 数据从 pageCache 被刷盘到 disk。因为只有 disk 中的数据能力被同步到 replica。
    2. 数据同步到 replica,并且 replica 胜利将数据写入 PageCache。在 producer 失去 ack 后,哪怕是所有机器都停电,数据也至多会存在于 leader 的磁盘内。

下面第三点提到了 ISR 的列表的 follower,须要配合另一个参数能力更好的保障 ack 的有效性。ISR 是 Broker 保护的一个“牢靠的 follower 列表”,in-sync Replica 列表,broker 的配置蕴含一个参数:min.insync.replicas。该参数示意 ISR 中起码的正本数。如果不设置该值,ISR 中的 follower 列表可能为空。此时相当于 acks=1。

如上图中:

  • acks=0,总耗时 f(t) = f(1)。
  • acks=1,总耗时 f(t) = f(1) + f(2)。
  • acks=-1,总耗时 f(t) = f(1) + max(f(A) , f(B) ) + f(2)。

性能顺次递加,可靠性顺次升高。

Producer

Producer 失落音讯,产生在生产者客户端。

为了晋升效率,缩小 IO,producer 在发送数据时能够将多个申请进行合并后发送。被合并的申请咋发送一线缓存在本地 buffer 中。缓存的形式和前文提到的刷盘相似,producer 能够将申请打包成“块”或者依照工夫距离,将 buffer 中的数据收回。通过 buffer 咱们能够将生产者革新为异步的形式,而这能够晋升咱们的发送效率。

然而,buffer 中的数据就是危险的。在失常状况下,客户端的异步调用能够通过 callback 来解决音讯发送失败或者超时的状况,然而,一旦 producer 被非法的进行了,那么 buffer 中的数据将失落,broker 将无奈收到该局部数据。又或者,当 Producer 客户端内存不够时,如果采取的策略是抛弃音讯(另一种策略是 block 阻塞),音讯也会被失落。抑或,音讯产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序解体,音讯失落。

producer

依据上图,能够想到几个解决的思路:

  • 异步发送音讯改为同步发送消。或者 service 产生音讯时,应用阻塞的线程池,并且线程数有肯定下限。整体思路是管制音讯产生速度。
  • 扩充 Buffer 的容量配置。这种形式能够缓解该状况的呈现,但不能杜绝。
  • service 不间接将音讯发送到 buffer(内存),而是将音讯写到本地的磁盘中(数据库或者文件),由另一个(或大量)生产线程进行音讯发送。相当于是在 buffer 和 service 之间又加了一层空间更加富裕的缓冲层。

Consumer

Consumer 生产音讯有上面几个步骤:

  1. 接管音讯
  2. 解决音讯
  3. 反馈“处理完毕”(commited)

Consumer 的生产形式次要分为两种:

  • 主动提交 offset,Automatic Offset Committing
  • 手动提交 offset,Manual Offset Control

Consumer 主动提交的机制是依据肯定的工夫距离,将收到的音讯进行 commit。commit 过程和生产音讯的过程是异步的。也就是说,可能存在生产过程未胜利(比方抛出异样),commit 音讯曾经提交了。此时音讯就失落了。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 主动提交开关
props.put("enable.auto.commit", "true");
// 主动提交的工夫距离,此处是 1s
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
        // 调用 poll 后,1000ms 后,音讯状态会被改为 committed
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records)
  insertIntoDB(record); // 将音讯入库,工夫可能会超过 1000ms
}

下面的示例是主动提交的例子。如果此时,insertIntoDB(record)产生异样,音讯将会呈现失落。接下来是手动提交的例子:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 敞开主动提交,改为手动提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
        // 调用 poll 后,不会进行 auto commit
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records) {buffer.add(record);
 }
 if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);
                // 所有音讯生产结束当前,才进行 commit 操作
  consumer.commitSync();
  buffer.clear();}
}

将提交类型改为手动当前,能够保障音讯“至多被生产一次”(at least once)。但此时可能呈现反复生产的状况,反复生产不属于本篇探讨范畴。

下面两个例子,是间接应用 Consumer 的 High level API,客户端对于 offset 等管制是通明的。也能够采纳 Low level API 的形式,手动管制 offset,也能够保障音讯不丢,不过会更加简单。

 try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ":" + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             // 准确管制 offset
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {consumer.close();
 }

起源:https://blog.dogchao.cn/?p=305

近期热文举荐:

1.Java 15 正式公布,14 个新个性,刷新你的认知!!

2. 终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!

3. 我用 Java 8 写了一段逻辑,共事直呼看不懂,你试试看。。

4. 吊打 Tomcat,Undertow 性能很炸!!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0