关于kafka:什么是Kafka消费组协调器

42次阅读

共计 3606 个字符,预计需要花费 10 分钟才能阅读完成。

什么是协调器

协调器是用于协调多个消费者之间可能正确地工作的一个角色, 比方计算 生产的分区调配策略,又或者消费者的退出组与来到组的解决逻辑, 有一点相似 Kafka 种的控制器的角色。

协调器的作用

协调器分为 生产组协调器  和  消费者协调器 两种

生产组协调器

组协调器 (GroupCoordinator) 能够了解为各个消费者协调器的一个中央处理器, 每个消费者的所有交互都是和组协调器 (GroupCoordinator) 进行的。

  1. 选举 Leader 消费者客户端
  2. 解决申请加入组的客户端
  3. 再均衡后同步新的调配计划
  4. 保护与客户端的心跳检测
  5. 治理消费者已生产偏移量, 并存储至__consumer_offset 中

消费者协调器

每个客户端都会有一个消费者协调器, 他的次要作用就是向组协调器发动申请做交互, 以及解决回调逻辑

  1. 向组协调器发动入组申请
  2. 向组协调器发动同步组申请(如果是 Leader 客户端, 则还会计算调配策略数据放到入参传入)
  3. 发动离组申请
  4. 放弃跟组协调器的心跳线程
  5. 向组协调器发送提交已生产偏移量的申请

协调器运行流程

组协调器抉择逻辑

生产组协调器的抉择

kafka 上的组协调器 (GroupCoordinator) 协调器有很多, 跟 Controller 不一样的是, Controller 只有一个, 而组协调器 (GroupCoordinator) 是依据外部 Topic __consumer_offset 数量决定的,有多少个 __consumer_offset 分区, 那么就有多少个组协调器(GroupCoordinator)。

然而每个分区可能有多个正本, 那么每个组协调器应该调配在哪里呢?

每个__consumer_offset 分区的 Leader 正本在哪个 Broer 上, 那么对应协调器就在哪里。

具体请看: 寻找协调器 FindCoordinatorRequest 申请流程

如何确定每个生产组对应哪个协调器

默认状况下, __consumer_offset 有 50 个分区, 每个生产组都会对应其中的一个分区,对应的逻辑为 hash(group.id)% 分区数;

消费者退出组流程 JoinGroup

客户端启动的时候, 或者重连的时候会发动 JoinGroup 的申请来申请加入的组中。

JoinGroup 时序图

  1. 消费者客户端发动第一次申请, 协调器给它计算一个 MemberId 返回
  2. 消费者客户端发动第二次申请,MemberId 是刚刚失去的。
  3. 生产组协调器解决申请, 构建新的 MemberMetadata 元信息缓存到 Group 中。
  4. 生产组协调器将状态流转为 PreparingRebalance
  5. 初始化 Generation 数据, 比方 generationId+1, Group 状态流转为 CompletingRebalance,当然如果以后 Group 的 member 是空的,则流转为 Empty ;
  6. 将下面的数据组装一下为JoinGroupResult, 返回给所有的 Member, 当然如果是 Leader Member 的话, 还会额定给他所有 Member 的元信息(因为它要用这些数据去计算新的分区调配的数据。)
  7. 消费者客户端拿到数据之后, 像生产组协调器发动 SyncGroupRequest 的申请, 如果是 Leader Member 的话, 则会依据分区策略去计算一下新的调配策略, 并把数据带上发动 SyncGroupRequest 的申请。对于 SyncGroupRequest 请看: Kafka 消费者 SyncGroupRequest 流程解析

详情请看:Kafka 消费者 JoinGroupRequest 流程解析

组协调器同步流程 SyncGroup

以后客户端都曾经实现 JoinGroup 之后, 客户端会收到 JoinGroup 的回调, 而后客户端会再次向组协调器发动 SyncGroup 的申请来获取新的调配计划。

然而在这一个过程中, 新的调配计划是由 Leader 客户端计算出来的, 并且会同步给组协调器。

而后组协调器再把这些后果回调给泛滥客户端。

消费者离组流程 LeaveGroup

当消费者客户端关机 / 异样 时, 会触发离组 LeaveGroup 申请。

组协调器始终会有针对每个客户端的心跳检测, 如果监测失败, 则就会将这个客户端踢出 Group、

这个提出的流程也很简略

就是触发一下 再均衡

心跳检测

客户端退出组内后, 会始终放弃一个心跳线程, 来放弃跟组协调器的一个感知。

并且组协调器会针对每个退出组的客户端做一个心跳监测,如果监测到过期, 则会将其踢出组内并再均衡。

分区调配策略

Q&A

1. 如果有多个客户端配置了不同的调配策略, 那么会以哪个配置失效呢?

必定是须要生产组上面的所有成员都应用同一种调配策略来进行调配。所以 GroupCoordinator 就面临着抉择哪个调配策略。

抉择的逻辑如下

  1. 抉择所有 Member 都反对的调配策略
  2. 在 1 的根底上, 优先选择每个 partition.assignment.strategy 配置靠前的策略。

请看上面的 2 个例子

Case-1

  1. 所有反对的调配策略为:roundrobin,rang
  2. 每个 consumer 都在 1 的根底上, 给本人排最后面的投票, consumer- 0 投 roundrobin, consumer- 1 投 rang, consumer- 3 投 roundrobin;这样算下来 roundrobin 是有 2 票的, 那么久抉择 roundrobin 为调配策略;

Case-2

  1. 所有反对的调配策略为:rang
  2. 都不必投票, 间接抉择 rang 入选

如果新 Member 退出 Group 的时候, 带上的调配策略跟现有 Group 中所有 Member(Group 有 Member 的状况下)都反对的协定都不穿插

那么就会抛出异样:INCONSISTENT_GROUP_PROTOCOL

2022-09-08 14:34:12,508] INFO [Consumer clientId=client2, groupId=consumer0] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
[2022-09-08 14:34:12,511] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)

2. 消费者生产并提交了之后, 其余消费者是如果晓得我曾经生产了, 从而不会从新生产的呢?

一个分区只会被同一个生产组内的消费者就行生产, 当消费者生产了之后会把生产的偏移量 offset 提交给组协调器进行存储。

存储中央是 kafka 的外部 Topic __consumer_offset, 存储的数据结构是如下:

Key:

Value:

能够看到 Key 的构造是 group+topic+partition,当产生再均衡的时候,就算该分区调配给了别的消费者的话,它也是通过这个 key 来寻找以后曾经生产到的 offset 了。

简略来说就是:

  1. 同一个生产组下, 一个分区只会被一个消费者生产
  2. 音讯被生产之后 会存储到外部 Topic __consumer_offset 中, 并且过期策略是 compact(压缩)
  3. 存储 offset 的时候,key 的构造是 group+topic+partition, 所以就算重均衡后, 雷同组内不同的消费者它去读取到的 offset 是上一个消费者提交之后的。

3. 消费者生产的 offset 存储构造是什么样子的呢?

存储构造请看:图解 Kafka 生产组偏移量_consumer_offset 的数据结构

4. 理解到 offset 存储构造, 如果让你去重置 offset, 你感觉应该如果操作呢?

既然消费者生产过的偏移量 offset 是存储在外部 Topic __consumer_offset 中, 消费者生产的时候先去读取这个 Topic 的最新值,key 的构造是 group+topic+partition

如果咱们想要批改 offset,只须要扭转这个值的数据大小就行了。

又因为它的过期策略是 compact(压缩), 那么咱们只须要这个 Topic __consumer_offset 的指定 key(group+topic+partition)发送一个新的 offset 值就行了。

如果想要重置 offset, 给这个 topic 发送一个墓碑音讯让它音讯就行。

5. 如果 __consumer_offset 扩容的话,offset 记录会失落吗?

正文完
 0