乐趣区

关于java:Soul网关探秘http数据同步变更通知机制

引言

上一篇,梳理除了 soul-admin 在收回数据变更告诉前的解决脉络,本篇开始探索 http 同步策略的变更告诉机制,

不同数据变更的告诉机制该当是统一的,故本篇以 selector 配置变更告诉为切入点进行深刻。

配置操作入口

找到 ConfigController,这是配置操作的入口

其持有一个 HttpLongPollingDataChangedListener 援用,通过 HttpLongPollingDataChangedListener 实现配置变更告诉订阅和配置获取。

告诉订阅:

@PostMapping(value = "/listener")
public void listener(final HttpServletRequest request, final HttpServletResponse response) {longPollingListener.doLongPolling(request, response);
}

配置获取:

@GetMapping("/fetch")
public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {Map<String, ConfigData<?>> result = Maps.newHashMap();
    for (String groupKey : groupKeys) {ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));
        result.put(groupKey, data);
    }
    return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);
}

告诉订阅实现

应用 HttpLongPollingDataChangedListener#doLongPolling 实现告诉订阅

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
    // 比拟配置组 md5
    List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
    String clientIp = getRemoteIp(request);
    // 发现配置组变动则立刻响应
    if (CollectionUtils.isNotEmpty(changedGroup)) {this.generateResponse(response, changedGroup);
        log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
        return;
    }
    // 监听配置变动
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0L);
    // 阻塞客户端线程
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}

通过比拟 MD5 查看配置组是否产生变更,若配置组产生变更则立刻响应,否则阻塞客户端线程。

此处 compareChangedGroup 实现不做深究,持续看 LongPollingClient 具体解决:

@Override
public void run() {this.asyncTimeoutFuture = scheduler.schedule(() -> {clients.remove(LongPollingClient.this);
        List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
        sendResponse(changedGroups);
    }, timeoutTime, TimeUnit.MILLISECONDS);
    clients.add(this);
}

这里将 client 退出 clients 的同时,开启了一个定时工作,负责超时移除 client 并返回发生变化的配置组信息。

超时工夫为结构时传入的 HttpConstants.SERVER_MAX_HOLD_TIMEOUT = 60s

配置获取实现

应用 AbstractDataChangedListener#fetchConfig 实现配置获取

public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {ConfigDataCache config = CACHE.get(groupKey.name());
    switch (groupKey) {
        case APP_AUTH:
            ...
        case PLUGIN:
            ...
        case RULE:
            ...
        case SELECTOR:
            List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {}.getType());
            return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
        case META_DATA:
            ...
        default:
            throw new IllegalStateException("Unexpected groupKey:" + groupKey);
    }
}

这里从 CACHE 缓存获取对应配置组信息,包装成 ConfigData 并返回。

建设订阅关系

soul-web 端通过 HttpSyncDataConfiguration 初始化 HttpSyncDataService 并注入 spring 容器。

HttpSyncDataService#start 办法在初始化时实现配置获取和订阅:

private void start() {
    // It could be initialized multiple times, so you need to control that.
    if (RUNNING.compareAndSet(false, true)) {
        // fetch all group configs.
        this.fetchGroupConfig(ConfigGroupEnum.values());
        int threadSize = serverList.size();
        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                SoulThreadFactory.create("http-long-polling", true));
        // start long polling, each server creates a thread to listen for changes.
        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
    } else {log.info("soul http long polling was started, executor=[{}]", executor);
    }
}

1)配置获取

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {for (int index = 0; index < this.serverList.size(); index++) {String server = serverList.get(index);
        try {this.doFetchGroupConfig(server, groups);
            break;
        } catch (SoulException e) {
            // no available server, throw exception.
            if (index >= serverList.size() - 1) {throw e;}
            log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
        }
    }
}

doFetchGroupConfig 外部发动配置获取申请并更新本地缓存

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
    ...
    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
    ...
    try {json = this.httpClient.getForObject(url, String.class);
    } catch (RestClientException e) {...}
    // update local cache
    boolean updated = this.updateCacheWithJson(json);
    ...
}

2)配置订阅

借助 HttpLongPollingTask 实现

@Override
public void run() {while (RUNNING.get()) {for (int time = 1; time <= retryTimes; time++) {
            try {doLongPolling(server);
            } catch (Exception e) {...}
        }
    }
    log.warn("Stop http long polling.");
}

HttpLongPollingTask 一直循环 doLongPolling,此处有 retry 操作

private void doLongPolling(final String server) {
    ...
    String listenerUrl = server + "/configs/listener";
    ...
    try {String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
        log.debug("listener result: [{}]", json);
        groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
    } catch (RestClientException e) {...}
    if (groupJson != null) {
        // fetch group configuration async.
        ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
        if (ArrayUtils.isNotEmpty(changedGroups)) {log.info("Group config changed: {}", Arrays.toString(changedGroups));
            this.doFetchGroupConfig(server, changedGroups);
        }
    }
}

doLongPolling 外部发动 post 申请订阅配置变更,若产生变更则从新获取配置。

至此,告诉订阅解决脉络已清晰:

  1. soul-web 端通过 http 发动订阅申请
  2. soul-admin 端收到申请,通过比拟 MD5 查看配置组是否存在变更

    • 若存在变更,则立刻响应变更组信息
    • 若无变更,则阻塞客户端线程,并开启定时工作 60s 后从新比拟配置组变更并返回响应
  3. soul-web 端收到响应,判断配置组是否存在变更

    • 若存在变更,则发动获取配置申请获取最新配置信息
  4. soul-web 从新发动订阅申请

配置变更

上回咱们说到 AbstractDataChangedListener 的 onSelectorChanged 实现:

public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {if (CollectionUtils.isEmpty(changed)) {return;}
    // 更新 selector 缓存
    this.updateSelectorCache();
    // selector 变更后处理,实现具体的变更告诉
    this.afterSelectorChanged(changed, eventType);
}

这里 selector 变更解决是先更缓存后发告诉,持续看 afterSelectorChanged 实现。

HttpLongPollingDataChangedListener 真正实现了 AbstractDataChangedListener 的 afterSelectorChanged:

@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}

由定时工作反复执行 DataChangeTask,DataChangeTask 具体解决如下:

@Override
public void run() {for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {LongPollingClient client = iter.next();
        iter.remove();
        client.sendResponse(Collections.singletonList(groupKey));
        log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
    }
}

DataChangeTask 负责从 clients 顺次移除 LongPollingClient 并将 groupKey 作为响应返回,sendResponse 外部解决如下:

void sendResponse(final List<ConfigGroupEnum> changedGroups) {
    // cancel scheduler
    if (null != asyncTimeoutFuture) {asyncTimeoutFuture.cancel(false);
    }
    generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
    asyncContext.complete();}

负责生成响应报文并异步响应客户端,留神有个 asyncTimeoutFuture.cancel 操作,勾销之前的 60s 超时响应。

总结

本篇梳理和剖析了 soul-web 端到 soul-admin 端的配置变更告诉订阅关系建设过程,配合上配置获取接口,实现了整个 http 数据同步策略的变更告诉机制。

下篇,将探索 http 同步策略的 web 端解决变更告诉,期待惊喜。

集体知识库

高性能微服务 API 网关 -Soul

退出移动版