引言
上一篇,梳理除了 soul-admin
在收回数据变更告诉前的解决脉络,本篇开始探索 http 同步策略的变更告诉机制,
不同数据变更的告诉机制该当是统一的,故本篇以 selector 配置变更告诉为切入点进行深刻。
配置操作入口
找到 ConfigController,这是配置操作的入口
其持有一个 HttpLongPollingDataChangedListener 援用,通过 HttpLongPollingDataChangedListener 实现配置变更告诉订阅和配置获取。
告诉订阅:
@PostMapping(value = "/listener")public void listener(final HttpServletRequest request, final HttpServletResponse response) { longPollingListener.doLongPolling(request, response);}
配置获取:
@GetMapping("/fetch")public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) { Map<String, ConfigData<?>> result = Maps.newHashMap(); for (String groupKey : groupKeys) { ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey)); result.put(groupKey, data); } return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);}
告诉订阅实现
应用 HttpLongPollingDataChangedListener#doLongPolling 实现告诉订阅
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) { // 比拟配置组md5 List<ConfigGroupEnum> changedGroup = compareChangedGroup(request); String clientIp = getRemoteIp(request); // 发现配置组变动则立刻响应 if (CollectionUtils.isNotEmpty(changedGroup)) { this.generateResponse(response, changedGroup); log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup); return; } // 监听配置变动 final AsyncContext asyncContext = request.startAsync(); asyncContext.setTimeout(0L); // 阻塞客户端线程 scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));}
通过比拟 MD5 查看配置组是否产生变更,若配置组产生变更则立刻响应,否则阻塞客户端线程。
此处 compareChangedGroup 实现不做深究,持续看LongPollingClient 具体解决:
@Overridepublic void run() { this.asyncTimeoutFuture = scheduler.schedule(() -> { clients.remove(LongPollingClient.this); List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest()); sendResponse(changedGroups); }, timeoutTime, TimeUnit.MILLISECONDS); clients.add(this);}
这里将 client 退出 clients 的同时,开启了一个定时工作,负责超时移除 client 并返回发生变化的配置组信息。
超时工夫为结构时传入的 HttpConstants.SERVER_MAX_HOLD_TIMEOUT = 60s
配置获取实现
应用 AbstractDataChangedListener#fetchConfig 实现配置获取
public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) { ConfigDataCache config = CACHE.get(groupKey.name()); switch (groupKey) { case APP_AUTH: ... case PLUGIN: ... case RULE: ... case SELECTOR: List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() { }.getType()); return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList); case META_DATA: ... default: throw new IllegalStateException("Unexpected groupKey: " + groupKey); }}
这里从 CACHE 缓存获取对应配置组信息,包装成 ConfigData 并返回。
建设订阅关系
soul-web
端通过 HttpSyncDataConfiguration 初始化 HttpSyncDataService 并注入 spring容器。
HttpSyncDataService#start 办法在初始化时实现配置获取和订阅:
private void start() { // It could be initialized multiple times, so you need to control that. if (RUNNING.compareAndSet(false, true)) { // fetch all group configs. this.fetchGroupConfig(ConfigGroupEnum.values()); int threadSize = serverList.size(); this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), SoulThreadFactory.create("http-long-polling", true)); // start long polling, each server creates a thread to listen for changes. this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server))); } else { log.info("soul http long polling was started, executor=[{}]", executor); }}
1)配置获取
private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException { for (int index = 0; index < this.serverList.size(); index++) { String server = serverList.get(index); try { this.doFetchGroupConfig(server, groups); break; } catch (SoulException e) { // no available server, throw exception. if (index >= serverList.size() - 1) { throw e; } log.warn("fetch config fail, try another one: {}", serverList.get(index + 1)); } }}
doFetchGroupConfig 外部发动配置获取申请并更新本地缓存
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) { ... String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&"); ... try { json = this.httpClient.getForObject(url, String.class); } catch (RestClientException e) { ... } // update local cache boolean updated = this.updateCacheWithJson(json); ...}
2)配置订阅
借助 HttpLongPollingTask 实现
@Overridepublic void run() { while (RUNNING.get()) { for (int time = 1; time <= retryTimes; time++) { try { doLongPolling(server); } catch (Exception e) { ... } } } log.warn("Stop http long polling.");}
HttpLongPollingTask 一直循环 doLongPolling,此处有 retry 操作
private void doLongPolling(final String server) { ... String listenerUrl = server + "/configs/listener"; ... try { String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody(); log.debug("listener result: [{}]", json); groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data"); } catch (RestClientException e) { ... } if (groupJson != null) { // fetch group configuration async. ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class); if (ArrayUtils.isNotEmpty(changedGroups)) { log.info("Group config changed: {}", Arrays.toString(changedGroups)); this.doFetchGroupConfig(server, changedGroups); } }}
doLongPolling 外部发动 post 申请订阅配置变更,若产生变更则从新获取配置。
至此,告诉订阅解决脉络已清晰:
soul-web
端通过 http 发动订阅申请
soul-admin
端收到申请,通过比拟 MD5 查看配置组是否存在变更
- 若存在变更,则立刻响应变更组信息
- 若无变更,则阻塞客户端线程,并开启定时工作 60s 后从新比拟配置组变更并返回响应
soul-web
端收到响应,判断配置组是否存在变更
- 若存在变更,则发动获取配置申请获取最新配置信息
soul-web
从新发动订阅申请
配置变更
上回咱们说到AbstractDataChangedListener 的 onSelectorChanged 实现:
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) { if (CollectionUtils.isEmpty(changed)) { return; } // 更新 selector 缓存 this.updateSelectorCache(); // selector 变更后处理,实现具体的变更告诉 this.afterSelectorChanged(changed, eventType);}
这里 selector 变更解决是先更缓存后发告诉,持续看 afterSelectorChanged 实现。
HttpLongPollingDataChangedListener 真正实现了 AbstractDataChangedListener 的 afterSelectorChanged:
@Overrideprotected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) { scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));}
由定时工作反复执行 DataChangeTask,DataChangeTask 具体解决如下:
@Overridepublic void run() { for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) { LongPollingClient client = iter.next(); iter.remove(); client.sendResponse(Collections.singletonList(groupKey)); log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime); }}
DataChangeTask 负责从 clients 顺次移除 LongPollingClient 并将 groupKey 作为响应返回,sendResponse 外部解决如下:
void sendResponse(final List<ConfigGroupEnum> changedGroups) { // cancel scheduler if (null != asyncTimeoutFuture) { asyncTimeoutFuture.cancel(false); } generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups); asyncContext.complete();}
负责生成响应报文并异步响应客户端,留神有个 asyncTimeoutFuture.cancel 操作,勾销之前的 60s 超时响应。
总结
本篇梳理和剖析了 soul-web
端到 soul-admin
端的配置变更告诉订阅关系建设过程,配合上配置获取接口,实现了整个 http 数据同步策略的变更告诉机制。
下篇,将探索 http 同步策略的web端解决变更告诉,期待惊喜。
集体知识库
高性能微服务API网关-Soul