关于前端:Kafka的Lag计算误区及正确实现

46次阅读

共计 6353 个字符,预计需要花费 16 分钟才能阅读完成。

前言

音讯沉积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等性能正是得益于消息中间件的音讯沉积能力。然而音讯沉积其实是一把亦正亦邪的双刃剑,如果利用场合不失当反而会对上下游的业务造成不必要的麻烦,比方音讯沉积势必会影响上下游整个调用链的时效性,有些中间件如 RabbitMQ 在产生音讯沉积时在某些状况下还会影响本身的性能。对于 Kafka 而言,尽管音讯沉积不会对其本身性能带来多大的困扰,但不免不会影响上下游的业务,沉积过多有可能会造成磁盘爆满,或者触发日志革除策略而造成音讯失落的状况。如何利用好消息沉积这把双刃剑,监控是最为要害的一步。

注释

音讯沉积是生产滞后 (Lag) 的一种表现形式,消息中间件服务端中所留存的音讯与生产掉的音讯之间的差值即为音讯沉积量,也称之为生产滞后 (Lag) 量。对于 Kafka 而言,音讯被发送至 Topic 中,而 Topic 又分成了多个分区(Partition),每一个 Partition 都有一个预写式的日志文件,尽管 Partition 能够持续细分为若干个段文件(Segment),然而对于下层利用来说能够将 Partition 看成最小的存储单元(一个由多个 Segment 文件拼接的“巨型文件”)。每个 Partition 都由一系列有序的、不可变的音讯组成,这些音讯被间断的追加到 Partition 中。咱们来看下图,其就是 Partition 的一个真实写照:

上图中有四个概念:

  1. LogStartOffset:示意一个 Partition 的起始位移,初始为 0,尽管音讯的减少以及日志革除策略的影响,这个值会阶段性的增大。
  2. ConsumerOffset:生产位移,示意 Partition 的某个消费者生产到的位移地位。
  3. HighWatermark:简称 HW,代表生产端所能“察看”到的 Partition 的最高日志位移,HW 大于等于 ConsumerOffset 的值。
  4. LogEndOffset:简称 LEO, 代表 Partition 的最高日志位移,其值对消费者不可见。比方在 ISR(In-Sync-Replicas)正本数等于 3 的状况下(如下图所示),音讯发送到 Leader A 之后会更新 LEO 的值,Follower B 和 Follower C 也会实时拉取 Leader A 中的音讯来更新本人,HW 就示意 A、B、C 三者同时达到的日志位移,也就是 A、B、C 三者中 LEO 最小的那个值。因为 B、C 拉取 A 音讯之间延时问题,所以 HW 必然不会始终与 Leader 的 LEO 相等,即 LEO>=HW。

要计算 Kafka 中某个消费者的滞后量很简略,首先看看其生产了几个 Topic,而后针对每个 Topic 来计算其中每个 Partition 的 Lag,每个 Partition 的 Lag 计算就显得十分的简略了,参考下图:

由图可知生产 Lag=HW – ConsumerOffset。对于这里大家有可能有个误区,就是认为 Lag 应该是 LEO 与 ConsumerOffset 之间的差值,笔者在这之前也犯过这样的谬误认知,具体能够参考《如何应用 JMX 监控 Kafka》。LEO 是对消费者不可见的,既然不可见何来生产滞后一说。

那么这里就引入了一个新的问题,HW 和 ConsumerOffset 的值如何获取呢?

首先来说说 ConsumerOffset,Kafka 中有两处能够存储,一个是 Zookeeper,而另一个是 ”__consumer_offsets 这个外部 topic 中,前者是 0.8.x 版本中的应用形式,然而随着版本的迭代更新,当初越来越趋向于后者。就拿 1.0.0 版本来说,尽管默认是存储在 ”__consumer_offsets” 中,然而保不齐用于就将其存储在了 Zookeeper 中了。这个问题倒也不难解决,针对两种形式都去拉取,而后哪个有值的取哪个。不过这里还有一个问题,对于生产位移来说,其个别不会实时的更新,而更多的是定时更新,这样能够进步整体的性能。那么这个定时的工夫距离就是 ConsumerOffset 的误差区间之一。

再来说说 HW,其也是 Kafka 中 Partition 的一个状态。有可能你会察觉到在 Kafka 的 JMX 中能够看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”这样一个属性,然而这个值不是 LEO 而是 HW。

那么怎么正确的计算生产的 Lag 呢?对 Kafka 相熟的同学可能会想到 Kafka 中自带的 kafka-consumer_groups.sh 脚本中就有 Lag 的信息,示例如下:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                   CLIENT-ID
topic-test1          0          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          1          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          2          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          3          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID

咱们深究一下 kafka-consumer_groups.sh 脚本,发现只有一句代码:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

其含意就是执行 kafka.admin.ConsumerGroupCommand 而已。进一步深究,在 ConsumerGroupCommand 外部抓住了 2 句要害代码:

val consumerGroupService = new KafkaConsumerGroupService(opts)
val (state, assignments) = consumerGroupService.describeGroup()

代码详解:consumerGroupService 的类型是 ConsumerGroupServicesealed trait 类型),而 KafkaConsumerGroupService 只是 ConsumerGroupService 的一种实现,还有一种实现是 ZkConsumerGroupService,别离对应新版的生产形式(生产位移存储在__consumer_offsets 中)和旧版的生产形式(生产位移存储在 zk 中),具体计算步骤参考下一段落的内容。opt 参数是指“–describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID”等参数。第 2 句代码是调用 describeGroup()办法来获取具体的信息,即二元组中的 assignments,这个 assignments 中保留了下面打印信息中的所有内容。

Scala 小常识:
在 Scala 中 trait(特色)相当于 Java 的接口,实际上它比接口更大弱小。与 Java 中的接口不同的是,它还能够定义属性和办法的实现(JDK8 起的接口默认办法)。个别状况下 Scala 中的类只能继承繁多父类,然而如果是 trait 的话就能够继承多个,从后果来看是实现了多重继承。被 sealed 申明的 trait 仅能被同一文件的类继承。

ZkConsumerGroupService 中计算生产 lag 的步骤如下:

  1. 通过 zk 获取一些根本信息,对应下面打印信息中的:TOPIC、PARTITION、CONSUMER-ID 等,不过不会有 HOST 和 CLIENT-ID。
  2. 通过 OffsetFetchRequest 申请获取生产位移(offset),如果获取失败则在通过 zk 获取。
  3. 通过 OffsetReuqest 申请获取分区的 LogEndOffset(简称为 LEO,可见的 LEO)。
  4. 计算 LogEndOffset 与生产位移的差值来获取 lag。

KafkaConsumerGroupService 中计算生产 lag 的步骤如下:

  1. 通过 DescibeGroupsRequest 申请获取一些根本信息,不仅包含 TOPIC、PARTITION、CONSUMER-ID,还有 HOST 和 CLIENT-ID。其实还有通过
    FindCoordinatorRequest 申请来获取 coordinator 信息,如果不理解 coordinator 在这里也没影响。
  2. 通过 OffsetFetchRequest 申请获取生产位移。
  3. 通过 OffsetReuqest 申请获取分区的 LogEndOffset(简称为 LEO)。
  4. 计算 LogEndOffset 与生产位移的差值来获取 lag。

能够看到 KafkaConsumerGroupService 与 ZkConsumerGroupService 的计算 Lag 的形式都差不多,然而 KafkaConsumerGroupService 能获取更多生产详情,并且 ZkConsumerGroupService 也被标注为 @Deprecated 的了,前面内容都针对 KafkaConsumerGroupService 来做阐明。既然 Kafka 曾经为咱们提供了线程的办法来获取 Lag,那么咱们有何必再反复造轮子,这里笔者写了一个调用的 KafkaConsumerGroupService 的示例(KafkaConsumerGroupService 是应用 Scala 语言编写的,在 Java 的程序里应用相似 scala.collection.Seq 这样的全名称以避免混同):

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions opts =
        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
        new ConsumerGroupCommand.KafkaConsumerGroupService(opts);
scala.Tuple2<scala.Option<String>, scala.Option<scala.collection.Seq<ConsumerGroupCommand
        .PartitionAssignmentState>>> res = kafkaConsumerGroupService.describeGroup();
scala.collection.Seq<ConsumerGroupCommand.PartitionAssignmentState> pasSeq = res._2.get();
scala.collection.Iterator<ConsumerGroupCommand.PartitionAssignmentState> iterable = pasSeq.iterator();
while (iterable.hasNext()) {ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();
    System.out.println(String.format("n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",
            pas.topic().get(), pas.partition().get(), pas.offset().get(),
            pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),
            pas.host().get(), pas.clientId().get()));
}

在应用时,你能够封装一下这段代码而后返回一个相似 List<ConsumerGroupCommand.PartitionAssignmentState> 的货色给下层业务代码做进一步的应用。ConsumerGroupCommand.PartitionAssignmentState 的代码如下:

 case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String],
    partition: Option[Int], offset: Option[Long], lag: Option[Long],
    consumerId: Option[String], host: Option[String],
    clientId: Option[String], logEndOffset: Option[Long])

Scala 小常识:
对于 case class, 在这里你能够简略的把它看成是一个 JavaBean,然而它远比 JavaBean 弱小,比方它会主动生成 equals、hashCode、toString、copy、伴生对象、apply、unapply 等等货色。在 scala 中,对爱护(Protected)成员的拜访比 java 更严格一些。因为它只容许爱护成员在定义了该成员的的类的子类中被拜访。而在 java 中,用 protected 关键字润饰的成员,除了定义了该成员的类的子类能够拜访,同一个包里的其余类也能够进行拜访。Scala 中,如果没有指定任何的修饰符,则默认为 public。这样的成员在任何中央都能够被拜访。

如果你正在试着运行下面一段程序,你会发现编译失败,报错:cannot access‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’in ‘kafka.admin.ConsumerGroupCommand‘。这时候须要将所引入的 kafka.core 包中的 kafka.admin.ConsumerGroupCommand 中的 PartitionAssignmentState 类后面的 protected 修饰符去掉能力编译通过。

现实情况下你可能并不想变更 kafka-core 的代码而后再从新打包,而是寻求间接可能调用的货色,至于到底怎么实现将会在下一篇文章中介绍,如果你比拟猴急,能够先预览一下代码的实现,具体参见:https://github.com/hiddenzzh/…。具体的逻辑解析敬请期待…

欢送跳转到本文的原文链接:https://honeypps.com/mq/myth-and-correct-implementation-of-kafka-lag/


参考链接:https://blog.csdn.net/u013256…

正文完
 0