前言

音讯沉积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等性能正是得益于消息中间件的音讯沉积能力。然而音讯沉积其实是一把亦正亦邪的双刃剑,如果利用场合不失当反而会对上下游的业务造成不必要的麻烦,比方音讯沉积势必会影响上下游整个调用链的时效性,有些中间件如RabbitMQ在产生音讯沉积时在某些状况下还会影响本身的性能。对于Kafka而言,尽管音讯沉积不会对其本身性能带来多大的困扰,但不免不会影响上下游的业务,沉积过多有可能会造成磁盘爆满,或者触发日志革除策略而造成音讯失落的状况。如何利用好消息沉积这把双刃剑,监控是最为要害的一步。

注释

音讯沉积是生产滞后(Lag)的一种表现形式,消息中间件服务端中所留存的音讯与生产掉的音讯之间的差值即为音讯沉积量,也称之为生产滞后(Lag)量。对于Kafka而言,音讯被发送至Topic中,而Topic又分成了多个分区(Partition),每一个Partition都有一个预写式的日志文件,尽管Partition能够持续细分为若干个段文件(Segment),然而对于下层利用来说能够将Partition看成最小的存储单元(一个由多个Segment文件拼接的“巨型文件”)。每个Partition都由一系列有序的、不可变的音讯组成,这些音讯被间断的追加到Partition中。咱们来看下图,其就是Partition的一个真实写照:

上图中有四个概念:

  1. LogStartOffset:示意一个Partition的起始位移,初始为0,尽管音讯的减少以及日志革除策略的影响,这个值会阶段性的增大。
  2. ConsumerOffset:生产位移,示意Partition的某个消费者生产到的位移地位。
  3. HighWatermark:简称HW,代表生产端所能“察看”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
  4. LogEndOffset:简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。比方在ISR(In-Sync-Replicas)正本数等于3的状况下(如下图所示),音讯发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的音讯来更新本人,HW就示意A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。因为B、C拉取A音讯之间延时问题,所以HW必然不会始终与Leader的LEO相等,即LEO>=HW。

要计算Kafka中某个消费者的滞后量很简略,首先看看其生产了几个Topic,而后针对每个Topic来计算其中每个Partition的Lag,每个Partition的Lag计算就显得十分的简略了,参考下图:

由图可知生产Lag=HW - ConsumerOffset。对于这里大家有可能有个误区,就是认为Lag应该是LEO与ConsumerOffset之间的差值,笔者在这之前也犯过这样的谬误认知,具体能够参考《如何应用JMX监控Kafka》。LEO是对消费者不可见的,既然不可见何来生产滞后一说。

那么这里就引入了一个新的问题,HW和ConsumerOffset的值如何获取呢?

首先来说说ConsumerOffset,Kafka中有两处能够存储,一个是Zookeeper,而另一个是"__consumer_offsets这个外部topic中,前者是0.8.x版本中的应用形式,然而随着版本的迭代更新,当初越来越趋向于后者。就拿1.0.0版本来说,尽管默认是存储在"__consumer_offsets"中,然而保不齐用于就将其存储在了Zookeeper中了。这个问题倒也不难解决,针对两种形式都去拉取,而后哪个有值的取哪个。不过这里还有一个问题,对于生产位移来说,其个别不会实时的更新,而更多的是定时更新,这样能够进步整体的性能。那么这个定时的工夫距离就是ConsumerOffset的误差区间之一。

再来说说HW,其也是Kafka中Partition的一个状态。有可能你会察觉到在Kafka的JMX中能够看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”这样一个属性,然而这个值不是LEO而是HW。

那么怎么正确的计算生产的Lag呢?对Kafka相熟的同学可能会想到Kafka中自带的kafka-consumer_groups.sh脚本中就有Lag的信息,示例如下:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_IDTOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                   CLIENT-IDtopic-test1          0          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_IDtopic-test1          1          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_IDtopic-test1          2          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_IDtopic-test1          3          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID

咱们深究一下kafka-consumer_groups.sh脚本,发现只有一句代码:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

其含意就是执行kafka.admin.ConsumerGroupCommand而已。进一步深究,在ConsumerGroupCommand外部抓住了2句要害代码:

val consumerGroupService = new KafkaConsumerGroupService(opts)val (state, assignments) = consumerGroupService.describeGroup()

代码详解:consumerGroupService的类型是ConsumerGroupServicesealed trait类型),而KafkaConsumerGroupService只是ConsumerGroupService的一种实现,还有一种实现是ZkConsumerGroupService,别离对应新版的生产形式(生产位移存储在__consumer_offsets中)和旧版的生产形式(生产位移存储在zk中),具体计算步骤参考下一段落的内容。opt参数是指“ --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID”等参数。第2句代码是调用describeGroup()办法来获取具体的信息,即二元组中的assignments,这个assignments中保留了下面打印信息中的所有内容。

Scala小常识:
在Scala中trait(特色)相当于Java的接口,实际上它比接口更大弱小。与Java中的接口不同的是,它还能够定义属性和办法的实现(JDK8起的接口默认办法)。个别状况下Scala中的类只能继承繁多父类,然而如果是trait的话就能够继承多个,从后果来看是实现了多重继承。被sealed申明的trait仅能被同一文件的类继承。

ZkConsumerGroupService中计算生产lag的步骤如下:

  1. 通过zk获取一些根本信息,对应下面打印信息中的:TOPIC、PARTITION、CONSUMER-ID等,不过不会有HOST和CLIENT-ID。
  2. 通过OffsetFetchRequest申请获取生产位移(offset),如果获取失败则在通过zk获取。
  3. 通过OffsetReuqest申请获取分区的LogEndOffset(简称为LEO,可见的LEO)。
  4. 计算LogEndOffset与生产位移的差值来获取lag。

KafkaConsumerGroupService中计算生产lag的步骤如下:

  1. 通过DescibeGroupsRequest申请获取一些根本信息,不仅包含TOPIC、PARTITION、CONSUMER-ID,还有HOST和CLIENT-ID。其实还有通过
    FindCoordinatorRequest申请来获取coordinator信息,如果不理解coordinator在这里也没影响。
  2. 通过OffsetFetchRequest申请获取生产位移。
  3. 通过OffsetReuqest申请获取分区的LogEndOffset(简称为LEO)。
  4. 计算LogEndOffset与生产位移的差值来获取lag。

能够看到KafkaConsumerGroupService与ZkConsumerGroupService的计算Lag的形式都差不多,然而KafkaConsumerGroupService能获取更多生产详情,并且ZkConsumerGroupService也被标注为@Deprecated的了,前面内容都针对KafkaConsumerGroupService来做阐明。既然Kafka曾经为咱们提供了线程的办法来获取Lag,那么咱们有何必再反复造轮子,这里笔者写了一个调用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是应用Scala语言编写的,在Java的程序里应用相似scala.collection.Seq这样的全名称以避免混同):

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};ConsumerGroupCommand.ConsumerGroupCommandOptions opts =        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =        new ConsumerGroupCommand.KafkaConsumerGroupService(opts);scala.Tuple2<scala.Option<String>, scala.Option<scala.collection.Seq<ConsumerGroupCommand        .PartitionAssignmentState>>> res = kafkaConsumerGroupService.describeGroup();scala.collection.Seq<ConsumerGroupCommand.PartitionAssignmentState> pasSeq = res._2.get();scala.collection.Iterator<ConsumerGroupCommand.PartitionAssignmentState> iterable = pasSeq.iterator();while (iterable.hasNext()) {    ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();    System.out.println(String.format("n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",            pas.topic().get(), pas.partition().get(), pas.offset().get(),            pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),            pas.host().get(), pas.clientId().get()));}

在应用时,你能够封装一下这段代码而后返回一个相似List<ConsumerGroupCommand.PartitionAssignmentState>的货色给下层业务代码做进一步的应用。ConsumerGroupCommand.PartitionAssignmentState的代码如下:

 case class PartitionAssignmentState(    group: String, coordinator: Option[Node], topic: Option[String],    partition: Option[Int], offset: Option[Long], lag: Option[Long],    consumerId: Option[String], host: Option[String],    clientId: Option[String], logEndOffset: Option[Long])
Scala小常识:
对于case class, 在这里你能够简略的把它看成是一个JavaBean,然而它远比JavaBean弱小,比方它会主动生成equals、hashCode、toString、copy、伴生对象、apply、unapply等等货色。在 scala 中,对爱护(Protected)成员的拜访比 java 更严格一些。因为它只容许爱护成员在定义了该成员的的类的子类中被拜访。而在java中,用protected关键字润饰的成员,除了定义了该成员的类的子类能够拜访,同一个包里的其余类也能够进行拜访。Scala中,如果没有指定任何的修饰符,则默认为 public。这样的成员在任何中央都能够被拜访。

如果你正在试着运行下面一段程序,你会发现编译失败,报错:cannot access ‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’ in 'kafka.admin.ConsumerGroupCommand‘。这时候须要将所引入的kafka.core包中的kafka.admin.ConsumerGroupCommand中的PartitionAssignmentState类后面的protected修饰符去掉能力编译通过。

现实情况下你可能并不想变更kafka-core的代码而后再从新打包,而是寻求间接可能调用的货色,至于到底怎么实现将会在下一篇文章中介绍,如果你比拟猴急,能够先预览一下代码的实现,具体参见:https://github.com/hiddenzzh/...。具体的逻辑解析敬请期待…

欢送跳转到本文的原文链接:https://honeypps.com/mq/myth-and-correct-implementation-of-kafka-lag/


参考链接:https://blog.csdn.net/u013256...