前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载平衡 load balance

为什么须要负载平衡

大家好,我是老马。

这一节让咱们看一下如何实现 MQ 的负载平衡。

为什么须要负载平衡呢?

作用

负载平衡最外围的作用:

(1)能够防止单点故障

(2)能够让申请均分的扩散到每一个节点

实现思路

负载平衡实现的形式比拟多,最简略的就是随机抉择一个。

拓展浏览:

从零手写实现负载平衡 http://houbb.github.io/2020/0...

MQ 中用到负载平衡的中央

生产者发送

生产者发送音讯时,能够发送给任一 broker。

broker 推送给消费者

broker 接管到音讯当前,在推送给消费者时,也能够任一抉择一个。

消费者的生产 ACK

消费者生产完,状态回执给 broker,能够抉择任一一个。

音讯黏连

有些音讯比拟非凡,比方须要保障生产的有序性,能够通过 shardingKey 的形式,在负载的时候固定到指定的片区。

代码实现

生产者发送

对立调整获取 channel 的办法。

@Overridepublic Channel getChannel(String key) {    // 期待启动实现    while (!statusManager.status()) {        log.debug("期待初始化实现...");        DateUtil.sleep(100);    }    RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance,            channelFutureList, key);    return rpcChannelFuture.getChannelFuture().channel();}

工具类实现为外围实现:

/** * 负载平衡 * * @param list 列表 * @param key 分片键 * @return 后果 * @since 0.0.7 */public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance,                                                final List<T> list, String key) {    if(CollectionUtil.isEmpty(list)) {        return null;    }    if(StringUtil.isEmpty(key)) {        LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance()                .servers(list);        return loadBalance.select(loadBalanceContext);    }    // 获取 code    int hashCode = Objects.hash(key);    int index = hashCode % list.size();    return list.get(index);}

如果指定了 shardingKey,那么依据 shadringKey 进行 hash 判断。

如果没有,则进行默认的负载平衡策略。

Broker 音讯推送给消费者

消费者订阅列表的获取:

@Overridepublic List<Channel> getSubscribeList(MqMessage mqMessage) {    final String topicName = mqMessage.getTopic();    Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);    if(CollectionUtil.isEmpty(set)) {        return Collections.emptyList();    }    //2. 获取匹配的 tag 列表    final List<String> tagNameList = mqMessage.getTags();    Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();    for(ConsumerSubscribeBo bo : set) {        String tagRegex = bo.getTagRegex();        if(hasMatch(tagNameList, tagRegex)) {            //TODO: 这种设置模式,对立增加解决 haven            String groupName = bo.getGroupName();            List<ConsumerSubscribeBo> list = groupMap.get(groupName);            if(list == null) {                list = new ArrayList<>();            }            list.add(bo);            groupMap.put(groupName, list);        }    }    //3. 依照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 抉择    final String shardingKey = mqMessage.getShardingKey();    List<Channel> channelList = new ArrayList<>();    for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) {        List<ConsumerSubscribeBo> list = entry.getValue();        ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey);        final String channelId = bo.getChannelId();        BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);        if(entryChannel == null) {            log.warn("channelId: {} 对应的通道信息为空", channelId);            continue;        }        channelList.add(entryChannel.getChannel());    }    return channelList;}

外围逻辑:RandomUtils.loadBalance(loadBalance, list, shardingKey); 获取,其余的放弃不变。

消费者 ACK

消费者也是相似的,获取 channel 的形式调整如下:

public Channel getChannel(String key) {    // 期待启动实现    while (!statusManager.status()) {        log.debug("期待初始化实现...");        DateUtil.sleep(100);    }    RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,            channelFutureList, key);    return rpcChannelFuture.getChannelFuture().channel();}

小结

负载平衡在分布式服务中,是必备的个性之一。实现的原理并不算简单。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc-从零开始实现 rpc https://github.com/houbb/rpc