乐趣区

关于java:小心丢失的消息RocketMQ投递策略帮你解决问题博学谷狂野架构师

RocketMQ 音讯投递策略

  • 作者: 博学谷狂野架构师
  • GitHub:GitHub 地址(有我精心筹备的 130 本电子书 PDF)

    只分享干货、不吹水,让咱们一起加油!😄

前言

RocketMQ 的音讯投递分分为两种:一种是 生产者 往 MQ Broker 中投递;另外一种则是 MQ broker 往 消费者 投递(这种 投递 的说法是从消息传递的角度论述的,实际上底层是 消费者 从 MQ broker 中 Pull 拉取的)。本文将从模型的角度来论述这两种机制。

RocketMQ 的音讯模型

RocketMQ 的音讯模型整体并不简单,如下图所示:

一个 Topic(音讯主题) 可能对应多个理论的音讯队列(MessgeQueue)

在底层实现上,为了进步 MQ 的可用性和灵活性,一个 Topic 在理论存储的过程中,采纳了多队列的形式,具体模式如上图所示。每个音讯队列在应用中该当保障 先入先出(FIFO,First In First Out)的形式进行生产。

那么,基于这种模型,就会引申出两个问题:

  • 生产者 在发送雷同 Topic 的音讯时,音讯体该当被搁置到哪一个音讯队列(MessageQueue) 中?
  • 消费者 在生产音讯时,该当从哪些音讯队列中拉取音讯?

音讯的零碎间传递时,会逾越不同的网络载体,这会导致音讯的流传无奈保障其有序请

生产者投递策略

轮询算法投递

默认投递形式:基于 Queue 队列 轮询算法投递

默认状况下,采纳了最简略的轮询算法,这种算法有个很好的个性就是,保障每一个 Queue 队列 的音讯投递数量尽可能平均,算法如下图所示:

COPY/**
*  依据 TopicPublishInfo Topic 公布信息对象中保护的 index,每次抉择队列时,都会递增
*  而后依据 index % queueSize 进行取余,达到轮询的成果
*
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return tpInfo.selectOneMessageQueue(lastBrokerName);
}

/**
*  TopicPublishInfo Topic 公布信息对象中
*/
public class TopicPublishInfo {
    // 基于线程上下文的计数递增,用于轮询目标
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
   

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();
        } else {int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                // 轮询计算
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}
            }
            return selectOneMessageQueue();}
    }

    public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
}
代码示例

RocketMQ 默认采纳轮询投递策略

COPY/**
 * 轮询投递策略
 */
public class PollingProducer {public static void main(String[] args) throws Exception {
        // 创立一个音讯生产者,并设置一个音讯生产者组
        DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

        // 指定 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 初始化 Producer,整个利用生命周期内只须要初始化一次
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创立一条音讯对象,指定其主题、标签和音讯内容
            Message msg = new Message(
                    /* 音讯主题名 */
                    "topicTest",
                    /* 音讯标签 */
                    "TagA",
                    /* 音讯内容 */
                    ("Hello Java demo RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

            // 发送音讯并返回后果
            SendResult sendResult = producer.send(msg);

            System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ", 存储 queue:" + sendResult.getMessageQueue().getQueueId() + ", 音讯索引:" + i);
        }

        // 一旦生产者实例不再被应用则将其敞开,包含清理资源,敞开网络连接等
        producer.shutdown();}
}

打印后果

COPYproduct: 发送状态:SEND_OK, 存储 queue:0, 音讯索引:0
product: 发送状态:SEND_OK, 存储 queue:1, 音讯索引:1
product: 发送状态:SEND_OK, 存储 queue:2, 音讯索引:2
product: 发送状态:SEND_OK, 存储 queue:3, 音讯索引:3
product: 发送状态:SEND_OK, 存储 queue:0, 音讯索引:4
product: 发送状态:SEND_OK, 存储 queue:1, 音讯索引:5
product: 发送状态:SEND_OK, 存储 queue:2, 音讯索引:6
product: 发送状态:SEND_OK, 存储 queue:3, 音讯索引:7
product: 发送状态:SEND_OK, 存储 queue:0, 音讯索引:8
product: 发送状态:SEND_OK, 存储 queue:1, 音讯索引:9

音讯投递提早最小策略

默认投递形式的加强:基于 Queue 队列 轮询算法和 音讯投递提早最小 的策略投递

默认的投递形式比较简单,然而也裸露了一个问题,就是有些 Queue 队列 可能因为本身数量积压等起因,可能在投递的过程比拟长,对于这样的 Queue 队列 会影响后续投递的成果。

基于这种景象,RocketMQ 在每发送一个 MQ 音讯后,都会统计一下音讯投递的 时间延迟 ,依据这个 时间延迟 ,能够晓得往哪些Queue 队列 投递的速度快。

在这种场景下,会优先应用 音讯投递提早最小 的策略,如果没有失效,再应用 Queue 队列轮询 的形式。

COPYpublic class MQFaultStrategy {
    /**
     * 依据 TopicPublishInfo 外部保护的 index, 在每次操作时,都会递增,* 而后依据 index % queueList.size(), 应用了轮询的根底算法
     *
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {
            try {
                // 从 queueid 为 0 开始,顺次验证 broker 是否无效,如果无效
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 基于 index 和队列数量取余,确定地位
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                
                // 从提早容错 broker 列表中筛选一个容错性最好的一个 broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                     // 取余筛选其中一个队列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {log.error("Error occurred when selecting message queue", e);
            }
          // 取余筛选其中一个队列
            return tpInfo.selectOneMessageQueue();}

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
}

程序投递策略

上述两种投递形式属于对音讯投递的时序性没有要求的场景,这种投递的速度和效率比拟高。而在有些场景下,须要保障同类型音讯投递和生产的程序性。

例如,假如当初有 TOPIC topicTest, 该 Topic 下有 4 个 Queue 队列,该 Topic 用于传递订单的状态变迁,假如订单有状态: 未领取 已领取 发货中 (解决中) 发货胜利 发货失败

在时序上,生产者从时序上能够生成如下几个音讯:

COPY 订单 T0000001: 未领取 --> 订单 T0000001: 已领取 --> 订单 T0000001: 发货中(解决中) --> 订单 T0000001: 发货失败

音讯发送到 MQ 中之后,可能因为轮询投递的起因,音讯在 MQ 的存储可能如下:

这种状况下,咱们心愿 消费者 生产音讯的程序和咱们发送是统一的,然而,有上述 MQ 的投递和生产机制,咱们无奈保障程序是正确的,对于程序异样的音讯,消费者 即便有肯定的状态容错,也不能齐全解决好这么多种随机呈现组合状况。

基于上述的状况,RockeMQ采纳了这种实现计划:对于雷同订单号的音讯,通过肯定的策略,将其搁置在一个 queue 队列中 ,而后 消费者 再采纳肯定的策略(一个线程独立解决一个queue, 保障解决音讯的程序性),可能保障生产的程序性

至于消费者是如何保障生产的程序行的,后续再具体开展,咱们先看 生产者 是如何能将雷同订单号的音讯发送到同一个 queue 队列 的:

生产者在音讯投递的过程中,应用了 MessageQueueSelector 作为队列抉择的策略接口,其定义如下:

COPYpublic interface MessageQueueSelector {
        /**
         * 依据音讯体和参数,从一批音讯队列中挑选出一个适合的音讯队列
         * @param mqs  待抉择的 MQ 队列抉择列表
         * @param msg  待发送的音讯体
         * @param arg  附加参数
         * @return  抉择后的队列
         */
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

相应地,目前 RocketMQ 提供了如下几种实现:

默认实现
投递策略 策略实现类 阐明
随机调配策略 SelectMessageQueueByRandom 应用了简略的随机数抉择算法
基于 Hash 调配策略 SelectMessageQueueByHash 依据附加参数的 Hash 值,依照音讯队列列表的大小取余数,失去音讯队列的 index
基于机器机房地位调配策略 SelectMessageQueueByMachineRoom 开源的版本没有具体的实现,根本的目标应该是机器的就近准则调配

当初大略看下策略的代码实现:

COPYpublic class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int value = arg.hashCode();
        if (value < 0) {value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}
代码示例

理论的操作代码样例如下,通过订单号作为 hash 运算对象,就能保障雷同订单号的音讯可能落在雷同的queue 队列上

COPYpublic class OrderProducer {private static final List<ProductOrder> orderList = new ArrayList<>();

    static {orderList.add(new ProductOrder("XXX001", "订单创立"));
        orderList.add(new ProductOrder("XXX001", "订单付款"));
        orderList.add(new ProductOrder("XXX001", "订单实现"));
        orderList.add(new ProductOrder("XXX002", "订单创立"));
        orderList.add(new ProductOrder("XXX002", "订单付款"));
        orderList.add(new ProductOrder("XXX002", "订单实现"));
        orderList.add(new ProductOrder("XXX003", "订单创立"));
        orderList.add(new ProductOrder("XXX003", "订单付款"));
        orderList.add(new ProductOrder("XXX003", "订单实现"));
    }

    public static void main(String[] args) throws Exception {
        // 创立一个音讯生产者,并设置一个音讯生产者组
        DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

        // 指定 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 初始化 Producer,整个利用生命周期内只须要初始化一次
        producer.start();

        for (int i = 0; i < orderList.size(); i++) {
            // 获取以后 order
            ProductOrder order = orderList.get(i);
            // 创立一条音讯对象,指定其主题、标签和音讯内容
            Message message = new Message(
                    /* 音讯主题名 */
                    "topicTest",
                    /* 音讯标签 */
                    order.getOrderId(),
                    /* 音讯内容 */
                    (order.toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

            // 发送音讯并返回后果 应用 hash 抉择策略
            SendResult sendResult = producer.send(message, new SelectMessageQueueByHash(), order.getOrderId());

            System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ", 存储 queue:" + sendResult.getMessageQueue().getQueueId() + ",orderID:" + order.getOrderId() + ",type:" + order.getType());
        }

        // 一旦生产者实例不再被应用则将其敞开,包含清理资源,敞开网络连接等
        producer.shutdown();}
}

打印后果如下

COPYproduct: 发送状态:SEND_OK, 存储 queue:3,orderID:XXX001,type: 订单创立
product: 发送状态:SEND_OK, 存储 queue:3,orderID:XXX001,type: 订单付款
product: 发送状态:SEND_OK, 存储 queue:3,orderID:XXX001,type: 订单实现
product: 发送状态:SEND_OK, 存储 queue:2,orderID:XXX002,type: 订单创立
product: 发送状态:SEND_OK, 存储 queue:2,orderID:XXX002,type: 订单付款
product: 发送状态:SEND_OK, 存储 queue:2,orderID:XXX002,type: 订单实现
product: 发送状态:SEND_OK, 存储 queue:1,orderID:XXX003,type: 订单创立
product: 发送状态:SEND_OK, 存储 queue:1,orderID:XXX003,type: 订单付款
product: 发送状态:SEND_OK, 存储 queue:1,orderID:XXX003,type: 订单实现

消费者调配队列

如何为消费者调配queue 队列?

RocketMQ 对于消费者生产音讯有两种模式:

  • BROADCASTING: 广播式生产,这种模式下,一个音讯会被告诉到每一个 消费者
  • CLUSTERING: 集群式生产,这种模式下,一个音讯最多只会被投递到一个 消费者 上进行生产
    模式如下:

广播式 的音讯模式比较简单,上面咱们介绍下 集群式 。对于应用了生产模式为MessageModel.CLUSTERING 进行生产时,须要保障一个 音讯 在整个集群中只须要被生产一次 。实际上,在 RoketMQ 底层,音讯指定调配给消费者的实现,是通过queue 队列 调配给 消费者 的形式实现的:也就是说,音讯 调配的单位是音讯所在的queue 队列。即:

queue 队列 指定给特定的 消费者 后,queue 队列 内的所有音讯将会被指定到 消费者 进行生产。

RocketMQ 定义了策略接口 AllocateMessageQueueStrategy,对于给定的 消费者分组 , 和 音讯队列列表 消费者列表 以后消费者 该当被调配到哪些queue 队列,定义如下:

COPY/**
 * 为消费者调配 queue 的策略算法接口
 */
public interface AllocateMessageQueueStrategy {

    /**
     * Allocating by consumer id
     *
     * @param consumerGroup 以后 consumer 群组
     * @param currentCID 以后 consumer id
     * @param mqAll 以后 topic 的所有 queue 实例援用
     * @param cidAll 以后 consumer 群组下所有的 consumer id set 汇合
     * @return 依据策略给以后 consumer 调配的 queue 列表
     */
    List<MessageQueue> allocate(
        final String consumerGroup,
        final String currentCID,
        final List<MessageQueue> mqAll,
        final List<String> cidAll
    );

    /**
     * 算法名称
     *
     * @return The strategy name
     */
    String getName();}

相应地,RocketMQ 提供了如下几种实现:

算法名称 含意
AllocateMessageQueueAveragely 平均分配算法
AllocateMessageQueueAveragelyByCircle 基于环形平均分配算法
AllocateMachineRoomNearby 基于机房邻近准则算法
AllocateMessageQueueByMachineRoom 基于机房调配算法
AllocateMessageQueueConsistentHash 基于一致性 hash 算法
AllocateMessageQueueByConfig 基于配置调配算法

为了讲述分明上述算法的基本原理,咱们先假如一个例子,上面所有的算法将基于这个例子解说。

假如以后同一个 topic 下有 queue 队列 10个,消费者共有 4 个, 如下图所示:

上面顺次介绍其原理:

平均分配算法

这里所谓的平均分配算法,并不是指的严格意义上的齐全均匀,如下面的例子中,10 个 queue,而消费者只有 4 个,无奈是整除关系,除了整除之外的多进去的 queue, 将顺次依据消费者的程序均摊。

依照上述例子来看,10/4=2,即示意每个 消费者 均匀均摊 2 个 queue;而 10%4=2,即除了均摊之外,多进去 2 个queue 还没有调配,那么,依据消费者的程序 consumer-1consumer-2consumer-3consumer-4, 则多进去的 2 个queue 将别离给 consumer-1consumer-2

最终,摊派关系如下:

  • consumer-1:3 个
  • consumer-2:3 个
  • consumer-3:2 个
  • consumer-4:2 个

其代码实现非常简单:

COPYpublic class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {return "AVG";}
}
演示成果
消费者 A
COPYConsumer- 线程名称 =[32], 接管 queueId:[0], 接管工夫:[1608171677558], 音讯 =[Hello Java demo RocketMQ 2]
Consumer- 线程名称 =[34], 接管 queueId:[1], 接管工夫:[1608171677580], 音讯 =[Hello Java demo RocketMQ 3]
Consumer- 线程名称 =[36], 接管 queueId:[0], 接管工夫:[1608171677655], 音讯 =[Hello Java demo RocketMQ 6]
Consumer- 线程名称 =[38], 接管 queueId:[1], 接管工夫:[1608171677679], 音讯 =[Hello Java demo RocketMQ 7]
消费者 B
COPYConsumer- 线程名称 =[35], 接管 queueId:[2], 接管工夫:[1608171677508], 音讯 =[Hello Java demo RocketMQ 0]
Consumer- 线程名称 =[36], 接管 queueId:[3], 接管工夫:[1608171677535], 音讯 =[Hello Java demo RocketMQ 1]
Consumer- 线程名称 =[37], 接管 queueId:[2], 接管工夫:[1608171677609], 音讯 =[Hello Java demo RocketMQ 4]
Consumer- 线程名称 =[38], 接管 queueId:[3], 接管工夫:[1608171677635], 音讯 =[Hello Java demo RocketMQ 5]
Consumer- 线程名称 =[39], 接管 queueId:[2], 接管工夫:[1608171677709], 音讯 =[Hello Java demo RocketMQ 8]
Consumer- 线程名称 =[40], 接管 queueId:[3], 接管工夫:[1608171677734], 音讯 =[Hello Java demo RocketMQ 9]

基于环形均匀算法

环形均匀算法,是指依据消费者的程序,顺次在由 queue 队列 组成的环形图中一一调配。具体流程如下所示:

这种算法最终调配的后果是:

  • consumer-1: #0,#4,#8
  • consumer-2: #1, #5, # 9
  • consumer-3: #2,#6
  • consumer-4: #3,#7

其代码实现如下所示:

COPY/**
 * Cycle average Hashing queue algorithm
 */
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        for (int i = index; i < mqAll.size(); i++) {if (i % cidAll.size() == index) {result.add(mqAll.get(i));
            }
        }
        return result;
    }

    @Override
    public String getName() {return "AVG_BY_CIRCLE";}
}
演示成果
设置算法
COPY// 设置应用环形 hash 算法
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "rocket_test_consumer_group", null, new AllocateMessageQueueAveragelyByCircle());
消费者 A
COPYConsumer- 线程名称 =[35], 接管 queueId:[0], 接管工夫:[1608171903364], 音讯 =[Hello Java demo RocketMQ 1]
Consumer- 线程名称 =[38], 接管 queueId:[2], 接管工夫:[1608171903411], 音讯 =[Hello Java demo RocketMQ 3]
Consumer- 线程名称 =[39], 接管 queueId:[0], 接管工夫:[1608171903459], 音讯 =[Hello Java demo RocketMQ 5]
Consumer- 线程名称 =[40], 接管 queueId:[2], 接管工夫:[1608171903508], 音讯 =[Hello Java demo RocketMQ 7]
Consumer- 线程名称 =[41], 接管 queueId:[0], 接管工夫:[1608171903562], 音讯 =[Hello Java demo RocketMQ 9]
消费者 B
COPYConsumer- 线程名称 =[28], 接管 queueId:[3], 接管工夫:[1608171903346], 音讯 =[Hello Java demo RocketMQ 0]
Consumer- 线程名称 =[30], 接管 queueId:[1], 接管工夫:[1608171903393], 音讯 =[Hello Java demo RocketMQ 2]
Consumer- 线程名称 =[32], 接管 queueId:[3], 接管工夫:[1608171903443], 音讯 =[Hello Java demo RocketMQ 4]
Consumer- 线程名称 =[34], 接管 queueId:[1], 接管工夫:[1608171903490], 音讯 =[Hello Java demo RocketMQ 6]
Consumer- 线程名称 =[36], 接管 queueId:[3], 接管工夫:[1608171903540], 音讯 =[Hello Java demo RocketMQ 8]

一致性 hash 调配算法

应用这种算法,会将 consumer 消费者 作为 Node 节点结构成一个 hash 环,而后 queue 队列 通过这个 hash 环来决定被调配给哪个consumer 消费者

其基本模式如下:

一致性 hash 算法用于在分布式系统中,保证数据的一致性而提出的一种基于 hash 环实现的算法

算法实现上也不简单,如下图所示:

COPYpublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                   List<String> cidAll) {
    // 省略局部代码
    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                 consumerGroup,
                 currentCID,
                 cidAll);
        return result;
    }

    Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
    for (String cid : cidAll) {cidNodes.add(new ClientNode(cid));
    }
    // 应用 consumer id 结构 hash 环
    final ConsistentHashRouter<ClientNode> router; //for building hash ring
    if (customHashFunction != null) {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
    } else {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
    }
    // 顺次为 队列调配 consumer
    List<MessageQueue> results = new ArrayList<MessageQueue>();
    for (MessageQueue mq : mqAll) {ClientNode clientNode = router.routeNode(mq.toString());
        if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);
        }
    }

    return results;

}
演示成果
设置算法
COPY// 设置应用环形 hash 算法
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "rocket_test_consumer_group", null, new AllocateMessageQueueConsistentHash());
消费者 A
COPYConsumer- 线程名称 =[29], 接管 queueId:[0], 接管工夫:[1608172067310], 音讯 =[Hello Java demo RocketMQ 0]
Consumer- 线程名称 =[31], 接管 queueId:[1], 接管工夫:[1608172067323], 音讯 =[Hello Java demo RocketMQ 1]
Consumer- 线程名称 =[33], 接管 queueId:[2], 接管工夫:[1608172067345], 音讯 =[Hello Java demo RocketMQ 2]
Consumer- 线程名称 =[37], 接管 queueId:[0], 接管工夫:[1608172067395], 音讯 =[Hello Java demo RocketMQ 4]
Consumer- 线程名称 =[39], 接管 queueId:[1], 接管工夫:[1608172067418], 音讯 =[Hello Java demo RocketMQ 5]
Consumer- 线程名称 =[40], 接管 queueId:[2], 接管工夫:[1608172067443], 音讯 =[Hello Java demo RocketMQ 6]
Consumer- 线程名称 =[41], 接管 queueId:[0], 接管工夫:[1608172067494], 音讯 =[Hello Java demo RocketMQ 8]
Consumer- 线程名称 =[42], 接管 queueId:[1], 接管工夫:[1608172067518], 音讯 =[Hello Java demo RocketMQ 9]
消费者 B
COPYConsumer- 线程名称 =[28], 接管 queueId:[3], 接管工夫:[1608172067383], 音讯 =[Hello Java demo RocketMQ 3]
Consumer- 线程名称 =[30], 接管 queueId:[3], 接管工夫:[1608172067475], 音讯 =[Hello Java demo RocketMQ 7]

机房邻近调配算法

该算法应用了 装璜者设计模式,对调配策略进行了加强。个别在生产环境,如果是微服务架构下,RocketMQ 集群的部署可能是在不同的机房中部署,其根本构造可能如下图所示:

对于跨机房的场景,会存在网络、稳定性和隔离心的起因,该算法会依据 queue 的部署机房地位和 消费者 consumer的地位,过滤出以后 消费者 consumer雷同机房的 queue 队列,而后再联合上述的算法,如基于平均分配算法在queue 队列 子集的根底上再筛选。相干代码实现如下:

COPY@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                   List<String> cidAll) {
    // 省略局部代码
    List<MessageQueue> result = new ArrayList<MessageQueue>();

    // 将 MQ 依照 机房进行分组
    Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
    for (MessageQueue mq : mqAll) {String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
        if (StringUtils.isNoneEmpty(brokerMachineRoom)) {if (mr2Mq.get(brokerMachineRoom) == null) {mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
            }
            mr2Mq.get(brokerMachineRoom).add(mq);
        } else {throw new IllegalArgumentException("Machine room is null for mq" + mq);
        }
    }

    // 将消费者 依照机房进行分组
    Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
    for (String cid : cidAll) {String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
        if (StringUtils.isNoneEmpty(consumerMachineRoom)) {if (mr2c.get(consumerMachineRoom) == null) {mr2c.put(consumerMachineRoom, new ArrayList<String>());
            }
            mr2c.get(consumerMachineRoom).add(cid);
        } else {throw new IllegalArgumentException("Machine room is null for consumer id" + cid);
        }
    }

    List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

    //1. 过滤出以后机房内的 MQ 队列子集,在此基础上应用调配算法筛选
    String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
    List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
    List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
    if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    }

    //2. 不在同一机房,依照个别策略进行操作
    for (String machineRoom : mr2Mq.keySet()) {if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
        }
    }

    return allocateResults;
}
基于机房调配算法

该算法实用于属于同一个机房外部的音讯,去调配 queue。这种形式十分明确,基于下面的 机房邻近调配算法 的场景,这种更彻底,间接指定基于机房生产的策略。这种形式具备强约定性,比方 broker 名称依照机房的名称进行拼接,在算法中通过约定解析进行调配。

其代码实现如下:

COPY/**
 * Computer room Hashing queue algorithm, such as Alipay logic room
 */
public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
    private Set<String> consumeridcs;

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();
        int currentIndex = cidAll.indexOf(currentCID);
        if (currentIndex < 0) {return result;}
        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {String[] temp = mq.getBrokerName().split("@");
            if (temp.length == 2 && consumeridcs.contains(temp[0])) {premqAll.add(mq);
            }
        }

        int mod = premqAll.size() / cidAll.size();
        int rem = premqAll.size() % cidAll.size();
        int startIndex = mod * currentIndex;
        int endIndex = startIndex + mod;
        for (int i = startIndex; i < endIndex; i++) {result.add(mqAll.get(i));
        }
        if (rem > currentIndex) {result.add(premqAll.get(currentIndex + mod * cidAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {return "MACHINE_ROOM";}

    public Set<String> getConsumeridcs() {return consumeridcs;}

    public void setConsumeridcs(Set<String> consumeridcs) {this.consumeridcs = consumeridcs;}
}

基于配置调配算法

这种算法单纯基于配置的,非常简单,理论应用中可能用处不大。代码如下:

COPYpublic class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
    private List<MessageQueue> messageQueueList;

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {return this.messageQueueList;}

    @Override
    public String getName() {return "CONFIG";}

    public List<MessageQueue> getMessageQueueList() {return messageQueueList;}

    public void setMessageQueueList(List<MessageQueue> messageQueueList) {this.messageQueueList = messageQueueList;}
}

消费者如何指定调配算法

消费者构造方法

在 DefaultMQPushConsumer 构造方法中能够传入调配策略

默认状况下,消费者应用的是 AllocateMessageQueueAveragely 算法,也能够本人指定:

COPYpublic class DefaultMQPushConsumer{    
    /**
     * Default constructor.
     */
    public DefaultMQPushConsumer() {this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
    }
  

    /**
     * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
     *
     * @param consumerGroup Consume queue.
     * @param rpcHook RPC hook to execute before each remoting command.
     * @param allocateMessageQueueStrategy message queue allocating algorithm.
     */
    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }
}

咱们看到默认应用了 AllocateMessageQueueAveragely 平均分配策略

应用其余调配策略

如果须要应用其余调配策略,应用形式如下

本文由 传智教育博学谷狂野架构师 教研团队公布。

如果本文对您有帮忙,欢送 关注 点赞 ;如果您有任何倡议也可 留言评论 私信,您的反对是我保持创作的能源。

转载请注明出处!

退出移动版