关于后端:聊聊-Kafka-Consumer-那点事

3次阅读

共计 11219 个字符,预计需要花费 29 分钟才能阅读完成。

作者:王江华 原文:https://mp.weixin.qq.com/s/jS…

在上一篇中咱们具体聊了对于 Kafka Producer 外部的底层原理设计思维和细节, 本篇咱们次要来聊聊 Kafka Consumer 即消费者的外部底层原理设计思维。

1、Consumer 之总体概述

在 Kafka 中, 咱们把生产音讯的一方称为 Consumer 即 消费者, 它是 Kafka 的外围组件之一。它的次要性能是将 Producer 生产的音讯进行生产解决,实现生产工作。 那么这些 Producer 产生的音讯是怎么被 Consumer 生产的呢?又是基于何种生产形式进行生产,分区调配策略都有哪些,消费者组以及重均衡机制是如何解决的,偏移量如何提交和存储,生产进度如何监控,如何保障生产解决实现?接下来会逐个解说阐明。

2、Consumer 之生产形式详解

咱们晓得音讯队列个别有两种实现形式,(1)Push(推模式) (2)Pull(拉模式),那么 Kafka Consumer 到底采纳哪种形式进行生产的呢?其实 Kafka Consumer 采纳的是被动拉取 Broker 数据进行生产的即 Pull 模式。 这两种形式各有优劣,咱们来剖析一下:

1)、为什么不采纳 Push 模式? 如果是抉择 Push 模式最大毛病就是 Broker 不分明 Consumer 的生产速度,且推送速率是 Broker 进行管制的,这样很容易造成音讯沉积,如果 Consumer 中执行的工作操作是比拟耗时的,那么 Consumer 就会解决的很慢,重大状况可能会导致系统 Crash。

2)、为什么采纳 Pull 模式? 如果抉择 Pull 模式,这时 Consumer 能够依据本人的状况和状态来拉取数据, 也能够进行提早解决。然而 Pull 模式也有有余,Kafka 又是如何解决这一问题?如果 Kafka Broker 没有音讯,这时每次 Consumer 拉取的都是空数据, 可能会始终循环返回空数据。针对这个问题,Consumer 在每次调用 Poll() 生产数据的时候,顺带一个 timeout 参数,当返回空数据的时候,会在 Long Polling 中进行阻塞,期待 timeout 再去生产,直到数据达到。

3、Consumer 之初始化

聊完 Consumer 生产形式和优缺点以及 Kafka 针对毛病又是如何衡量解决的,接下来咱们来聊聊 Consumer 初始化都做了什么?

首先看一下 Kafka consumer 初始化代码:

从代码能够看出初始化 Consumer 有 4 步:

  • 1、结构 Propertity 对象,进行 Consumer 相干的配置;
  • 2、创立 KafkaConsumer 的对象 Consumer;
  • 3、订阅相应的 Topic 列表;
  • 4、调用 Consumer 的 poll() 办法拉取订阅的音讯

Kafka consumer 生产流程图如下:

4、Consumer 之消费者组机制

4.1 Consumer Group 机制

聊完 Consumer 的初始化流程,接下来咱们来聊聊 Consumer 消费者组机制,为什么 Kafka 要设计 Consumer Group, 只有 Consumer 不能够吗? 咱们晓得 Kafka 是一款高吞吐量,低提早,高并发, 高可扩展性的音讯队列产品,那么如果某个 Topic 领有数百万到数千万的数据量,仅仅依附 Consumer 过程生产,生产速度可想而知,所以须要一个扩展性较好的机制来保障生产进度,这个时候 Consumer Group 应运而生, Consumer Group 是 Kafka 提供的可扩大且具备容错性的消费者机制。

Kafka Consumer Group 特点如下:

  • 1、每个 Consumer Group 有一个或者多个 Consumer
  • 2、每个 Consumer Group 领有一个公共且惟一的 Group ID
  • 3、Consumer Group 在生产 Topic 的时候,Topic 的每个 Partition 只能调配给组内的某个 Consumer,只有被任何 Consumer 生产一次, 那么这条数据就能够认为被以后 Consumer Group 生产胜利

4.2 Partition 调配策略机制

咱们晓得一个 Consumer Group 中有多个 Consumer,一个 Topic 也有多个 Partition,所以必然会波及到 Partition 的调配问题: 确定哪个 Partition 由哪个 Consumer 来生产的问题。

Kafka 客户端提供了 3 种分区调配策略:RangeAssignor、RoundRobinAssignor 和 StickyAssignor,前两种调配计划绝对简略一些 StickyAssignor 调配计划绝对简单一些。

4.2.1 RangeAssignor

RangeAssignor 是 Kafka 默认的分区调配算法,它是依照 Topic 的维度进行调配的,对于每个 Topic,首先对 Partition 依照分区 ID 进行排序,而后对订阅这个 Topic 的 Consumer Group 的 Consumer 再进行排序,之后尽量平衡的依照范畴区段将分区调配给 Consumer。此时可能会造成先调配分区的 Consumer 过程的工作过重(分区数无奈被消费者数量整除)。

分区调配场景剖析如下图所示(同一个消费者组下的多个 consumer):

论断:这种调配形式显著的问题就是随着消费者订阅的 Topic 的数量的减少,不平衡的问题会越来越重大。

4.2.2 RoundRobinAssignor

RoundRobinAssignor 的分区调配策略是将 Consumer Group 内订阅的所有 Topic 的 Partition 及所有 Consumer 进行排序后依照程序尽量平衡的一个一个进行调配。如果 Consumer Group 内,每个 Consumer 订阅都订阅了雷同的 Topic,那么调配后果是平衡的。如果订阅 Topic 是不同的,那么调配后果是不保障“尽量平衡”的,因为某些 Consumer 可能不参加一些 Topic 的调配。

分区调配场景剖析如下图所示:
1) 当组内每个 Consumer 订阅的 Topic 是雷同状况:

2) 当组内每个订阅的 Topic 是不同状况,这样就可能会造成分区订阅的歪斜:

4.2.3 StickyAssignor

StickyAssignor 分区调配算法是 Kafka Java 客户端提供的调配策略中最简单的一种,能够通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目标就是在执行新调配时,尽量在上一次调配后果上少做调整,其次要实现了以下 2 个指标:

1)、Topic Partition 的调配要尽量平衡。

2)、当 Rebalance(重调配,前面会详细分析) 产生时,尽量与上一次调配后果保持一致。

留神:当两个指标发生冲突的时候,优先保障第一个指标,这样能够使调配更加平均,其中第一个指标是 3 种调配策略都尽量去尝试实现的,而第二个指标才是该算法的精华所在。

上面咱们举例来聊聊 RoundRobinAssignor 跟 StickyAssignor 的区别。

分区调配场景剖析如下图所示:

1)组内每个 Consumer 订阅的 Topic 是雷同状况,RoundRobinAssignor 跟 StickyAssignor 调配统一:

当上述情况产生 Rebalance 状况后,可能调配会不太一样,如果这时候 C1 产生故障下线:

RoundRobinAssignor:

而 StickyAssignor:

论断: 从下面 Rebalance 后的后果能够看出,尽管两种调配策略最初都是平均调配的,然而 RoundRoubinAssignor 齐全是重新分配了一遍,而 StickyAssignor 则是在原先的根底上达到了平均的状态。

2) 当组内每个 Consumer 订阅的 Topic 是不同状况:

RoundRobinAssignor:

StickyAssignor:

当上述情况产生 Rebalance 状况后,可能调配会不太一样,如果这时候 C1 产生故障下线:

RoundRobinAssignor:

StickyAssignor:

从下面后果能够看出,RoundRoubin 的调配策略在 Rebalance (重调配)之后造成了重大的调配歪斜。因而在生产环境上如果想要缩小重调配带来的开销,能够选用 StickyAssignor 的分区调配策略。

5、Consumer 之消费者组重分配机制

下面聊完消费者组以及分区调配策略后,咱们来聊聊 Consumer Group 中 Rebalance (重调配) 机制,对于 Consumer Group 来说,可能随时都会有 Consumer 退出或退出,那么 Consumer 列表的变动必定会引起 Partition 的重新分配。咱们将这个调配过程叫做 Consumer Rebalance,然而这个调配过程须要借助 Broker 端的 Coordinator 协调者组件,在 Coordinator 的帮忙下实现整个消费者组的分区重调配,也是通过监听 ZooKeeper 的 /admin/reassign_partitions 节点触发的。

5.1 Rebalance 触发与告诉

Rebalance 的触发条件有三种:

  • 1、当 Consumer Group 组成员数量发生变化(被动退出或者被动离组,故障下线等)
  • 2、当订阅主题数量发生变化
  • 3、当订阅主题的分区数发生变化

Rebalance 如何告诉其余 consumer 过程?

Rebalance 的告诉机制就是靠 Consumer 端的心跳线程,它会定期发送心跳申请到 Broker 端的 Coordinator, 当协调者决定开启 Rebalance 后,它会将“REBALANCE_IN_PROGRESS”封装进心跳申请的响应中发送给 Consumer , 当 Consumer 发现心跳响应中蕴含了“REBALANCE_IN_PROGRESS”,就晓得 Rebalance 开始了。

5.2 协定 (protocol) 阐明

其实 Rebalance 实质上也是一组协定。Consumer Group 与 Coordinator 独特应用它来实现 Consumer Group 的 Rebalance。上面我看看这 5 种协定都是什么,实现了什么性能:

  • 1、Heartbeat 申请:Consumer 须要定期给 Coordinator 发送心跳来证实本人还活着。
  • 2、LeaveGroup 申请:被动通知 Coordinator 要来到 Consumer Group
  • 3、SyncGroup 申请:Group Leader Consumer 把调配计划通知组内所有成员
  • 4、JoinGroup 申请:成员申请退出组
  • 5、DescribeGroup 申请:显示组的所有信息,包含成员信息,协定名称,调配计划,订阅信息等。通常该申请是给管理员应用。

Coordinator 在 Rebalance 的时候次要用到了后面 4 种申请

5.3 Consumer Group 状态机

Rebalance 一旦产生,必定会波及到 Consumer Group 的状态流转,此时 Kafka 为咱们设计了一套残缺的状态机机制,来帮忙 Broker Coordinator 实现整个重均衡流程。理解整个状态流转过程能够帮忙咱们深刻了解 Consumer Group 的设计原理。

5 种状态,定义别离如下:

Empty 状态

 Empty 状态示意以后组内无成员,然而可能存在 Consumer Group 已提交的位移数据,且未过期,这种状态只能响应 JoinGroup 申请。

Dead 状态

Dead 状态示意组内曾经没有任何成员的状态,组内的元数据曾经被 Broker Coordinator 移除,这种状态响应各种申请都是一个 Response:UNKNOWN_MEMBER_ID。

PreparingRebalance 状态

 PreparingRebalance 状态示意筹备开始新的 Rebalance, 期待组内所有成员重新加入组内。

CompletingRebalance 状态

 CompletingRebalance 状态示意组内成员都曾经退出胜利,正在期待调配计划,旧版本中叫“AwaitingSync”。

Stable 状态

 Stable 状态示意 Rebalance 曾经实现,组内 Consumer 能够开始生产了。

5 种状态流转图如下:

5.4 Rebalance 流程剖析

接下来咱们看看 Rebalance 的流程,通过下面 5 种状态能够看出,Rebalance 次要分为两个步骤:退出组 (对应 JoinGroup 申请) 和期待 Leader Consumer 调配计划(SyncGroup 申请)。

1)、JoinGroup 申请: 组内所有成员向 Coordinator 发送 JoinGroup 申请,申请退出组,顺带会上报本人订阅的 Topic,这样 Coordinator 就能收集到所有成员的 JoinGroup 申请和订阅 Topic 信息,Coordinator 就会从这些成员中抉择一个负责这个 Consumer Group 的 Leader(个别状况下,第一个发送申请的 Consumer 会成为 Leader),这里说的 Leader 是指具体的某一个 consumer,它的工作就是收集所有成员的订阅 Topic 信息,而后制订具体的生产分区调配计划。 待选出 Leader 后,Coordinator 会把 Consumer Group 的订阅 Topic 信息封装进 JoinGroup 申请的 Response 中,而后发给 Leader,而后由 Leader 对立做出调配计划后,进入到下一步,如下图:

2)、SyncGroup 申请: Leader 开始调配生产计划,即哪个 Consumer 负责生产哪些 Topic 的哪些 Partition。 一旦实现调配,Leader 会将这个调配计划封装进 SyncGroup 申请中发给 Coordinator,其余成员也会发 SyncGroup 申请,只是内容为空,待 Coordinator 接管到调配计划之后会把计划封装进 SyncGroup 的 Response 中发给组内各成员, 这样各自就晓得应该生产哪些 Partition 了,如下图:

5.5 Rebalance 场景剖析

刚刚具体的聊了对于 Rebalance 的状态流转与流程剖析,接下来咱们通过时序图来重点剖析几个场景来加深对 Rebalance 的了解。

场景一:新成员 (c1) 退出组

场景二:成员 (c2) 被动离组

场景三:成员 (c2) 超时被踢出组

场景四:成员 (c2) 提交位移数据

6、Consumer 之位移提交机制

6.1 位移提交 Offset 概念了解

下面聊完消费者组 Rebalance 机制后,咱们来聊聊 Consumer 的位移提交机制,在聊位移提交之前,咱们回顾一下 位移 和 消费者位移 之间的区别。通常所说的位移是指 Topic Partition 在 Broker 端的存储偏移量,而消费者位移则是指某个 Consumer Group 在不同 Topic Partition 下面的生产偏移量(也能够了解为生产进度),它记录了 Consumer 要生产的下一条音讯的位移。

Consumer 须要向 Kafka 上报本人的位移数据信息,咱们将这个上报过程叫做提交位移(Committing Offsets)。它是为了保障 Consumer 的生产进度失常,当 Consumer 产生故障重启后,能够间接从之前提交的 Offset 地位开始进行生产而不必重头再来一遍(Kafka 认为小于提交的 Offset 的音讯都曾经胜利生产了),Kafka 设计了这个机制来保障生产进度。咱们晓得 Consumer 能够同时去生产多个分区的数据,所以位移提交是依照分区的粒度进行上报的,也就是说 Consumer 须要为调配给它的每个分区提交各自的位移数据。

6.2 多种提交形式剖析

Kafka Consumer 提供了多种提交形式,从用户角度来说:位移提交能够分为主动提交和手动提交,但从 Consumer 的角度来说,位移提交能够分为同步提交和异步提交,接下来咱们就来聊聊主动提交和手动提交形式:

主动提交
主动提交是指 Kafka Consumer 在后盾默默地帮咱们提交位移,用户不须要关怀这个事件。启用主动提交位移,在 初始化 KafkaConsumer 的时候,通过设置参数 enable.auto.commit = true (默认为 true),开启之后还须要另外一个参数进行配合即 auto.commit.interval.ms,这个参数示意 Kafka Consumer 每隔 X 秒主动提交一次位移,这个值默认是 5 秒。

主动提交看起来是挺美妙的, 那么主动提交会不会呈现生产数据失落的状况呢?在设置了 enable.auto.commit = true 的时候,Kafka 会保障在开始调用 Poll() 办法时,提交上一批音讯的位移,再解决下一批音讯, 因而它能保障不呈现生产失落的状况。但主动提交位移也有设计缺点,那就是它可能会呈现反复生产。就是在主动提交距离之间产生 Rebalance 的时候,此时 Offset 还未提交,待 Rebalance 实现后,所有 Consumer 须要将产生 Rebalance 前的音讯进行从新生产一次。

手动提交
与主动提交绝对应的就是手动提交了。开启手动提交位移的办法就是在初始化 KafkaConsumer 的时候设置参数 enable.auto.commit = false,然而只设置为 false 还不行,它只是通知 Kafka Consumer 不必主动提交位移了,你还须要在解决完音讯之后调用相应的 Consumer API 手动进行提交位移,对于手动提交位移,又分为同步提交和异步提交。

1)、同步提交 API:

KafkaConsumer#commitSync(),该办法会提交由 KafkaConsumer#poll() 办法返回的最新位移值,它是一个同步操作,会始终阻塞期待直到位移被胜利提交才返回,如果提交的过程中出现异常,该办法会将异样抛出。这里咱们晓得在调用 commitSync() 办法的机会是在解决完 Poll() 办法返回所有音讯之后进行提交,如果过早的提交了位移就会呈现生产数据失落的状况。

2)、异步提交 API:

KafkaConsumer#commitAsync(),该办法是异步形式进行提交的,调用 commitAsync() 之后,它会立刻返回,并不会阻塞,因而不会影响 Consumer 的 TPS。另外 Kafka 针对它提供了 callback,不便咱们来实现提交之后的逻辑,比方记录日志或异样解决等等。因为它是一个异步操作,如果呈现问题是不会进行重试的,这时候重试位移值可能已不是最新值,所以重试无意义。

3)、混合提交模式:

从下面剖析能够得出 commitSync 和 commitAsync 都有本人的缺点,咱们须要将 commitSync 和 commitAsync 组合应用能力达到最现实的成果,既不影响 Consumer TPS,又能利用 commitSync 的主动重试性能来防止一些刹时谬误(网络抖动,GC,Rebalance 问题),在生产环境中倡议大家应用混合提交模式来进步 Consumer 的健壮性。

7、Consumer 之__consumer_offsets 存储

7.1 __consumer_offsets 揭秘

下面聊完 Consumer 位移提交,咱们晓得 Consumer 生产完数据后须要进行位移提交,那么提交的位移数据到底存储在哪里,又是以何种形式进行存储的,接下来咱们就看看新旧版本 Kafka 对于 Offset 存储形式。

咱们晓得 Kafka 旧版本(0.8 版本之前)是重度依赖 Zookeeper 来实现各种各样的协调治理,当然旧版本的 Consumer Group 是把位移保留在 ZooKeeper 中,缩小 Broker 端状态存储开销,鉴于 Zookeeper 的存储架构设计来说,它不适宜频繁写更新,而 Consumer Group 的位移提交又是高频写操作,这样会拖慢 ZooKeeper 集群的性能,于是在新版 Kafka 中,社区从新设计了 Consumer Group 的位移治理形式,采纳了将位移保留在 Kafka 外部(这是因为 Kafka Topic 人造反对高频写且长久化),这就是所谓赫赫有名的__consumer_offsets。

__consumer_offsets:用来保留 Kafka Consumer 提交的位移信息,另外它是由 Kafka 主动创立的,和一般的 Topic 雷同,它的音讯格局也是 Kafka 本人定义的,咱们无奈进行批改。这里咱们很好奇它的音讯格局到底是怎么样的,让咱们来一起剖析并揭开它的神秘面纱吧。

__consumer_offsets 音讯格局剖析揭秘:

  • 1、所谓的音讯格局咱们能够简略了解为是一个 KV 对。Key 和 Value 别离示意音讯的键值和音讯体。
  • 2、那么 Key 存什么呢?既然是存储 Consumer 的位移信息,在 Kafka 中,Consumer 数量会很多,那么必须有字段来标识这个位移数据是属于哪个 Consumer 的,怎么来标识 Consumer 字段呢?后面在解说 Consumer Group 的时候咱们晓得它共享一个公共且惟一的 Group ID,那么只保留它就能够了吗?咱们晓得 Consumer 提交位移是在分区的维度进行的,很显然,key 中还应该保留 Consumer 要提交位移的分区。
  • 3、总结:位移主题的 Key 中应该保留 3 局部内容:<Group ID,主题名,分区号 >
  • 4、value 能够简略认为存储的是 offset 值,当然底层还存储其余一些元数据,帮忙 Kafka 来实现一些其余操作,比方删除过期位移数据等。

__consumer_offsets 音讯格局示意图:



7.2 __consumer_offsets 创立过程

聊完音讯格局后,咱们来聊聊 __consumer_offsets 是怎么被创立进去的呢?当 Kafka 集群中的第一个 Consumer 启动时,Kafka 会主动创立__consumer_offsets。后面说过,它就是一般的 Topic,它也有对应的分区数,如果由 Kafka 主动创立的,那么分区数又是怎么设置的呢?这个依赖 Broker 端参数 offsets.topic.num.partitions (默认值是 50),因而 Kafka 会主动创立一个有 50 个分区的__consumer_offsets。这就是咱们在 Kafka 日志门路下看到有很多 __consumer_offsets-xxx 这样的目录的起因。既然有分区数,必然就会有对应的正本数,这个是依赖 Broker 端另一个参数 offsets.topic.replication.factor(默认值为 3)。总结一下,如果__consumer_offsets 由 Kafka 主动创立的,那么该 Topic 的分区数是 50,正本数是 3,而具体 Group 的生产状况要存储到哪个 Partition,依据 abs(GroupId.hashCode()) % NumPartitions 来计算的,这样就能够保障 Consumer Offset 信息与 Consumer Group 对应的 Coordinator 处于同一个 Broker 节点上。

7.3 查看__consumer_offsets 数据

Kafka 默认提供了脚本供用户查看 Consumer 信息, 具体的查看形式如下:

//1. 查看 kafka 消费者组列表:./bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip>:9092 --list

//2. 查看 kafka 中某一个消费者组 (test-group-1) 的生产状况:./bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip>:9092 --group test-group-1 --describe

//3. 计算 group.id 对应的 partition 的公式为:abs(GroupId.hashCode()) % NumPartitions // 其中 GroupId:test-group-1 NumPartitions:50

//3. 找到 group.id 对应的 partition 后,就能够指定分区生产了
//kafka 0.11 当前
./bin/kafka-console-consumer.sh --bootstrap-server message-1:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --partition xx
//kafka 0.11 以前
./bin/kafka-console-consumer.sh --bootstrap-server message-1:9092 --topic __consumer_offsets --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --partition xx

//4. 获取指定 consumer group 的位移信息 
//kafka 0.11 当前
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition xx --broker-list <kafka-ip>:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
//kafka 0.11 以前
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition xx --broker-list <kafka-ip>:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

//5. 脚本执行后输入的元数据信息 
// 格局:[消费者组 : 生产的 topic : 生产的分区] :: [offset 位移], [offset 提交工夫], [元数据过期工夫]
[order-group-1,topic-order,0]::[OffsetMetadata[36672,NO_METADATA],CommitTime 1633694193000,ExpirationTime 1633866993000]

8、Consumer 之生产进度监控

下面聊完 Consumer 的各个实现细节,咱们来聊聊对于 Consumer 来说,最重要的事件即生产进度的监控, 或者说监控其滞后水平(Consumer 以后落后于 Producer 的水平),这里有个专业名词叫 Consumer Lag。举例说明: Kafka Producer 向某 Topic 胜利生产了 1000 万条音讯,这时 Consumer 以后生产了 900 万条音讯,那么能够认为 Consumer 滞后了 100 万条音讯,即 Lag 等于 100 万。

对 Consumer 来说,Lag 应该算是最重要的监控指标了。它间接反映了一个 Consumer 的运行状况。Lag 值越小示意该 Consumer 可能及时的生产 Producer 生产进去的音讯,滞后水平很小;如果该值有增大的趋势阐明可能会有沉积,重大会拖慢上游的处理速度。

对于这么重要的指标,咱们该怎么监控它呢?次要有 以下几 种办法:

  • 1、应用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本
  • 2、应用 Kafka Java Consumer API 编程
  • 3、应用 Kafka 自带的 JMX 监控指标
  • 4、如果是云产品的话,能够间接应用云产品自带的监控性能

9、Consumer 之总结

至此曾经跟大家全面深刻的分析了 Kafka Consumer 外部底层原理设计的方方面面, kafka 原理相干篇章到此告一段落,后续会针对 Kafka 细节技术点进行专题和源码剖析, 大家敬请期待 …

保持总结, 继续输入高质量文章 关注我: 华仔聊技术

正文完
 0