Nacos 作为配置核心,当应用程序去拜访Nacos动静获取配置源之后,会缓存到本地内存以及磁盘中。
因为Nacos作为动静配置核心,意味着后续配置变更之后须要让所有相干的客户端感知,并更新本地内存!
那么这个性能是在哪里实现的呢? 以及它是采纳什么样的形式来实现配置的更新的呢? 咱们一起来摸索一下源码的实现!

客户端配置缓存更新

当客户端拿到配置后,须要动静刷新,从而保证数据和服务器端是统一的,这个过程是如何实现的呢?在这一大节中咱们来做一个详细分析。

Nacos采纳长轮训机制来实现数据变更的同步,原理如下!

整体工作流程如下:

  • 客户端发动长轮训申请
  • 服务端收到申请当前,先比拟服务端缓存中的数据是否雷同,如果不通,则间接返回
  • 如果雷同,则通过schedule提早29.5s之后再执行比拟
  • 为了保障当服务端在29.5s之内产生数据变动可能及时告诉给客户端,服务端采纳事件订阅的形式来监听服务端本地数据变动的事件,一旦收到事件,则触发DataChangeTask的告诉,并且遍历allStubs队列中的ClientLongPolling,把后果写回到客户端,就实现了一次数据的推送
  • 如果 DataChangeTask 工作实现了数据的 “推送” 之后,ClientLongPolling 中的调度工作又开始执行了怎么办呢?
    很简略,只有在进行 “推送” 操作之前,先将原来期待执行的调度工作勾销掉就能够了,这样就避免了推送操作写完响应数据之后,调度工作又去写响应数据,这时必定会报错的。所以,在ClientLongPolling办法中,最开始的一个步骤就是删除订阅事件

长轮训工作启动入口

在NacosConfigService的构造方法中,当这个类被实例化当前,有做一些事件

  • 初始化一个HttpAgent,这里又用到了装璜起模式,理论工作的类是ServerHttpAgent, MetricsHttpAgent外部也是调用了ServerHttpAgent的办法,减少了监控统计的信息
  • ClientWorker, 客户端的一个工作类,agent作为参数传入到clientworker,能够根本猜测到外面会用到agent做一些近程相干的事件
public NacosConfigService(Properties properties) throws NacosException {    ValidatorUtils.checkInitParam(properties);    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);    if (StringUtils.isBlank(encodeTmp)) {        this.encode = Constants.ENCODE;    } else {        this.encode = encodeTmp.trim();    }    initNamespace(properties); //    this.configFilterChainManager = new ConfigFilterChainManager(properties);    //初始化网络通信组件    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));    this.agent.start();     //初始化ClientWorker    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);}

ClientWorker

在上述初始化代码中,咱们重点须要关注ClientWorker这个类,它的构造方法如下

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,        final Properties properties) {    this.agent = agent;    this.configFilterChainManager = configFilterChainManager; //初始化配置过滤管理器        // Initialize the timeout parameter        init(properties); //初始化配置        //初始化一个定时调度的线程池,重写了threadfactory办法    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(r);            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());            t.setDaemon(true);            return t;        }    });         //初始化一个定时调度的线程池,从外面的name名字来看,仿佛和长轮训有关系。而这个长轮训应该是和nacos服务端的长轮训    this.executorService = Executors            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {                @Override                public Thread newThread(Runnable r) {                    Thread t = new Thread(r);                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());                    t.setDaemon(true);                    return t;                }            });    //设置定时工作的执行频率,并且调用checkConfigInfo这个办法,猜想是定时去检测配置是否产生了变动        //首次执行延迟时间为1毫秒、延迟时间为10毫秒    this.executor.scheduleWithFixedDelay(new Runnable() {        @Override        public void run() {            try {                checkConfigInfo();            } catch (Throwable e) {                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);            }        }    }, 1L, 10L, TimeUnit.MILLISECONDS);}

能够看到 ClientWorker 除了将 HttpAgent 维持在本人外部,还创立了两个线程池:

  1. 第一个线程池是只领有一个线程用来执行定时工作的 executor,executor 每隔 10ms 就会执行一次 checkConfigInfo() 办法,从办法名上能够晓得是每 10 ms 查看一次配置信息。
  2. 第二个线程池是一个一般的线程池,从 ThreadFactory 的名称能够看到这个线程池是做长轮询的。

checkConfigInfo

ClientWorker结构初始化中,启动了一个定时工作去执行checkConfigInfo()办法,这个办法次要是定时查看本地配置和服务器上的配置的变更状况,这个办法定义如下.

public void checkConfigInfo() {    // Dispatch tasks.    int listenerSize = cacheMap.size(); //    // Round up the longingTaskCount.     // 向上取整为批数,监听的配置数量除以3000,失去一个整数,代表长轮训工作的数量    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());     //currentLongingTaskCount示意以后的长轮训工作数量,如果小于计算的后果,则能够持续创立    if (longingTaskCount > currentLongingTaskCount) {        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {            // The task list is no order.So it maybe has issues when changing.            executorService.execute(new LongPollingRunnable(i));        }        currentLongingTaskCount = longingTaskCount;    }}

这个办法次要的目标是用来查看服务端的配置信息是否产生了变动。如果有变动,则触发listener告诉

  • cacheMap: AtomicReference<Map<String, CacheData>> cacheMap 用来存储监听变更的缓存汇合。key是依据dataID/group/tenant(租户) 拼接的值。Value是对应存储在nacos服务器上的配置文件的内容。
  • 默认状况下,每个长轮训LongPullingRunnable工作默认解决3000个监听配置集。如果超过3000, 则须要启动多个LongPollingRunnable去执行。
  • currentLongingTaskCount保留已启动的LongPullingRunnable工作数
  • executorService就是在ClientWorker构造方法中初始化的线程池

LongPollingRunnable.run

LongPollingRunnable长轮训工作的实现逻辑,代码比拟长,咱们分段来剖析。

第一局部次要有两个逻辑

  1. 对工作依照批次分类
  2. 查看以后批次的缓存和本地文件的数据是否统一,如果产生了变动,则触发监听。
class LongPollingRunnable implements Runnable {        private final int taskId; //示意当前任务批次id        public LongPollingRunnable(int taskId) {        this.taskId = taskId;    }        @Override    public void run() {                List<CacheData> cacheDatas = new ArrayList<CacheData>();        List<String> inInitializingCacheList = new ArrayList<String>();        try {            // 遍历CacheMap,把CacheMap中和当前任务id雷同的缓存,保留到cacheDatas            // 通过checkLocalConfig办法            for (CacheData cacheData : cacheMap.values()) {                if (cacheData.getTaskId() == taskId) {                    cacheDatas.add(cacheData);                    try {                        checkLocalConfig(cacheData);                        if (cacheData.isUseLocalConfigInfo()) { //这里示意数据有变动,须要告诉监听器                            cacheData.checkListenerMd5(); //告诉所有针对以后配置设置了监听的监听器                        }                    } catch (Exception e) {                        LOGGER.error("get local config info error", e);                    }                }            }           //省略局部                    } catch (Throwable e) {                        // If the rotation training task is abnormal, the next execution time of the task will be punished            LOGGER.error("longPolling error : ", e);            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); //出现异常,到下一次taskPenaltyTime后从新执行工作        }    }}

checkLocalConfig

查看本地配置,这外面有三种状况

  • 如果isUseLocalConfigInfo为false,示意不应用本地配置,然而本地缓存门路的文件是存在的,于是把isUseLocalConfigInfo设置为true,并且更新cacheData的内容以及文件的更新工夫
  • 如果isUseLocalConfigInfo为true,示意应用本地配置文件,然而本地缓存文件不存在,则设置为false,不告诉监听器。
  • 如果isUseLocalConfigInfo为true,并且本地缓存文件也存在,然而缓存的的工夫和文件的更新工夫不统一,则更新cacheData中的内容,并且isUseLocalConfigInfo设置为true。
private void checkLocalConfig(CacheData cacheData) {    final String dataId = cacheData.dataId;    final String group = cacheData.group;    final String tenant = cacheData.tenant;    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);    // 没有 -> 有    if (!cacheData.isUseLocalConfigInfo() && path.exists()) {        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);        cacheData.setUseLocalConfigInfo(true);        cacheData.setLocalConfigInfoVersion(path.lastModified());        cacheData.setContent(content);        String encryptedDataKey = LocalEncryptedDataKeyProcessor                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);        cacheData.setEncryptedDataKey(encryptedDataKey);                LOGGER.warn(                "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));        return;    }     // 有 -> 没有。不告诉业务监听器,从server拿到配置后告诉。    // If use local config info, then it doesn't notify business listener and notify after getting from server.    if (cacheData.isUseLocalConfigInfo() && !path.exists()) {        cacheData.setUseLocalConfigInfo(false);        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),                dataId, group, tenant);        return;    }         // 有变更    if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path            .lastModified()) {        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);        cacheData.setUseLocalConfigInfo(true);        cacheData.setLocalConfigInfoVersion(path.lastModified());        cacheData.setContent(content);        String encryptedDataKey = LocalEncryptedDataKeyProcessor                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);        cacheData.setEncryptedDataKey(encryptedDataKey);        LOGGER.warn(                "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));    }}

checkListenerMd5

遍历用户本人增加的监听器,如果发现数据的md5值不同,则发送告诉

void checkListenerMd5() {    for (ManagerListenerWrap wrap : listeners) {        if (!md5.equals(wrap.lastCallMd5)) {            safeNotifyListener(dataId, group, content, type, md5, wrap);        }    }}

查看服务端配置

在LongPollingRunnable.run中,先通过本地配置的读取和查看来判断数据是否发生变化从而实现变动的告诉

接着,以后的线程还须要去近程服务器上取得最新的数据,查看哪些数据产生了变动

  • 通过checkUpdateDataIds获取近程服务器上数据变更的dataid
  • 遍历这些变动的汇合,而后调用getServerConfig从近程服务器取得对应的内容
  • 更新本地的cache,设置为服务器端返回的内容
  • 最初遍历cacheDatas,找到变动的数据进行告诉
// check server config//从服务端获取发生变化的数据的DataID列表,保留在List<String>汇合中List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);if (!CollectionUtils.isEmpty(changedGroupKeys)) {    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);} //遍历产生了变更的配置项for (String groupKey : changedGroupKeys) {    String[] key = GroupKey.parseKey(groupKey);    String dataId = key[0];    String group = key[1];    String tenant = null;    if (key.length == 3) {        tenant = key[2];    }    try {        //逐项依据这些配置项获取配置信息        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);        //把配置信息保留到CacheData中        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));        cache.setContent(response.getContent());        cache.setEncryptedDataKey(response.getEncryptedDataKey());        if (null != response.getConfigType()) {            cache.setType(response.getConfigType());        }        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",                    agent.getName(), dataId, group, tenant, cache.getMd5(),                    ContentUtils.truncateContent(response.getContent()), response.getConfigType());    } catch (NacosException ioe) {        String message = String            .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",                    agent.getName(), dataId, group, tenant);        LOGGER.error(message, ioe);    }}//再遍历CacheData这个汇合,找到发生变化的数据进行告诉for (CacheData cacheData : cacheDatas) {    if (!cacheData.isInitializing() || inInitializingCacheList        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {        cacheData.checkListenerMd5();        cacheData.setInitializing(false);    }}inInitializingCacheList.clear(); //持续传递以后线程进行轮询executorService.execute(this);

checkUpdateDataIds

这个办法次要是向服务器端发动查看申请,判断本人本地的配置和服务端的配置是否统一。

  • 首先从cacheDatas汇合中找到isUseLocalConfigInfo为false的缓存
  • 把须要查看的配置项,拼接成一个字符串,调用checkUpdateConfigStr进行验证
/** * 从Server获取值变动了的DataID列表。返回的对象里只有dataId和group是无效的。 保障不返回NULL。 */List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {    StringBuilder sb = new StringBuilder();    for (CacheData cacheData : cacheDatas) { //把须要查看的配置项,拼接成一个字符串        if (!cacheData.isUseLocalConfigInfo()) { //找到isUseLocalConfigInfo=false的缓存            sb.append(cacheData.dataId).append(WORD_SEPARATOR);            sb.append(cacheData.group).append(WORD_SEPARATOR);            if (StringUtils.isBlank(cacheData.tenant)) {                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);            } else {                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);            }            if (cacheData.isInitializing()) {//                // cacheData 首次呈现在cacheMap中&首次check更新                inInitializingCacheList                    .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));            }        }    }    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}

checkUpdateConfigStr

从Server获取值变动了的DataID列表。返回的对象里只有dataId和group是无效的。 保障不返回NULL。

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {        //拼接参数和header    Map<String, String> params = new HashMap<String, String>(2);    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);    Map<String, String> headers = new HashMap<String, String>(2);    headers.put("Long-Pulling-Timeout", "" + timeout);        // told server do not hang me up if new initializing cacheData added in    if (isInitializingCacheList) {        headers.put("Long-Pulling-Timeout-No-Hangup", "true");    }        if (StringUtils.isBlank(probeUpdateString)) {//判断可能产生变更的字符串是否为空,如果是,则间接返回。        return Collections.emptyList();    }        try {        // In order to prevent the server from handling the delay of the client's long task,        // increase the client's read timeout to avoid this problem.        // 设置readTimeoutMs,也就是本次申请期待响应的超时工夫,默认是30s        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);        //发动近程调用        HttpRestResult<String> result = agent                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),                        readTimeoutMs);                if (result.ok()) { //如果响应胜利            setHealthServer(true);            return parseUpdateDataIdResponse(result.getData()); //解析并更新数据,返回的是的确产生了数据变更的字符串:tenant/group/dataid。        } else {//如果响应失败            setHealthServer(false);            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),                    result.getCode());        }    } catch (Exception e) {        setHealthServer(false);        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);        throw e;    }    return Collections.emptyList();}

客户端缓存配置长轮训机制总结

整体实现的外围点就一下几个局部

  1. 对本地缓存的配置做工作拆分,每一个批次是3000条
  2. 针对每3000条创立一个线程去执行
  3. 先把每一个批次的缓存和本地磁盘文件中的数据进行比拟,

    1. 如果和本地配置不统一,则示意该缓存产生了更新,间接告诉客户端监听
    2. 如果本地缓存和磁盘数据统一,则须要发动近程申请查看配置变动
  4. 先以tenent/groupId/dataId拼接成字符串,发送到服务端进行查看,返回产生了变更的配置
  5. 客户端收到变更配置列表,再逐项遍历发送到服务端获取配置内容。

服务端配置更新的推送

剖析完客户端之后,随着好奇心的驱使,服务端是如何解决客户端的申请的?那么同样,咱们须要思考几个问题

  • 服务端是如何实现长轮训机制的
  • 客户端的超时工夫为什么要设置30s

客户端发动的申请地址是:/v1/cs/configs/listener,于是找到这个接口进行查看,代码如下。

//# ConfigController.java@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response)        throws ServletException, IOException {    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);    String probeModify = request.getParameter("Listening-Configs");    if (StringUtils.isBlank(probeModify)) {        throw new IllegalArgumentException("invalid probeModify");    }        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);        Map<String, String> clientMd5Map;    try {        //解析客户端传递过去的可能发生变化的配置我的项目,转化为Map汇合(key=dataId,value=md5)        clientMd5Map = MD5Util.getClientMd5Map(probeModify);    } catch (Throwable e) {        throw new IllegalArgumentException("invalid probeModify");    }        // 开始执行长轮训。    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}

doPollingConfig

这个办法次要是用来做长轮训和短轮询的判断

  1. 如果是长轮训,间接走addLongPollingClient办法
  2. 如果是短轮询,间接比拟服务端的数据,如果存在md5不统一,间接把数据返回。
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,        Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {        // 判断以后申请是否反对长轮训。()    if (LongPollingService.isSupportLongPolling(request)) {        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);        return HttpServletResponse.SC_OK + "";    }        //如果是短轮询,走上面的申请,上面的申请就是把客户端传过来的数据和服务端的数据逐项进行比拟,保留到changeGroups中。    // Compatible with short polling logic.    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);        // Compatible with short polling result.    String oldResult = MD5Util.compareMd5OldResult(changedGroups);    String newResult = MD5Util.compareMd5ResultString(changedGroups);        String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);    if (version == null) {        version = "2.0.0";    }    int versionNum = Protocol.getVersionNumber(version);        // Before 2.0.4 version, return value is put into header.    if (versionNum < START_LONG_POLLING_VERSION_NUM) {        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);    } else {        request.setAttribute("content", newResult);    }        Loggers.AUTH.info("new content:" + newResult);        // Disable cache.    response.setHeader("Pragma", "no-cache");    response.setDateHeader("Expires", 0);    response.setHeader("Cache-Control", "no-cache,no-store");    response.setStatus(HttpServletResponse.SC_OK);    return HttpServletResponse.SC_OK + "";}

addLongPollingClient

把客户端的申请,保留到长轮训的执行引擎中。

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,        int probeRequestSize) {    //获取客户端长轮训的超时工夫    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);     //不容许断开的标记    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);    //利用名称    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);    //    String tag = req.getHeader("Vipserver-Tag");    //延期工夫,默认为500ms    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.    // 提前500ms返回一个响应,防止客户端呈现超时    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);    if (isFixedPolling()) {        timeout = Math.max(10000, getFixedPollingInterval());        // Do nothing but set fix polling timeout.    } else {        long start = System.currentTimeMillis();        //通过md5判断客户端申请过去的key是否有和服务器端有不统一的,如果有,则保留到changedGroups中。        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);        if (changedGroups.size() > 0) { //如果发现有变更,则间接把申请返回给客户端            generateResponse(req, rsp, changedGroups);            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                    changedGroups.size());            return;        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { //如果noHangUpFlag为true,阐明不须要挂起客户端,所以间接返回。            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                    changedGroups.size());            return;        }    }    //获取申请端的ip    String ip = RequestUtil.getRemoteIp(req);    // Must be called by http thread, or send response.    //把以后申请转化为一个异步申请(意味着此时tomcat线程被开释,也就是客户端的申请,须要通过asyncContext来手动触发返回,否则始终挂起)    final AsyncContext asyncContext = req.startAsync();    // AsyncContext.setTimeout() is incorrect, Control by oneself    asyncContext.setTimeout(0L); //设置异步申请超时工夫,    //执行长轮训申请    ConfigExecutor.executeLongPolling(            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}

ClientLongPolling

接下来咱们来剖析一下,clientLongPolling到底做了什么操作。或者说咱们能够先猜想一下应该会做什么事件

  • 这个工作要阻塞29.5s能力执行,因为立马执行没有任何意义,毕竟后面曾经执行过一次了
  • 如果在29.5s+之内,数据发生变化,须要提前告诉。须要有一种监控机制

基于这些猜测,咱们能够看看它的实现过程

从代码粗粒度来看,它的实现仿佛和咱们的猜测统一,在run办法中,通过scheduler.schedule实现了一个定时工作,它的delay工夫正好是后面计算的29.5s。在这个工作中,会通过MD5Util.compareMd5来进行计算

那另外一个,当数据发生变化当前,必定不能等到29.5s之后才告诉呀,那怎么办呢?咱们发现有一个allSubs的货色,它仿佛和公布订阅有关系。那是不是有可能以后的clientLongPolling订阅了数据变动的事件呢?

class ClientLongPolling implements Runnable {    @Override    public void run() {        //构建一个异步工作,延后29.5s执行        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {            @Override            public void run() { //如果达到29.5s,阐明这个期间没有做任何配置批改,则主动触发执行                try {                    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());                    // Delete subsciber's relations.                    allSubs.remove(ClientLongPolling.this); //移除订阅关系                    if (isFixedPolling()) { //如果是固定距离的长轮训                        LogUtil.CLIENT_LOG                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),                                        "polling", clientMd5Map.size(), probeRequestSize);                        //比拟变更的key                        List<String> changedGroups = MD5Util                                .compareMd5((HttpServletRequest) asyncContext.getRequest(),                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);                        if (changedGroups.size() > 0) {//如果大于0,示意有变更,间接响应                            sendResponse(changedGroups);                        } else {                            sendResponse(null); //否则返回null                        }                    } else {                        LogUtil.CLIENT_LOG                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),                                        "polling", clientMd5Map.size(), probeRequestSize);                        sendResponse(null);                    }                } catch (Throwable t) {                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());                }            }        }, timeoutTime, TimeUnit.MILLISECONDS);        allSubs.add(this);  //把以后线程增加到订阅事件队列中    }}

allSubs

allSubs是一个队列,队列外面放了ClientLongPolling这个对象。这个队列仿佛和配置变更有某种关联关系。

那么这里必须要实现的是,当用户在nacos 控制台批改了配置之后,必须要从这个订阅关系中取出关注的客户端长连贯,而后把变更的后果返回。于是咱们去看LongPollingService的构造方法查找订阅关系

/** * 长轮询订阅关系 */final Queue<ClientLongPolling> allSubs;allSubs.add(this);

LongPollingService

在LongPollingService的构造方法中,应用了一个NotifyCenter订阅了一个事件,其中不难发现,如果这个事件的实例是LocalDataChangeEvent,也就是服务端数据产生变更的工夫,就会执行一个DataChangeTask的线程。

public LongPollingService() {    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);    // Register LocalDataChangeEvent to NotifyCenter.    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);    //注册LocalDataChangeEvent订阅事件    NotifyCenter.registerSubscriber(new Subscriber() {        @Override        public void onEvent(Event event) {            if (isFixedPolling()) {                // Ignore.            } else {                if (event instanceof LocalDataChangeEvent) { //如果触发了LocalDataChangeEvent,则执行上面的代码                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));                }            }        }        @Override        public Class<? extends Event> subscribeType() {            return LocalDataChangeEvent.class;        }    });}

DataChangeTask

数据变更事件线程,代码如下

class DataChangeTask implements Runnable {    @Override    public void run() {        try {            ConfigCacheService.getContentBetaMd5(groupKey); //            //遍历所有订阅事件表            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {                ClientLongPolling clientSub = iter.next(); //失去ClientLongPolling                //判断以后的ClientLongPolling中,申请的key是否蕴含以后批改的groupKey                if (clientSub.clientMd5Map.containsKey(groupKey)) {                    // If published tag is not in the beta list, then it skipped.                    if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { //如果是beta形式且betaIps不蕴含以后客户端ip,间接返回                        continue;                    }                    // If published tag is not in the tag list, then it skipped.                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {//如果配置了tag标签且不蕴含以后客户端的tag,间接返回                        continue;                    }                    //                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());                    iter.remove(); // Delete subscribers' relationships. 移除以后客户端的订阅关系                    LogUtil.CLIENT_LOG                            .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",                                    RequestUtil                                            .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),                                    "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);                    clientSub.sendResponse(Arrays.asList(groupKey)); //响应客户端申请。                }            }        } catch (Throwable t) {            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));        }    }}

原理总结

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!