springcloud项目优雅重启二eurekaclient

49次阅读

共计 25309 个字符,预计需要花费 64 分钟才能阅读完成。

启动

eureka-client启动后,会向 eureka-server 注册,同时会定时续约 (renew);为了提升性能,eureka-client启用了本地缓存,缓存存在 localRegionApps 里,定时更新缓存。
eureka-client的核心类是DiscoveryClient

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
        this.preRegistrationHandler = args.preRegistrationHandler;
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
        this.preRegistrationHandler = null;
    }
  
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();} else {logger.warn("Setting instanceInfo to a passed in null value");
    }
    this.backupRegistryProvider = backupRegistryProvider;
    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    localRegionApps.set(new Applications());
    fetchRegistryGeneration = new AtomicLong(0);
    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    if (config.shouldFetchRegistry()) {this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
    if (config.shouldRegisterWithEureka()) {this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
        return;  // no need to setup up an network tasks and we are done
    }
    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());
        heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build());  // use direct handoff


        cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build());  // use direct handoff


        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);


        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }


    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {fetchRegistryFromBackup();
    }


    // call and execute the pre registration handler before all background tasks (inc registration) is started
    if (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();
    }
    // 这里 clientConfig.shouldEnforceRegistrationAtInit()默认为 false
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }


    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    initScheduledTasks();


    try {Monitors.registerObject(this);
    } catch (Throwable e) {logger.warn("Cannot register timers", e);
    }


    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);


    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());

}

DiscoveryClient初始化时,在本地缓存放入了空列表,创建了定时任务启动器和线程池,启动定时任务;由于 clientConfig.shouldEnforceRegistrationAtInit() 默认为false,所以在初始化时并不会执行注册方法register()

/**
 * Initializes all scheduled tasks.
 */
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor:" + "renew interval is: {}", renewalIntervalInSecs);
        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()),
                renewalIntervalInSecs, TimeUnit.SECONDS);
        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {return "statusChangeListener";}
            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();}
        };
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {logger.info("Not registering with Eureka server per configuration");
    }
}
/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

在初始化定时任务方法中,启动定时任务刷新缓存和执行心跳。
主要是三个定时任务:刷新本地缓存 cacheRefresh、心跳heartbeat、定时上报服务信息InstanceInfoReplicator,再加上一个状态监听StatusChangeListener。其中cacheRefreshheartbeat共用一个 scheduler,并且都套了一层TimedSupervisorTask,默认每30 秒 刷新一次缓存。如果超时重新设置间隔,新的间隔最大不能超过默认值 10 倍;每 10 秒 一次心跳,如果超时重新设置间隔,则新的间隔,最大不能超过默认值 10 倍。这 4 个值都是可以通过配置来改变。


public class TimedSupervisorTask extends TimerTask {private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);
    private final Counter successCounter;
    private final Counter timeoutCounter;
    private final Counter rejectedCounter;
    private final Counter throwableCounter;
    private final LongGauge threadPoolLevelGauge;
    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor executor;
    private final long timeoutMillis;
    private final Runnable task;
    private final AtomicLong delay;
    private final long maxDelay;
    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;
        // Initialize the counters and register.
        successCounter = Monitors.newCounter("success");
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }
    @Override
    public void run() {
        Future<?> future = null;
        try {future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();} catch (TimeoutException e) {logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn("task supervisor shutting down, reject the task", e);
            } else {logger.warn("task supervisor rejected the task", e);
            }
            rejectedCounter.increment();} catch (Throwable e) {if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn("task supervisor shutting down, can't accept the task");
            } else {logger.warn("task supervisor threw an exception", e);
            }
            throwableCounter.increment();} finally {if (future != null) {future.cancel(true);
            }
            if (!scheduler.isShutdown()) {scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}

TimedSupervisorTask通过 通过线程池 +Future方式防止任务超时,如果任务超时,则将下个任务的间隔时间延长一倍,如果没有超时则恢复成最初的间隔。
前面有提到,DiscoveryClient初始化时默认不会调用 register 方法来注册信息,那什么时候会注册?通过源码可以看到有几个地方会调用 register 方法。
一、在前面提到的初始化定时任务里,有个上报节点信息的定时任务,如果发现当前节点状态 Dirty,就会调用注册方法,而InstanceInfoReplicator 在刚启动时,会将 Dirty 设为 true,就是为了服务刚启动时,向eureka-server 注册信息。

class InstanceInfoReplicator implements Runnable {public void start(int initialDelayMs) {if (started.compareAndSet(false, true)) {instanceInfo.setIsDirty(); // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
    
    public void run() {
        try {discoveryClient.refreshInstanceInfo();


            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);
        } finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

}

二、在心跳 / 续约时 (renew),如果eureka-server 返回服务不存在,也会重新注册服务。

public class DiscoveryClient implements EurekaClient {
    /**
     * The heartbeat task that renews the lease in the given intervals.
     */
    private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

}    

了解了服务注册之后,再了解下本地缓存原理,缓存刷新线程 CacheRefreshThread 是调用 DiscoveryClientrefreshRegistry方法。


class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();
    }
}
@VisibleForTesting
void refreshRegistry() {
    try {boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                "ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();}
        if (logger.isDebugEnabled()) {StringBuilder allAppsHashCodes = new StringBuilder();
            allAppsHashCodes.append("Local region apps hashcode:");
            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
            allAppsHashCodes.append(", is fetching remote regions?");
            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {allAppsHashCodes.append(", Remote region:");
                allAppsHashCodes.append(entry.getKey());
                allAppsHashCodes.append(", apps hashcode:");
                allAppsHashCodes.append(entry.getValue().getAppsHashCode());
            }
            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {}",
                    allAppsHashCodes);
        }
    } catch (Throwable e) {logger.error("Cannot fetch registry from server", e);
    }
}

private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {if (tracer != null) {tracer.stop();
        }
    }
    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();
    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();
    // registry was fetched successfully, so return true
    return true;
}

fetchRegistry 方法可以看到,如果配置了 禁用增量刷新 / 开启强制全量刷新 / 当前没有数据 / 当前没有版本号 时,会全量刷新,否则是增量刷新。

private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();
    logger.info("Getting all instance registry info from the eureka server");
    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());
    if (apps == null) {logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {logger.warn("Not updating applications as another thread is updating it already");
    }
}

全量刷新就是去 eureka-server 获取全部信息,并且覆盖掉本地缓存 localRegionApps 已有的值。

private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();
    }
    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe."
                + "Hence got the full registry.");
        getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {fetchRegistryUpdateLock.unlock();
            }
        } else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

增量更新是调用 apps/delta 接口获取最新更新的数据 (详细怎么更新在eureka-server 篇),并根据 InstanceInfoActionType属性来做增量修改。
appsHashCode是数据一致性哈希码,eureka会将本地数据(client)计算的哈希码和接口(server)返回的哈希码做比较,如果比较结果不一致,就重新全量获取数据reconcileAndLogDifference()

private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {logger.debug("The Reconcile hashcodes do not match, client : {}, server : {}. Getting the full registry",
            reconcileHashCode, delta.getAppsHashCode());
    RECONCILE_HASH_CODES_MISMATCH.increment();
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    Applications serverApps = httpResponse.getEntity();
    if (serverApps == null) {logger.warn("Cannot fetch full registry from the server; reconciliation failure");
       return;
   }
    if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(serverApps));
        getApplications().setVersion(delta.getVersion());
       logger.debug("The Reconcile hashcodes after complete sync up, client : {}, server : {}.",
                getApplications().getReconcileHashCode(),
                delta.getAppsHashCode());
    } else {logger.warn("Not setting the applications map as another thread has advanced the update generation");
    }
}

appsHashCode是数据一致性哈希码的规则是:“状态 1_状态 1 对应的服务数量_状态 2_状态 2 对应的服务数量_…”。

public class Applications {
    private static final String STATUS_DELIMITER = "_";
    @JsonIgnore
    public String getReconcileHashCode() {TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
        populateInstanceCountMap(instanceCountMap);
        return getReconcileHashCode(instanceCountMap);
    }
    public void populateInstanceCountMap(Map<String, AtomicInteger> instanceCountMap) {for (Application app : this.getRegisteredApplications()) {for (InstanceInfo info : app.getInstancesAsIsFromEureka()) {AtomicInteger instanceCount = instanceCountMap.computeIfAbsent(info.getStatus().name(),
                        k -> new AtomicInteger(0));
                instanceCount.incrementAndGet();}
        }
    }
    public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) {StringBuilder reconcileHashCode = new StringBuilder(75);
        for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER).append(mapEntry.getValue().get())
                    .append(STATUS_DELIMITER);
        }
        return reconcileHashCode.toString();}

}

注销

public class DiscoveryClient implements EurekaClient {
    @PreDestroy
    @Override
    public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");
            if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }
            cancelScheduledTasks();
            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                unregister();}
            if (eurekaTransport != null) {eurekaTransport.shutdown();
            }
            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();
            logger.info("Completed shut down of DiscoveryClient");
        }
    }
    private void cancelScheduledTasks() {if (instanceInfoReplicator != null) {instanceInfoReplicator.stop();
        }
        if (heartbeatExecutor != null) {heartbeatExecutor.shutdownNow();
        }
        if (cacheRefreshExecutor != null) {cacheRefreshExecutor.shutdownNow();
        }
        if (scheduler != null) {scheduler.shutdownNow();
        }
    } 
    /**
     * unregister w/ the eureka service.
     */
    void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }
    private static final class EurekaTransport {
        private ClosableResolver bootstrapResolver;
        private TransportClientFactory transportClientFactory;
        private EurekaHttpClient registrationClient;
        private EurekaHttpClientFactory registrationClientFactory;
        private EurekaHttpClient queryClient;
        private EurekaHttpClientFactory queryClientFactory;
        void shutdown() {if (registrationClientFactory != null) {registrationClientFactory.shutdown();
            }
            if (queryClientFactory != null) {queryClientFactory.shutdown();
            }
            if (registrationClient != null) {registrationClient.shutdown();
            }
            if (queryClient != null) {queryClient.shutdown();
            }
            if (transportClientFactory != null) {transportClientFactory.shutdown();
            }
            if (bootstrapResolver != null) {bootstrapResolver.shutdown();
            }
        }
    }  
}    

从代码里可以看到,注销步骤很简答:

  1. 停止几个定时任务。
  2. 调用 cancel 接口向 eureka-server 发送注销请求。
  3. 停止一些 clientmonitor

状态变更

最后我们再来看下 eureka-client 的状态变更。
InstanceInfo刚创建的时候,状态是InstanceStatus.STARTING

public class ApplicationInfoManager {public void initComponent(EurekaInstanceConfig config) {
        try {
            this.config = config;
            this.instanceInfo = new EurekaConfigBasedInstanceInfoProvider(config).get();} catch (Throwable e) {throw new RuntimeException("Failed to initialize ApplicationInfoManager", e);
        }
    }
}

之后会变成InstanceStatus.UP(以下代码是 springcloud-eureka-client)。

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
   @Override
   public void register(EurekaRegistration reg) {maybeInitializeClient(reg);
      if (log.isInfoEnabled()) {log.info("Registering application" + reg.getApplicationInfoManager().getInfo().getAppName()
               + "with eureka with status"
               + reg.getInstanceConfig().getInitialStatus());
      }
      reg.getApplicationInfoManager()
            .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
      reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->
            reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
   }
}

服务停止时会从 InstanceStatus.UP 变成InstanceStatus.DOWN(以下代码是 springcloud-eureka-client)。

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
    @Override
    public void deregister(EurekaRegistration reg) {if (reg.getApplicationInfoManager().getInfo() != null) {if (log.isInfoEnabled()) {log.info("Unregistering application" + reg.getApplicationInfoManager().getInfo().getAppName()
                        + "with eureka with status DOWN");
            }
            reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
            //shutdown of eureka client should happen with EurekaRegistration.close()
            //auto registration will create a bean which will be properly disposed
            //manual registrations will need to call close()}
    }
}

流程图

定时任务简易流程:

正文完
 0