启动
eureka-client
启动后,会向eureka-server
注册,同时会定时续约 (renew);为了提升性能,eureka-client
启用了本地缓存,缓存存在localRegionApps
里,定时更新缓存。eureka-client
的核心类是DiscoveryClient
。
@InjectDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } // 这里clientConfig.shouldEnforceRegistrationAtInit()默认为false if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size());}
DiscoveryClient
初始化时,在本地缓存放入了空列表,创建了定时任务启动器和线程池,启动定时任务;由于clientConfig.shouldEnforceRegistrationAtInit()
默认为false
,所以在初始化时并不会执行注册方法register()
。
/** * Initializes all scheduled tasks. */private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); }}/** * Register with the eureka service by making the appropriate REST call. */boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}
在初始化定时任务方法中,启动定时任务刷新缓存和执行心跳。
主要是三个定时任务:刷新本地缓存cacheRefresh
、心跳heartbeat
、定时上报服务信息InstanceInfoReplicator
,再加上一个状态监听StatusChangeListener
。其中cacheRefresh
和heartbeat
共用一个scheduler
,并且都套了一层TimedSupervisorTask
,默认每30秒
刷新一次缓存。如果超时重新设置间隔,新的间隔最大不能超过默认值10倍;每10秒
一次心跳,如果超时重新设置间隔,则新的间隔,最大不能超过默认值10倍。这4个值都是可以通过配置来改变。
public class TimedSupervisorTask extends TimerTask { private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class); private final Counter successCounter; private final Counter timeoutCounter; private final Counter rejectedCounter; private final Counter throwableCounter; private final LongGauge threadPoolLevelGauge; private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor executor; private final long timeoutMillis; private final Runnable task; private final AtomicLong delay; private final long maxDelay; public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.scheduler = scheduler; this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. successCounter = Monitors.newCounter("success"); timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); } @Override public void run() { Future<?> future = null; try { future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); long newDelay = Math.min(maxDelay, currentDelay * 2); delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }}
TimedSupervisorTask
通过通过线程池+Future
方式防止任务超时,如果任务超时,则将下个任务的间隔时间延长一倍,如果没有超时则恢复成最初的间隔。
前面有提到,DiscoveryClient
初始化时默认不会调用register
方法来注册信息,那什么时候会注册?通过源码可以看到有几个地方会调用register
方法。
一、 在前面提到的初始化定时任务里,有个上报节点信息的定时任务,如果发现当前节点状态Dirty
,就会调用注册方法,而InstanceInfoReplicator
在刚启动时,会将Dirty
设为true
,就是为了服务刚启动时,向eureka-server
注册信息。
class InstanceInfoReplicator implements Runnable { public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }}
二、 在心跳/续约时(renew),如果eureka-server
返回服务不存在,也会重新注册服务。
public class DiscoveryClient implements EurekaClient { /** * The heartbeat task that renews the lease in the given intervals. */ private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }}
了解了服务注册之后,再了解下本地缓存原理,缓存刷新线程CacheRefreshThread
是调用DiscoveryClient
的refreshRegistry
方法。
class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); }}@VisibleForTestingvoid refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); }}private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true;}
从fetchRegistry
方法可以看到,如果配置了禁用增量刷新/开启强制全量刷新/当前没有数据/当前没有版本号
时,会全量刷新,否则是增量刷新。
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); }}
全量刷新就是去eureka-server
获取全部信息,并且覆盖掉本地缓存localRegionApps
已有的值。
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); }}
增量更新是调用apps/delta
接口获取最新更新的数据(详细怎么更新在eureka-server
篇),并根据InstanceInfo
的ActionType
属性来做增量修改。appsHashCode
是数据一致性哈希码,eureka
会将本地数据(client)计算的哈希码和接口(server)返回的哈希码做比较,如果比较结果不一致,就重新全量获取数据reconcileAndLogDifference()
。
private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable { logger.debug("The Reconcile hashcodes do not match, client : {}, server : {}. Getting the full registry", reconcileHashCode, delta.getAppsHashCode()); RECONCILE_HASH_CODES_MISMATCH.increment(); long currentUpdateGeneration = fetchRegistryGeneration.get(); EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); Applications serverApps = httpResponse.getEntity(); if (serverApps == null) { logger.warn("Cannot fetch full registry from the server; reconciliation failure"); return; } if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(serverApps)); getApplications().setVersion(delta.getVersion()); logger.debug( "The Reconcile hashcodes after complete sync up, client : {}, server : {}.", getApplications().getReconcileHashCode(), delta.getAppsHashCode()); } else { logger.warn("Not setting the applications map as another thread has advanced the update generation"); }}
appsHashCode
是数据一致性哈希码的规则是:“状态1_状态1对应的服务数量_状态2_状态2对应的服务数量_...”。
public class Applications { private static final String STATUS_DELIMITER = "_"; @JsonIgnore public String getReconcileHashCode() { TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>(); populateInstanceCountMap(instanceCountMap); return getReconcileHashCode(instanceCountMap); } public void populateInstanceCountMap(Map<String, AtomicInteger> instanceCountMap) { for (Application app : this.getRegisteredApplications()) { for (InstanceInfo info : app.getInstancesAsIsFromEureka()) { AtomicInteger instanceCount = instanceCountMap.computeIfAbsent(info.getStatus().name(), k -> new AtomicInteger(0)); instanceCount.incrementAndGet(); } } } public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) { StringBuilder reconcileHashCode = new StringBuilder(75); for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) { reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER).append(mapEntry.getValue().get()) .append(STATUS_DELIMITER); } return reconcileHashCode.toString(); }}
注销
public class DiscoveryClient implements EurekaClient { @PreDestroy @Override public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); } } private void cancelScheduledTasks() { if (instanceInfoReplicator != null) { instanceInfoReplicator.stop(); } if (heartbeatExecutor != null) { heartbeatExecutor.shutdownNow(); } if (cacheRefreshExecutor != null) { cacheRefreshExecutor.shutdownNow(); } if (scheduler != null) { scheduler.shutdownNow(); } } /** * unregister w/ the eureka service. */ void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e); } } } private static final class EurekaTransport { private ClosableResolver bootstrapResolver; private TransportClientFactory transportClientFactory; private EurekaHttpClient registrationClient; private EurekaHttpClientFactory registrationClientFactory; private EurekaHttpClient queryClient; private EurekaHttpClientFactory queryClientFactory; void shutdown() { if (registrationClientFactory != null) { registrationClientFactory.shutdown(); } if (queryClientFactory != null) { queryClientFactory.shutdown(); } if (registrationClient != null) { registrationClient.shutdown(); } if (queryClient != null) { queryClient.shutdown(); } if (transportClientFactory != null) { transportClientFactory.shutdown(); } if (bootstrapResolver != null) { bootstrapResolver.shutdown(); } } } }
从代码里可以看到,注销步骤很简答:
- 停止几个定时任务。
- 调用
cancel
接口向eureka-server
发送注销请求。 - 停止一些
client
和monitor
。
状态变更
最后我们再来看下eureka-client
的状态变更。InstanceInfo
刚创建的时候,状态是InstanceStatus.STARTING
。
public class ApplicationInfoManager { public void initComponent(EurekaInstanceConfig config) { try { this.config = config; this.instanceInfo = new EurekaConfigBasedInstanceInfoProvider(config).get(); } catch (Throwable e) { throw new RuntimeException("Failed to initialize ApplicationInfoManager", e); } }}
之后会变成InstanceStatus.UP
(以下代码是springcloud-eureka-client)。
public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> { @Override public void register(EurekaRegistration reg) { maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler)); }}
服务停止时会从InstanceStatus.UP
变成InstanceStatus.DOWN
(以下代码是springcloud-eureka-client)。
public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> { @Override public void deregister(EurekaRegistration reg) { if (reg.getApplicationInfoManager().getInfo() != null) { if (log.isInfoEnabled()) { log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status DOWN"); } reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN); //shutdown of eureka client should happen with EurekaRegistration.close() //auto registration will create a bean which will be properly disposed //manual registrations will need to call close() } }}
流程图
定时任务简易流程: