共计 5830 个字符,预计需要花费 15 分钟才能阅读完成。
在 Eureka – Client 服务启动咱们看到,注册表获取的中央有两个,一个是 EurekaClient 构造函数,一个是定时器每隔 30 秒去获取。咱们先看看定时器 TimedSupervisorTask 的 run 办法
定时器
这个办法有三个比拟重要的参数,timeoutMillis、delay、maxDelay。比方频率是 30s,那这个 maxDelay 就是 30*10=300s(这个 10 的起源参考上一篇),delay 在这里是每次都翻倍,然而不能比 maxDelay 大。
整个设计思路是,如果调用 30s 超时,那就用 60 秒,如果再超时,就始终翻,然而不能超过 300s。如果在前面的调用中失常了,那 delay 就复原到 30s,下次超时持续翻倍。
@Override
public void run() {
Future<?> future = null;
try {
// 执行工作
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
// 指定超时工夫
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
// 设置 delay
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();} catch (TimeoutException e) {logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
// 获取 delay
long currentDelay = delay.get();
// maxDelay 和 currentDelay 的 2 倍中取最小值
long newDelay = Math.min(maxDelay, currentDelay * 2);
// 设置为下面最小值
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
// 其余的略
rejectedCounter.increment();} catch (Throwable e) {
// 其余的略
throwableCounter.increment();} finally {if (future != null) {future.cancel(true);
}
// 把工作放入定时器,工夫是 delay
if (!scheduler.isShutdown()) {scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
服务发现
判断增量获取还是全量获取
private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {Applications applications = getApplications();
// 禁用增量、强制全量更新、没有注册信息的时候,进行全量更新
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
// 其余日志打印略
getAndStoreFullRegistry();} else {
// 增量更新
getAndUpdateDelta(applications);
}
// 计算 hash
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();} catch (Throwable e) {
// 其余日志打印略
return false;
} finally {if (tracer != null) {tracer.stop();
}
}
// 其余略
return true;
}
全量获取
全量获取,调用注册核心地址 +apps/ 获取。
private void getAndStoreFullRegistry() throws Throwable {
// 其余略
Applications apps = null;
// 调用注册核心地址 +apps/ 获取
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();
}
// 其余略
}
增量获取
增量更新,获取后,通过 hash 要判断是否和服务器统一,不统一就全量
private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
// 调用注册核心地址 +apps/delta 获取
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();
}
if (delta == null) {
// 获取不到数据,全力更新
getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
// 加锁
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 和本地的合并
updateDelta(delta);
// 计算 hash
reconcileHashCode = getReconcileHashCode(applications);
} finally {
// 解锁
fetchRegistryUpdateLock.unlock();}
} else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
// 和服务器的 hash 比照
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
// 不统一阐明和服务器的不统一,间接全量
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {// 其余略}
}
和本地数据的合并,包含新增、批改、删除。
private void updateDelta(Applications delta) {
int deltaCount = 0;
// 遍历所有增量
for (Application app : delta.getRegisteredApplications()) {
// 遍历所有实例
for (InstanceInfo instance : app.getInstances()) {
// 本地缓存
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
// 是否为同一个 region,如果不是,则取同一个 region 的 applications
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}
++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) {
// 新增解决
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {
// 批改解决
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps", instance.getId());
// 和新增都是调用 addInstance 办法,因为 Application#addInstance 里是先移除再新增,所有批改也能够调用
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.DELETED.equals(instance.getActionType())) {
// 删除解决
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {logger.debug("Deleted instance {} to the existing apps", instance.getId());
existingApp.removeInstance(instance);
if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {applications.removeApplication(existingApp);
}
}
}
}
}
logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
for (Applications applications : remoteRegionVsApps.values()) {applications.setVersion(delta.getVersion());
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}
正文完