乐趣区

关于java:深入剖析-RocketMQ-源码-负载均衡机制

一、引言

RocketMQ 是一款优良的分布式消息中间件,在各方面的性能都比目前已有的音讯队列要好,RocketMQ 默认采纳长轮询的拉模式,单机反对千万级别的音讯沉积,能够十分好的利用在海量音讯零碎中。

RocketMQ 次要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中 Producer 负责生产音讯,Consumer 负责生产音讯,Broker 负责存储音讯,Namesvr 负责存储元数据,各组件的次要性能如下:

  • 音讯生产者(Producer):负责生产音讯,个别由业务零碎负责生产音讯。一个音讯生产者会把业务利用零碎里产生的音讯发送到 Broker 服务器。RocketMQ 提供多种发送形式,同步发送、异步发送、程序发送、单向发送。同步和异步形式均须要 Broker 返回确认信息,单向发送不须要。
  • 音讯消费者(Consumer):负责生产音讯,个别是后盾零碎负责异步生产。一个音讯消费者会从 Broker 服务器拉取音讯、并将其提供给应用程序。从用户利用的角度而言提供了两种生产模式:拉取式生产、推动式生产。
  • 代理服务器(Broker Server):音讯直达角色,负责存储音讯、转发音讯。代理服务器在 RocketMQ 零碎中负责接管从生产者发送来的音讯并存储、同时为消费者的拉取申请作筹备。代理服务器也存储音讯相干的元数据,包含消费者组、生产进度偏移和主题和队列音讯等。
  • 名字服务(Name Server):名称服务充当路由音讯的提供者。生产者或消费者可能通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但互相独立,没有信息替换。
  • 生产者组(Producer Group):同一类 Producer 的汇合,这类 Producer 发送同一类音讯且发送逻辑统一。如果发送的是事务音讯且原始生产者在发送之后解体,则 Broker 服务器会分割同一生产者组的其余生产者实例以提交或回溯生产。
  • 消费者组(Consumer Group):同一类 Consumer 的汇合,这类 Consumer 通常生产同一类音讯且生产逻辑统一。消费者组使得在音讯生产方面,实现负载平衡和容错的指标变得非常容易。

RocketMQ 整体音讯解决逻辑上以 Topic 维度进行生产生产、物理上会存储到具体的 Broker 上的某个 MessageQueue 当中,正因为一个 Topic 会存在多个 Broker 节点上的多个 MessageQueue,所以自然而然就产生了音讯生产生产的负载平衡需要。

本篇文章剖析的外围在于介绍 RocketMQ 的音讯生产者(Producer)和音讯消费者(Consumer)在整个音讯的生产生产过程中如何实现负载平衡以及其中的实现细节。

二、RocketMQ 的整体架构

(图片来自于 Apache RocketMQ)

RocketMQ 架构上次要分为四局部,如上图所示:

  • Producer:音讯公布的角色,反对分布式集群形式部署。Producer 通过 MQ 的负载平衡模块抉择相应的 Broker 集群队列进行音讯投递,投递的过程反对疾速失败并且低提早。
  • Consumer:音讯生产的角色,反对分布式集群形式部署。反对以 push 推,pull 拉两种模式对音讯进行生产。同时也反对集群形式和播送形式的生产,它提供实时音讯订阅机制,能够满足大多数用户的需要。
  • NameServer:NameServer 是一个非常简单的 Topic 路由注册核心,反对分布式集群形式部署,其角色相似 Dubbo 中的 zookeeper,反对 Broker 的动静注册与发现。
  • BrokerServer:Broker 次要负责音讯的存储、投递和查问以及服务高可用保障,反对分布式集群形式部署。

RocketMQ 的 Topic 的物理散布如上图所示:

Topic 作为音讯生产和生产的逻辑概念,具体的音讯存储散布在不同的 Broker 当中。

Broker 中的 Queue 是 Topic 对应音讯的物理存储单元。

在 RocketMQ 的整体设计理念当中,音讯的生产生产以 Topic 维度进行,每个 Topic 会在 RocketMQ 的集群中的 Broker 节点创立对应的 MessageQueue。

producer 生产音讯的过程实质上就是抉择 Topic 在 Broker 的所有的 MessageQueue 并依照肯定的规定抉择其中一个进行音讯发送,失常状况的策略是轮询。

consumer 生产音讯的过程实质上就是一个订阅同一个 Topic 的 consumerGroup 下的每个 consumer 依照肯定的规定负责 Topic 下一部分 MessageQueue 进行生产。

在 RocketMQ 整个音讯的生命周期内,不论是生产音讯还是生产音讯都会波及到负载平衡的概念,音讯的生成过程中次要波及到 Broker 抉择的负载平衡,音讯的生产过程次要波及多 consumer 和多 Broker 之间的负责平衡。

三、producer 音讯生产过程

producer 音讯生产过程:

  • producer 首先拜访 namesvr 获取路由信息,namesvr 存储 Topic 维度的所有路由信息(包含每个 topic 在每个 Broker 的队列散布状况)。
  • producer 解析路由信息生成本地的路由信息,解析 Topic 在 Broker 队列信息并转化为本地的音讯生产的路由信息。
  • producer 依据本地路由信息向 Broker 发送音讯,抉择本地路由中具体的 Broker 进行音讯发送。

3.1 路由同步过程

public class MQClientInstance {public boolean updateTopicRouteInfoFromNameServer(final String topic) {return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
 
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {// 省略对应的代码} else {
                        // 1、负责查问指定的 Topic 对应的路由信息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
 
                    if (topicRouteData != null) {
                        // 2、比拟路由数据 topicRouteData 是否产生变更
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
                        // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息
                        if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
 
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
 
                            // 生成生产者对应的 Topic 信息
                            {TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                            // 保留到本地生产者路由表当中
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {this.lockNamesrv.unlock();
                }
            } else {}} catch (InterruptedException e) { }
 
        return false;
    }
}

路由同步过程

  • 路由同步过程是音讯生产者发送音讯的前置条件,没有路由的同步就无奈感知具体发往那个 Broker 节点。
  • 路由同步次要负责查问指定的 Topic 对应的路由信息,比拟路由数据 topicRouteData 是否产生变更,最终解析路由信息转化为生产者的路由信息和消费者的路由信息。
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    // 依照 broker 维度保留的 Queue 信息
    private List<QueueData> queueDatas;
    // 依照 broker 维度保留的 broker 信息
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
 
 
public class QueueData implements Comparable<QueueData> {
    // broker 的名称
    private String brokerName;
    // 读队列大小
    private int readQueueNums;
    // 写队列大小
    private int writeQueueNums;
    // 读写权限
    private int perm;
    private int topicSynFlag;
}
 
 
public class BrokerData implements Comparable<BrokerData> {
    // broker 所属集群信息
    private String cluster;
    // broker 的名称
    private String brokerName;
    // broker 对应的 ip 地址信息
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    private final Random random = new Random();}
 
 
--------------------------------------------------------------------------------------------------
 
 
public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    // 最细粒度的队列信息
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}
 
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    // Topic 信息
    private String topic;
    // 所属的 brokerName 信息
    private String brokerName;
    // Topic 下的队列信息 Id
    private int queueId;
}

路由解析过程:

  • TopicRouteData 外围变量 QueueData 保留每个 Broker 的队列信息,BrokerData 保留 Broker 的地址信息。
  • TopicPublishInfo 外围变量 MessageQueue 保留最细粒度的队列信息。
  • producer 负责将从 namesvr 获取的 TopicRouteData 转化为 producer 本地的 TopicPublishInfo。
public class MQClientInstance {public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {TopicPublishInfo info = new TopicPublishInfo();
 
        info.setTopicRouteData(route);
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {// 省略相干代码} else {List<QueueData> qds = route.getQueueDatas();
 
            // 依照 brokerName 进行排序
            Collections.sort(qds);
 
            // 遍历所有 broker 生成队列维度信息
            for (QueueData qd : qds) {
                // 具备写能力的 QueueData 可能用于队列生成
                if (PermName.isWriteable(qd.getPerm())) {
                    // 遍历取得指定 brokerData 进行异样条件过滤
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }
                    if (null == brokerData) {continue;}
                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue;}
 
                    // 遍历 QueueData 的写队列的数量大小,生成 MessageQueue 保留指定 TopicPublishInfo
                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }
 
            info.setOrderTopic(false);
        }
 
        return info;
    }
}

路由生成过程:

  • 路由生成过程次要是依据 QueueData 的 BrokerName 和 writeQueueNums 来生成 MessageQueue 对象。
  • MessageQueue 是音讯发送过程中抉择的最细粒度的可发送音讯的队列。
{
    "TBW102": [{
        "brokerName": "broker-a",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }, {
        "brokerName": "broker-b",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }]
}

路由解析举例:

  • topic(TBW102)在 broker- a 和 broker- b 上存在队列信息,其中读写队列个数都为 8。
  • 先依照 broker-a、broker- b 的名字程序针对 broker 信息进行排序。
  • 针对 broker- a 会生成 8 个 topic 为 TBW102 的 MessageQueue 对象,queueId 别离是 0 -7。
  • 针对 broker- b 会生成 8 个 topic 为 TBW102 的 MessageQueue 对象,queueId 别离是 0 -7。
  • topic(名为 TBW102)的 TopicPublishInfo 整体蕴含 16 个 MessageQueue 对象,其中有 8 个 broker- a 的 MessageQueue,有 8 个 broker- b 的 MessageQueue。
  • 音讯发送过程中的路由抉择就是从这 16 个 MessageQueue 对象当中获取一个进行音讯发送。

3.2 负载平衡过程

public class DefaultMQProducerImpl implements MQProducerInner {
 
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        
        // 1、查问音讯发送的 TopicPublishInfo 信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 
        if (topicPublishInfo != null && topicPublishInfo.ok()) {String[] brokersSent = new String[timesTotal];
            // 依据重试次数进行音讯发送
            for (; times < timesTotal; times++) {
                // 记录上次发送失败的 brokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 2、从 TopicPublishInfo 获取发送音讯的队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 3、执行发送并判断发送后果,如果发送失败依据重试次数抉择音讯队列进行从新发送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        switch (communicationMode) {
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}
                                }
 
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (MQBrokerException e) {// 省略相干代码} catch (InterruptedException e) {// 省略相干代码}
                } else {break;}
            }
 
            if (sendResult != null) {return sendResult;}
        }
    }
}

音讯发送过程:

  • 查问 Topic 对应的路由信息对象 TopicPublishInfo。
  • 从 TopicPublishInfo 中通过 selectOneMessageQueue 获取发送音讯的队列,该队列代表具体落到具体的 Broker 的 queue 队列当中。
  • 执行发送并判断发送后果,如果发送失败依据重试次数抉择音讯队列进行从新发送,从新抉择队列会避开上一次发送失败的 Broker 的队列。
public class TopicPublishInfo {public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();
        } else {
            // 依照轮询进行抉择发送的 MessageQueue
            for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                // 避开上一次上一次发送失败的 MessageQueue
                if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}
            }
            return selectOneMessageQueue();}
    }
}

路由抉择过程:

  • MessageQueue 的抉择依照轮询进行抉择,通过全局保护索引进行累加取模抉择发送队列。
  • MessageQueue 的抉择过程中会避开上一次发送失败 Broker 对应的 MessageQueue。

Producer 音讯发送示意图

  • 某 Topic 的队列散布为 Broker\_A\_Queue1、Broker\_A\_Queue2、Broker\_B\_Queue1、Broker\_B\_Queue2、Broker\_C\_Queue1、Broker\_C\_Queue2,依据轮询策略顺次进行抉择。
  • 发送失败的场景下如 Broker\_A\_Queue1 发送失败那么就会跳过 Broker\_A 抉择 Broker\_B_Queue1 进行发送。

四、consumer 音讯生产过程

consumer 音讯生产过程

  • consumer 拜访 namesvr 同步 topic 对应的路由信息。
  • consumer 在本地解析近程路由信息并保留到本地。
  • consumer 在本地进行 Reblance 负载平衡确定本节点负责生产的 MessageQueue。
  • consumer 拜访 Broker 生产指定的 MessageQueue 的音讯。

4.1 路由同步过程

public class MQClientInstance {
 
    // 1、启动定时工作从 namesvr 定时同步路由信息
    private void startScheduledTask() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    }
 
    public void updateTopicRouteInfoFromNameServer() {Set<String> topicList = new HashSet<String>();
 
        // 遍历所有的 consumer 订阅的 Topic 并从 namesvr 获取路由信息
        {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {Set<SubscriptionData> subList = impl.subscriptions();
                    if (subList != null) {for (SubscriptionData subData : subList) {topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }
 
        for (String topic : topicList) {this.updateTopicRouteInfoFromNameServer(topic);
        }
    }
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
 
        try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {// 省略代码} else {topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
 
                    if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
 
                        if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
 
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
 
                            // 构建 consumer 侧的路由信息
                            {Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
     
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {this.lockNamesrv.unlock();
                }
            }
        } catch (InterruptedException e) { }
 
        return false;
    }
}

路由同步过程

  • 路由同步过程是音讯消费者生产音讯的前置条件,没有路由的同步就无奈感知具体待生产的音讯的 Broker 节点。
  • consumer 节点通过定时工作定期从 namesvr 同步该生产节点订阅的 topic 的路由信息。
  • consumer 通过 updateTopicSubscribeInfo 将同步的路由信息构建老本地的路由信息并用以后续的负责平衡。

4.2 负载平衡过程

public class RebalanceService extends ServiceThread {
 
    private static long waitInterval =
        Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
 
    private final MQClientInstance mqClientFactory;
 
    public RebalanceService(MQClientInstance mqClientFactory) {this.mqClientFactory = mqClientFactory;}
 
    @Override
    public void run() {while (!this.isStopped()) {this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();}
 
    }
}

负载平衡过程

  • consumer 通过 RebalanceService 来定期进行从新负载平衡。
  • RebalanceService 的外围在于实现 MessageQueue 和 consumer 的分配关系。
public abstract class RebalanceImpl {private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {
            case BROADCASTING: {
                // 省略相干代码
                break;
            }
            case CLUSTERING: { // 集群模式下的负载平衡
                // 1、获取 topic 下所有的 MessageQueue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 
                // 2、获取 topic 下该 consumerGroup 下所有的 consumer 对象
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                 
                // 3、开始重新分配进行 rebalance
                if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
 
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
 
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 
                    List<MessageQueue> allocateResult = null;
                    try {
                        // 4、通过调配策略从新进行调配
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {return;}
 
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {allocateResultSet.addAll(allocateResult);
                    }
                    // 5、依据调配后果执行真正的 rebalance 动作
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

重新分配流程

  • 获取 topic 下所有的 MessageQueue。
  • 获取 topic 下该 consumerGroup 下所有的 consumer 的 cid(如 192.168.0.8@15958)。
  • 针对 mqAll 和 cidAll 进行排序,mqAll 排序程序依照先 BrokerName 后 BrokerId,cidAll 排序依照字符串排序。
  • 通过调配策略
  • AllocateMessageQueueStrategy 重新分配。
  • 依据调配后果执行真正的 rebalance 动作。
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();
 
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();
         
        // 外围逻辑计算开始
 
        // 计算以后 cid 的下标
        int index = cidAll.indexOf(currentCID);
         
        // 计算多余的模值
        int mod = mqAll.size() % cidAll.size();
 
        // 计算均匀大小
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        // 计算起始下标
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // 计算范畴大小
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        // 组装后果
        for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
    // 外围逻辑计算完结
 
    @Override
    public String getName() {return "AVG";}
}
 
------------------------------------------------------------------------------------
 
rocketMq 的集群存在 3 个 broker,别离是 broker_a、broker_b、broker_c。rocketMq 上存在名为 topic_demo 的 topic,writeQueue 写队列数量为 3,散布在 3 个 broker。排序后的 mqAll 的大小为 9,顺次为
[broker_a_0  broker_a_1  broker_a_2  broker_b_0  broker_b_1  broker_b_2  broker_c_0  broker_c_1  broker_c_2]
 
rocketMq 存在蕴含 4 个 consumer 的 consumer_group,排序后 cidAll 顺次为
[192.168.0.6@15956  192.168.0.7@15957  192.168.0.8@15958  192.168.0.9@15959]
 
192.168.0.6@15956 的调配 MessageQueue 结算过程
index:0
mod:9%4=1
averageSize:9 / 4 + 1 = 3
startIndex:0
range:3
messageQueue:[broker_a_0、broker_a_1、broker_a_2]
 
 
192.168.0.6@15957 的调配 MessageQueue 结算过程
index:1
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:3
range:2
messageQueue:[broker_b_0、broker_b_1]
 
 
192.168.0.6@15958 的调配 MessageQueue 结算过程
index:2
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:5
range:2
messageQueue:[broker_b_2、broker_c_0]
 
 
192.168.0.6@15959 的调配 MessageQueue 结算过程
index:3
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:7
range:2
messageQueue:[broker_c_1、broker_c_2]

调配策略剖析:

  • 整体调配策略能够参考上图的具体例子,能够更好的了解调配的逻辑。

consumer 的调配

  • 同一个 consumerGroup 下的 consumer 对象会调配到同一个 Topic 下不同的 MessageQueue。
  • 每个 MessageQueue 最终会调配到具体的 consumer 当中。

五、RocketMQ 指定机器生产设计思路

日常测试环境当中会存在多台 consumer 进行生产,但理论开发当中某台 consumer 新上了性能后心愿音讯只由该机器进行生产进行逻辑笼罩,这个时候 consumerGroup 的集群模式就会给咱们造成困扰,因为生产负载平衡的起因不确定音讯具体由那台 consumer 进行生产。当然咱们能够通过染指 consumer 的负载平衡机制来实现指定机器生产。

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();
 
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 通过改写这部分逻辑,减少判断是否是指定 IP 的机器,如果不是间接返回空列表示意该机器不负责生产
        if (!cidAll.contains(currentCID)) {return result;}
 
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

consumer 负载平衡策略改写

  • 通过改写负载平衡策略 AllocateMessageQueueAveragely 的 allocate 机制保障只有指定 IP 的机器可能进行生产。
  • 通过 IP 进行判断是基于 RocketMQ 的 cid 格局是 192.168.0.6@15956,其中后面的 IP 地址就是对于的生产机器的 ip 地址,整个计划可行且能够理论落地。

六、小结

本文次要介绍了 RocketMQ 在生产和生产过程中的负载平衡机制,联合源码和理论案例力求给读者一个易于了解的技术遍及,心愿能对读者有参考和借鉴价值。囿于文章篇幅,有些方面未波及,也有很多技术细节未具体论述,如有疑难欢送持续交换。

作者:vivo 互联网服务器团队 -Wang Zhi

退出移动版