共计 3189 个字符,预计需要花费 8 分钟才能阅读完成。
前景回顾
【mq】从零开始实现 mq-01- 生产者、消费者启动
【mq】从零开始实现 mq-02- 如何实现生产者调用消费者?
【mq】从零开始实现 mq-03- 引入 broker 中间人
【mq】从零开始实现 mq-04- 启动检测与实现优化
【mq】从零开始实现 mq-05- 实现优雅停机
【mq】从零开始实现 mq-06- 消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07- 负载平衡 load balance
为什么须要负载平衡
大家好,我是老马。
这一节让咱们看一下如何实现 MQ 的负载平衡。
为什么须要负载平衡呢?
作用
负载平衡最外围的作用:
(1)能够防止单点故障
(2)能够让申请均分的扩散到每一个节点
实现思路
负载平衡实现的形式比拟多,最简略的就是随机抉择一个。
拓展浏览:
从零手写实现负载平衡 http://houbb.github.io/2020/0…
MQ 中用到负载平衡的中央
生产者发送
生产者发送音讯时,能够发送给任一 broker。
broker 推送给消费者
broker 接管到音讯当前,在推送给消费者时,也能够任一抉择一个。
消费者的生产 ACK
消费者生产完,状态回执给 broker,能够抉择任一一个。
音讯黏连
有些音讯比拟非凡,比方须要保障生产的有序性,能够通过 shardingKey 的形式,在负载的时候固定到指定的片区。
代码实现
生产者发送
对立调整获取 channel 的办法。
@Override
public Channel getChannel(String key) {
// 期待启动实现
while (!statusManager.status()) {log.debug("期待初始化实现...");
DateUtil.sleep(100);
}
RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance,
channelFutureList, key);
return rpcChannelFuture.getChannelFuture().channel();
}
工具类实现为外围实现:
/**
* 负载平衡
*
* @param list 列表
* @param key 分片键
* @return 后果
* @since 0.0.7
*/
public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance,
final List<T> list, String key) {if(CollectionUtil.isEmpty(list)) {return null;}
if(StringUtil.isEmpty(key)) {LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance()
.servers(list);
return loadBalance.select(loadBalanceContext);
}
// 获取 code
int hashCode = Objects.hash(key);
int index = hashCode % list.size();
return list.get(index);
}
如果指定了 shardingKey,那么依据 shadringKey 进行 hash 判断。
如果没有,则进行默认的负载平衡策略。
Broker 音讯推送给消费者
消费者订阅列表的获取:
@Override
public List<Channel> getSubscribeList(MqMessage mqMessage) {final String topicName = mqMessage.getTopic();
Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);
if(CollectionUtil.isEmpty(set)) {return Collections.emptyList();
}
//2. 获取匹配的 tag 列表
final List<String> tagNameList = mqMessage.getTags();
Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();
for(ConsumerSubscribeBo bo : set) {String tagRegex = bo.getTagRegex();
if(hasMatch(tagNameList, tagRegex)) {
//TODO: 这种设置模式,对立增加解决 haven
String groupName = bo.getGroupName();
List<ConsumerSubscribeBo> list = groupMap.get(groupName);
if(list == null) {list = new ArrayList<>();
}
list.add(bo);
groupMap.put(groupName, list);
}
}
//3. 依照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 抉择
final String shardingKey = mqMessage.getShardingKey();
List<Channel> channelList = new ArrayList<>();
for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) {List<ConsumerSubscribeBo> list = entry.getValue();
ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey);
final String channelId = bo.getChannelId();
BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);
if(entryChannel == null) {log.warn("channelId: {} 对应的通道信息为空", channelId);
continue;
}
channelList.add(entryChannel.getChannel());
}
return channelList;
}
外围逻辑:RandomUtils.loadBalance(loadBalance, list, shardingKey);
获取,其余的放弃不变。
消费者 ACK
消费者也是相似的,获取 channel 的形式调整如下:
public Channel getChannel(String key) {
// 期待启动实现
while (!statusManager.status()) {log.debug("期待初始化实现...");
DateUtil.sleep(100);
}
RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,
channelFutureList, key);
return rpcChannelFuture.getChannelFuture().channel();
}
小结
负载平衡在分布式服务中,是必备的个性之一。实现的原理并不算简单。
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq
拓展浏览
rpc- 从零开始实现 rpc https://github.com/houbb/rpc