关于java:Kafka消费者客户端心跳请求

1. 发动心跳申请

在Consumer客户端启动的时候,就会构建心跳监测线程HeartbeatThread并启动,

心跳监测线程名:kafka-coordinator-heartbeat-thread|group.id

例如:kafka-coordinator-heartbeat-thread | consumer0


    /**
    * Java学习材料
    * wx: javataozi888
    **/
    private boolean enabled = false;

    private synchronized void startHeartbeatThreadIfNeeded() {
        if (heartbeatThread == null) {
            heartbeatThread = new HeartbeatThread();
            heartbeatThread.start();
        }
    }

尽管这个时候启动了, 然而run办法外面有个逻辑标记为enabled=false,实际上这个时候并不会收回心跳监测的。

它会依据整个生产组的状态变动而变动。

1.1 启动心跳线程

比方, 当咱们的消费者客户端发动JoinGroupRequest并胜利回调, 则就会设置enabled=true

JoinGroupResponseHandler#handle

从上面的代码能够看到, JoinGroupRequest回调的时候,把客户端的状态流转为了 COMPLETING_REBALANCE,并启动的监测线程

1.2 暂停心跳线程

  1. 当客户端的状态变更为 UNJOINED 或者 PREPARING_REBALANCE 的时候
  2. 又或者心跳线程有异样的时候

那么心跳线程就会临时进行, 因为 UNJOINED 或者 PREPARING_REBALANCE 的状态 自身并不需要去定时查看协调器在不在线, 并不关怀。

1.3 发动心跳申请

有个相干的配置如下

相熟形容默认值heartbeat.interval.ms消费者协调器与消费者协调器之间的心跳间隔时间,心跳用于确保消费者的会话放弃沉闷,并在新的消费者退出或者来到Group的时候促成Rebalance, 该值必须设置为低于session.timeout.ms,但通常应该设置为不高于该值的1/3, 也能够设置得更低3000(3 秒)


    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}",
            generation.generationId, generation.memberId, coordinator);
        HeartbeatRequest.Builder requestBuilder =
                new HeartbeatRequest.Builder(new HeartbeatRequestData()
                        .setGroupId(rebalanceConfig.groupId)
                        .setMemberId(this.generation.memberId)
                        .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                        .setGenerationId(this.generation.generationId));
        return client.send(coordinator, requestBuilder)
                .compose(new HeartbeatResponseHandler(generation));
    }

把客户端的根本信息带上发动申请

1.4 发动LeaveGroup(离组)申请

当客户端检测到以后时候超过了 session.timeout.ms的时候,会断定会话超时,这个时候将客户端持有的消费者协调器标记为空,须要从新寻找协调器去。

当客户端检测到以后工夫具体上一次客户端poll音讯超过了max.poll.interval.ms默认值300000(5 分钟)的时候, 就会执行LeaveGroup离组申请

AbstractCoordinator# maybeLeaveGroup


    public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
        RequestFuture<Void> future = null;

        // 从 2.3 开始,只有动静成员才会向 broker 发送 LeaveGroupRequest,group.instance.id 无效的 consumer 被视为从不发送 LeaveGroup 的动态成员,成员过期仅受 session timeout 管制
        if (isDynamicMember() && !coordinatorUnknown() &&
            state != MemberState.UNJOINED && generation.hasMemberId()) {
            // this is a minimal effort attempt to leave the group. we do not
            // attempt any resending if the request fails or times out.
            log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
                generation.memberId, coordinator, leaveReason);
            LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
                rebalanceConfig.groupId,
                Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId))
            );

            future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation));
            client.pollNoWakeup();
        }
  // 重置状态为UNJOINED 
        resetGenerationOnLeaveGroup();

        return future;
    }
  1. 在2.3之后, 只有动静成员才会向 broker 发送 LeaveGroupRequest,group.instance.id 无效的 consumer 被视为从不发送 LeaveGroup 的动态成员,成员过期仅受 session.timeout.ms 管制
  2. 重置客户端状态为 UNJOINED

具体的就不剖析了。客户端向GroupCoordinator发送LeaveGroupRequest之后,协调器做的是

3.移除Member,尝试重均衡

2. GroupCoordinator解决申请

上面的代码看着很多, 其实也没有很简单,基本上都是一些校验逻辑。

GroupCoordinator#handleHeartbeat


  def handleHeartbeat(groupId: String,
                      memberId: String,
                      groupInstanceId: Option[String],
                      generationId: Int,
                      responseCallback: Errors => Unit): Unit = {
    validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
      if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
        // the group is still loading, so respond just blindly
        responseCallback(Errors.NONE)
      else
        responseCallback(error)
      return
    }

    groupManager.getGroup(groupId) match {
      case None =>
        responseCallback(Errors.UNKNOWN_MEMBER_ID)

      case Some(group) => group.inLock {
        if (group.is(Dead)) {
          // if the group is marked as dead, it means some other thread has just removed the group
          // from the coordinator metadata; this is likely that the group has migrated to some other
          // coordinator OR the group is in a transient unstable phase. Let the member retry
          // finding the correct coordinator and rejoin.
          responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
        } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat")) {
          responseCallback(Errors.FENCED_INSTANCE_ID)
        } else if (!group.has(memberId)) {
          responseCallback(Errors.UNKNOWN_MEMBER_ID)
        } else if (generationId != group.generationId) {
          responseCallback(Errors.ILLEGAL_GENERATION)
        } else {
          group.currentState match {
            case Empty =>
              responseCallback(Errors.UNKNOWN_MEMBER_ID)

            case CompletingRebalance =>
              // consumers may start sending heartbeat after join-group response, in which case
              // we should treat them as normal hb request and reset the timer
              val member = group.get(memberId)
              completeAndScheduleNextHeartbeatExpiration(group, member)
              responseCallback(Errors.NONE)

            case PreparingRebalance =>
                val member = group.get(memberId)
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.REBALANCE_IN_PROGRESS)

            case Stable =>
                val member = group.get(memberId)
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.NONE)

            case Dead =>
              throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")
          }
        }
      }
    }
  }

简略来说就是

  1. 校验一下Group协调器存不存在以后Member
  2. 校验 Group的状态是否Dead,如果是Dead的话则客户端要从新寻找新的GroupCoordinator并JoinGroup
  3. 判断客户端和GroupCoordinator是否在同一个年代,如果不是一个年代阐明客户端须要从新JoinGroup了。
  4. 如果GroupCoordinator 以后状态是 PreparingRebalance的话, 客户端会判断本身如果是STABLE的话,则会从新JoinGroup。
  5. 如果GroupCoordinator以后状态是CompletingRebalanceStable , 则会清理一下GroupCoordinator设置的提早过期工作, 并从新设置一个新的工作。这个工作执行的工夫是配置 session.timeout.ms 之后。如果假如没有心跳线程申请过去了, 那么这个工作就会被执行。如果执行了会有啥问题呢?请持续看上面的生产组协调器超时工作

对于JoinGroupRequest, 是客户端发动退出到生产组的申请。

具体解说请看:Kafka消费者JoinGroupRequest流程解析

2.1 生产组协调器超时工作

如果在session.timeout.ms 期间始终没有收到客户端来的信跳申请,那么生产组协调器超时工作就会被执行


  def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
    group.inLock {
      if (group.is(Dead)) {

  // 如果以后心跳监测到Group协调器曾经Dead了,仅仅只是打印一下日志, 因为它本身可能曾经不是组协调器了,他曾经不能再被容许做什么了
        info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")

      } else if (isPending) {
        info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")
  // 客户端在发动第一次JoinGroup申请的时候,并没有带上memberId,然而Group会生成一个给客户端返回
  // 这个时候这个member就是Pending状态的,属于待退出状态, 因为它还会发动第二次JoinGroup申请并带上这个memberId,才算是真的Join了Group
  // 在这里 间接把这个memberId从Pending缓存中移除了,因为它心跳监测过期了,这意味着客户端须要从新发动第一次Join
        removePendingMemberAndUpdateGroup(group, memberId)
      } else if (!group.has(memberId)) {
        debug(s"Member $memberId has already been removed from the group.")
      } else {
        val member = group.get(memberId)
        if (!member.hasSatisfiedHeartbeat) {
          info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")

          removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
        }
      }
    }
  }
  1. 如果Group状态是Dead,那么什么也不做,它都不是组协调器了,它什么也做不了
  2. 如果以后的member 是Pending状态,(首先理解一下Pending状态,Member第一次JoinGroup的时候因为没有带上memberId参数,组协调器会生成一个MemberId返回给客户端,并且组协调器会在本身放弃一份这个Member的数据,然而这个时候的Member是Pending状态的,意识是期待退出的状态, 因为它还会再发动第二次JoinGroup申请,并且带上这个MemberId。这个时候才是真正的JoinGroup。) 则把这个Member从Pending缓存中移除。也就意味着这个Member须要再次发动第一次JoinGroup申请。
  3. 其余状态就是确定期间没有心跳申请的话, 那么要把这个Member移除掉并更新Group元信息。①. 将这个Member在Group协调器缓存中移除 ②. 如果以后的状态是Stable | CompletingRebalance 的话, 间接走prepareRebalance流程

prepareRebalance 流程次要做的事件就是

  1. 将状态流转到PreparingRebalance
  2. 设置一个DelayedJoin超时过期工作,超时工夫是max.poll.interval.ms 默认300000(5 分钟)。
  3. 这个工作会在满足要求(所有Member都JoinGroup了)的时候,去执行onCompleteJoin。这个就是跟JoinGroup前面的流程是一样的。次要动作就是告诉所有的Member,你们都Join胜利了, 接下来你们该发动SyncGroup申请了。 具体请看:Kafka消费者JoinGroupRequest流程解析

3. 客户端解决返回数据

HeartbeatResponseHandler#handle

这一块的代码就不贴了,次要就是依据返回的异样做具体的事件。

如果没有异样的话啥也不干。

异样映射关系如下:

  1. COORDINATOR_NOT_AVAILABLE | NOT_COORDINATOR 该去从新寻找新的GroupCoordinator
  2. REBALANCE_IN_PROGRESS;以后组协调器正在Rebalance中, 如果以后客户端是STABLE状态,阐明它该从新发动JoinGroupRequest了,连忙的它也要去Join并参加调配了呢。
  3. ILLEGAL_GENERATION | UNKNOWN_MEMBER_ID | FENCED_INSTANCE_ID :将Member的状态设置为 UNJOINED,并从新JoinGroup

4. 心跳线程状态图

咱们能够先看一下 消费者客户端Member的状态流转图

理解了这个状态流转图, 也就能够晓得心跳线程状态流转图了

因为心跳线程的运行只有在 两个状态:COMPLETING_REBALANCESTABLE

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理