乐趣区

关于kafka:聊聊-Kafka-Consumer-源码解析之-Consumer-如何加入-Consumer-Group

一、前言

明天这一篇咱们来说一下 Consumer 是如何退出 Consumer Group 的,咱们后面有一篇 Kafka 的架构文章有说到,Consumer 有生产组(Consumer Group)的概念,而 Producer 没有生产组的概念。所以说 Consumer 侧会比 Producer 侧简单点,除了消费者有生产组的概念,还须要保护治理 offset 偏移量、反复生产等问题。

与生产组相干的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端的 GroupCoordinator。ConsumerCoordinator 负责与 GroupCoordinator 通信,Broker 启动的时候,都会启动一个 GroupCoordinator 实例,而一个集群中,会有多个 Broker,那么如何确定一个新的 Consumer 退出 Consumer Group 后,到底和哪个 Broker 上的 GroupCoordinator 进行交互呢?

别急,聪慧的程序员必定是有方法的,咱们还是先来说一下 GroupCoordinator 吧。

二、GroupCoordinator

别问,问就是有相应的算法和策略。那咱们就来看下是啥算法和策略实现 Consumer 正确找到 GroupCoordinator 的,这就和 Kafka 外部的 Topic __consumer_offsets 有关系了。

2.1 __consumer_offsets

__consumer_offsets 这个外部 Topic,专门用来存储 Consumer Group 生产的状况,默认状况下有 50 个 partition,每个 partition 默认三个正本。如下图所示:

2.2 Consumer 如何找到 GroupCoordinator 的?

每个 Consumer Group 都有其对应的 GroupCoordinator,当一个新的 Consumer 要寻找和它交互的 GroupCoordinator 时,须要先对它的 GroupId 进行 hash,而后取模 __consumer_offsets 的 partition 数量,最初失去的值就是对应 partition,那么这个 partition 的 leader 所在的 broker 即为这个 Consumer Group 要交互的 GroupCoordinator 所在的节点。获取 partition 公式如下:

abs(GroupId.hashCode()) % NumPartitions

举个例子,假如一个 GroupId 计算出来的 hashCode 是 8,之后取模 50 失去 8。那么 partition-8 的 leader 所在的 broker 就是咱们要找的那个节点。这个 Consumer Group 前面都会间接和该 broker 上的 GroupCoordinator 交互。

三、Group 状态变更

说 Consumer 退出 Consumer Group 流程之前,老周感觉有必要先说一下 Consumer Group 的状态变更。

3.1 生产端

在协调器 AbstractCoordinator 中的外部类 MemberState 中咱们能够看到协调器的四种状态,别离是未注册、重调配后没收到响应、重调配后收到响应但还没有收到调配、稳固状态。

上述生产端的四种状态的转换如下图所示:

3.2 服务端

对于 Kafka 服务端的组则有五种状态 Empty、PreparingRebalance、CompletingRebalance、Stable、Dead。他们的状态转换如下图所示:


四、Consumer 退出 Consumer Group 流程

说 Consumer 如何退出 Consumer Group 之前,咱们还是先来回顾下上一篇音讯生产的测试案例。


外围办法是 poll() 办法,咱们这里简略提一下,前面咱们会具体介绍 Consumer 对于 poll 的网络模型。

Consumer 如何退出 Consumer Group 的,咱们得来看啥时候与 GroupCoordinator 交互通信的,不难发现在音讯拉取申请做筹备 updateAssignmentMetadataIfNeeded() 这个办法里。

而后对于对 ConsumerCoordinator 的解决都集中在 coordinator.poll() 办法中。

咱们来跟一下这两个办法:

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#poll(org.apache.kafka.common.utils.Timer, boolean) 办法中,具体能够分为以下几个步骤:

  • 检测心跳线程运行是否失常 (须要定时向 GroupCoordinator 发送心跳,在建设连贯之后,建设连贯之前不会做任何事件)
  • 如果不存在协调器和协调器已断开连接,则返回 false,完结本次拉取。如果 coordinator 未知,就初始化 ConsumerCoordinator (在 ensureCoordinatorReady() 中实现)
  • 判断是否须要触发重均衡,即生产组内的所有消费者重新分配 topic 中的分区信息。
  • 通过 ensureActiveGroup() 发送 join-group、sync-group 申请,退出 group 并获取其 assign 的 TopicPartition list。
  • 如果须要更新元数据,并且还没有分区筹备好,则同步阻塞期待元数据更新结束。
  • 如果开启了主动提交生产进度,并且已到下一次提交工夫,则提交。

其中,有几个中央须要具体介绍,那就是 ensureCoordinatorReady() 办法、rejoinNeededOrPending() 办法和 ensureActiveGroup() 办法。

五、ensureCoordinatorReady()

这个办法的作用是:抉择一个连接数最小的 broker,向其发送 GroupCoordinator 申请,并建设相应的 TCP 连贯。

  • 办法调用流程是:ensureCoordinatorReady() -> lookupCoordinator() -> sendFindCoordinatorRequest()。
  • 如果 client 获取到 Server response,那么就会与 GroupCoordinator 建设连贯。

5.1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady

5.2 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator

5.3 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendFindCoordinatorRequest

5.4 小结

  • 抉择一个连贯最小的节点,发送 FindCoordinator request 申请,并对 response 进行解决。
  • FindCoordinatorRequest 这个申请会应用 group id 通过 ConsumerNetworkClient.send() 来查找对应的 GroupCoordinator 节点。(当然 ConsumerNetworkClient.send() 也是采纳的 Java NIO 的机制,咱们后面的文章有说到过)
  • 如果正确获取 GroupCoordinator 时(会返回其对应的 node id、host 和 port 信息),建设连贯,并更新心跳工夫。

六、rejoinNeededOrPending()


对于 rejoin, 下列几种状况会触发再平衡 reblance 操作

  • 新的消费者退出生产组 (第一次进行生产也属于这种状况)

    • 消费者宕机下线 (长时间未发送心跳包)
    • 消费者被动退出生产组,比方调用 unsubscrible() 办法勾销对主题的订阅
    • 生产组对应的 GroupCoordinator 节点产生了变动
    • 生产组内所订阅的任一主题或者主题的分区数量产生了变动

七、ensureActiveGroup()

当初咱们曾经晓得了 GroupCoordinator 节点,并建设了连贯。ensureActiveGroup() 这个办法的次要作用是向 GroupCoordinator 发送 join-group、sync-group 申请,获取 assign 的 TopicPartition list。

  • 办法调用流程是:ensureActiveGroup() -> ensureCoordinatorReady() -> startHeartbeatThreadIfNeeded() -> joinGroupIfNeeded()
  • joinGroupIfNeeded() 办法中最重要的办法是 initiateJoinGroup(),它的调用流程是 sendJoinGroupRequest() -> JoinGroupResponseHandler.handle() -> onJoinLeader()、onJoinFollower() -> sendSyncGroupRequest()
/**
 * 确保 Group 是 active,并且退出该 group。* Ensure the group is active (i.e., joined and synced)
 *
 * @param timer Timer bounding how long this method can block
 * @throws KafkaException if the callback throws exception
 * @return true iff the group is active
 */
boolean ensureActiveGroup(final Timer timer) {
    // always ensure that the coordinator is ready because we may have been disconnected
    // when sending heartbeats and does not necessarily require us to rejoin the group.
    // 确保 GroupCoordinator 曾经连贯
    if (!ensureCoordinatorReady(timer)) {return false;}

    // 启动心跳发送线程(并不一定发送心跳, 满足条件后才会发送心跳)startHeartbeatThreadIfNeeded();
    // 发送 JoinGroup 申请,并对返回的信息进行解决。return joinGroupIfNeeded(timer);
}

7.1 joinGroupIfNeeded()

join-group 的申请是在 joinGroupIfNeeded() 中实现的。

7.2 initiateJoinGroup()

joinGroupIfNeeded() 办法中最重要的办法是 initiateJoinGroup(),咱们来看下:

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
    // we store the join future in case we are woken up by the user after beginning the
    // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
    // to rejoin before the pending rebalance has completed.
    if (joinFuture == null) {
        // 状态标记为 rebalance
        state = MemberState.PREPARING_REBALANCE;
        // a rebalance can be triggered consecutively if the previous one failed,
        // in this case we would not update the start time.
        if (lastRebalanceStartMs == -1L)
            lastRebalanceStartMs = time.milliseconds();
        // 发送 JoinGroup 申请
        joinFuture = sendJoinGroupRequest();
        joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
            @Override
            public void onSuccess(ByteBuffer value) {// do nothing since all the handler logic are in SyncGroupResponseHandler already}

            @Override
            public void onFailure(RuntimeException e) {
                // we handle failures below after the request finishes. if the join completes
                // after having been woken up, the exception is ignored and we will rejoin;
                // this can be triggered when either join or sync request failed
                synchronized (AbstractCoordinator.this) {sensors.failedRebalanceSensor.record();
                }
            }
        });
    }
    return joinFuture;
}

7.3 sendJoinGroupRequest():join-group 申请

持续跟 sendJoinGroupRequest() 办法


sendJoinGroupRequest():向 GroupCoordinator 发送 join-group 申请

  • 如果 group 是新的 group.id,那么此时 group 初始化的状态为 Empty
  • 当 GroupCoordinator 接管到 consumer 的 join-group 申请后,因为此时这个 group 的 member 列表还是空(group 是新建的,每个 consumer 实例被称为这个 group 的一个 member),第一个退出的 member 将被选为 leader,也就是说,对于一个新的 consumer group 而言,当第一个 consumer 实例退出后将会被选为 leader。
  • 如果 GroupCoordinator 接管到 leader 发送 join-group 申请,将会触发 rebalance,group 的状态变为 PreparingRebalance
  • 此时,GroupCoordinator 将会期待肯定的工夫,如果在肯定工夫内,接管到 join-group 申请的 consumer 将被认为是仍然存活的,此时 group 会变为 AwaitSync 状态,并且 GroupCoordinator 会向这个 group 的所有 member 返回其 response。
  • consumer 在接管到 GroupCoordinator 的 response 后,如果这个 consumer 是 group 的 leader,那么这个 consumer 将会负责为整个 group assign partition 订阅安顿(默认是按 range 的策略,目前也可选 RoundRobin),而后 leader 将调配后的信息以 sendSyncGroupRequest() 申请的形式发给 GroupCoordinator,而作为 follower 的 consumer 实例会发送一个空列表。
  • GroupCoordinator 在接管到 leader 发来的申请后,会将 assign 的后果返回给所有曾经发送 sync-group 申请的 consumer 实例,并且 group 的状态将会转变为 Stable,如果后续再收到 sync-group 申请,因为 group 的状态曾经是 Stable,将会间接返回其调配后果。

7.4 sendSyncGroupRequest():sync-group 申请

sync-group 发送申请外围代码如下:

7.5 onJoinComplete()

通过下面的步骤,一个 consumer 实例就曾经退出 group 胜利了,退出 group 胜利后,将会触发 ConsumerCoordinator 的 onJoinComplete() 办法,其作用就是:更新订阅的 tp 列表以及更新其对应的 metadata。


至此,一个 consumer 实例算是真正上意义上退出 group 胜利。而后消费者就进入失常工作状态,同时消费者也通过向 GroupCoordinator 发送心跳来维持它们与消费者的从属关系以及它们对分区的所有权关系。只有以失常的距离发送心跳,就被认为是沉闷的,然而如果 GroupCoordinator 没有响应,那么就会发送 LeaveGroup 申请退出生产组。

退出移动版