关于后端:Kafaka丢消息吗

32次阅读

共计 6105 个字符,预计需要花费 16 分钟才能阅读完成。

名词解释
置信做过数据处理的小伙伴们对于 kafka 必定是相熟的。根底的 kafka 常识这里就不过多陈说了。明天次要来讲一下 kafka 的几个个性,上面先简略解释下这几个个性的含意:

安全性:

数据从 producer 中写入到 kafka 以及 consumer 从 topic 中生产数据,数据都不会失落。

幂等性:

数据在 kafka 的流程中既不会被从新生产,也不会被反复生产。

这也是实现 exactly-once 语义的根底。

有序性:

因为 kafka 是程序生产,所以 kafka 的有序性次要体现在生产者写入 kafka 时数据的有序性。

应用场景
介绍完这几个定义后,上面来看看这几个个性的应用场景:

安全性

安全性这里就不多说了,毕竟作为一个消息中间件,数据失落是万万不可承受的。所以数据安全性的保障是 kafka 可能应用的根底。

幂等性

幂等性对于某些业务可能意义不是很大,然而对于一些业务却是非常重要的。就拿资金结算场景来说,业务方作为上游把数据打到结算平台,如果一份数据被计算、解决了屡次,产生的结果将会特地重大,到时候损失的就是真金白银了,而且很多其余业务也可能受到影响。

此外 kafka 的幂等性也是实现 kafka exactly-once 语义的根底,对于 kafka 来说还是很重要的。

有序性

有序性在数据有更新以及某些依赖数据程序的业务场景意义很大,因为数据乱序造成的数据净化或者业务出错显然是不可承受的。

家喻户晓,kafka 是程序读取数据的,所以 kafka 的有序性取决于生产者的生产数据的有序性。

性能实现

在理解了这几个个性的应用场景后,上面来看一下 kafka 是如何实现这些个性的:

安全性

kafka 的数据在绝大多数状况下平安的,然而某些极其的状况还是可能导致 kafka 丢数据的。比方上面这些状况:

上面具体说下这些场景:

producer 在生产数据和 receive ack 时会遇到上面的场景:

  • 发送数据后,疏忽 ack 间接确认胜利:即 kafka 的 at most once 语义。生产者认为数据生产胜利,然而 broker 数据处理失败。造成数据失落
    consumer 在生产数据和 commit offset 时会遇到上面的场景:
  • 先 commit offset,再执行业务逻辑:commit 胜利,数据处理失败。造成数据失落
    先执行业务逻辑,再 commit offset:commit 胜利,异步数据处理失败。造成数据失落
    还有一种状况就是 kafka 本身数据的安全性,在某些节点下线后,仍能对外提供服务,保证数据不失落。然而 kafka broker 本身的问题也会造成数据“失落”,这里的失落代表的意思是无奈对外提供服务,毕竟数据写入 kafka 后存储在磁盘上。思考下上面的场景:
  • kafka 数据无备份,某台 broker 挂掉后,这台 broker 上的数据将“失落”且无奈对外提供服务
    producer 数据发送到 broker 后,数据在 leader 节点写入胜利即返回 ack 胜利,在 leader 向 replica 节点同步数据时,leader 节点宕机。造成 producer 端认为数据生产胜利,而 broker 端数据“失落”

显然针对下面的场景,kafka 都有相干的预案。上面就从生产数据安全、生产数据安全以及 kafka 本身存储数据安全三个场景来阐明下这个问题:

生产者数据安全性

producer 的 acks 设置的批改,先看看 acks 的取值:

  • acks 为 0:这意味着 producer 发送数据后,不会期待 broker 确认,间接发送下一条数据,性能最好然而有失落数据的危险
  • acks 为 1:为 1 意味着 producer 发送数据后,须要期待 leader 正本确认接管后,才会发送下一条数据,性能稍差,然而安全性大大增高(还是有可能丢数据,上面 broker 丢数据场景具体解说)
  • acks 为 -1:这个代表的是 all,意味着发送的音讯写入所有的 ISR 汇合中的正本(留神不是全副正本)后,才会发送下一条数据,性能最差,但安全性最强,保障不会丢数据。
    从下面的取值能够看出,producer 端要是想不丢数据,acks 设置要从 0 起码改成 1,如果要求相对不能丢数据则须要设置成 -1

另外,producer 发送音讯的模式最好选用异步形式,能够晋升性能;并且通过回调函数的形式解决发送后的一些逻辑解决,如打印日志或者数据统计等。

消费者数据安全性

consumer 呈现丢数据的问题次要还是 enable.auto.commit 这个配置项。这个配置项的配置为 true 就是 kafka 主动 commit offset;而设置为 false 就是手动 commit offset。

主动提交只有程序出问题,基本上就会呈现丢数据的问题。而手动提交如果解决不好,也会存在丢数据的问题,上面咱们就来剖析下这两种提交形式的区别,以及正确的应用办法。

主动提交

在 Kafka 中默认的生产位移的提交形式是主动提交。这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true。
这个默认的主动提交不是每生产一条音讯就提交一次,而是定期提交。这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒。另外此参数失效的前提是 enable.auto.commit 参数为 true。
主动位移提交的动作是在 poll()办法的逻辑里实现的,在每次真正向服务端发动拉取申请之前会查看是否能够进行位移提交,如果能够,那么就会提交上一轮生产的位移。
手动提交

手动提交分为同步提交 (commitSync) 和异步提交 (commitAsync) 两种形式。

同步提交最大的问题是数据是批量解决的时候,当局部数据实现生产,还没来得及提交 offset 就被中断,则会使得下次生产会反复生产那局部曾经生产过的数据。而且同步提交会阻塞线程,造成整体性能受影响。

异步提交不会阻塞线程,比起同步提交 commitSync 具备更好的性能。然而须要解决回调函数,也须要在某些极其场景下如 rebalance 操作时有相应的解决。

综上,惯例的做法就是数据生产解决流程中应用异步提交的形式 commit offset;而消费者失常退出的场景,咱们能够应用,commitSync 同步提交,保障 offset 的正确。例子如下:

try {while(isRunning.get()) {
        //poll records and do some data processing .
        consumer.commitAsync() ;}
) finally {
    try {consumer.commitSync() ;
    ) finally {consumer.close() ;
}}

而 rebalance 时 offset 无奈失常提交,导致数据反复生产,在这种场景下咱们须要在监听到 rebalance 产生之前进行一次 offset 提交。例子如下:

//currentOffsets 须要保留该消费者生产的分区的最新的 offset
// 本段代码中没有体现,能够在生产数据之后 进行更新该对象,并在 rebalance 之后清空该对象
Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ;

consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () {
    // 产生在 rebalance 之前,并且消费者进行读取音讯的时候
    @Override
    public void onPartitionsRevoked(Collection<TopicPart ition> partitions) {consume.commitSync(currentOffsets) ;
        currentOffsets.clear();}

    @Override
     public void onPartitions Assigned(Collection<TopicPartition > partitions) {//do nothing .}
});

kafka 存储数据安全性

kafka 数据因为是存储在硬盘上,所以 kafka 自身的安全性次要是数据的高可用性上,所以针对 kafka 数据的安全性次要是数据备份以及数据同步时候相干的策略配置。

首先是 replication.factor 配置参数,这个配置决定了正本的数量,默认是 1。留神这个参数不能超过 broker 的数量。说这个参数其实是因为如果应用默认的 1,或者不在创立 topic 的时候指定正本数量(也就是正本数为 1),那么当一台机器呈现磁盘损坏或者服务宕机等状况,那么数据也就从 kafka 外面失落了。所以 replication.factor 这个参数最好是配置大于 1,比如说 3。
其次就是数据同步的时候导致的数据不可用。比方 leader 正本接管到数据,但还没同步给其余正本的时候就挂掉了,这时候数据也是失落了。针对这种场景能够通过配置 producer 端的 acks 设置为 - 1 能够解决问题,然而这样的代价是会影响 kafka 的局部性能以及吞吐量。
最初 kafka 有一个配置参数,min.insync.replicas,默认是 1(也就是只有 leader,理论生产应该调高),该属性规定了最小的 ISR 数。这意味着当 acks 为 -1(即 all)的时候,这个参数规定了必须写入的 ISR 集中的正本数,如果没达到,那么 producer 会产生异样。这个参数的作用是 leader 挂了之后,kakfa 能疾速从 ISR 中选出 leader,最快速度复原服务。

幂等性

幂等性蕴含生产数据的幂等性以及生产数据的幂等性。上面就从这两个方面来阐明:

producer 端的幂等性

幂等性 producer
kafka 从 0.11 版本当前开始反对 producer 端的幂等性,通过上面的配置项就能够实现幂等性 producer 的创立:

props.put("enable.idempotence", true)

开启幂等性的时候,acks 就主动配置成“all”了,如果这时候手动将 acks 设置为 0,程序那么会报错。

而幂等性的 producer 实现逻辑也比较简单,即 Kafka 减少了 pid 和 seq。Producer 中每个 RecordBatch 都有一个枯燥递增的 seq; Broker 上每个 topic 也会保护 pid-seq 的映射,并且每 Commit 都会更新 lastSeq。这样 recordBatch 到来时,broker 会先查看 RecordBatch 再保留数据:如果 batch 中 baseSeq(第一条音讯的 seq)比 Broker 保护的序号 (lastSeq) 大 1,则保留数据,否则不保留(inSequence 办法)。

尽管幂等性的 producer 解决了局部问题,然而还是有两个次要缺点:

幂等性的 producer 仅做到单分区上的幂等性,即单分区音讯不反复,多分区无奈保障幂等性。
只能放弃单会话的幂等性,无奈实现跨会话的幂等性,也就是说如果 producer 挂掉再重启,无奈保障两个会话间的幂等(新会话可能会重发)。因为 broker 端无奈获取之前的状态信息,所以无奈实现跨会话的幂等。
针对这种状况,kafka 提供了事务性的 producer 来解决上述问题。

事务性 producer
kafka 事务引入了 transactionId 和 Epoch,设置开启事务后,一个 transactionId 只对应一个 pid, 且 Server 端会记录最新的 Epoch 值。

这样有新的 producer 初始化时,会向 TransactionCoordinator 发送 InitPIDRequest 申请,TransactionCoordinator 曾经有了这个 transactionId 对应的 meta,会返回之前调配的 PID,并把 Epoch 自增 1 返回,这样当 old producer 恢复过来申请操作时,将被认为是有效 producer 抛出异样。

如果没有开启事务,TransactionCoordinator 会为新的 producer 返回 new pid,这样就起不到隔离成果,因而无奈实现多会话幂等。

上面就是开启事务性 producer 的办法:

// 初始化事务
producer.initTransactions();
try {
    // 开启一个事务
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    // 提交
    producer.commitTransaction();} catch (KafkaException e) {
    // 出现异常的时候,终止事务
    producer.abortTransaction();}

下面就是对于 producer 端幂等性的实现。然而无论开启幂等还是事务的个性,都会对性能有肯定影响,这是必然的。所以 kafka 默认也并没有开启这两个个性,大家也须要依据业务来衡量是否须要开启。

consumer 端的幂等性

比拟遗憾的是,kafka 目前没有保障 consumer 幂等生产的措施,如果的确须要保障 consumer 的幂等,能够对每条音讯维持一个全局的 id,每次生产进行加锁去重。

至于消耗这么多的资源来实现 consumer 端的幂等性,进而实现 kafka 的 exactly once 的生产到底值不值,那就得根与业务进行衡量了。

上面一共一段伪代码:

if(cache.contain(msgId)){
  // cache 中蕴含 msgId,曾经解决过
  continue;
}else {lock.lock();
  cache.put(msgId,timeout);
  commitSync();
  lock.unLock();}

// 后续实现所有操作后,删除 cache 中的 msgId,只有 msgId 存在 cache 中,就认为曾经解决过。Note:须要给 cache 设置有音讯

有序性

首先 kafka 的有序性只能保障是单分区数据有序,无奈保障全局有序。

如果的确须要保障须要解决的数据有序,能够通过重写分区函数将须要程序解决的数据写入到同一个分区进行解决。分区选择器须要实现 org.apache.kafka.clients.producer.Partitioner 接口。而后通过配置项 ProducerConfig.PARTITIONER_CLASS_CONFIG 进行配置指定。

再回到单分区数据有序的实现形式。这其实是 kafka 幂等性实现的一个附带成果。新版本 kafka 设置 enable.idempotence=true 后可能动静调整 max-in-flight-request。该参数指定了生产者在收到服务端响应之前能够发送多少个 batch 音讯,默认值为 5。

当重试申请到来时,batch 会依据 seq 从新增加到队列的适合地位,并把 max.in.flight.requests.per.connection 设为 1,这样它后面的 batch 序号都比它小,只有后面的都发完了,它能力发。这样就在就义了局部性能和吞吐量的前提下,保障了数据的有序性。

最初路漫漫其修远兮,大数据之路还很漫长。如果想一起大数据的小伙伴,欢送点赞转发加关注,下次学习不迷路,咱们在大数据的路上共同前进!

本文由 mdnice 多平台公布

正文完
 0