在Eureka - Client服务启动咱们看到,注册表获取的中央有两个,一个是EurekaClient构造函数,一个是定时器每隔30秒去获取。咱们先看看定时器TimedSupervisorTask的run办法

定时器

这个办法有三个比拟重要的参数,timeoutMillis、delay、maxDelay。比方频率是30s,那这个maxDelay就是30*10=300s(这个10的起源参考上一篇),delay在这里是每次都翻倍,然而不能比maxDelay大。
整个设计思路是,如果调用30s超时,那就用60秒,如果再超时,就始终翻,然而不能超过300s。如果在前面的调用中失常了,那delay就复原到30s,下次超时持续翻倍。

@Overridepublic void run() {    Future<?> future = null;    try {        // 执行工作        future = executor.submit(task);        threadPoolLevelGauge.set((long) executor.getActiveCount());        // 指定超时工夫        future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout        //设置delay        delay.set(timeoutMillis);        threadPoolLevelGauge.set((long) executor.getActiveCount());        successCounter.increment();    } catch (TimeoutException e) {        logger.warn("task supervisor timed out", e);        timeoutCounter.increment();        // 获取delay        long currentDelay = delay.get();        // maxDelay和currentDelay的2倍中取最小值        long newDelay = Math.min(maxDelay, currentDelay * 2);        // 设置为下面最小值        delay.compareAndSet(currentDelay, newDelay);    } catch (RejectedExecutionException e) {        // 其余的略        rejectedCounter.increment();    } catch (Throwable e) {        // 其余的略        throwableCounter.increment();    } finally {        if (future != null) {            future.cancel(true);        }        // 把工作放入定时器,工夫是delay        if (!scheduler.isShutdown()) {            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);        }    }}

服务发现

判断增量获取还是全量获取

private boolean fetchRegistry(boolean forceFullRegistryFetch) {    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();    try {        Applications applications = getApplications();        // 禁用增量、强制全量更新、没有注册信息的时候,进行全量更新        if (clientConfig.shouldDisableDelta()                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))                || forceFullRegistryFetch                || (applications == null)                || (applications.getRegisteredApplications().size() == 0)                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta        {            // 其余日志打印略            getAndStoreFullRegistry();        } else {            //增量更新            getAndUpdateDelta(applications);        }        // 计算hash        applications.setAppsHashCode(applications.getReconcileHashCode());        logTotalInstances();    } catch (Throwable e) {        // 其余日志打印略        return false;    } finally {        if (tracer != null) {            tracer.stop();        }    }    // 其余略    return true;}

全量获取

全量获取,调用注册核心地址+apps/获取。

private void getAndStoreFullRegistry() throws Throwable {    // 其余略    Applications apps = null;    // 调用注册核心地址+apps/获取    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {        apps = httpResponse.getEntity();    }    // 其余略}

增量获取

增量更新,获取后,通过hash要判断是否和服务器统一,不统一就全量

private void getAndUpdateDelta(Applications applications) throws Throwable {    long currentUpdateGeneration = fetchRegistryGeneration.get();    Applications delta = null;    // 调用注册核心地址+apps/delta获取    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {        delta = httpResponse.getEntity();    }    if (delta == null) {        //获取不到数据,全力更新        getAndStoreFullRegistry();    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());        String reconcileHashCode = "";        //加锁        if (fetchRegistryUpdateLock.tryLock()) {            try {                // 和本地的合并                updateDelta(delta);                // 计算hash                reconcileHashCode = getReconcileHashCode(applications);            } finally {                //解锁                fetchRegistryUpdateLock.unlock();            }        } else {            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");        }        // There is a diff in number of instances for some reason        // 和服务器的hash比照        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {            // 不统一阐明和服务器的不统一,间接全量            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall        }    } else {        // 其余略    }}

和本地数据的合并,包含新增、批改、删除。

private void updateDelta(Applications delta) {    int deltaCount = 0;    // 遍历所有增量    for (Application app : delta.getRegisteredApplications()) {        // 遍历所有实例        for (InstanceInfo instance : app.getInstances()) {            // 本地缓存            Applications applications = getApplications();            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);            // 是否为同一个region,如果不是,则取同一个region的applications            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);                if (null == remoteApps) {                    remoteApps = new Applications();                    remoteRegionVsApps.put(instanceRegion, remoteApps);                }                applications = remoteApps;            }            ++deltaCount;            if (ActionType.ADDED.equals(instance.getActionType())) {                // 新增解决                Application existingApp = applications.getRegisteredApplications(instance.getAppName());                if (existingApp == null) {                    applications.addApplication(app);                }                logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);            } else if (ActionType.MODIFIED.equals(instance.getActionType())) {                // 批改解决                Application existingApp = applications.getRegisteredApplications(instance.getAppName());                if (existingApp == null) {                    applications.addApplication(app);                }                logger.debug("Modified instance {} to the existing apps ", instance.getId());                // 和新增都是调用addInstance办法,因为Application#addInstance里是先移除再新增,所有批改也能够调用                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);            } else if (ActionType.DELETED.equals(instance.getActionType())) {                // 删除解决                Application existingApp = applications.getRegisteredApplications(instance.getAppName());                if (existingApp != null) {                    logger.debug("Deleted instance {} to the existing apps ", instance.getId());                    existingApp.removeInstance(instance);                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {                        applications.removeApplication(existingApp);                    }                }            }        }    }    logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);        getApplications().setVersion(delta.getVersion());    getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());    for (Applications applications : remoteRegionVsApps.values()) {        applications.setVersion(delta.getVersion());        applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());    }}