指标

  • soul http长轮询 形式数据同步原理及源码剖析

上一篇咱们对Soul网关的 zookeeper 数据同步形式做了简略的剖析,理解了一下 zookeeper 同步的根本流程。接下来咱们看一下Soul网关的http长轮询数据同步形式。

同步原理

Soul网关 http同步原理:

Soul 借鉴了 ApolloNacos 的设计思维,取其精华,本人实现了 http 长轮询数据同步性能。留神,这里并非传统的 Ajax长轮询。http 长轮询机制如下图所示:

soul-web 网关申请 soul-admin 的配置服务,读取超时工夫为 90s,意味着网关层申请配置服务最多会期待 90s,这样便于 admin 配置服务及时响应变更数据,从而实现准实时推送。

soul-web的 HTTP 申请达到 sou-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,将长轮询申请工作扔到 BlocingQueue 中,并且开启调度工作,60s 后执行。这样做的目标是 60s 后将该长轮询申请移除队列,即使是这段时间内没有产生配置数据变更,也得让网关晓得。而且网关申请配置服务时,也有 90s 的超时工夫。

Soul网关开启 http长轮询 同步:

  • soul-bootstrap新增如下依赖:

     <!--soul data sync start use http--><dependency>    <groupId>org.dromara</groupId>    <artifactId>soul-spring-boot-starter-sync-data-http</artifactId>    <version>2.2.1</version></dependency>
  • application.yml增加相干配置

    soul :   sync:       http:            url: http://localhost:9095 #url: 配置成你的zk地址,集群环境请应用(,)分隔

soul-admin 配置,或在 soul-admin 启动参数中设置 --soul.sync.http.enabled=true,而后重启服务

  sync:    http:      enabled: true

源码剖析

soul-admin 数据同步

soul-admin 的数据变更告诉,Soul 网关的三种数据同步形式webscoket、zookeeper、http长轮询原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样,上一篇zookeeper数据同步已做了剖析,这里就不在赘述。

  • htttp长轮询 监听器源码剖析

咱们后面开启了soul.sync.http.enabled=true,那么在我的项目中必定会有读取配置的中央。通过`soul.sync.http搜寻发现数据同步的配置类DataSyncConfiguration,上面是http长轮询的配置代码:

   /**     * http long polling.     */    @Configuration    @ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")    @EnableConfigurationProperties(HttpSyncProperties.class)    static class HttpLongPollingListener {        @Bean        @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)        public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {            return new HttpLongPollingDataChangedListener(httpSyncProperties);        }    }

HttpLongPollingDataChangedListener类为DataChangedListener接口的具体实现,解决http长轮询的数据推送,外围代码如下:

 /**     * If the configuration data changes, the group information for the change is immediately responded.     * Otherwise, the client's request thread is blocked until any data changes or the specified timeout is reached.     *     * @param request  the request     * @param response the response     */    public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {        //查看客户端是否须要更新缓存,md5不统一,立刻响应        //留神:这里返回的是配置分组信息,网关收到响应信息之后,只晓得是哪个 Group 产生了配置变更,还须要再次申请该 Group 的配置数据        List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);        String clientIp = getRemoteIp(request);        // response immediately.        if (CollectionUtils.isNotEmpty(changedGroup)) {            this.generateResponse(response, changedGroup);            log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);            return;        }        // listen for configuration changed.        //获取servlet3.0的异步解决HTTP申请        final AsyncContext asyncContext = request.startAsync();        // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself        asyncContext.setTimeout(0L);        // block client's thread.        //阻止客户端线程60秒        scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));    }        /******************LongPollingClient外围代码*****************************/        public void run() {             // 退出定时工作,如果60s之内没有配置变更,则60s后执行,响应http申请            this.asyncTimeoutFuture = scheduler.schedule(() -> {                //将以后长轮询从队列中移除( clients是阻塞队列,保留了来自soul-web的申请信息)                clients.remove(LongPollingClient.this);                //获取以后申请中的配置分组信息                List<ConfigGroupEnum> changedGroups                     = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());                //发送响应申请                sendResponse(changedGroups);            }, timeoutTime, TimeUnit.MILLISECONDS);            clients.add(this);        }    /**     * Send response datagram.发送响应数据包     *     * @param response      the response     * @param changedGroups the changed groups     */    private void generateResponse(final HttpServletResponse response, final List<ConfigGroupEnum> changedGroups) {        try {            response.setHeader("Pragma", "no-cache");            response.setDateHeader("Expires", 0);            response.setHeader("Cache-Control", "no-cache,no-store");            response.setContentType(MediaType.APPLICATION_JSON_VALUE);            response.setStatus(HttpServletResponse.SC_OK);            response.getWriter().println(GsonUtils.getInstance().toJson(SoulAdminResult.success(SoulResultMessage.SUCCESS, changedGroups)));        } catch (IOException ex) {            log.error("Sending response failed.", ex);      

至此,soul-admin曾经实现了数据发送。

soul-bootstrap 网关数据同步

开启http长轮询同步,须要在soul-bootstrap中引入soul-spring-boot-starter-sync-data-http,在我的项目中找到对应的自定义spring-boot-starter,发现了HttpSyncDataConfiguration配置类。

/** * Http sync data configuration for spring boot. * * @author xiaoyu(Myth) */@Configuration@ConditionalOnClass(HttpSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")@Slf4jpublic class HttpSyncDataConfiguration {    /**     * Http sync data service.     *     * @param httpConfig        the http config     * @param pluginSubscriber the plugin subscriber     * @param metaSubscribers   the meta subscribers     * @param authSubscribers   the auth subscribers     * @return the sync data service     */    @Bean    public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use http long pull sync soul data");        return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }    /**     * Http config http config.     *     * @return the http config     */    @Bean    @ConfigurationProperties(prefix = "soul.sync.http")    public HttpConfig httpConfig() {        return new HttpConfig();    }}

HttpSyncDataService为实现http长轮询的外围类,上面是外围代码:

   private void start() {        // RUNNING = new AtomicBoolean(false),默认为false        // compareAndSet:如果以后状态值等于预期值,则以原子形式将同步状态设置为给定的更新值        if (RUNNING.compareAndSet(false, true)) {            // fetch all group configs.            //申请configs/fetch,获取最新的配置信息            this.fetchGroupConfig(ConfigGroupEnum.values());            int threadSize = serverList.size();            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,                    new LinkedBlockingQueue<>(),                    SoulThreadFactory.create("http-long-polling", true));            // start long polling, each server creates a thread to listen for changes.            // 开启轮询,若配置多个soul-admin服务器,每个服务器都会开启轮询            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));        } else {            log.info("soul http long polling was started, executor=[{}]", executor);        }    }  /****************** HttpLongPollingTask 外围代码*****************************/     //分组申请soul-admin/configs/listener监听接口     private void doLongPolling(final String server) {        //每组缓存最初更新工夫        MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {            ConfigData<?> cacheConfig = factory.cacheConfigData(group);            String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));            params.put(group.name(), Lists.newArrayList(value));        }        HttpHeaders headers = new HttpHeaders();        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);        HttpEntity httpEntity = new HttpEntity(params, headers);        String listenerUrl = server + "/configs/listener";        log.debug("request listener configs: [{}]", listenerUrl);        JsonArray groupJson = null;        try {            //执行申请            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();            groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");        } catch (RestClientException e) {            String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());            throw new SoulException(message, e);        }        if (groupJson != null) {            // 异步获取组配置信息            ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);            if (ArrayUtils.isNotEmpty(changedGroups)) {                //分组申请soul-admin/configs/listener监听接口,获取最新配置信息                this.doFetchGroupConfig(server, changedGroups);            }        }    }   //分组申请soul-admin/configs/listener接口,获取最新配置信息   private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {        StringBuilder params = new StringBuilder();        for (ConfigGroupEnum groupKey : groups) {            params.append("groupKeys").append("=").append(groupKey.name()).append("&");        }        //组装申请 url 地址        String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");        String json = null;        try {            //获取最新配置信息            json = this.httpClient.getForObject(url, String.class);        } catch (RestClientException e) {            log.warn(message);            throw new SoulException(message, e);        }        //更新本地缓存        boolean updated = this.updateCacheWithJson(json);        ThreadUtils.sleep(TimeUnit.SECONDS, 30);    }   /**     * 更新本地缓存     * @param json the response from config server.     * @return true: the local cache was updated. false: not updated.     */    private boolean updateCacheWithJson(final String json) {        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);        JsonObject data = jsonObject.getAsJsonObject("data");        // if the config cache will be updated?        return factory.executor(data);    }    /**     * 执行更新本地缓存操作     *     * @param data the data     * @return the boolean     */    public boolean executor(final JsonObject data) {        final boolean[] success = {false};        ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));        return success[0];    }

为什么先告诉 Group 产生了配置变更,通过 Group 申请soul-admin的 /configs/fetch 接口获取具体配置信息,而不是间接将变更的数据写出?

因为 http长轮询机制只能保障准实时,如果在网关层解决不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,平安起见,只告知某个 Group 信息产生了变更。

至此,http长轮询数据同步源码剖析实现。