共计 6617 个字符,预计需要花费 17 分钟才能阅读完成。
一、插件概述
插件定位
divide 插件是一个 http 正向代理插件,所有的 http 申请都由该插件进行负载平衡解决(具体的负载平衡策略在规定中指定)。
失效机会
当申请头的 rpcType = http 且插件开启时,它将依据申请参数匹配规定,最终交由上游插件进行响应式代理调用。
二、插件解决流程
1)先回顾下申请解决类插件的通用流程(AbstractSoulPlugin # execute):
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) { | |
// 获取插件数据 | |
String pluginName = named(); | |
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName); | |
if (pluginData != null && pluginData.getEnabled()) { | |
// 获取选择器数据 | |
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName); | |
... | |
// 匹配选择器 | |
final SelectorData selectorData = matchSelector(exchange, selectors); | |
... | |
// 获取规定数据 | |
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId()); | |
... | |
// 匹配规定 | |
RuleData rule; | |
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) { | |
//get last | |
rule = rules.get(rules.size() - 1); | |
} else {rule = matchRule(exchange, rules); | |
} | |
... | |
// 执行自定义解决 | |
return doExecute(exchange, chain, selectorData, rule); | |
} | |
// 继续执行插件链解决 | |
return chain.execute(exchange); | |
} |
AbstractSoulPlugin 先匹配到对应的选择器和规定,匹配通过则执行插件的自定义解决。
2)再来看看 divide 插件的自定义解决流程(DividePlugin # doExecute):
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) { | |
... | |
// 筹备规定解决对象(外部持有:负载平衡算法名、重试次数以及超时工夫)final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class); | |
// 获取选择器对应的可用服务列表 | |
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId()); | |
... | |
// 抉择具体散发的服务实例 ip(负载平衡)final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); | |
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip); | |
... | |
// 设置 http url、超时工夫以及重试次数 | |
String domain = buildDomain(divideUpstream); | |
String realURL = buildRealURL(domain, soulContext, exchange); | |
exchange.getAttributes().put(Constants.HTTP_URL, realURL); | |
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout()); | |
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry()); | |
// 继续执行插件链解决 | |
return chain.execute(exchange); | |
} |
DividePlugin 先获取到选择器对应的可用服务列表,而后进行负载平衡抉择行将散发的指标服务器实例 ip,最初设置最终的 url、超时工夫以及重试次数并交由插件链上游进行解决。
留神:
divide 插件本身只是负责依据选择器、规定和负载平衡策略选出待散发的服务器实例,并不间接向后端服务发动 http 申请。
三、主机探活
下面提到,divide 须要获取服务列表,看下获取的实现(UpstreamCacheManager # findUpstreamListBySelectorId):
public List<DivideUpstream> findUpstreamListBySelectorId(final String selectorId) {return UPSTREAM_MAP_TEMP.get(selectorId); | |
} |
外部通过 UPSTREAM_MAP_TEMP 获取存活服务列表。
UpstreamCacheManager 外部保护了两份散列表:
- UPSTREAM_MAP:
全量服务散列表,负责寄存全量的上游服务信息,key 为 选择器 id,value 为应用雷同选择器的服务列表。
- UPSTREAM_MAP_TEMP:
长期服务散列表,负责寄存流动的上游服务信息,key 为 选择器 id,value 为应用雷同选择器的服务列表。
后面章节咱们提到,数据同步时,submit 办法同时更新了 UPSTREAM_MAP 和 UPSTREAM_MAP_TEMP,但后续服务下线如何保护 UPSTREAM_MAP_TEMP 呢,所有还得从 ip 探活说起。
3.1 探活机会
探活机会得从 UpstreamCacheManager 初始化说起:
private UpstreamCacheManager() { | |
// 探活开关查看 | |
boolean check = Boolean.parseBoolean(System.getProperty("soul.upstream.check", "false")); | |
if (check) { | |
// 启动定时探活工作 | |
new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("scheduled-upstream-task", false)) | |
.scheduleWithFixedDelay(this::scheduled, | |
30, Integer.parseInt(System.getProperty("soul.upstream.scheduledTime", "30")), TimeUnit.SECONDS); | |
} | |
} |
UpstreamCacheManager 初始化时,若探活开关关上,则创立定时探活工作,此处默认 30 秒执行一次。
此处共波及到两个配置参数:
- soul.upstream.check 探活开关:默认为 ture,设置为 false 示意不检测
- soul.upstream.scheduledTime 探活工夫距离,默认 10 秒
3.2 探活工作
1)接下来看看探活工作实现(UpstreamCacheManager # scheduled):
private void scheduled() {if (UPSTREAM_MAP.size() > 0) {UPSTREAM_MAP.forEach((k, v) -> { | |
// 流动查看 | |
List<DivideUpstream> result = check(v); | |
if (result.size() > 0) {UPSTREAM_MAP_TEMP.put(k, result); | |
} else {UPSTREAM_MAP_TEMP.remove(k); | |
} | |
}); | |
} | |
} |
工作负责逐条遍历注销全量服务散列表,查看服务活性:
- 若存活数大于 0,则更新存活服务散列表
- 否则,移除存活服务散列表相应内容
2)持续看服务列表活性查看解决(UpstreamCacheManager # check):
private List<DivideUpstream> check(final List<DivideUpstream> upstreamList) {List<DivideUpstream> resultList = Lists.newArrayListWithCapacity(upstreamList.size()); | |
for (DivideUpstream divideUpstream : upstreamList) { | |
// 查看服务活性 | |
final boolean pass = UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl()); | |
if (pass) { | |
// 更新服务状态 | |
if (!divideUpstream.isStatus()) {divideUpstream.setTimestamp(System.currentTimeMillis()); | |
divideUpstream.setStatus(true); | |
... | |
} | |
// 记录存活的服务 | |
resultList.add(divideUpstream); | |
} else { | |
// 更新服务状态 | |
divideUpstream.setStatus(false); | |
... | |
} | |
} | |
return resultList; | |
} |
负责遍历服务列表,依据 url 查看各服务活性并注销存活的服务。
3.3 活性查看
1)服务活性查看实现(UpstreamCheckUtils # checkUrl):
public static boolean checkUrl(final String url) { | |
... | |
// 查看 url 是否为 ip+ 端口格局 | |
if (checkIP(url)) { | |
// 解决 ip 和端口 | |
String[] hostPort; | |
if (url.startsWith(HTTP)) {final String[] http = StringUtils.split(url, "\\/\\/"); | |
hostPort = StringUtils.split(http[1], Constants.COLONS); | |
} else {hostPort = StringUtils.split(url, Constants.COLONS); | |
} | |
// 测试主机是否可连通 | |
return isHostConnector(hostPort[0], Integer.parseInt(hostPort[1])); | |
} else { | |
// 测试主机是否可达 | |
return isHostReachable(url); | |
} | |
} |
查看 url 是否为 ip + port 格局:
- 若为 ip + 端口格局,则测试主机是否可连贯
- 否则,测试主机是否可达
2)测试主机是否可连贯(UpstreamCheckUtils # isHostConnector):
private static boolean isHostConnector(final String host, final int port) {try (Socket socket = new Socket()) {socket.connect(new InetSocketAddress(host, port)); | |
} catch (IOException e) {return false;} | |
return true; | |
} |
通过 socket 的 connection 测试 ip 的连通性。
3)测试主机是否可达(UpstreamCheckUtils # isHostReachable):
private static boolean isHostReachable(final String host) { | |
try {return InetAddress.getByName(host).isReachable(1000); | |
} catch (IOException ignored) { } | |
return false; | |
} |
非 ip + 端口格局 url 尝试应用域名格局测试主机是否可达。
整体看下来,divide 插件从缓存里拿到的服务器信息,来源于数据同步,由探活工作定期被动更新。
四、负载平衡
下面提到,divide 通过负载平衡算法筛选最终散发的服务 ip,看下负载平衡的实现(LoadBalanceUtils # selector):
public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm); | |
return loadBalance.select(upstreamList, ip); | |
} |
外部利用扩大加载器,通过算法名加载对应的负载平衡算法,执行负载平衡计算最终散发到的服务 ip。
soul 网关里默认反对三种负载平衡策略
- HASH(须要计算,可能存在不平衡的状况)
- RANDOM(最简略最快,大量申请下简直均匀)
- ROUND_ROBIN(须要记录状态,有肯定的影响,大数据量下随机和轮询并无太大后果上的差别)
默认为 RANDOM 随机算法,算法解决如下(RandomLoadBalance # doSelect):
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {int totalWeight = calculateTotalWeight(upstreamList); | |
boolean sameWeight = isAllUpStreamSameWeight(upstreamList); | |
// 若权重不统一,则按总权重随机 | |
if (totalWeight > 0 && !sameWeight) {return random(totalWeight, upstreamList); | |
} | |
// 按服务数随机 | |
return random(upstreamList); | |
} |
判断服务列表内服务的权重是否统一:
- 若权重不统一,则按总权重随机
- 否则,按服务数随机
按总权重随机细节(RandomLoadBalance # random):
private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) { | |
// 按总权重取随机数 | |
int offset = RANDOM.nextInt(totalWeight); | |
// 确定随机值落在哪个段上 | |
for (DivideUpstream divideUpstream : upstreamList) {offset -= getWeight(divideUpstream); | |
if (offset < 0) {return divideUpstream;} | |
} | |
return upstreamList.get(0); | |
} |
五、小结
divide 插件解决流程:
-
获取可用服务列表
- 服务列表最后来自
soul-admin
数据同步 - 可用服务列表默认每 30 秒被动探活更新
- 服务列表最后来自
-
负载平衡
- 扩大加载器加载指标负载平衡算法
- 执行具体平衡策略
- 返回一个最终抉择的服务信息
- 设置最终服务的的 url 信息
- 交由插件链上游进行解决