什么是协调器

协调器是用于协调多个消费者之间可能正确地工作的一个角色, 比方计算生产的分区调配策略,又或者消费者的退出组与来到组的解决逻辑, 有一点相似Kafka种的控制器的角色。

协调器的作用

协调器分为 生产组协调器 和 消费者协调器两种

生产组协调器

组协调器(GroupCoordinator)能够了解为各个消费者协调器的一个中央处理器, 每个消费者的所有交互都是和组协调器(GroupCoordinator)进行的。

  1. 选举Leader消费者客户端
  2. 解决申请加入组的客户端
  3. 再均衡后同步新的调配计划
  4. 保护与客户端的心跳检测
  5. 治理消费者已生产偏移量,并存储至__consumer_offset中

消费者协调器

每个客户端都会有一个消费者协调器, 他的次要作用就是向组协调器发动申请做交互, 以及解决回调逻辑

  1. 向组协调器发动入组申请
  2. 向组协调器发动同步组申请(如果是Leader客户端,则还会计算调配策略数据放到入参传入)
  3. 发动离组申请
  4. 放弃跟组协调器的心跳线程
  5. 向组协调器发送提交已生产偏移量的申请

协调器运行流程

组协调器抉择逻辑

生产组协调器的抉择

kafka上的组协调器(GroupCoordinator)协调器有很多, 跟Controller不一样的是, Controller只有一个, 而组协调器(GroupCoordinator)是依据外部Topic __consumer_offset数量决定的,有多少个 __consumer_offset分区, 那么就有多少个组协调器(GroupCoordinator)。

然而每个分区可能有多个正本, 那么每个组协调器应该调配在哪里呢?

每个__consumer_offset分区的Leader正本在哪个Broer上, 那么对应协调器就在哪里。

具体请看:寻找协调器FindCoordinatorRequest申请流程

如何确定每个生产组对应哪个协调器

默认状况下, __consumer_offset有50个分区, 每个生产组都会对应其中的一个分区,对应的逻辑为 hash(group.id)%分区数;

消费者退出组流程JoinGroup

客户端启动的时候, 或者重连的时候会发动JoinGroup的申请来申请加入的组中。

JoinGroup时序图

  1. 消费者客户端发动第一次申请, 协调器给它计算一个MemberId返回
  2. 消费者客户端发动第二次申请,MemberId是刚刚失去的。
  3. 生产组协调器解决申请,构建新的MemberMetadata元信息缓存到Group中。
  4. 生产组协调器将状态流转为 PreparingRebalance
  5. 初始化Generation数据,比方generationId+1, Group状态流转为 CompletingRebalance,当然如果以后Group的member是空的,则流转为 Empty ;
  6. 将下面的数据组装一下为JoinGroupResult,返回给所有的Member, 当然如果是Leader Member的话, 还会额定给他所有Member的元信息(因为它要用这些数据去计算新的分区调配的数据。)
  7. 消费者客户端拿到数据之后, 像生产组协调器发动SyncGroupRequest的申请, 如果是Leader Member的话, 则会依据分区策略去计算一下新的调配策略,并把数据带上发动SyncGroupRequest的申请。对于SyncGroupRequest请看: Kafka消费者 SyncGroupRequest 流程解析

详情请看:Kafka消费者JoinGroupRequest流程解析

组协调器同步流程SyncGroup

以后客户端都曾经实现JoinGroup之后, 客户端会收到JoinGroup的回调, 而后客户端会再次向组协调器发动SyncGroup的申请来获取新的调配计划。

然而在这一个过程中,新的调配计划是由Leader 客户端计算出来的,并且会同步给组协调器。

而后组协调器再把这些后果回调给泛滥客户端。

消费者离组流程LeaveGroup

当消费者客户端关机/异样 时, 会触发离组LeaveGroup申请。

组协调器始终会有针对每个客户端的心跳检测, 如果监测失败,则就会将这个客户端踢出Group、

这个提出的流程也很简略

就是触发一下 再均衡

心跳检测

客户端退出组内后, 会始终放弃一个心跳线程,来放弃跟组协调器的一个感知。

并且组协调器会针对每个退出组的客户端做一个心跳监测,如果监测到过期, 则会将其踢出组内并再均衡。

分区调配策略

Q&A

1. 如果有多个客户端配置了不同的调配策略, 那么会以哪个配置失效呢?

必定是须要生产组上面的所有成员都应用同一种调配策略来进行调配。所以GroupCoordinator就面临着抉择哪个调配策略。

抉择的逻辑如下

  1. 抉择所有Member都反对的调配策略
  2. 在1的根底上,优先选择每个partition.assignment.strategy配置靠前的策略。

请看上面的2个例子

Case-1

  1. 所有反对的调配策略为:roundrobin,rang
  2. 每个consumer都在1的根底上,给本人排最后面的投票, consumer-0投roundrobin, consumer-1投rang, consumer-3投roundrobin;这样算下来 roundrobin是有2票的, 那么久抉择roundrobin为调配策略;

Case-2

  1. 所有反对的调配策略为:rang
  2. 都不必投票, 间接抉择rang入选

如果新Member退出Group的时候, 带上的调配策略跟现有Group中所有Member(Group有Member的状况下)都反对的协定都不穿插

那么就会抛出异样:INCONSISTENT_GROUP_PROTOCOL

2022-09-08 14:34:12,508] INFO [Consumer clientId=client2, groupId=consumer0] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.[2022-09-08 14:34:12,511] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)

2. 消费者生产并提交了之后, 其余消费者是如果晓得我曾经生产了,从而不会从新生产的呢?

一个分区只会被同一个生产组内的消费者就行生产, 当消费者生产了之后会把生产的偏移量offset提交给组协调器进行存储。

存储中央是 kafka的外部Topic __consumer_offset, 存储的数据结构是如下:

Key:

Value:

能够看到Key的构造是 group+topic+partition,当产生再均衡的时候,就算该分区调配给了别的消费者的话,它也是通过这个key来寻找以后曾经生产到的offset了。

简略来说就是:

  1. 同一个生产组下,一个分区只会被一个消费者生产
  2. 音讯被生产之后 会存储到外部Topic __consumer_offset中, 并且过期策略是 compact(压缩)
  3. 存储offset的时候,key的构造是 group+topic+partition,所以就算重均衡后, 雷同组内不同的消费者它去读取到的offset是上一个消费者提交之后的。

3. 消费者生产的offset存储构造是什么样子的呢?

存储构造请看:图解Kafka生产组偏移量_consumer_offset的数据结构

4. 理解到offset存储构造,如果让你去重置offset,你感觉应该如果操作呢?

既然消费者生产过的偏移量offset是存储在外部Topic __consumer_offset中, 消费者生产的时候先去读取这个Topic的最新值,key的构造是 group+topic+partition

如果咱们想要批改offset,只须要扭转这个值的数据大小就行了。

又因为它的过期策略是 compact(压缩), 那么咱们只须要这个Topic __consumer_offset 的指定key( group+topic+partition)发送一个新的offset值就行了。

如果想要重置offset, 给这个topic发送一个墓碑音讯让它音讯就行。

5. 如果 __consumer_offset扩容的话,offset记录会失落吗?