启动

eureka-client启动后,会向eureka-server注册,同时会定时续约 (renew);为了提升性能,eureka-client启用了本地缓存,缓存存在localRegionApps里,定时更新缓存。
eureka-client的核心类是DiscoveryClient

@InjectDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,                Provider<BackupRegistry> backupRegistryProvider) {    if (args != null) {        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;        this.eventListeners.addAll(args.getEventListeners());        this.preRegistrationHandler = args.preRegistrationHandler;    } else {        this.healthCheckCallbackProvider = null;        this.healthCheckHandlerProvider = null;        this.preRegistrationHandler = null;    }      this.applicationInfoManager = applicationInfoManager;    InstanceInfo myInfo = applicationInfoManager.getInfo();    clientConfig = config;    staticClientConfig = clientConfig;    transportConfig = config.getTransportConfig();    instanceInfo = myInfo;    if (myInfo != null) {        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();    } else {        logger.warn("Setting instanceInfo to a passed in null value");    }    this.backupRegistryProvider = backupRegistryProvider;    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);    localRegionApps.set(new Applications());    fetchRegistryGeneration = new AtomicLong(0);    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));    if (config.shouldFetchRegistry()) {        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});    } else {        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;    }    if (config.shouldRegisterWithEureka()) {        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});    } else {        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;    }    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {        logger.info("Client configured to neither register nor query for data.");        scheduler = null;        heartbeatExecutor = null;        cacheRefreshExecutor = null;        eurekaTransport = null;        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()        // to work with DI'd DiscoveryClient        DiscoveryManager.getInstance().setDiscoveryClient(this);        DiscoveryManager.getInstance().setEurekaClientConfig(config);        initTimestampMs = System.currentTimeMillis();        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",                initTimestampMs, this.getApplications().size());        return;  // no need to setup up an network tasks and we are done    }    try {        // default size of 2 - 1 each for heartbeat and cacheRefresh        scheduler = Executors.newScheduledThreadPool(2,                new ThreadFactoryBuilder()                        .setNameFormat("DiscoveryClient-%d")                        .setDaemon(true)                        .build());        heartbeatExecutor = new ThreadPoolExecutor(                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,                new SynchronousQueue<Runnable>(),                new ThreadFactoryBuilder()                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")                        .setDaemon(true)                        .build()        );  // use direct handoff        cacheRefreshExecutor = new ThreadPoolExecutor(                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,                new SynchronousQueue<Runnable>(),                new ThreadFactoryBuilder()                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")                        .setDaemon(true)                        .build()        );  // use direct handoff        eurekaTransport = new EurekaTransport();        scheduleServerEndpointTask(eurekaTransport, args);        AzToRegionMapper azToRegionMapper;        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);        } else {            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);        }        if (null != remoteRegionsToFetch.get()) {            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));        }        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());    } catch (Throwable e) {        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);    }    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {        fetchRegistryFromBackup();    }    // call and execute the pre registration handler before all background tasks (inc registration) is started    if (this.preRegistrationHandler != null) {        this.preRegistrationHandler.beforeRegistration();    }    // 这里clientConfig.shouldEnforceRegistrationAtInit()默认为false    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {        try {            if (!register() ) {                throw new IllegalStateException("Registration error at startup. Invalid server response.");            }        } catch (Throwable th) {            logger.error("Registration error at startup: {}", th.getMessage());            throw new IllegalStateException(th);        }    }    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch    initScheduledTasks();    try {        Monitors.registerObject(this);    } catch (Throwable e) {        logger.warn("Cannot register timers", e);    }    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()    // to work with DI'd DiscoveryClient    DiscoveryManager.getInstance().setDiscoveryClient(this);    DiscoveryManager.getInstance().setEurekaClientConfig(config);    initTimestampMs = System.currentTimeMillis();    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",            initTimestampMs, this.getApplications().size());}

DiscoveryClient初始化时,在本地缓存放入了空列表,创建了定时任务启动器和线程池,启动定时任务;由于clientConfig.shouldEnforceRegistrationAtInit()默认为false,所以在初始化时并不会执行注册方法register()

/** * Initializes all scheduled tasks. */private void initScheduledTasks() {    if (clientConfig.shouldFetchRegistry()) {        // registry cache refresh timer        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();        scheduler.schedule(                new TimedSupervisorTask(                        "cacheRefresh",                        scheduler,                        cacheRefreshExecutor,                        registryFetchIntervalSeconds,                        TimeUnit.SECONDS,                        expBackOffBound,                        new CacheRefreshThread()                ),                registryFetchIntervalSeconds, TimeUnit.SECONDS);    }    if (clientConfig.shouldRegisterWithEureka()) {        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);        // Heartbeat timer        scheduler.schedule(                new TimedSupervisorTask(                        "heartbeat",                        scheduler,                        heartbeatExecutor,                        renewalIntervalInSecs,                        TimeUnit.SECONDS,                        expBackOffBound,                        new HeartbeatThread()                ),                renewalIntervalInSecs, TimeUnit.SECONDS);        // InstanceInfo replicator        instanceInfoReplicator = new InstanceInfoReplicator(                this,                instanceInfo,                clientConfig.getInstanceInfoReplicationIntervalSeconds(),                2); // burstSize        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {            @Override            public String getId() {                return "statusChangeListener";            }            @Override            public void notify(StatusChangeEvent statusChangeEvent) {                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {                    // log at warn level if DOWN was involved                    logger.warn("Saw local status change event {}", statusChangeEvent);                } else {                    logger.info("Saw local status change event {}", statusChangeEvent);                }                instanceInfoReplicator.onDemandUpdate();            }        };        if (clientConfig.shouldOnDemandUpdateStatusChange()) {            applicationInfoManager.registerStatusChangeListener(statusChangeListener);        }        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());    } else {        logger.info("Not registering with Eureka server per configuration");    }}/** * Register with the eureka service by making the appropriate REST call. */boolean register() throws Throwable {    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);    EurekaHttpResponse<Void> httpResponse;    try {        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);    } catch (Exception e) {        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);        throw e;    }    if (logger.isInfoEnabled()) {        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());    }    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}

在初始化定时任务方法中,启动定时任务刷新缓存和执行心跳。
主要是三个定时任务:刷新本地缓存cacheRefresh、心跳heartbeat、定时上报服务信息InstanceInfoReplicator,再加上一个状态监听StatusChangeListener。其中cacheRefreshheartbeat共用一个scheduler,并且都套了一层TimedSupervisorTask,默认每30秒刷新一次缓存。如果超时重新设置间隔,新的间隔最大不能超过默认值10倍;每10秒一次心跳,如果超时重新设置间隔,则新的间隔,最大不能超过默认值10倍。这4个值都是可以通过配置来改变。

public class TimedSupervisorTask extends TimerTask {    private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);    private final Counter successCounter;    private final Counter timeoutCounter;    private final Counter rejectedCounter;    private final Counter throwableCounter;    private final LongGauge threadPoolLevelGauge;    private final ScheduledExecutorService scheduler;    private final ThreadPoolExecutor executor;    private final long timeoutMillis;    private final Runnable task;    private final AtomicLong delay;    private final long maxDelay;    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {        this.scheduler = scheduler;        this.executor = executor;        this.timeoutMillis = timeUnit.toMillis(timeout);        this.task = task;        this.delay = new AtomicLong(timeoutMillis);        this.maxDelay = timeoutMillis * expBackOffBound;        // Initialize the counters and register.        successCounter = Monitors.newCounter("success");        timeoutCounter = Monitors.newCounter("timeouts");        rejectedCounter = Monitors.newCounter("rejectedExecutions");        throwableCounter = Monitors.newCounter("throwables");        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());        Monitors.registerObject(name, this);    }    @Override    public void run() {        Future<?> future = null;        try {            future = executor.submit(task);            threadPoolLevelGauge.set((long) executor.getActiveCount());            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout            delay.set(timeoutMillis);            threadPoolLevelGauge.set((long) executor.getActiveCount());            successCounter.increment();        } catch (TimeoutException e) {            logger.warn("task supervisor timed out", e);            timeoutCounter.increment();            long currentDelay = delay.get();            long newDelay = Math.min(maxDelay, currentDelay * 2);            delay.compareAndSet(currentDelay, newDelay);        } catch (RejectedExecutionException e) {            if (executor.isShutdown() || scheduler.isShutdown()) {                logger.warn("task supervisor shutting down, reject the task", e);            } else {                logger.warn("task supervisor rejected the task", e);            }            rejectedCounter.increment();        } catch (Throwable e) {            if (executor.isShutdown() || scheduler.isShutdown()) {                logger.warn("task supervisor shutting down, can't accept the task");            } else {                logger.warn("task supervisor threw an exception", e);            }            throwableCounter.increment();        } finally {            if (future != null) {                future.cancel(true);            }            if (!scheduler.isShutdown()) {                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);            }        }    }}

TimedSupervisorTask通过通过线程池+Future方式防止任务超时,如果任务超时,则将下个任务的间隔时间延长一倍,如果没有超时则恢复成最初的间隔。
前面有提到,DiscoveryClient初始化时默认不会调用register方法来注册信息,那什么时候会注册?通过源码可以看到有几个地方会调用register方法。
一、 在前面提到的初始化定时任务里,有个上报节点信息的定时任务,如果发现当前节点状态Dirty,就会调用注册方法,而InstanceInfoReplicator在刚启动时,会将Dirty设为true,就是为了服务刚启动时,向eureka-server注册信息。

class InstanceInfoReplicator implements Runnable {    public void start(int initialDelayMs) {        if (started.compareAndSet(false, true)) {            instanceInfo.setIsDirty(); // for initial register            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);            scheduledPeriodicRef.set(next);        }    }        public void run() {        try {            discoveryClient.refreshInstanceInfo();            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();            if (dirtyTimestamp != null) {                discoveryClient.register();                instanceInfo.unsetIsDirty(dirtyTimestamp);            }        } catch (Throwable t) {            logger.warn("There was a problem with the instance info replicator", t);        } finally {            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);            scheduledPeriodicRef.set(next);        }    }}

二、 在心跳/续约时(renew),如果eureka-server返回服务不存在,也会重新注册服务。

public class DiscoveryClient implements EurekaClient {    /**     * The heartbeat task that renews the lease in the given intervals.     */    private class HeartbeatThread implements Runnable {        public void run() {            if (renew()) {                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();            }        }    }        boolean renew() {        EurekaHttpResponse<InstanceInfo> httpResponse;        try {            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {                REREGISTER_COUNTER.increment();                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());                long timestamp = instanceInfo.setIsDirtyWithTime();                boolean success = register();                if (success) {                    instanceInfo.unsetIsDirty(timestamp);                }                return success;            }            return httpResponse.getStatusCode() == Status.OK.getStatusCode();        } catch (Throwable e) {            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);            return false;        }    }}    

了解了服务注册之后,再了解下本地缓存原理,缓存刷新线程CacheRefreshThread是调用DiscoveryClientrefreshRegistry方法。

class CacheRefreshThread implements Runnable {    public void run() {        refreshRegistry();    }}@VisibleForTestingvoid refreshRegistry() {    try {        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();        boolean remoteRegionsModified = false;        // This makes sure that a dynamic change to remote regions to fetch is honored.        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();        if (null != latestRemoteRegions) {            String currentRemoteRegions = remoteRegionsToFetch.get();            if (!latestRemoteRegions.equals(currentRemoteRegions)) {                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync                synchronized (instanceRegionChecker.getAzToRegionMapper()) {                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {                        String[] remoteRegions = latestRemoteRegions.split(",");                        remoteRegionsRef.set(remoteRegions);                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);                        remoteRegionsModified = true;                    } else {                        logger.info("Remote regions to fetch modified concurrently," +                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);                    }                }            } else {                // Just refresh mapping to reflect any DNS/Property change                instanceRegionChecker.getAzToRegionMapper().refreshMapping();            }        }        boolean success = fetchRegistry(remoteRegionsModified);        if (success) {            registrySize = localRegionApps.get().size();            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();        }        if (logger.isDebugEnabled()) {            StringBuilder allAppsHashCodes = new StringBuilder();            allAppsHashCodes.append("Local region apps hashcode: ");            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());            allAppsHashCodes.append(", is fetching remote regions? ");            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {                allAppsHashCodes.append(", Remote region: ");                allAppsHashCodes.append(entry.getKey());                allAppsHashCodes.append(" , apps hashcode: ");                allAppsHashCodes.append(entry.getValue().getAppsHashCode());            }            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",                    allAppsHashCodes);        }    } catch (Throwable e) {        logger.error("Cannot fetch registry from server", e);    }}private boolean fetchRegistry(boolean forceFullRegistryFetch) {    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();    try {        // If the delta is disabled or if it is the first time, get all        // applications        Applications applications = getApplications();        if (clientConfig.shouldDisableDelta()                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))                || forceFullRegistryFetch                || (applications == null)                || (applications.getRegisteredApplications().size() == 0)                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta        {            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);            logger.info("Application is null : {}", (applications == null));            logger.info("Registered Applications size is zero : {}",                    (applications.getRegisteredApplications().size() == 0));            logger.info("Application version is -1: {}", (applications.getVersion() == -1));            getAndStoreFullRegistry();        } else {            getAndUpdateDelta(applications);        }        applications.setAppsHashCode(applications.getReconcileHashCode());        logTotalInstances();    } catch (Throwable e) {        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);        return false;    } finally {        if (tracer != null) {            tracer.stop();        }    }    // Notify about cache refresh before updating the instance remote status    onCacheRefreshed();    // Update remote status based on refreshed data held in the cache    updateInstanceRemoteStatus();    // registry was fetched successfully, so return true    return true;}

fetchRegistry方法可以看到,如果配置了禁用增量刷新/开启强制全量刷新/当前没有数据/当前没有版本号时,会全量刷新,否则是增量刷新。

private void getAndStoreFullRegistry() throws Throwable {    long currentUpdateGeneration = fetchRegistryGeneration.get();    logger.info("Getting all instance registry info from the eureka server");    Applications apps = null;    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {        apps = httpResponse.getEntity();    }    logger.info("The response status is {}", httpResponse.getStatusCode());    if (apps == null) {        logger.error("The application is null for some reason. Not storing this information");    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {        localRegionApps.set(this.filterAndShuffle(apps));        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());    } else {        logger.warn("Not updating applications as another thread is updating it already");    }}

全量刷新就是去eureka-server获取全部信息,并且覆盖掉本地缓存localRegionApps已有的值。

private void getAndUpdateDelta(Applications applications) throws Throwable {    long currentUpdateGeneration = fetchRegistryGeneration.get();    Applications delta = null;    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {        delta = httpResponse.getEntity();    }    if (delta == null) {        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "                + "Hence got the full registry.");        getAndStoreFullRegistry();    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());        String reconcileHashCode = "";        if (fetchRegistryUpdateLock.tryLock()) {            try {                updateDelta(delta);                reconcileHashCode = getReconcileHashCode(applications);            } finally {                fetchRegistryUpdateLock.unlock();            }        } else {            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");        }        // There is a diff in number of instances for some reason        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall        }    } else {        logger.warn("Not updating application delta as another thread is updating it already");        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());    }}

增量更新是调用apps/delta接口获取最新更新的数据(详细怎么更新在eureka-server篇),并根据InstanceInfoActionType属性来做增量修改。
appsHashCode是数据一致性哈希码,eureka会将本地数据(client)计算的哈希码和接口(server)返回的哈希码做比较,如果比较结果不一致,就重新全量获取数据reconcileAndLogDifference()

private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {    logger.debug("The Reconcile hashcodes do not match, client : {}, server : {}. Getting the full registry",            reconcileHashCode, delta.getAppsHashCode());    RECONCILE_HASH_CODES_MISMATCH.increment();    long currentUpdateGeneration = fetchRegistryGeneration.get();    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());    Applications serverApps = httpResponse.getEntity();    if (serverApps == null) {        logger.warn("Cannot fetch full registry from the server; reconciliation failure");       return;   }    if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {        localRegionApps.set(this.filterAndShuffle(serverApps));        getApplications().setVersion(delta.getVersion());       logger.debug(                "The Reconcile hashcodes after complete sync up, client : {}, server : {}.",                getApplications().getReconcileHashCode(),                delta.getAppsHashCode());    } else {        logger.warn("Not setting the applications map as another thread has advanced the update generation");    }}

appsHashCode是数据一致性哈希码的规则是:“状态1_状态1对应的服务数量_状态2_状态2对应的服务数量_...”。

public class Applications {    private static final String STATUS_DELIMITER = "_";    @JsonIgnore    public String getReconcileHashCode() {        TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();        populateInstanceCountMap(instanceCountMap);        return getReconcileHashCode(instanceCountMap);    }    public void populateInstanceCountMap(Map<String, AtomicInteger> instanceCountMap) {        for (Application app : this.getRegisteredApplications()) {            for (InstanceInfo info : app.getInstancesAsIsFromEureka()) {                AtomicInteger instanceCount = instanceCountMap.computeIfAbsent(info.getStatus().name(),                        k -> new AtomicInteger(0));                instanceCount.incrementAndGet();            }        }    }    public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) {        StringBuilder reconcileHashCode = new StringBuilder(75);        for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {            reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER).append(mapEntry.getValue().get())                    .append(STATUS_DELIMITER);        }        return reconcileHashCode.toString();    }}

注销

public class DiscoveryClient implements EurekaClient {    @PreDestroy    @Override    public synchronized void shutdown() {        if (isShutdown.compareAndSet(false, true)) {            logger.info("Shutting down DiscoveryClient ...");            if (statusChangeListener != null && applicationInfoManager != null) {                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());            }            cancelScheduledTasks();            // If APPINFO was registered            if (applicationInfoManager != null                    && clientConfig.shouldRegisterWithEureka()                    && clientConfig.shouldUnregisterOnShutdown()) {                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);                unregister();            }            if (eurekaTransport != null) {                eurekaTransport.shutdown();            }            heartbeatStalenessMonitor.shutdown();            registryStalenessMonitor.shutdown();            logger.info("Completed shut down of DiscoveryClient");        }    }    private void cancelScheduledTasks() {        if (instanceInfoReplicator != null) {            instanceInfoReplicator.stop();        }        if (heartbeatExecutor != null) {            heartbeatExecutor.shutdownNow();        }        if (cacheRefreshExecutor != null) {            cacheRefreshExecutor.shutdownNow();        }        if (scheduler != null) {            scheduler.shutdownNow();        }    }     /**     * unregister w/ the eureka service.     */    void unregister() {        // It can be null if shouldRegisterWithEureka == false        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {            try {                logger.info("Unregistering ...");                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());            } catch (Exception e) {                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);            }        }    }    private static final class EurekaTransport {        private ClosableResolver bootstrapResolver;        private TransportClientFactory transportClientFactory;        private EurekaHttpClient registrationClient;        private EurekaHttpClientFactory registrationClientFactory;        private EurekaHttpClient queryClient;        private EurekaHttpClientFactory queryClientFactory;        void shutdown() {            if (registrationClientFactory != null) {                registrationClientFactory.shutdown();            }            if (queryClientFactory != null) {                queryClientFactory.shutdown();            }            if (registrationClient != null) {                registrationClient.shutdown();            }            if (queryClient != null) {                queryClient.shutdown();            }            if (transportClientFactory != null) {                transportClientFactory.shutdown();            }            if (bootstrapResolver != null) {                bootstrapResolver.shutdown();            }        }    }  }    

从代码里可以看到,注销步骤很简答:

  1. 停止几个定时任务。
  2. 调用cancel接口向eureka-server发送注销请求。
  3. 停止一些clientmonitor

状态变更

最后我们再来看下eureka-client的状态变更。
InstanceInfo刚创建的时候,状态是InstanceStatus.STARTING

public class ApplicationInfoManager {    public void initComponent(EurekaInstanceConfig config) {        try {            this.config = config;            this.instanceInfo = new EurekaConfigBasedInstanceInfoProvider(config).get();        } catch (Throwable e) {            throw new RuntimeException("Failed to initialize ApplicationInfoManager", e);        }    }}

之后会变成InstanceStatus.UP(以下代码是springcloud-eureka-client)。

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {   @Override   public void register(EurekaRegistration reg) {      maybeInitializeClient(reg);      if (log.isInfoEnabled()) {         log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()               + " with eureka with status "               + reg.getInstanceConfig().getInitialStatus());      }      reg.getApplicationInfoManager()            .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());      reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->            reg.getEurekaClient().registerHealthCheck(healthCheckHandler));   }}

服务停止时会从InstanceStatus.UP变成InstanceStatus.DOWN(以下代码是springcloud-eureka-client)。

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {    @Override    public void deregister(EurekaRegistration reg) {        if (reg.getApplicationInfoManager().getInfo() != null) {            if (log.isInfoEnabled()) {                log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName()                        + " with eureka with status DOWN");            }            reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);            //shutdown of eureka client should happen with EurekaRegistration.close()            //auto registration will create a bean which will be properly disposed            //manual registrations will need to call close()        }    }}

流程图

定时任务简易流程: