关于java:深入剖析-RocketMQ-源码-负载均衡机制

一、引言

RocketMQ是一款优良的分布式消息中间件,在各方面的性能都比目前已有的音讯队列要好,RocketMQ默认采纳长轮询的拉模式, 单机反对千万级别的音讯沉积,能够十分好的利用在海量音讯零碎中。

RocketMQ次要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中Producer 负责生产音讯,Consumer 负责生产音讯,Broker 负责存储音讯,Namesvr负责存储元数据,各组件的次要性能如下:

  • 音讯生产者(Producer):负责生产音讯,个别由业务零碎负责生产音讯。一个音讯生产者会把业务利用零碎里产生的音讯发送到Broker服务器。RocketMQ提供多种发送形式,同步发送、异步发送、程序发送、单向发送。同步和异步形式均须要Broker返回确认信息,单向发送不须要。
  • 音讯消费者(Consumer):负责生产音讯,个别是后盾零碎负责异步生产。一个音讯消费者会从Broker服务器拉取音讯、并将其提供给应用程序。从用户利用的角度而言提供了两种生产模式:拉取式生产、推动式生产。
  • 代理服务器(Broker Server):音讯直达角色,负责存储音讯、转发音讯。代理服务器在RocketMQ零碎中负责接管从生产者发送来的音讯并存储、同时为消费者的拉取申请作筹备。代理服务器也存储音讯相干的元数据,包含消费者组、生产进度偏移和主题和队列音讯等。
  • 名字服务(Name Server):名称服务充当路由音讯的提供者。生产者或消费者可能通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但互相独立,没有信息替换。
  • 生产者组(Producer Group):同一类Producer的汇合,这类Producer发送同一类音讯且发送逻辑统一。如果发送的是事务音讯且原始生产者在发送之后解体,则Broker服务器会分割同一生产者组的其余生产者实例以提交或回溯生产。
  • 消费者组(Consumer Group):同一类Consumer的汇合,这类Consumer通常生产同一类音讯且生产逻辑统一。消费者组使得在音讯生产方面,实现负载平衡和容错的指标变得非常容易。

RocketMQ整体音讯解决逻辑上以Topic维度进行生产生产、物理上会存储到具体的Broker上的某个MessageQueue当中,正因为一个Topic会存在多个Broker节点上的多个MessageQueue,所以自然而然就产生了音讯生产生产的负载平衡需要。

本篇文章剖析的外围在于介绍RocketMQ的音讯生产者(Producer)和音讯消费者(Consumer)在整个音讯的生产生产过程中如何实现负载平衡以及其中的实现细节。

二、RocketMQ的整体架构

(图片来自于Apache RocketMQ)

RocketMQ架构上次要分为四局部,如上图所示:

  • Producer:音讯公布的角色,反对分布式集群形式部署。Producer通过MQ的负载平衡模块抉择相应的Broker集群队列进行音讯投递,投递的过程反对疾速失败并且低提早。
  • Consumer:音讯生产的角色,反对分布式集群形式部署。反对以push推,pull拉两种模式对音讯进行生产。同时也反对集群形式和播送形式的生产,它提供实时音讯订阅机制,能够满足大多数用户的需要。
  • NameServer:NameServer是一个非常简单的Topic路由注册核心,反对分布式集群形式部署,其角色相似Dubbo中的zookeeper,反对Broker的动静注册与发现。
  • BrokerServer:Broker次要负责音讯的存储、投递和查问以及服务高可用保障,反对分布式集群形式部署。

RocketMQ的Topic的物理散布如上图所示:

Topic作为音讯生产和生产的逻辑概念,具体的音讯存储散布在不同的Broker当中。

Broker中的Queue是Topic对应音讯的物理存储单元。

在RocketMQ的整体设计理念当中,音讯的生产生产以Topic维度进行,每个Topic会在RocketMQ的集群中的Broker节点创立对应的MessageQueue。

producer生产音讯的过程实质上就是抉择Topic在Broker的所有的MessageQueue并依照肯定的规定抉择其中一个进行音讯发送,失常状况的策略是轮询。

consumer生产音讯的过程实质上就是一个订阅同一个Topic的consumerGroup下的每个consumer依照肯定的规定负责Topic下一部分MessageQueue进行生产。

在RocketMQ整个音讯的生命周期内,不论是生产音讯还是生产音讯都会波及到负载平衡的概念,音讯的生成过程中次要波及到Broker抉择的负载平衡,音讯的生产过程次要波及多consumer和多Broker之间的负责平衡。

三、producer音讯生产过程

producer音讯生产过程:

  • producer首先拜访namesvr获取路由信息,namesvr存储Topic维度的所有路由信息(包含每个topic在每个Broker的队列散布状况)。
  • producer解析路由信息生成本地的路由信息,解析Topic在Broker队列信息并转化为本地的音讯生产的路由信息。
  • producer依据本地路由信息向Broker发送音讯,抉择本地路由中具体的Broker进行音讯发送。

3.1 路由同步过程

public class MQClientInstance {
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
 
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 省略对应的代码
                    } else {
                        // 1、负责查问指定的Topic对应的路由信息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
 
                    if (topicRouteData != null) {
                        // 2、比拟路由数据topicRouteData是否产生变更
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
                        // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
 
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
 
                            // 生成生产者对应的Topic信息
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                            // 保留到本地生产者路由表当中
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
            }
        } catch (InterruptedException e) {
        }
 
        return false;
    }
}

路由同步过程

  • 路由同步过程是音讯生产者发送音讯的前置条件,没有路由的同步就无奈感知具体发往那个Broker节点。
  • 路由同步次要负责查问指定的Topic对应的路由信息,比拟路由数据topicRouteData是否产生变更,最终解析路由信息转化为生产者的路由信息和消费者的路由信息。
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    // 依照broker维度保留的Queue信息
    private List<QueueData> queueDatas;
    // 依照broker维度保留的broker信息
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
 
 
public class QueueData implements Comparable<QueueData> {
    // broker的名称
    private String brokerName;
    // 读队列大小
    private int readQueueNums;
    // 写队列大小
    private int writeQueueNums;
    // 读写权限
    private int perm;
    private int topicSynFlag;
}
 
 
public class BrokerData implements Comparable<BrokerData> {
    // broker所属集群信息
    private String cluster;
    // broker的名称
    private String brokerName;
    // broker对应的ip地址信息
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    private final Random random = new Random();
}
 
 
--------------------------------------------------------------------------------------------------
 
 
public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    // 最细粒度的队列信息
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}
 
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    // Topic信息
    private String topic;
    // 所属的brokerName信息
    private String brokerName;
    // Topic下的队列信息Id
    private int queueId;
}

路由解析过程:

  • TopicRouteData外围变量QueueData保留每个Broker的队列信息,BrokerData保留Broker的地址信息。
  • TopicPublishInfo外围变量MessageQueue保留最细粒度的队列信息。
  • producer负责将从namesvr获取的TopicRouteData转化为producer本地的TopicPublishInfo。
public class MQClientInstance {
 
    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
 
        TopicPublishInfo info = new TopicPublishInfo();
 
        info.setTopicRouteData(route);
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
          // 省略相干代码
        } else {
 
            List<QueueData> qds = route.getQueueDatas();
 
            // 依照brokerName进行排序
            Collections.sort(qds);
 
            // 遍历所有broker生成队列维度信息
            for (QueueData qd : qds) {
                // 具备写能力的QueueData可能用于队列生成
                if (PermName.isWriteable(qd.getPerm())) {
                    // 遍历取得指定brokerData进行异样条件过滤
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }
                    if (null == brokerData) {
                        continue;
                    }
                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }
 
                    // 遍历QueueData的写队列的数量大小,生成MessageQueue保留指定TopicPublishInfo
                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }
 
            info.setOrderTopic(false);
        }
 
        return info;
    }
}

路由生成过程:

  • 路由生成过程次要是依据QueueData的BrokerName和writeQueueNums来生成MessageQueue 对象。
  • MessageQueue是音讯发送过程中抉择的最细粒度的可发送音讯的队列。
{
    "TBW102": [{
        "brokerName": "broker-a",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }, {
        "brokerName": "broker-b",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }]
}

路由解析举例:

  • topic(TBW102)在broker-a和broker-b上存在队列信息,其中读写队列个数都为8。
  • 先依照broker-a、broker-b的名字程序针对broker信息进行排序。
  • 针对broker-a会生成8个topic为TBW102的MessageQueue对象,queueId别离是0-7。
  • 针对broker-b会生成8个topic为TBW102的MessageQueue对象,queueId别离是0-7。
  • topic(名为TBW102)的TopicPublishInfo整体蕴含16个MessageQueue对象,其中有8个broker-a的MessageQueue,有8个broker-b的MessageQueue。
  • 音讯发送过程中的路由抉择就是从这16个MessageQueue对象当中获取一个进行音讯发送。

3.2 负载平衡过程

public class DefaultMQProducerImpl implements MQProducerInner {
 
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        
        // 1、查问音讯发送的TopicPublishInfo信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
             
            String[] brokersSent = new String[timesTotal];
            // 依据重试次数进行音讯发送
            for (; times < timesTotal; times++) {
                // 记录上次发送失败的brokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 2、从TopicPublishInfo获取发送音讯的队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 3、执行发送并判断发送后果,如果发送失败依据重试次数抉择音讯队列进行从新发送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        switch (communicationMode) {
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
 
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (MQBrokerException e) {
                        // 省略相干代码
                    } catch (InterruptedException e) {
                        // 省略相干代码
                    }
                } else {
                    break;
                }
            }
 
            if (sendResult != null) {
                return sendResult;
            }
        }
    }
}

音讯发送过程:

  • 查问Topic对应的路由信息对象TopicPublishInfo。
  • 从TopicPublishInfo中通过selectOneMessageQueue获取发送音讯的队列,该队列代表具体落到具体的Broker的queue队列当中。
  • 执行发送并判断发送后果,如果发送失败依据重试次数抉择音讯队列进行从新发送,从新抉择队列会避开上一次发送失败的Broker的队列。
public class TopicPublishInfo {
 
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            // 依照轮询进行抉择发送的MessageQueue
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                // 避开上一次上一次发送失败的MessageQueue
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
}

路由抉择过程:

  • MessageQueue的抉择依照轮询进行抉择,通过全局保护索引进行累加取模抉择发送队列。
  • MessageQueue的抉择过程中会避开上一次发送失败Broker对应的MessageQueue。

Producer音讯发送示意图

  • 某Topic的队列散布为Broker\_A\_Queue1、Broker\_A\_Queue2、Broker\_B\_Queue1、Broker\_B\_Queue2、Broker\_C\_Queue1、Broker\_C\_Queue2,依据轮询策略顺次进行抉择。
  • 发送失败的场景下如Broker\_A\_Queue1发送失败那么就会跳过Broker\_A抉择Broker\_B_Queue1进行发送。

四、consumer音讯生产过程

consumer音讯生产过程

  • consumer拜访namesvr同步topic对应的路由信息。
  • consumer在本地解析近程路由信息并保留到本地。
  • consumer在本地进行Reblance负载平衡确定本节点负责生产的MessageQueue。
  • consumer拜访Broker生产指定的MessageQueue的音讯。

4.1 路由同步过程

public class MQClientInstance {
 
    // 1、启动定时工作从namesvr定时同步路由信息
    private void startScheduledTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    }
 
    public void updateTopicRouteInfoFromNameServer() {
        Set<String> topicList = new HashSet<String>();
 
        // 遍历所有的consumer订阅的Topic并从namesvr获取路由信息
        {
            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    Set<SubscriptionData> subList = impl.subscriptions();
                    if (subList != null) {
                        for (SubscriptionData subData : subList) {
                            topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }
 
        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic);
        }
    }
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
 
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 省略代码
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
 
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
 
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
 
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
 
                            // 构建consumer侧的路由信息
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
     
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            }
        } catch (InterruptedException e) {
        }
 
        return false;
    }
}

路由同步过程

  • 路由同步过程是音讯消费者生产音讯的前置条件,没有路由的同步就无奈感知具体待生产的音讯的Broker节点。
  • consumer节点通过定时工作定期从namesvr同步该生产节点订阅的topic的路由信息。
  • consumer通过updateTopicSubscribeInfo将同步的路由信息构建老本地的路由信息并用以后续的负责平衡。

4.2 负载平衡过程

public class RebalanceService extends ServiceThread {
 
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
 
    private final MQClientInstance mqClientFactory;
 
    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }
 
    @Override
    public void run() {
 
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
 
    }
}

负载平衡过程

  • consumer通过RebalanceService来定期进行从新负载平衡。
  • RebalanceService的外围在于实现MessageQueue和consumer的分配关系。
public abstract class RebalanceImpl {
 
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // 省略相干代码
                break;
            }
            case CLUSTERING: { // 集群模式下的负载平衡
                // 1、获取topic下所有的MessageQueue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 
                // 2、获取topic下该consumerGroup下所有的consumer对象
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                 
                // 3、开始重新分配进行rebalance
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
 
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
 
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 
                    List<MessageQueue> allocateResult = null;
                    try {
                        // 4、通过调配策略从新进行调配
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
 
                        return;
                    }
 
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    // 5、依据调配后果执行真正的rebalance动作
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

重新分配流程

  • 获取topic下所有的MessageQueue。
  • 获取topic下该consumerGroup下所有的consumer的cid(如192.168.0.8@15958)。
  • 针对mqAll和cidAll进行排序,mqAll排序程序依照先BrokerName后BrokerId,cidAll排序依照字符串排序。
  • 通过调配策略
  • AllocateMessageQueueStrategy重新分配。
  • 依据调配后果执行真正的rebalance动作。
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
 
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
         
        List<MessageQueue> result = new ArrayList<MessageQueue>();
         
        // 外围逻辑计算开始
 
        // 计算以后cid的下标
        int index = cidAll.indexOf(currentCID);
         
        // 计算多余的模值
        int mod = mqAll.size() % cidAll.size();
 
        // 计算均匀大小
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        // 计算起始下标
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // 计算范畴大小
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        // 组装后果
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
    // 外围逻辑计算完结
 
    @Override
    public String getName() {
        return "AVG";
    }
}
 
------------------------------------------------------------------------------------
 
rocketMq的集群存在3个broker,别离是broker_a、broker_b、broker_c。
 
rocketMq上存在名为topic_demo的topic,writeQueue写队列数量为3,散布在3个broker。
排序后的mqAll的大小为9,顺次为
[broker_a_0  broker_a_1  broker_a_2  broker_b_0  broker_b_1  broker_b_2  broker_c_0  broker_c_1  broker_c_2]
 
rocketMq存在蕴含4个consumer的consumer_group,排序后cidAll顺次为
[192.168.0.6@15956  192.168.0.7@15957  192.168.0.8@15958  192.168.0.9@15959]
 
192.168.0.6@15956 的调配MessageQueue结算过程
index:0
mod:9%4=1
averageSize:9 / 4 + 1 = 3
startIndex:0
range:3
messageQueue:[broker_a_0、broker_a_1、broker_a_2]
 
 
192.168.0.6@15957 的调配MessageQueue结算过程
index:1
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:3
range:2
messageQueue:[broker_b_0、broker_b_1]
 
 
192.168.0.6@15958 的调配MessageQueue结算过程
index:2
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:5
range:2
messageQueue:[broker_b_2、broker_c_0]
 
 
192.168.0.6@15959 的调配MessageQueue结算过程
index:3
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:7
range:2
messageQueue:[broker_c_1、broker_c_2]

调配策略剖析:

  • 整体调配策略能够参考上图的具体例子,能够更好的了解调配的逻辑。

consumer的调配

  • 同一个consumerGroup下的consumer对象会调配到同一个Topic下不同的MessageQueue。
  • 每个MessageQueue最终会调配到具体的consumer当中。

五、RocketMQ指定机器生产设计思路

日常测试环境当中会存在多台consumer进行生产,但理论开发当中某台consumer新上了性能后心愿音讯只由该机器进行生产进行逻辑笼罩,这个时候consumerGroup的集群模式就会给咱们造成困扰,因为生产负载平衡的起因不确定音讯具体由那台consumer进行生产。当然咱们能够通过染指consumer的负载平衡机制来实现指定机器生产。

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
 
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        
 
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 通过改写这部分逻辑,减少判断是否是指定IP的机器,如果不是间接返回空列表示意该机器不负责生产
        if (!cidAll.contains(currentCID)) {
            return result;
        }
 
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

consumer负载平衡策略改写

  • 通过改写负载平衡策略AllocateMessageQueueAveragely的allocate机制保障只有指定IP的机器可能进行生产。
  • 通过IP进行判断是基于RocketMQ的cid格局是192.168.0.6@15956,其中后面的IP地址就是对于的生产机器的ip地址,整个计划可行且能够理论落地。

六、小结

本文次要介绍了RocketMQ在生产和生产过程中的负载平衡机制,联合源码和理论案例力求给读者一个易于了解的技术遍及,心愿能对读者有参考和借鉴价值。囿于文章篇幅,有些方面未波及,也有很多技术细节未具体论述,如有疑难欢送持续交换。

作者:vivo互联网服务器团队-Wang Zhi

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理