一次 RocketMQ 程序生产提早的问题定位
问题背景与景象
昨晚收到了利用报警,发现线上某个业务生产音讯提早了 54s 多(从音讯发送到 MQ 到被生产的距离):
2021-06-30T23:12:46.756 message processing is incredibly delayed! (Current delay time: 54725, incredible delay count in 10 seconds: 5677)
查看 RocketMQ 的监控,发现的确产生了比拟多的音讯积压:
从 RocketMQ-Console 下面查看 Topic 的消费者:
这个 Topic,业务要求是 须要有序 的。所以在发送的时候,指定了 业务 Key,并且生产的时候,应用的是 程序生产模式。
咱们应用了 RocketMQ 集群,有三个 Broker,对于这个 Topic,每个 Broker 下面都有 8 个 ReadQueue 和 WriteQueue。这里简略提一下 ReadQueue 和 WriteQueue 的意思:
在 RocketMQ 中,音讯发送时应用 WriteQueue 个数返回路由信息 , 而音讯生产时依照 ReadQueue 个数返回路由信息 。在物理文件层面,只有 WriteQueue 才会创立文件。举个例子:设置 WriteQueueNum = 8,ReadQueueNum = 4,会创立 8 个文件夹,代表 0 1 2 3 4 5 6 7 这 8 个队列,但在音讯生产时,路由信息只返回 4,在具体拉取音讯时,就只会生产 0 1 2 3 这 4 个队列中的音讯,4 5 6 7 压根就没有被生产。反过来,如果设置 WriteQueueNum = 4,ReadQueueNum = 8,在生产音讯时只会往 0 1 2 3 中生产音讯,生产音讯时则会从 0 1 2 3 4 5 6 7 所有的队列中生产,当然 4 5 6 7 中压根就没有音讯,假如生产是 Group 生产,Group 中有两个消费者,事实上只有第一个消费者在真正的生产音讯(0 1 2 3),第二个消费者压根就生产不到音讯(4 5 6 7)。 个别咱们都会设置这两个值雷同,只有在须要缩容 topic 的队列数量的时候,才会设置他们不同。
问题剖析
首先联想到的是,是否是 生产线程卡住了呢 ? 线程卡住个别因为:
-
产生了 Stop-the-wolrd:
- GC 导致
- 其余 safepoint 起因导致(例如 jstack,定时进入 safepoint 等等,参考我的这篇文章 JVM 相干 – SafePoint 与 Stop The World 全解)
- 线程解决音讯工夫过长,可能有锁获取不到,可能卡在某些 IO
采集过后的 JFR(对于 JFR,请参考我的另一系列 JFR 全解),发现:
- 在这个时间段并没有产生 停滞工夫很长 的 GC 以及其余 Stop-the-world 的 safepoint 事件:
- 在这段时间,线程是 park 的,并且堆栈显示是 生产线程并没有音讯能够生产:
既然利用并没有什么问题,咱们来看看 RocketMQ 是否有什么问题。个别的 RocketMQ Broker 的日志咱们关怀:
- 音讯长久化的工夫耗费统计,如果这里产生异样,咱们须要调优 Java MMAP 相干的参数,请参考:
- 音讯长久化异样,查看 storeerr.log
- 锁异样,查看 lock.log
那到底应该去看哪一个 broker 呢?之前提到了,发送到这个 Topic 是指定了 hashKey 的,通过音讯的 hashKey 咱们能够定位到是哪个 broker:
int hashCode = "咱们的 hashKey".hashCode();
log.info("{}", Math.abs(hashCode % 24));
咱们找到了音讯的 hashKey,通过下面的代码,后果是 20,也就是队列 20,通过后面的形容,咱们晓得每个 broker 是 8 个队列,20 对应的就是 broker-2 下面的队列,也就是 broker-2 queueId = 5 这个队列。咱们来查看 broker-2 下面的日志定位问题。
咱们发现 lock.log 外面有异样,如下所示,相似的有很多条,并且继续了 54s 左右,和线程 park 工夫比拟吻合,也和音讯提早比拟吻合:
2021-07-01 07:11:47 WARN AdminBrokerThread_10 - tryLockBatch, message queue locked by other client. Group: 生产 group OtherClientId: 10.238.18.6@29 NewClientId: 10.238.18.122@29 MessageQueue [topic= 音讯 topic, brokerName=broker-2, queueId=5]
这个日志的意思是,10.238.18.122@29
这个实例尝试锁住 queueId = 5 失败,因为 10.238.18.6@29
正在持有这个锁。那么为什么会产生这种状况呢?
RocketMQ 多队列程序生产的原理
RocketMQ 想要实现多队列程序生产,首先须要指定 hashKey,通过 hashKey 音讯会被放入特定的队列,消费者生产这个队列的时候,如果指定了程序生产,是 单线程生产 的,这样就保障了同一队列内有序。
那么是如何保障每个队列是单线程生产的呢?每个 Broker 保护一个:
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
他是一个 ConcurrentMap< 生产组名称, ConcurrentHashMap< 音讯队列, 锁对象 >>
。锁对象 LockEntry 包含:
RebalanceLockManager.java
:
// 读取 rocketmq.broker.rebalance.lockMaxLiveTime 这个环境变量,默认 60s
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty("rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
static class LockEntry {
//RocketMQ 客户端惟一 id
private String clientId;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
// 省略 getter setter
public boolean isLocked(final String clientId) {boolean eq = this.clientId.equals(clientId);
return eq && !this.isExpired();}
public boolean isExpired() {
// 在 REBALANCE_LOCK_MAX_LIVE_TIME 这么长时间后过期
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
}
RocketMQ 客户端发送 LOCK_BATCH_MQ 申请到 Broker 下面,Broker 会将客户端申请封装成为 LockEntry 并尝试更新这个 Map,如果更新胜利就是获取到了锁,如果失败则没有获取这个锁。Broker 的具体更新逻辑是(感兴趣能够查看,也能够间接跳过,不影响了解,前面有便于了解的图片):
public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
// 判断没有曾经锁住
if (!this.isLocked(group, mq, clientId)) {
try {
// 获取锁,这个锁是实例内的,因为每个 broker 保护本人的队列锁表,并不共享
this.lock.lockInterruptibly();
try {
// 尝试获取,判断是否存在,存在就判断是否过期
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
LockEntry lockEntry = groupValue.get(mq);
if (null == lockEntry) {lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",
group,
clientId,
mq);
}
if (lockEntry.isLocked(clientId)) {lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
return true;
}
String oldClientId = lockEntry.getClientId();
if (lockEntry.isExpired()) {lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn("tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
return true;
}
// 这里就是咱们刚刚看到的日志
log.warn("tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
return false;
} finally {this.lock.unlock();
}
} catch (InterruptedException e) {log.error("putMessage exception", e);
}
} else { }
return true;
}
// 判断是否是曾经锁住了
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
// 通过生产组名称获取
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
// 如果不为 null
if (groupValue != null) {
// 尝试获取 lockEntry,看是否存在
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
// 如果存在,判断是否过期
boolean locked = lockEntry.isLocked(clientId);
if (locked) {lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
每个 MQ 客户端,会定时发送 LOCK_BATCH_MQ 申请,并且在本地保护获取到锁的所有队列:
ProcessQueue.java
:
// 定时发送 **LOCK_BATCH_MQ** 距离
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
ConsumeMessageOrderlyService.java
:
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
流程图如下所示:
ConsumeMessageOrderlyService
在敞开的时候,会 unlock 所有的队列:
public void shutdown() {
this.stopped = true;
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {this.unlockAllMQ();
}
}
问题呈现起因
咱们这里客户端定时发送 LOCK_BATCH_MQ 距离是默认的 20s,Broker 端锁过期的工夫也是默认的 60s。
咱们的集群容器编排应用了 k8s,并且有实例迁徙的性能。在集群压力大的时候,主动扩容新的 Node(能够了解为虚拟机)并将创立新的服务实例部署下来。集群某些服务压力小的时候,某些服务实例会缩容上来,这时候就不须要那么多 Node 了,就会回收一部分 Node,然而被回收的 Node 下面还有不能缩容的服务实例,这时候就须要将这些服务实例迁徙到其余 Node 下面。这里咱们的业务实例就是产生了这个状况。
在问题呈现的时候,产生了 迁徙 ,老的实例被敞开,然而没有期待 ConsumeMessageOrderlyService#shutdown 的执行, 导致锁没有被被动开释,而是期待 60s 的锁过期工夫后,新的实例才拿到队列锁开始生产。
问题解决
- 在下个版本,退出针对 RocketMQ 客户端的优雅敞开逻辑
- 所有服务实例(RocketMQ 客户端)配置
rocketmq.client.rebalance.lockInterval
缩短心跳工夫(5s),RocketMQ Broker 配置rocketmq.broker.rebalance.lockMaxLiveTime
缩短过期工夫(例如 15s),然而放弃过期工夫是心跳工夫的 3 倍(集群中的 3 倍设计公理)
微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer: