在Eureka - Client服务启动咱们看到,注册表获取的中央有两个,一个是EurekaClient构造函数,一个是定时器每隔30秒去获取。咱们先看看定时器TimedSupervisorTask的run办法
定时器
这个办法有三个比拟重要的参数,timeoutMillis、delay、maxDelay。比方频率是30s,那这个maxDelay就是30*10=300s(这个10的起源参考上一篇),delay在这里是每次都翻倍,然而不能比maxDelay大。
整个设计思路是,如果调用30s超时,那就用60秒,如果再超时,就始终翻,然而不能超过300s。如果在前面的调用中失常了,那delay就复原到30s,下次超时持续翻倍。
@Overridepublic void run() { Future<?> future = null; try { // 执行工作 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); // 指定超时工夫 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout //设置delay delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); // 获取delay long currentDelay = delay.get(); // maxDelay和currentDelay的2倍中取最小值 long newDelay = Math.min(maxDelay, currentDelay * 2); // 设置为下面最小值 delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { // 其余的略 rejectedCounter.increment(); } catch (Throwable e) { // 其余的略 throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } // 把工作放入定时器,工夫是delay if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } }}
服务发现
判断增量获取还是全量获取
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { Applications applications = getApplications(); // 禁用增量、强制全量更新、没有注册信息的时候,进行全量更新 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { // 其余日志打印略 getAndStoreFullRegistry(); } else { //增量更新 getAndUpdateDelta(applications); } // 计算hash applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { // 其余日志打印略 return false; } finally { if (tracer != null) { tracer.stop(); } } // 其余略 return true;}
全量获取
全量获取,调用注册核心地址+apps/获取。
private void getAndStoreFullRegistry() throws Throwable { // 其余略 Applications apps = null; // 调用注册核心地址+apps/获取 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } // 其余略}
增量获取
增量更新,获取后,通过hash要判断是否和服务器统一,不统一就全量
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; // 调用注册核心地址+apps/delta获取 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { //获取不到数据,全力更新 getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; //加锁 if (fetchRegistryUpdateLock.tryLock()) { try { // 和本地的合并 updateDelta(delta); // 计算hash reconcileHashCode = getReconcileHashCode(applications); } finally { //解锁 fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason // 和服务器的hash比照 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { // 不统一阐明和服务器的不统一,间接全量 reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { // 其余略 }}
和本地数据的合并,包含新增、批改、删除。
private void updateDelta(Applications delta) { int deltaCount = 0; // 遍历所有增量 for (Application app : delta.getRegisteredApplications()) { // 遍历所有实例 for (InstanceInfo instance : app.getInstances()) { // 本地缓存 Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); // 是否为同一个region,如果不是,则取同一个region的applications if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; if (ActionType.ADDED.equals(instance.getActionType())) { // 新增解决 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 批改解决 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId()); // 和新增都是调用addInstance办法,因为Application#addInstance里是先移除再新增,所有批改也能够调用 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.DELETED.equals(instance.getActionType())) { // 删除解决 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { applications.removeApplication(existingApp); } } } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount); getApplications().setVersion(delta.getVersion()); getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); }}