共计 4812 个字符,预计需要花费 13 分钟才能阅读完成。
大家好,我是不才陈某~
明天分享一道有意思的面试题。
面试官:RocketMQ 音讯积压了,减少消费者有用吗?
我:这个要看具体的场景,不同的场景下状况是不一样的。
面试官:能够具体说一下吗?
我:如果消费者的数量小于 MessageQueue 的数量,减少消费者能够放慢音讯生产速度,缩小音讯积压。比方一个 Topic 有 4 个 MessageQueue,2 个消费者进行生产,如果减少一个消费者,明细能够放慢拉取音讯的频率。如下图:
关注公众号:码猿技术专栏,回复关键词:1111 获取阿里外部 Java 性能调优手册!
如果消费者的数量大于等于 MessageQueue 的数量,减少消费者是没有用的。比方一个 Topic 有 4 个 MessageQueue,并且有 4 个消费者进行生产。如下图
面试官:你说的第一种状况,减少消费者肯定能放慢音讯生产的速度吗?
我:这 …,个别状况下是能够的。
面试官:有非凡的状况吗?
我:当然有。消费者音讯拉取的速度也取决于本地音讯的生产速度,如果本地音讯生产的慢,就会提早一段时间后再去拉取。
面试官:在什么状况下消费者会提早一段时间后后再去拉取呢?
我:消费者拉取的音讯存在 ProcessQueue,消费者是有流量管制的,如果呈现上面三种状况,就不会被动去拉取:
- ProcessQueue 保留的音讯数量超过阈值(默认 1000,能够配置);
- ProcessQueue 保留的音讯大小超过阈值(默认 100M,能够配置);
- 对于非程序生产的场景,ProcessQueue 中保留的最初一条和第一条音讯偏移量之差超过阈值(默认 2000,能够配置)。
这部分源码请参考类:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl。
面试官:还有其余状况吗?
我:对于程序生产的场景,ProcessQueue 加锁失败,也会提早拉取,这个延迟时间是 3s。
面试官:消费者提早拉取音讯,个别可能是什么起因导致的呢?
我 :其实提早拉取的 实质就是消费者生产慢,导致下次去拉取的时候 ProcessQueue 中积压的音讯超过阈值。以上面这张架构图为例:
消费者生产慢,可是能上面的起因:
- 消费者解决的业务逻辑简单,耗时很长;
- 消费者有慢查问,或者数据库负载高导致响应慢;
- 缓存等中间件响应慢,比方 Redis 响应慢;
- 调用内部服务接口响应慢。
面试官:对于内部接口响应慢的状况,有什么应答措施吗?
我:这个要分状况探讨。
如果调用内部零碎 只是一个告诉,或者调用内部接口的后果并不解决,能够采纳异步的形式,异步逻辑里采纳重试的形式保障接口调胜利。
如果内部接口返回后果必须要解决,能够思考接口返回的后果是否能够缓存默认值(要思考业务可行),在调用失败后采纳疾速降级的形式,应用默认值代替返回接口返回值。
如果这个接口返回后果必须要解决,并且不能缓存,能够把拉取到的音讯存入本地而后给 Broker 间接返回 CONSUME_SUCCESS。等内部零碎恢复正常后再从本地取出来进行解决。
面试官:如果消费者数小于 MessageQueue 数量,并且内部零碎响应失常,为了疾速生产积压音讯而减少消费者,有什么须要思考的吗?
我:内部零碎尽管响应失常,然而减少多个消费者后,内部零碎的接口调用量会突增,如果达到吞吐量下限,内部零碎会响应变慢,甚至被打挂。
同时也要思考本地数据库、缓存的压力,如果数据库响应变慢,解决音讯的速度就会变慢,起不到缓解音讯积压的作用。
面试官:新减少了消费者后,怎么给它调配 MessageQueue 呢?
我:Consumer 在拉取音讯之前,须要对 MessageQueue 进行负载操作。RocketMQ 应用一个定时器来实现负载操作,默认每距离 20s 从新负载一次。
面试官:能具体说一下都有哪些负载策略吗?
我:RocketMQ 提供了 6 种负载策略,顺次来看一下。
均匀负载策略:
- 把消费者进行排序;
- 计算每个消费者能够平均分配的 MessageQueue 数量;
- 如果消费者数量大于 MessageQueue 数量,多出的消费者就分不到;
- 如果不能够平分,就应用 MessageQueue 总数量对消费者数量求余数 mod;
- 对前 mod 数量消费者,每个消费者加一个,这样就获取到了每个消费者调配的 MessageQueue 数量。
比方 4 个 MessageQueue 和 3 个消费者的状况:
源代码的逻辑非常简单,如下:
// AllocateMessageQueueAveragely 这个类
// 4 个 MessageQueue 和 3 个消费者的状况,如果第一个,index = 0
int index = cidAll.indexOf(currentCID);
// mod = 1
int mod = mqAll.size() % cidAll.size();
// averageSize = 2
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
// startIndex = 0
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// range = 2, 所以第一个消费者调配到了 2 个
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
循环调配策略:
这个很容易了解,遍历消费者,把 MessageQueue 分一个给遍历到的消费者,如果 MessageQueue 数量比消费者多,须要进行屡次遍历,遍历次数等于(MessageQueue 数量 / 消费者数量),还是以 4 个 MessageQueue 和 3 个消费者的状况,如下图:
源代码如下:
//AllocateMessageQueueAveragelyByCircle 这个类
//4 个 MessageQueue 和 3 个消费者的状况,如果第一个,index = 0
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {if (i % cidAll.size() == index) {
//i == 0 或者 i == 3 都会走到这里
result.add(mqAll.get(i));
}
}
自定义调配策略:
这种策略在消费者启动的时候能够指定生产哪些 MessageQueue。能够参考上面代码:
AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
// 绑定生产 messageQueue1
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("messageQueue1","broker1",0)));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
consumer.start();
依照机房调配策略:
这种形式 Consumer 只生产指定机房的 MessageQueue,如下图:Consumer0、Consumer1、Consumer2 绑定 room1 和 room2 这两个机房,而 room3 这个机房没有消费者。
Consumer 启动的时候须要绑定机房名称。能够参考上面代码:
AllocateMessageQueueByMachineRoom allocateMessageQueueByMachineRoom = new AllocateMessageQueueByMachineRoom();
// 绑定生产 room1 和 room2 这两个机房
allocateMessageQueueByMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("room1","room2")));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);
consumer.start();
这种策略 broker 的命名必须依照格局:机房名 @brokerName,因为消费者调配队列的时候,首先依照机房名称过滤出所有的 MessageQueue,而后 再依照平均分配策略进行调配。
//AllocateMessageQueueByMachineRoom 这个类
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {String[] temp = mq.getBrokerName().split("@");
if (temp.length == 2 && consumeridcs.contains(temp[0])) {premqAll.add(mq);
}
}
// 下面依照机房名称过滤出所有的 MessageQueue 放入 premqAll,前面就是平均分配策略
依照机房就近调配:
跟依照机房分配原则相比,就近调配的益处是能够对没有消费者的机房进行调配。如下图,机房 3 的 MessageQueue 也调配到了消费者:
如果一个机房没有消费者,则会把这个机房的 MessageQueue 调配给集群中所有的消费者。
源码所在类:AllocateMachineRoomNearby。
一致性 Hash 算法策略:
把所有的消费者通过 Hash 计算散布到 Hash 环上,对所有的 MessageQueue 进行 Hash 计算,找到顺时针方向最近的消费者节点进行绑定。如下图:
源代码如下:
// 所在类 AllocateMessageQueueConsistentHash
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {cidNodes.add(new ClientNode(cid));
}
// 应用消费者构建 Hash 环,把消费者散布在 Hash 环节点上
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
// 对 MessageQueue 做 Hash 运算,找到环上间隔最近的消费者
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);
}
}
面试官:祝贺你,通过了。