1. 发动心跳申请
在 Consumer 客户端启动的时候, 就会构建心跳监测线程 HeartbeatThread 并启动,
心跳监测线程名:kafka-coordinator-heartbeat-thread|group.id
例如:kafka-coordinator-heartbeat-thread | consumer0
/**
* Java 学习材料
* wx: javataozi888
**/
private boolean enabled = false;
private synchronized void startHeartbeatThreadIfNeeded() {if (heartbeatThread == null) {heartbeatThread = new HeartbeatThread();
heartbeatThread.start();}
}
尽管这个时候启动了, 然而 run 办法外面有个逻辑标记为 enabled=false, 实际上这个时候并不会收回心跳监测的。
它会依据整个生产组的状态变动而变动。
1.1 启动心跳线程
比方, 当咱们的消费者客户端发动 JoinGroupRequest 并胜利回调, 则就会设置 enabled=true
JoinGroupResponseHandler#handle
从上面的代码能够看到, JoinGroupRequest 回调的时候, 把客户端的状态流转为了 COMPLETING_REBALANCE,并启动的监测线程
1.2 暂停心跳线程
- 当客户端的状态变更为 UNJOINED 或者 PREPARING_REBALANCE 的时候
- 又或者心跳线程有异样的时候
那么心跳线程就会临时进行, 因为 UNJOINED 或者 PREPARING_REBALANCE 的状态 自身并不需要去定时查看协调器在不在线, 并不关怀。
1.3 发动心跳申请
有个相干的配置如下
相熟形容默认值 heartbeat.interval.ms 消费者协调器与消费者协调器之间的心跳间隔时间, 心跳用于确保消费者的会话放弃沉闷, 并在新的消费者退出或者来到 Group 的时候促成 Rebalance, 该值必须设置为低于 session.timeout.ms,但通常应该设置为不高于该值的 1 /3, 也能够设置得更低 3000(3 秒)
synchronized RequestFuture<Void> sendHeartbeatRequest() {log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}",
generation.generationId, generation.memberId, coordinator);
HeartbeatRequest.Builder requestBuilder =
new HeartbeatRequest.Builder(new HeartbeatRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(this.generation.generationId));
return client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler(generation));
}
把客户端的根本信息带上发动申请
1.4 发动 LeaveGroup(离组)申请
当客户端检测到以后时候超过了 session.timeout.ms 的时候,会断定会话超时, 这个时候将客户端持有的消费者协调器标记为空, 须要从新寻找协调器去。
当客户端检测到以后工夫具体上一次客户端 poll 音讯超过了 max.poll.interval.ms 默认值 300000(5 分钟)的时候, 就会 执行 LeaveGroup 离组申请
AbstractCoordinator# maybeLeaveGroup
public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
RequestFuture<Void> future = null;
// 从 2.3 开始,只有动静成员才会向 broker 发送 LeaveGroupRequest,group.instance.id 无效的 consumer 被视为从不发送 LeaveGroup 的动态成员,成员过期仅受 session timeout 管制
if (isDynamicMember() && !coordinatorUnknown() &&
state != MemberState.UNJOINED && generation.hasMemberId()) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
rebalanceConfig.groupId,
Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId))
);
future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation));
client.pollNoWakeup();}
// 重置状态为 UNJOINED
resetGenerationOnLeaveGroup();
return future;
}
- 在 2.3 之后, 只有动静成员才会向 broker 发送 LeaveGroupRequest,group.instance.id 无效的 consumer 被视为从不发送 LeaveGroup 的动态成员,成员过期仅受 session.timeout.ms 管制
- 重置客户端状态为 UNJOINED
具体的就不剖析了。客户端向 GroupCoordinator 发送 LeaveGroupRequest 之后,协调器做的是
3. 移除 Member, 尝试重均衡
2. GroupCoordinator 解决申请
上面的代码看着很多, 其实也没有很简单, 基本上都是一些校验逻辑。
GroupCoordinator#handleHeartbeat
def handleHeartbeat(groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
responseCallback: Errors => Unit): Unit = {validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
// the group is still loading, so respond just blindly
responseCallback(Errors.NONE)
else
responseCallback(error)
return
}
groupManager.getGroup(groupId) match {
case None =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) => group.inLock {if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
} else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat")) {responseCallback(Errors.FENCED_INSTANCE_ID)
} else if (!group.has(memberId)) {responseCallback(Errors.UNKNOWN_MEMBER_ID)
} else if (generationId != group.generationId) {responseCallback(Errors.ILLEGAL_GENERATION)
} else {
group.currentState match {
case Empty =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case CompletingRebalance =>
// consumers may start sending heartbeat after join-group response, in which case
// we should treat them as normal hb request and reset the timer
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
case PreparingRebalance =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS)
case Stable =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
case Dead =>
throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")
}
}
}
}
}
简略来说就是
- 校验一下 Group 协调器存不存在以后 Member
- 校验 Group 的状态是否 Dead, 如果是 Dead 的话则客户端要从新寻找新的 GroupCoordinator 并 JoinGroup
- 判断客户端和 GroupCoordinator 是否在同一个年代, 如果不是一个年代阐明客户端须要从新 JoinGroup 了。
- 如果 GroupCoordinator 以后状态是 PreparingRebalance的话, 客户端会判断本身如果是 STABLE 的话, 则会从新 JoinGroup。
- 如果 GroupCoordinator 以后状态是 CompletingRebalance、Stable , 则会清理一下 GroupCoordinator 设置的提早过期工作, 并从新设置一个新的工作。这个工作执行的工夫是配置 session.timeout.ms 之后。如果假如没有心跳线程申请过去了, 那么这个工作就会被执行。如果执行了会有啥问题呢?请持续看上面的 生产组协调器超时工作
对于 JoinGroupRequest, 是客户端发动退出到生产组的申请。
具体解说请看:Kafka 消费者 JoinGroupRequest 流程解析
2.1 生产组协调器超时工作
如果在 session.timeout.ms 期间始终没有收到客户端来的信跳申请,那么 生产组协调器超时工作 就会被执行
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
group.inLock {if (group.is(Dead)) {
// 如果以后心跳监测到 Group 协调器曾经 Dead 了,仅仅只是打印一下日志, 因为它本身可能曾经不是组协调器了,他曾经不能再被容许做什么了
info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")
} else if (isPending) {info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")
// 客户端在发动第一次 JoinGroup 申请的时候,并没有带上 memberId, 然而 Group 会生成一个给客户端返回
// 这个时候这个 member 就是 Pending 状态的, 属于待退出状态, 因为它还会发动第二次 JoinGroup 申请并带上这个 memberId,才算是真的 Join 了 Group
// 在这里 间接把这个 memberId 从 Pending 缓存中移除了, 因为它心跳监测过期了, 这意味着客户端须要从新发动第一次 Join
removePendingMemberAndUpdateGroup(group, memberId)
} else if (!group.has(memberId)) {debug(s"Member $memberId has already been removed from the group.")
} else {val member = group.get(memberId)
if (!member.hasSatisfiedHeartbeat) {info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
}
- 如果 Group 状态是 Dead,那么什么也不做, 它都不是组协调器了, 它什么也做不了
- 如果以后的 member 是 Pending 状态,(首先理解一下 Pending 状态,Member 第一次 JoinGroup 的时候因为没有带上 memberId 参数, 组协调器会生成一个 MemberId 返回给客户端, 并且组协调器会在本身放弃一份这个 Member 的数据, 然而这个时候的 Member 是 Pending 状态的, 意识是期待退出的状态, 因为它还会再发动第二次 JoinGroup 申请, 并且带上这个 MemberId。这个时候才是真正的 JoinGroup。) 则把这个 Member 从 Pending 缓存中移除。也就意味着这个 Member 须要再次发动第一次 JoinGroup 申请。
- 其余状态就是确定期间没有心跳申请的话, 那么要把这个 Member 移除掉并更新 Group 元信息。①. 将这个 Member 在 Group 协调器缓存中移除 ②. 如果以后的状态是Stable | CompletingRebalance 的话, 间接走 prepareRebalance 流程
prepareRebalance 流程次要做的事件就是
- 将状态流转到PreparingRebalance
- 设置一个 DelayedJoin 超时过期工作,超时工夫是 max.poll.interval.ms 默认 300000(5 分钟)。
- 这个工作会在满足要求 (所有 Member 都 JoinGroup 了) 的时候,去执行 onCompleteJoin。这个就是跟 JoinGroup 前面的流程是一样的。次要动作就是告诉所有的 Member, 你们都 Join 胜利了, 接下来你们该发动 SyncGroup 申请了。具体请看:Kafka 消费者 JoinGroupRequest 流程解析
3. 客户端解决返回数据
HeartbeatResponseHandler#handle
这一块的代码就不贴了, 次要就是依据返回的异样做具体的事件。
如果没有异样的话啥也不干。
异样映射关系如下:
- COORDINATOR_NOT_AVAILABLE | NOT_COORDINATOR 该去从新寻找新的 GroupCoordinator
- REBALANCE_IN_PROGRESS;以后组协调器正在 Rebalance 中, 如果以后客户端是 STABLE 状态, 阐明它该从新发动 JoinGroupRequest 了,连忙的它也要去 Join 并参加调配了呢。
- ILLEGAL_GENERATION | UNKNOWN_MEMBER_ID | FENCED_INSTANCE_ID:将 Member 的状态设置为 UNJOINED,并从新 JoinGroup
4. 心跳线程状态图
咱们能够先看一下 消费者客户端 Member 的状态流转图。
理解了这个状态流转图, 也就能够晓得心跳线程状态流转图了
因为心跳线程的运行只有在 两个状态:COMPLETING_REBALANCE、STABLE