共计 20316 个字符,预计需要花费 51 分钟才能阅读完成。
RocketMQ 音讯投递策略
- 作者: 博学谷狂野架构师
-
GitHub:GitHub 地址(有我精心筹备的 130 本电子书 PDF)
只分享干货、不吹水,让咱们一起加油!😄
前言
RocketMQ 的音讯投递分分为两种:一种是 生产者
往 MQ Broker 中投递;另外一种则是 MQ broker 往 消费者
投递(这种 投递
的说法是从消息传递的角度论述的,实际上底层是 消费者
从 MQ broker 中 Pull 拉取的)。本文将从模型的角度来论述这两种机制。
RocketMQ 的音讯模型
RocketMQ 的音讯模型整体并不简单,如下图所示:
一个 Topic(音讯主题)
可能对应多个理论的音讯队列(MessgeQueue)
在底层实现上,为了进步 MQ 的可用性和灵活性,一个 Topic 在理论存储的过程中,采纳了多队列的形式,具体模式如上图所示。每个音讯队列在应用中该当保障 先入先出(FIFO,First In First Out)的形式进行生产。
那么,基于这种模型,就会引申出两个问题:
- 生产者 在发送雷同 Topic 的音讯时,音讯体该当被搁置到哪一个音讯队列(MessageQueue) 中?
- 消费者 在生产音讯时,该当从哪些音讯队列中拉取音讯?
音讯的零碎间传递时,会逾越不同的网络载体,这会导致音讯的流传无奈保障其有序请
生产者投递策略
轮询算法投递
默认投递形式:基于
Queue 队列
轮询算法投递
默认状况下,采纳了最简略的轮询算法,这种算法有个很好的个性就是,保障每一个 Queue 队列
的音讯投递数量尽可能平均,算法如下图所示:
COPY/**
* 依据 TopicPublishInfo Topic 公布信息对象中保护的 index,每次抉择队列时,都会递增
* 而后依据 index % queueSize 进行取余,达到轮询的成果
*
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return tpInfo.selectOneMessageQueue(lastBrokerName);
}
/**
* TopicPublishInfo Topic 公布信息对象中
*/
public class TopicPublishInfo {
// 基于线程上下文的计数递增,用于轮询目标
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();
} else {int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
// 轮询计算
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}
}
return selectOneMessageQueue();}
}
public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
}
代码示例
RocketMQ 默认采纳轮询投递策略
COPY/**
* 轮询投递策略
*/
public class PollingProducer {public static void main(String[] args) throws Exception {
// 创立一个音讯生产者,并设置一个音讯生产者组
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");
// 指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 初始化 Producer,整个利用生命周期内只须要初始化一次
producer.start();
for (int i = 0; i < 10; i++) {
// 创立一条音讯对象,指定其主题、标签和音讯内容
Message msg = new Message(
/* 音讯主题名 */
"topicTest",
/* 音讯标签 */
"TagA",
/* 音讯内容 */
("Hello Java demo RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送音讯并返回后果
SendResult sendResult = producer.send(msg);
System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ", 存储 queue:" + sendResult.getMessageQueue().getQueueId() + ", 音讯索引:" + i);
}
// 一旦生产者实例不再被应用则将其敞开,包含清理资源,敞开网络连接等
producer.shutdown();}
}
打印后果
COPYproduct: 发送状态:SEND_OK, 存储 queue:0, 音讯索引:0
product: 发送状态:SEND_OK, 存储 queue:1, 音讯索引:1
product: 发送状态:SEND_OK, 存储 queue:2, 音讯索引:2
product: 发送状态:SEND_OK, 存储 queue:3, 音讯索引:3
product: 发送状态:SEND_OK, 存储 queue:0, 音讯索引:4
product: 发送状态:SEND_OK, 存储 queue:1, 音讯索引:5
product: 发送状态:SEND_OK, 存储 queue:2, 音讯索引:6
product: 发送状态:SEND_OK, 存储 queue:3, 音讯索引:7
product: 发送状态:SEND_OK, 存储 queue:0, 音讯索引:8
product: 发送状态:SEND_OK, 存储 queue:1, 音讯索引:9
音讯投递提早最小策略
默认投递形式的加强:基于
Queue 队列
轮询算法和音讯投递提早最小
的策略投递
默认的投递形式比较简单,然而也裸露了一个问题,就是有些 Queue 队列
可能因为本身数量积压等起因,可能在投递的过程比拟长,对于这样的 Queue 队列
会影响后续投递的成果。
基于这种景象,RocketMQ 在每发送一个 MQ 音讯后,都会统计一下音讯投递的 时间延迟
,依据这个 时间延迟
,能够晓得往哪些Queue 队列
投递的速度快。
在这种场景下,会优先应用 音讯投递提早最小
的策略,如果没有失效,再应用 Queue 队列轮询
的形式。
COPYpublic class MQFaultStrategy {
/**
* 依据 TopicPublishInfo 外部保护的 index, 在每次操作时,都会递增,* 而后依据 index % queueList.size(), 应用了轮询的根底算法
*
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {
try {
// 从 queueid 为 0 开始,顺次验证 broker 是否无效,如果无效
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 基于 index 和队列数量取余,确定地位
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 从提早容错 broker 列表中筛选一个容错性最好的一个 broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
// 取余筛选其中一个队列
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {log.error("Error occurred when selecting message queue", e);
}
// 取余筛选其中一个队列
return tpInfo.selectOneMessageQueue();}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
}
程序投递策略
上述两种投递形式属于对音讯投递的时序性没有要求的场景,这种投递的速度和效率比拟高。而在有些场景下,须要保障同类型音讯投递和生产的程序性。
例如,假如当初有 TOPIC topicTest
, 该 Topic 下有 4 个 Queue 队列
,该 Topic 用于传递订单的状态变迁,假如订单有状态: 未领取
、 已领取
、 发货中 (解决中)
、 发货胜利
、 发货失败
。
在时序上,生产者从时序上能够生成如下几个音讯:
COPY 订单 T0000001: 未领取 --> 订单 T0000001: 已领取 --> 订单 T0000001: 发货中(解决中) --> 订单 T0000001: 发货失败
音讯发送到 MQ 中之后,可能因为轮询投递的起因,音讯在 MQ 的存储可能如下:
这种状况下,咱们心愿 消费者
生产音讯的程序和咱们发送是统一的,然而,有上述 MQ 的投递和生产机制,咱们无奈保障程序是正确的,对于程序异样的音讯,消费者
即便有肯定的状态容错,也不能齐全解决好这么多种随机呈现组合状况。
基于上述的状况,RockeMQ
采纳了这种实现计划:对于雷同订单号的音讯,通过肯定的策略,将其搁置在一个 queue 队列中
,而后 消费者
再采纳肯定的策略(一个线程独立解决一个queue
, 保障解决音讯的程序性),可能保障生产的程序性
至于消费者是如何保障生产的程序行的,后续再具体开展,咱们先看 生产者
是如何能将雷同订单号的音讯发送到同一个 queue 队列
的:
生产者在音讯投递的过程中,应用了 MessageQueueSelector
作为队列抉择的策略接口,其定义如下:
COPYpublic interface MessageQueueSelector {
/**
* 依据音讯体和参数,从一批音讯队列中挑选出一个适合的音讯队列
* @param mqs 待抉择的 MQ 队列抉择列表
* @param msg 待发送的音讯体
* @param arg 附加参数
* @return 抉择后的队列
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
相应地,目前 RocketMQ 提供了如下几种实现:
默认实现
投递策略 | 策略实现类 | 阐明 |
---|---|---|
随机调配策略 | SelectMessageQueueByRandom | 应用了简略的随机数抉择算法 |
基于 Hash 调配策略 | SelectMessageQueueByHash | 依据附加参数的 Hash 值,依照音讯队列列表的大小取余数,失去音讯队列的 index |
基于机器机房地位调配策略 | SelectMessageQueueByMachineRoom | 开源的版本没有具体的实现,根本的目标应该是机器的就近准则调配 |
当初大略看下策略的代码实现:
COPYpublic class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int value = arg.hashCode();
if (value < 0) {value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
代码示例
理论的操作代码样例如下,通过订单号作为 hash 运算对象,就能保障雷同订单号的音讯可能落在雷同的
queue 队列上
。
COPYpublic class OrderProducer {private static final List<ProductOrder> orderList = new ArrayList<>();
static {orderList.add(new ProductOrder("XXX001", "订单创立"));
orderList.add(new ProductOrder("XXX001", "订单付款"));
orderList.add(new ProductOrder("XXX001", "订单实现"));
orderList.add(new ProductOrder("XXX002", "订单创立"));
orderList.add(new ProductOrder("XXX002", "订单付款"));
orderList.add(new ProductOrder("XXX002", "订单实现"));
orderList.add(new ProductOrder("XXX003", "订单创立"));
orderList.add(new ProductOrder("XXX003", "订单付款"));
orderList.add(new ProductOrder("XXX003", "订单实现"));
}
public static void main(String[] args) throws Exception {
// 创立一个音讯生产者,并设置一个音讯生产者组
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");
// 指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 初始化 Producer,整个利用生命周期内只须要初始化一次
producer.start();
for (int i = 0; i < orderList.size(); i++) {
// 获取以后 order
ProductOrder order = orderList.get(i);
// 创立一条音讯对象,指定其主题、标签和音讯内容
Message message = new Message(
/* 音讯主题名 */
"topicTest",
/* 音讯标签 */
order.getOrderId(),
/* 音讯内容 */
(order.toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送音讯并返回后果 应用 hash 抉择策略
SendResult sendResult = producer.send(message, new SelectMessageQueueByHash(), order.getOrderId());
System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ", 存储 queue:" + sendResult.getMessageQueue().getQueueId() + ",orderID:" + order.getOrderId() + ",type:" + order.getType());
}
// 一旦生产者实例不再被应用则将其敞开,包含清理资源,敞开网络连接等
producer.shutdown();}
}
打印后果如下
COPYproduct: 发送状态:SEND_OK, 存储 queue:3,orderID:XXX001,type: 订单创立
product: 发送状态:SEND_OK, 存储 queue:3,orderID:XXX001,type: 订单付款
product: 发送状态:SEND_OK, 存储 queue:3,orderID:XXX001,type: 订单实现
product: 发送状态:SEND_OK, 存储 queue:2,orderID:XXX002,type: 订单创立
product: 发送状态:SEND_OK, 存储 queue:2,orderID:XXX002,type: 订单付款
product: 发送状态:SEND_OK, 存储 queue:2,orderID:XXX002,type: 订单实现
product: 发送状态:SEND_OK, 存储 queue:1,orderID:XXX003,type: 订单创立
product: 发送状态:SEND_OK, 存储 queue:1,orderID:XXX003,type: 订单付款
product: 发送状态:SEND_OK, 存储 queue:1,orderID:XXX003,type: 订单实现
消费者调配队列
如何为消费者调配
queue 队列
?RocketMQ 对于消费者生产音讯有两种模式:
BROADCASTING
: 广播式生产,这种模式下,一个音讯会被告诉到每一个消费者
CLUSTERING
: 集群式生产,这种模式下,一个音讯最多只会被投递到一个消费者
上进行生产
模式如下:
广播式
的音讯模式比较简单,上面咱们介绍下 集群式
。对于应用了生产模式为MessageModel.CLUSTERING
进行生产时,须要保障一个 音讯
在整个集群中只须要被生产一次 。实际上,在 RoketMQ 底层,音讯指定调配给消费者的实现,是通过queue 队列
调配给 消费者
的形式实现的:也就是说,音讯
调配的单位是音讯所在的queue 队列
。即:
将
queue 队列
指定给特定的消费者
后,queue 队列
内的所有音讯将会被指定到消费者
进行生产。
RocketMQ 定义了策略接口 AllocateMessageQueueStrategy
,对于给定的 消费者分组
, 和 音讯队列列表
、 消费者列表
, 以后消费者
该当被调配到哪些queue 队列
,定义如下:
COPY/**
* 为消费者调配 queue 的策略算法接口
*/
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
*
* @param consumerGroup 以后 consumer 群组
* @param currentCID 以后 consumer id
* @param mqAll 以后 topic 的所有 queue 实例援用
* @param cidAll 以后 consumer 群组下所有的 consumer id set 汇合
* @return 依据策略给以后 consumer 调配的 queue 列表
*/
List<MessageQueue> allocate(
final String consumerGroup,
final String currentCID,
final List<MessageQueue> mqAll,
final List<String> cidAll
);
/**
* 算法名称
*
* @return The strategy name
*/
String getName();}
相应地,RocketMQ 提供了如下几种实现:
算法名称 | 含意 |
---|---|
AllocateMessageQueueAveragely | 平均分配算法 |
AllocateMessageQueueAveragelyByCircle | 基于环形平均分配算法 |
AllocateMachineRoomNearby | 基于机房邻近准则算法 |
AllocateMessageQueueByMachineRoom | 基于机房调配算法 |
AllocateMessageQueueConsistentHash | 基于一致性 hash 算法 |
AllocateMessageQueueByConfig | 基于配置调配算法 |
为了讲述分明上述算法的基本原理,咱们先假如一个例子,上面所有的算法将基于这个例子解说。
假如以后同一个 topic 下有 queue 队列 10
个,消费者共有 4
个, 如下图所示:
上面顺次介绍其原理:
平均分配算法
这里所谓的平均分配算法,并不是指的严格意义上的齐全均匀,如下面的例子中,10 个 queue,而消费者只有 4 个,无奈是整除关系,除了整除之外的多进去的 queue, 将顺次依据消费者的程序均摊。
依照上述例子来看,10/4=2
,即示意每个 消费者
均匀均摊 2 个 queue;而 10%4=2
,即除了均摊之外,多进去 2 个queue
还没有调配,那么,依据消费者的程序 consumer-1
、consumer-2
、consumer-3
、consumer-4
, 则多进去的 2 个queue
将别离给 consumer-1
和consumer-2
。
最终,摊派关系如下:
consumer-1
:3 个consumer-2
:3 个consumer-3
:2 个consumer-4
:2 个
其代码实现非常简单:
COPYpublic class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
@Override
public String getName() {return "AVG";}
}
演示成果
消费者 A
COPYConsumer- 线程名称 =[32], 接管 queueId:[0], 接管工夫:[1608171677558], 音讯 =[Hello Java demo RocketMQ 2]
Consumer- 线程名称 =[34], 接管 queueId:[1], 接管工夫:[1608171677580], 音讯 =[Hello Java demo RocketMQ 3]
Consumer- 线程名称 =[36], 接管 queueId:[0], 接管工夫:[1608171677655], 音讯 =[Hello Java demo RocketMQ 6]
Consumer- 线程名称 =[38], 接管 queueId:[1], 接管工夫:[1608171677679], 音讯 =[Hello Java demo RocketMQ 7]
消费者 B
COPYConsumer- 线程名称 =[35], 接管 queueId:[2], 接管工夫:[1608171677508], 音讯 =[Hello Java demo RocketMQ 0]
Consumer- 线程名称 =[36], 接管 queueId:[3], 接管工夫:[1608171677535], 音讯 =[Hello Java demo RocketMQ 1]
Consumer- 线程名称 =[37], 接管 queueId:[2], 接管工夫:[1608171677609], 音讯 =[Hello Java demo RocketMQ 4]
Consumer- 线程名称 =[38], 接管 queueId:[3], 接管工夫:[1608171677635], 音讯 =[Hello Java demo RocketMQ 5]
Consumer- 线程名称 =[39], 接管 queueId:[2], 接管工夫:[1608171677709], 音讯 =[Hello Java demo RocketMQ 8]
Consumer- 线程名称 =[40], 接管 queueId:[3], 接管工夫:[1608171677734], 音讯 =[Hello Java demo RocketMQ 9]
基于环形均匀算法
环形均匀算法,是指依据消费者的程序,顺次在由
queue 队列
组成的环形图中一一调配。具体流程如下所示:
这种算法最终调配的后果是:
consumer-1
: #0,#4,#8consumer-2
: #1, #5, # 9consumer-3
: #2,#6consumer-4
: #3,#7
其代码实现如下所示:
COPY/**
* Cycle average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {if (i % cidAll.size() == index) {result.add(mqAll.get(i));
}
}
return result;
}
@Override
public String getName() {return "AVG_BY_CIRCLE";}
}
演示成果
设置算法
COPY// 设置应用环形 hash 算法
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "rocket_test_consumer_group", null, new AllocateMessageQueueAveragelyByCircle());
消费者 A
COPYConsumer- 线程名称 =[35], 接管 queueId:[0], 接管工夫:[1608171903364], 音讯 =[Hello Java demo RocketMQ 1]
Consumer- 线程名称 =[38], 接管 queueId:[2], 接管工夫:[1608171903411], 音讯 =[Hello Java demo RocketMQ 3]
Consumer- 线程名称 =[39], 接管 queueId:[0], 接管工夫:[1608171903459], 音讯 =[Hello Java demo RocketMQ 5]
Consumer- 线程名称 =[40], 接管 queueId:[2], 接管工夫:[1608171903508], 音讯 =[Hello Java demo RocketMQ 7]
Consumer- 线程名称 =[41], 接管 queueId:[0], 接管工夫:[1608171903562], 音讯 =[Hello Java demo RocketMQ 9]
消费者 B
COPYConsumer- 线程名称 =[28], 接管 queueId:[3], 接管工夫:[1608171903346], 音讯 =[Hello Java demo RocketMQ 0]
Consumer- 线程名称 =[30], 接管 queueId:[1], 接管工夫:[1608171903393], 音讯 =[Hello Java demo RocketMQ 2]
Consumer- 线程名称 =[32], 接管 queueId:[3], 接管工夫:[1608171903443], 音讯 =[Hello Java demo RocketMQ 4]
Consumer- 线程名称 =[34], 接管 queueId:[1], 接管工夫:[1608171903490], 音讯 =[Hello Java demo RocketMQ 6]
Consumer- 线程名称 =[36], 接管 queueId:[3], 接管工夫:[1608171903540], 音讯 =[Hello Java demo RocketMQ 8]
一致性 hash 调配算法
应用这种算法,会将 consumer 消费者
作为 Node 节点结构成一个 hash 环,而后 queue 队列
通过这个 hash 环来决定被调配给哪个consumer 消费者
。
其基本模式如下:
一致性 hash 算法用于在分布式系统中,保证数据的一致性而提出的一种基于 hash 环实现的算法
算法实现上也不简单,如下图所示:
COPYpublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
// 省略局部代码
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {cidNodes.add(new ClientNode(cid));
}
// 应用 consumer id 结构 hash 环
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
// 顺次为 队列调配 consumer
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);
}
}
return results;
}
演示成果
设置算法
COPY// 设置应用环形 hash 算法
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "rocket_test_consumer_group", null, new AllocateMessageQueueConsistentHash());
消费者 A
COPYConsumer- 线程名称 =[29], 接管 queueId:[0], 接管工夫:[1608172067310], 音讯 =[Hello Java demo RocketMQ 0]
Consumer- 线程名称 =[31], 接管 queueId:[1], 接管工夫:[1608172067323], 音讯 =[Hello Java demo RocketMQ 1]
Consumer- 线程名称 =[33], 接管 queueId:[2], 接管工夫:[1608172067345], 音讯 =[Hello Java demo RocketMQ 2]
Consumer- 线程名称 =[37], 接管 queueId:[0], 接管工夫:[1608172067395], 音讯 =[Hello Java demo RocketMQ 4]
Consumer- 线程名称 =[39], 接管 queueId:[1], 接管工夫:[1608172067418], 音讯 =[Hello Java demo RocketMQ 5]
Consumer- 线程名称 =[40], 接管 queueId:[2], 接管工夫:[1608172067443], 音讯 =[Hello Java demo RocketMQ 6]
Consumer- 线程名称 =[41], 接管 queueId:[0], 接管工夫:[1608172067494], 音讯 =[Hello Java demo RocketMQ 8]
Consumer- 线程名称 =[42], 接管 queueId:[1], 接管工夫:[1608172067518], 音讯 =[Hello Java demo RocketMQ 9]
消费者 B
COPYConsumer- 线程名称 =[28], 接管 queueId:[3], 接管工夫:[1608172067383], 音讯 =[Hello Java demo RocketMQ 3]
Consumer- 线程名称 =[30], 接管 queueId:[3], 接管工夫:[1608172067475], 音讯 =[Hello Java demo RocketMQ 7]
机房邻近调配算法
该算法应用了 装璜者设计模式
,对调配策略进行了加强。个别在生产环境,如果是微服务架构下,RocketMQ 集群的部署可能是在不同的机房中部署,其根本构造可能如下图所示:
对于跨机房的场景,会存在网络、稳定性和隔离心的起因,该算法会依据 queue
的部署机房地位和 消费者 consumer
的地位,过滤出以后 消费者 consumer
雷同机房的 queue 队列
,而后再联合上述的算法,如基于平均分配算法在queue 队列
子集的根底上再筛选。相干代码实现如下:
COPY@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
// 省略局部代码
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 将 MQ 依照 机房进行分组
Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
for (MessageQueue mq : mqAll) {String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
if (StringUtils.isNoneEmpty(brokerMachineRoom)) {if (mr2Mq.get(brokerMachineRoom) == null) {mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
}
mr2Mq.get(brokerMachineRoom).add(mq);
} else {throw new IllegalArgumentException("Machine room is null for mq" + mq);
}
}
// 将消费者 依照机房进行分组
Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
for (String cid : cidAll) {String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
if (StringUtils.isNoneEmpty(consumerMachineRoom)) {if (mr2c.get(consumerMachineRoom) == null) {mr2c.put(consumerMachineRoom, new ArrayList<String>());
}
mr2c.get(consumerMachineRoom).add(cid);
} else {throw new IllegalArgumentException("Machine room is null for consumer id" + cid);
}
}
List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
//1. 过滤出以后机房内的 MQ 队列子集,在此基础上应用调配算法筛选
String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
}
//2. 不在同一机房,依照个别策略进行操作
for (String machineRoom : mr2Mq.keySet()) {if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
}
}
return allocateResults;
}
基于机房调配算法
该算法实用于属于同一个机房外部的音讯,去调配 queue。这种形式十分明确,基于下面的 机房邻近调配算法
的场景,这种更彻底,间接指定基于机房生产的策略。这种形式具备强约定性,比方 broker
名称依照机房的名称进行拼接,在算法中通过约定解析进行调配。
其代码实现如下:
COPY/**
* Computer room Hashing queue algorithm, such as Alipay logic room
*/
public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
private Set<String> consumeridcs;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {return result;}
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {String[] temp = mq.getBrokerName().split("@");
if (temp.length == 2 && consumeridcs.contains(temp[0])) {premqAll.add(mq);
}
}
int mod = premqAll.size() / cidAll.size();
int rem = premqAll.size() % cidAll.size();
int startIndex = mod * currentIndex;
int endIndex = startIndex + mod;
for (int i = startIndex; i < endIndex; i++) {result.add(mqAll.get(i));
}
if (rem > currentIndex) {result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}
return result;
}
@Override
public String getName() {return "MACHINE_ROOM";}
public Set<String> getConsumeridcs() {return consumeridcs;}
public void setConsumeridcs(Set<String> consumeridcs) {this.consumeridcs = consumeridcs;}
}
基于配置调配算法
这种算法单纯基于配置的,非常简单,理论应用中可能用处不大。代码如下:
COPYpublic class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
private List<MessageQueue> messageQueueList;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {return this.messageQueueList;}
@Override
public String getName() {return "CONFIG";}
public List<MessageQueue> getMessageQueueList() {return messageQueueList;}
public void setMessageQueueList(List<MessageQueue> messageQueueList) {this.messageQueueList = messageQueueList;}
}
消费者如何指定调配算法
消费者构造方法
在 DefaultMQPushConsumer 构造方法中能够传入调配策略
默认状况下,消费者应用的是
AllocateMessageQueueAveragely
算法,也能够本人指定:
COPYpublic class DefaultMQPushConsumer{
/**
* Default constructor.
*/
public DefaultMQPushConsumer() {this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
}
咱们看到默认应用了
AllocateMessageQueueAveragely
平均分配策略
应用其余调配策略
如果须要应用其余调配策略,应用形式如下
本文由
传智教育博学谷狂野架构师
教研团队公布。如果本文对您有帮忙,欢送
关注
和点赞
;如果您有任何倡议也可留言评论
或私信
,您的反对是我保持创作的能源。转载请注明出处!