关于eureka:Eureka-服务发现

43次阅读

共计 5830 个字符,预计需要花费 15 分钟才能阅读完成。

在 Eureka – Client 服务启动咱们看到,注册表获取的中央有两个,一个是 EurekaClient 构造函数,一个是定时器每隔 30 秒去获取。咱们先看看定时器 TimedSupervisorTask 的 run 办法

定时器

这个办法有三个比拟重要的参数,timeoutMillis、delay、maxDelay。比方频率是 30s,那这个 maxDelay 就是 30*10=300s(这个 10 的起源参考上一篇),delay 在这里是每次都翻倍,然而不能比 maxDelay 大。
整个设计思路是,如果调用 30s 超时,那就用 60 秒,如果再超时,就始终翻,然而不能超过 300s。如果在前面的调用中失常了,那 delay 就复原到 30s,下次超时持续翻倍。

@Override
public void run() {
    Future<?> future = null;
    try {
        // 执行工作
        future = executor.submit(task);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        // 指定超时工夫
        future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
        // 设置 delay
        delay.set(timeoutMillis);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        successCounter.increment();} catch (TimeoutException e) {logger.warn("task supervisor timed out", e);
        timeoutCounter.increment();
        // 获取 delay
        long currentDelay = delay.get();
        // maxDelay 和 currentDelay 的 2 倍中取最小值
        long newDelay = Math.min(maxDelay, currentDelay * 2);
        // 设置为下面最小值
        delay.compareAndSet(currentDelay, newDelay);

    } catch (RejectedExecutionException e) {
        // 其余的略
        rejectedCounter.increment();} catch (Throwable e) {
        // 其余的略
        throwableCounter.increment();} finally {if (future != null) {future.cancel(true);
        }
        // 把工作放入定时器,工夫是 delay
        if (!scheduler.isShutdown()) {scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}

服务发现

判断增量获取还是全量获取

private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {Applications applications = getApplications();
        // 禁用增量、强制全量更新、没有注册信息的时候,进行全量更新
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            // 其余日志打印略
            getAndStoreFullRegistry();} else {
            // 增量更新
            getAndUpdateDelta(applications);
        }
        // 计算 hash
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();} catch (Throwable e) {
        // 其余日志打印略
        return false;
    } finally {if (tracer != null) {tracer.stop();
        }
    }
    // 其余略
    return true;
}

全量获取

全量获取,调用注册核心地址 +apps/ 获取。

private void getAndStoreFullRegistry() throws Throwable {
    // 其余略
    Applications apps = null;
    // 调用注册核心地址 +apps/ 获取
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();
    }
    // 其余略
}

增量获取

增量更新,获取后,通过 hash 要判断是否和服务器统一,不统一就全量

private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    // 调用注册核心地址 +apps/delta 获取
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();
    }

    if (delta == null) {
        // 获取不到数据,全力更新
        getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        // 加锁
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 和本地的合并
                updateDelta(delta);
                // 计算 hash
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                // 解锁
                fetchRegistryUpdateLock.unlock();}
        } else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        // 和服务器的 hash 比照
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            // 不统一阐明和服务器的不统一,间接全量
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {// 其余略}
}

和本地数据的合并,包含新增、批改、删除。

private void updateDelta(Applications delta) {
    int deltaCount = 0;
    // 遍历所有增量
    for (Application app : delta.getRegisteredApplications()) {
        // 遍历所有实例
        for (InstanceInfo instance : app.getInstances()) {
            // 本地缓存
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            // 是否为同一个 region,如果不是,则取同一个 region 的 applications
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }

            ++deltaCount;
            if (ActionType.ADDED.equals(instance.getActionType())) {
                // 新增解决
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {applications.addApplication(app);
                }
                logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                // 批改解决
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {applications.addApplication(app);
                }
                logger.debug("Modified instance {} to the existing apps", instance.getId());
                // 和新增都是调用 addInstance 办法,因为 Application#addInstance 里是先移除再新增,所有批改也能够调用
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

            } else if (ActionType.DELETED.equals(instance.getActionType())) {
                // 删除解决
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {logger.debug("Deleted instance {} to the existing apps", instance.getId());
                    existingApp.removeInstance(instance);
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }
    logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
    getApplications().setVersion(delta.getVersion());
    getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

    for (Applications applications : remoteRegionVsApps.values()) {applications.setVersion(delta.getVersion());
        applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    }
}

正文完
 0