RocketMQ音讯投递策略

  • 作者: 博学谷狂野架构师
  • GitHub:GitHub地址 (有我精心筹备的130本电子书PDF)

    只分享干货、不吹水,让咱们一起加油!

前言

RocketMQ的音讯投递分分为两种:一种是生产者往MQ Broker中投递;另外一种则是MQ broker 往消费者 投递(这种投递的说法是从消息传递的角度论述的,实际上底层是消费者从MQ broker 中Pull拉取的)。本文将从模型的角度来论述这两种机制。

RocketMQ的音讯模型

RocketMQ 的音讯模型整体并不简单,如下图所示:

一个Topic(音讯主题)可能对应多个理论的音讯队列(MessgeQueue)

在底层实现上,为了进步MQ的可用性和灵活性,一个Topic在理论存储的过程中,采纳了多队列的形式,具体模式如上图所示。每个音讯队列在应用中该当保障先入先出(FIFO,First In First Out)的形式进行生产。

那么,基于这种模型,就会引申出两个问题:
  • 生产者 在发送雷同Topic的音讯时,音讯体该当被搁置到哪一个音讯队列(MessageQueue)中?
  • 消费者 在生产音讯时,该当从哪些音讯队列中拉取音讯?

音讯的零碎间传递时,会逾越不同的网络载体,这会导致音讯的流传无奈保障其有序请

生产者投递策略

轮询算法投递

默认投递形式:基于Queue队列轮询算法投递

默认状况下,采纳了最简略的轮询算法,这种算法有个很好的个性就是,保障每一个Queue队列的音讯投递数量尽可能平均,算法如下图所示:

COPY/***  依据 TopicPublishInfo Topic公布信息对象中保护的index,每次抉择队列时,都会递增*  而后依据 index % queueSize 进行取余,达到轮询的成果**/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        return tpInfo.selectOneMessageQueue(lastBrokerName);}/***  TopicPublishInfo Topic公布信息对象中*/public class TopicPublishInfo {    //基于线程上下文的计数递增,用于轮询目标    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();       public MessageQueue selectOneMessageQueue(final String lastBrokerName) {        if (lastBrokerName == null) {            return selectOneMessageQueue();        } else {            int index = this.sendWhichQueue.getAndIncrement();            for (int i = 0; i < this.messageQueueList.size(); i++) {                //轮询计算                int pos = Math.abs(index++) % this.messageQueueList.size();                if (pos < 0)                    pos = 0;                MessageQueue mq = this.messageQueueList.get(pos);                if (!mq.getBrokerName().equals(lastBrokerName)) {                    return mq;                }            }            return selectOneMessageQueue();        }    }    public MessageQueue selectOneMessageQueue() {        int index = this.sendWhichQueue.getAndIncrement();        int pos = Math.abs(index) % this.messageQueueList.size();        if (pos < 0)            pos = 0;        return this.messageQueueList.get(pos);    }}
代码示例
RocketMQ默认采纳轮询投递策略
COPY/** * 轮询投递策略 */public class PollingProducer {    public static void main(String[] args) throws Exception {        //创立一个音讯生产者,并设置一个音讯生产者组        DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");        //指定 NameServer 地址        producer.setNamesrvAddr("127.0.0.1:9876");        //初始化 Producer,整个利用生命周期内只须要初始化一次        producer.start();        for (int i = 0; i < 10; i++) {            //创立一条音讯对象,指定其主题、标签和音讯内容            Message msg = new Message(                    /* 音讯主题名 */                    "topicTest",                    /* 音讯标签 */                    "TagA",                    /* 音讯内容 */                    ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)            );            //发送音讯并返回后果            SendResult sendResult = producer.send(msg);            System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ",存储queue:" + sendResult.getMessageQueue().getQueueId() + ",音讯索引:" + i);        }        // 一旦生产者实例不再被应用则将其敞开,包含清理资源,敞开网络连接等        producer.shutdown();    }}
打印后果
COPYproduct: 发送状态:SEND_OK,存储queue:0,音讯索引:0product: 发送状态:SEND_OK,存储queue:1,音讯索引:1product: 发送状态:SEND_OK,存储queue:2,音讯索引:2product: 发送状态:SEND_OK,存储queue:3,音讯索引:3product: 发送状态:SEND_OK,存储queue:0,音讯索引:4product: 发送状态:SEND_OK,存储queue:1,音讯索引:5product: 发送状态:SEND_OK,存储queue:2,音讯索引:6product: 发送状态:SEND_OK,存储queue:3,音讯索引:7product: 发送状态:SEND_OK,存储queue:0,音讯索引:8product: 发送状态:SEND_OK,存储queue:1,音讯索引:9

音讯投递提早最小策略

默认投递形式的加强:基于Queue队列轮询算法和音讯投递提早最小的策略投递

默认的投递形式比较简单,然而也裸露了一个问题,就是有些Queue队列可能因为本身数量积压等起因,可能在投递的过程比拟长,对于这样的Queue队列会影响后续投递的成果。

基于这种景象,RocketMQ在每发送一个MQ音讯后,都会统计一下音讯投递的时间延迟,依据这个时间延迟,能够晓得往哪些Queue队列投递的速度快。

在这种场景下,会优先应用音讯投递提早最小的策略,如果没有失效,再应用Queue队列轮询的形式。

COPYpublic class MQFaultStrategy {    /**     * 依据 TopicPublishInfo 外部保护的index,在每次操作时,都会递增,     * 而后依据 index % queueList.size(),应用了轮询的根底算法     *     */    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        if (this.sendLatencyFaultEnable) {            try {                // 从queueid 为 0 开始,顺次验证broker 是否无效,如果无效                int index = tpInfo.getSendWhichQueue().getAndIncrement();                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                    //基于index和队列数量取余,确定地位                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                    if (pos < 0)                        pos = 0;                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                            return mq;                    }                }                                // 从提早容错broker列表中筛选一个容错性最好的一个 broker                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                if (writeQueueNums > 0) {                     // 取余筛选其中一个队列                    final MessageQueue mq = tpInfo.selectOneMessageQueue();                    if (notBestBroker != null) {                        mq.setBrokerName(notBestBroker);                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                    }                    return mq;                } else {                    latencyFaultTolerance.remove(notBestBroker);                }            } catch (Exception e) {                log.error("Error occurred when selecting message queue", e);            }          // 取余筛选其中一个队列            return tpInfo.selectOneMessageQueue();        }        return tpInfo.selectOneMessageQueue(lastBrokerName);    }}

程序投递策略

上述两种投递形式属于对音讯投递的时序性没有要求的场景,这种投递的速度和效率比拟高。而在有些场景下,须要保障同类型音讯投递和生产的程序性。

例如,假如当初有TOPIC topicTest,该 Topic下有4个Queue队列,该Topic用于传递订单的状态变迁,假如订单有状态:未领取已领取发货中(解决中)发货胜利发货失败

在时序上,生产者从时序上能够生成如下几个音讯:
COPY订单T0000001:未领取 --> 订单T0000001:已领取 --> 订单T0000001:发货中(解决中) --> 订单T0000001:发货失败

音讯发送到MQ中之后,可能因为轮询投递的起因,音讯在MQ的存储可能如下:

这种状况下,咱们心愿消费者生产音讯的程序和咱们发送是统一的,然而,有上述MQ的投递和生产机制,咱们无奈保障程序是正确的,对于程序异样的音讯,消费者 即便有肯定的状态容错,也不能齐全解决好这么多种随机呈现组合状况。

基于上述的状况,RockeMQ采纳了这种实现计划:对于雷同订单号的音讯,通过肯定的策略,将其搁置在一个 queue队列中,而后消费者再采纳肯定的策略(一个线程独立解决一个queue,保障解决音讯的程序性),可能保障生产的程序性

至于消费者是如何保障生产的程序行的,后续再具体开展,咱们先看生产者是如何能将雷同订单号的音讯发送到同一个queue队列的:

生产者在音讯投递的过程中,应用了 MessageQueueSelector 作为队列抉择的策略接口,其定义如下:

COPYpublic interface MessageQueueSelector {        /**         * 依据音讯体和参数,从一批音讯队列中挑选出一个适合的音讯队列         * @param mqs  待抉择的MQ队列抉择列表         * @param msg  待发送的音讯体         * @param arg  附加参数         * @return  抉择后的队列         */        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
相应地,目前RocketMQ提供了如下几种实现:

默认实现
投递策略策略实现类阐明
随机调配策略SelectMessageQueueByRandom应用了简略的随机数抉择算法
基于Hash调配策略SelectMessageQueueByHash依据附加参数的Hash值,依照音讯队列列表的大小取余数,失去音讯队列的index
基于机器机房地位调配策略SelectMessageQueueByMachineRoom开源的版本没有具体的实现,根本的目标应该是机器的就近准则调配
当初大略看下策略的代码实现:
COPYpublic class SelectMessageQueueByHash implements MessageQueueSelector {    @Override    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {        int value = arg.hashCode();        if (value < 0) {            value = Math.abs(value);        }        value = value % mqs.size();        return mqs.get(value);    }}
代码示例
理论的操作代码样例如下,通过订单号作为hash运算对象,就能保障雷同订单号的音讯可能落在雷同的queue队列上
COPYpublic class OrderProducer {    private static final List<ProductOrder> orderList = new ArrayList<>();    static {        orderList.add(new ProductOrder("XXX001", "订单创立"));        orderList.add(new ProductOrder("XXX001", "订单付款"));        orderList.add(new ProductOrder("XXX001", "订单实现"));        orderList.add(new ProductOrder("XXX002", "订单创立"));        orderList.add(new ProductOrder("XXX002", "订单付款"));        orderList.add(new ProductOrder("XXX002", "订单实现"));        orderList.add(new ProductOrder("XXX003", "订单创立"));        orderList.add(new ProductOrder("XXX003", "订单付款"));        orderList.add(new ProductOrder("XXX003", "订单实现"));    }    public static void main(String[] args) throws Exception {        //创立一个音讯生产者,并设置一个音讯生产者组        DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");        //指定 NameServer 地址        producer.setNamesrvAddr("127.0.0.1:9876");        //初始化 Producer,整个利用生命周期内只须要初始化一次        producer.start();        for (int i = 0; i < orderList.size(); i++) {            //获取以后order            ProductOrder order = orderList.get(i);            //创立一条音讯对象,指定其主题、标签和音讯内容            Message message = new Message(                    /* 音讯主题名 */                    "topicTest",                    /* 音讯标签 */                    order.getOrderId(),                    /* 音讯内容 */                    (order.toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)            );            //发送音讯并返回后果 应用hash抉择策略            SendResult sendResult = producer.send(message, new SelectMessageQueueByHash(), order.getOrderId());            System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ",存储queue:" + sendResult.getMessageQueue().getQueueId() + ",orderID:" + order.getOrderId() + ",type:" + order.getType());        }        // 一旦生产者实例不再被应用则将其敞开,包含清理资源,敞开网络连接等        producer.shutdown();    }}
打印后果如下
COPYproduct: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单创立product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单付款product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单实现product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单创立product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单付款product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单实现product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单创立product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单付款product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单实现

消费者调配队列

如何为消费者调配queue队列?

RocketMQ对于消费者生产音讯有两种模式:

  • BROADCASTING:广播式生产,这种模式下,一个音讯会被告诉到每一个消费者
  • CLUSTERING: 集群式生产,这种模式下,一个音讯最多只会被投递到一个消费者上进行生产
    模式如下:

广播式的音讯模式比较简单,上面咱们介绍下集群式。对于应用了生产模式为MessageModel.CLUSTERING进行生产时,须要保障一个音讯在整个集群中只须要被生产一次。实际上,在RoketMQ底层,音讯指定调配给消费者的实现,是通过queue队列调配给消费者的形式实现的:也就是说,音讯调配的单位是音讯所在的queue队列。即:

queue队列指定给特定的消费者后,queue队列内的所有音讯将会被指定到消费者进行生产。

RocketMQ定义了策略接口AllocateMessageQueueStrategy,对于给定的消费者分组,和音讯队列列表消费者列表以后消费者该当被调配到哪些queue队列,定义如下:

COPY/** * 为消费者调配queue的策略算法接口 */public interface AllocateMessageQueueStrategy {    /**     * Allocating by consumer id     *     * @param consumerGroup 以后 consumer群组     * @param currentCID 以后consumer id     * @param mqAll 以后topic的所有queue实例援用     * @param cidAll 以后 consumer群组下所有的consumer id set汇合     * @return 依据策略给以后consumer调配的queue列表     */    List<MessageQueue> allocate(        final String consumerGroup,        final String currentCID,        final List<MessageQueue> mqAll,        final List<String> cidAll    );    /**     * 算法名称     *     * @return The strategy name     */    String getName();}

相应地,RocketMQ提供了如下几种实现:

算法名称含意
AllocateMessageQueueAveragely平均分配算法
AllocateMessageQueueAveragelyByCircle基于环形平均分配算法
AllocateMachineRoomNearby基于机房邻近准则算法
AllocateMessageQueueByMachineRoom基于机房调配算法
AllocateMessageQueueConsistentHash基于一致性hash算法
AllocateMessageQueueByConfig基于配置调配算法
为了讲述分明上述算法的基本原理,咱们先假如一个例子,上面所有的算法将基于这个例子解说。

假如以后同一个topic下有queue队列 10个,消费者共有4个,如下图所示:

上面顺次介绍其原理:

平均分配算法

这里所谓的平均分配算法,并不是指的严格意义上的齐全均匀,如下面的例子中,10个queue,而消费者只有4个,无奈是整除关系,除了整除之外的多进去的queue,将顺次依据消费者的程序均摊。

依照上述例子来看,10/4=2,即示意每个消费者均匀均摊2个queue;而10%4=2,即除了均摊之外,多进去2个queue还没有调配,那么,依据消费者的程序consumer-1consumer-2consumer-3consumer-4,则多进去的2个queue将别离给consumer-1consumer-2

最终,摊派关系如下:
  • consumer-1:3个
  • consumer-2:3个
  • consumer-3:2个
  • consumer-4:2个

其代码实现非常简单:
COPYpublic class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {    private final InternalLogger log = ClientLogger.getLog();    @Override    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,        List<String> cidAll) {        if (currentCID == null || currentCID.length() < 1) {            throw new IllegalArgumentException("currentCID is empty");        }        if (mqAll == null || mqAll.isEmpty()) {            throw new IllegalArgumentException("mqAll is null or mqAll empty");        }        if (cidAll == null || cidAll.isEmpty()) {            throw new IllegalArgumentException("cidAll is null or cidAll empty");        }        List<MessageQueue> result = new ArrayList<MessageQueue>();        if (!cidAll.contains(currentCID)) {            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",                consumerGroup,                currentCID,                cidAll);            return result;        }        int index = cidAll.indexOf(currentCID);        int mod = mqAll.size() % cidAll.size();        int averageSize =            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()                + 1 : mqAll.size() / cidAll.size());        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;        int range = Math.min(averageSize, mqAll.size() - startIndex);        for (int i = 0; i < range; i++) {            result.add(mqAll.get((startIndex + i) % mqAll.size()));        }        return result;    }    @Override    public String getName() {        return "AVG";    }}
演示成果
消费者A
COPYConsumer-线程名称=[32],接管queueId:[0],接管工夫:[1608171677558],音讯=[Hello Java demo RocketMQ 2]Consumer-线程名称=[34],接管queueId:[1],接管工夫:[1608171677580],音讯=[Hello Java demo RocketMQ 3]Consumer-线程名称=[36],接管queueId:[0],接管工夫:[1608171677655],音讯=[Hello Java demo RocketMQ 6]Consumer-线程名称=[38],接管queueId:[1],接管工夫:[1608171677679],音讯=[Hello Java demo RocketMQ 7]
消费者B
COPYConsumer-线程名称=[35],接管queueId:[2],接管工夫:[1608171677508],音讯=[Hello Java demo RocketMQ 0]Consumer-线程名称=[36],接管queueId:[3],接管工夫:[1608171677535],音讯=[Hello Java demo RocketMQ 1]Consumer-线程名称=[37],接管queueId:[2],接管工夫:[1608171677609],音讯=[Hello Java demo RocketMQ 4]Consumer-线程名称=[38],接管queueId:[3],接管工夫:[1608171677635],音讯=[Hello Java demo RocketMQ 5]Consumer-线程名称=[39],接管queueId:[2],接管工夫:[1608171677709],音讯=[Hello Java demo RocketMQ 8]Consumer-线程名称=[40],接管queueId:[3],接管工夫:[1608171677734],音讯=[Hello Java demo RocketMQ 9]

基于环形均匀算法

环形均匀算法,是指依据消费者的程序,顺次在由queue队列组成的环形图中一一调配。具体流程如下所示:

这种算法最终调配的后果是:
  • consumer-1: #0,#4,#8
  • consumer-2: #1, #5, # 9
  • consumer-3: #2,#6
  • consumer-4: #3,#7
其代码实现如下所示:
COPY/** * Cycle average Hashing queue algorithm */public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {    private final InternalLogger log = ClientLogger.getLog();    @Override    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,        List<String> cidAll) {        if (currentCID == null || currentCID.length() < 1) {            throw new IllegalArgumentException("currentCID is empty");        }        if (mqAll == null || mqAll.isEmpty()) {            throw new IllegalArgumentException("mqAll is null or mqAll empty");        }        if (cidAll == null || cidAll.isEmpty()) {            throw new IllegalArgumentException("cidAll is null or cidAll empty");        }        List<MessageQueue> result = new ArrayList<MessageQueue>();        if (!cidAll.contains(currentCID)) {            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",                consumerGroup,                currentCID,                cidAll);            return result;        }        int index = cidAll.indexOf(currentCID);        for (int i = index; i < mqAll.size(); i++) {            if (i % cidAll.size() == index) {                result.add(mqAll.get(i));            }        }        return result;    }    @Override    public String getName() {        return "AVG_BY_CIRCLE";    }}
演示成果
设置算法
COPY//设置应用环形hash算法DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "rocket_test_consumer_group", null, new AllocateMessageQueueAveragelyByCircle());
消费者A
COPYConsumer-线程名称=[35],接管queueId:[0],接管工夫:[1608171903364],音讯=[Hello Java demo RocketMQ 1]Consumer-线程名称=[38],接管queueId:[2],接管工夫:[1608171903411],音讯=[Hello Java demo RocketMQ 3]Consumer-线程名称=[39],接管queueId:[0],接管工夫:[1608171903459],音讯=[Hello Java demo RocketMQ 5]Consumer-线程名称=[40],接管queueId:[2],接管工夫:[1608171903508],音讯=[Hello Java demo RocketMQ 7]Consumer-线程名称=[41],接管queueId:[0],接管工夫:[1608171903562],音讯=[Hello Java demo RocketMQ 9]
消费者B
COPYConsumer-线程名称=[28],接管queueId:[3],接管工夫:[1608171903346],音讯=[Hello Java demo RocketMQ 0]Consumer-线程名称=[30],接管queueId:[1],接管工夫:[1608171903393],音讯=[Hello Java demo RocketMQ 2]Consumer-线程名称=[32],接管queueId:[3],接管工夫:[1608171903443],音讯=[Hello Java demo RocketMQ 4]Consumer-线程名称=[34],接管queueId:[1],接管工夫:[1608171903490],音讯=[Hello Java demo RocketMQ 6]Consumer-线程名称=[36],接管queueId:[3],接管工夫:[1608171903540],音讯=[Hello Java demo RocketMQ 8]

一致性hash调配算法

应用这种算法,会将consumer消费者作为Node节点结构成一个hash环,而后queue队列通过这个hash环来决定被调配给哪个consumer消费者

其基本模式如下:

一致性hash算法用于在分布式系统中,保证数据的一致性而提出的一种基于hash环实现的算法

算法实现上也不简单,如下图所示:
COPYpublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,                                   List<String> cidAll) {    //省略局部代码    List<MessageQueue> result = new ArrayList<MessageQueue>();    if (!cidAll.contains(currentCID)) {        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",                 consumerGroup,                 currentCID,                 cidAll);        return result;    }    Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();    for (String cid : cidAll) {        cidNodes.add(new ClientNode(cid));    }    //应用consumer id 结构hash环    final ConsistentHashRouter<ClientNode> router; //for building hash ring    if (customHashFunction != null) {        router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);    } else {        router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);    }    //顺次为 队列调配 consumer    List<MessageQueue> results = new ArrayList<MessageQueue>();    for (MessageQueue mq : mqAll) {        ClientNode clientNode = router.routeNode(mq.toString());        if (clientNode != null && currentCID.equals(clientNode.getKey())) {            results.add(mq);        }    }    return results;}
演示成果
设置算法
COPY//设置应用环形hash算法DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "rocket_test_consumer_group", null, new AllocateMessageQueueConsistentHash());
消费者A
COPYConsumer-线程名称=[29],接管queueId:[0],接管工夫:[1608172067310],音讯=[Hello Java demo RocketMQ 0]Consumer-线程名称=[31],接管queueId:[1],接管工夫:[1608172067323],音讯=[Hello Java demo RocketMQ 1]Consumer-线程名称=[33],接管queueId:[2],接管工夫:[1608172067345],音讯=[Hello Java demo RocketMQ 2]Consumer-线程名称=[37],接管queueId:[0],接管工夫:[1608172067395],音讯=[Hello Java demo RocketMQ 4]Consumer-线程名称=[39],接管queueId:[1],接管工夫:[1608172067418],音讯=[Hello Java demo RocketMQ 5]Consumer-线程名称=[40],接管queueId:[2],接管工夫:[1608172067443],音讯=[Hello Java demo RocketMQ 6]Consumer-线程名称=[41],接管queueId:[0],接管工夫:[1608172067494],音讯=[Hello Java demo RocketMQ 8]Consumer-线程名称=[42],接管queueId:[1],接管工夫:[1608172067518],音讯=[Hello Java demo RocketMQ 9]
消费者B
COPYConsumer-线程名称=[28],接管queueId:[3],接管工夫:[1608172067383],音讯=[Hello Java demo RocketMQ 3]Consumer-线程名称=[30],接管queueId:[3],接管工夫:[1608172067475],音讯=[Hello Java demo RocketMQ 7]

机房邻近调配算法

该算法应用了装璜者设计模式,对调配策略进行了加强。个别在生产环境,如果是微服务架构下,RocketMQ集群的部署可能是在不同的机房中部署,其根本构造可能如下图所示:

对于跨机房的场景,会存在网络、稳定性和隔离心的起因,该算法会依据queue的部署机房地位和消费者consumer的地位,过滤出以后消费者consumer雷同机房的queue队列,而后再联合上述的算法,如基于平均分配算法在queue队列子集的根底上再筛选。相干代码实现如下:

COPY@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,                                   List<String> cidAll) {    //省略局部代码    List<MessageQueue> result = new ArrayList<MessageQueue>();    //将MQ依照 机房进行分组    Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();    for (MessageQueue mq : mqAll) {        String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);        if (StringUtils.isNoneEmpty(brokerMachineRoom)) {            if (mr2Mq.get(brokerMachineRoom) == null) {                mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());            }            mr2Mq.get(brokerMachineRoom).add(mq);        } else {            throw new IllegalArgumentException("Machine room is null for mq " + mq);        }    }    //将消费者 依照机房进行分组    Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();    for (String cid : cidAll) {        String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);        if (StringUtils.isNoneEmpty(consumerMachineRoom)) {            if (mr2c.get(consumerMachineRoom) == null) {                mr2c.put(consumerMachineRoom, new ArrayList<String>());            }            mr2c.get(consumerMachineRoom).add(cid);        } else {            throw new IllegalArgumentException("Machine room is null for consumer id " + cid);        }    }    List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();    //1.过滤出以后机房内的MQ队列子集,在此基础上应用调配算法筛选    String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);    List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);    List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);    if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {        allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));    }    //2.不在同一机房,依照个别策略进行操作    for (String machineRoom : mr2Mq.keySet()) {        if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));        }    }    return allocateResults;}
基于机房调配算法

该算法实用于属于同一个机房外部的音讯,去调配queue。这种形式十分明确,基于下面的机房邻近调配算法的场景,这种更彻底,间接指定基于机房生产的策略。这种形式具备强约定性,比方broker名称依照机房的名称进行拼接,在算法中通过约定解析进行调配。

其代码实现如下:
COPY/** * Computer room Hashing queue algorithm, such as Alipay logic room */public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {    private Set<String> consumeridcs;    @Override    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,        List<String> cidAll) {        List<MessageQueue> result = new ArrayList<MessageQueue>();        int currentIndex = cidAll.indexOf(currentCID);        if (currentIndex < 0) {            return result;        }        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();        for (MessageQueue mq : mqAll) {            String[] temp = mq.getBrokerName().split("@");            if (temp.length == 2 && consumeridcs.contains(temp[0])) {                premqAll.add(mq);            }        }        int mod = premqAll.size() / cidAll.size();        int rem = premqAll.size() % cidAll.size();        int startIndex = mod * currentIndex;        int endIndex = startIndex + mod;        for (int i = startIndex; i < endIndex; i++) {            result.add(mqAll.get(i));        }        if (rem > currentIndex) {            result.add(premqAll.get(currentIndex + mod * cidAll.size()));        }        return result;    }    @Override    public String getName() {        return "MACHINE_ROOM";    }    public Set<String> getConsumeridcs() {        return consumeridcs;    }    public void setConsumeridcs(Set<String> consumeridcs) {        this.consumeridcs = consumeridcs;    }}

基于配置调配算法

这种算法单纯基于配置的,非常简单,理论应用中可能用处不大。代码如下:
COPYpublic class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {    private List<MessageQueue> messageQueueList;    @Override    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,        List<String> cidAll) {        return this.messageQueueList;    }    @Override    public String getName() {        return "CONFIG";    }    public List<MessageQueue> getMessageQueueList() {        return messageQueueList;    }    public void setMessageQueueList(List<MessageQueue> messageQueueList) {        this.messageQueueList = messageQueueList;    }}

消费者如何指定调配算法

消费者构造方法

在DefaultMQPushConsumer构造方法中能够传入调配策略

默认状况下,消费者应用的是AllocateMessageQueueAveragely算法,也能够本人指定:

COPYpublic class DefaultMQPushConsumer{        /**     * Default constructor.     */    public DefaultMQPushConsumer() {        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());    }      /**     * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.     *     * @param consumerGroup Consume queue.     * @param rpcHook RPC hook to execute before each remoting command.     * @param allocateMessageQueueStrategy message queue allocating algorithm.     */    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {        this.consumerGroup = consumerGroup;        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);    }}
咱们看到默认应用了AllocateMessageQueueAveragely平均分配策略
应用其余调配策略
如果须要应用其余调配策略,应用形式如下

本文由传智教育博学谷狂野架构师教研团队公布。

如果本文对您有帮忙,欢送关注点赞;如果您有任何倡议也可留言评论私信,您的反对是我保持创作的能源。

转载请注明出处!