启动
eureka-client
启动后,会向 eureka-server
注册,同时会定时续约 (renew);为了提升性能,eureka-client
启用了本地缓存,缓存存在 localRegionApps
里,定时更新缓存。eureka-client
的核心类是DiscoveryClient
。
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();} else {logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) {this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
if (config.shouldRegisterWithEureka()) {this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();
}
// 这里 clientConfig.shouldEnforceRegistrationAtInit()默认为 false
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
try {Monitors.registerObject(this);
} catch (Throwable e) {logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
DiscoveryClient
初始化时,在本地缓存放入了空列表,创建了定时任务启动器和线程池,启动定时任务;由于 clientConfig.shouldEnforceRegistrationAtInit()
默认为false
,所以在初始化时并不会执行注册方法register()
。
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor:" + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {return "statusChangeListener";}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {logger.info("Not registering with Eureka server per configuration");
}
}
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
在初始化定时任务方法中,启动定时任务刷新缓存和执行心跳。
主要是三个定时任务:刷新本地缓存 cacheRefresh
、心跳heartbeat
、定时上报服务信息InstanceInfoReplicator
,再加上一个状态监听StatusChangeListener
。其中cacheRefresh
和heartbeat
共用一个 scheduler
,并且都套了一层TimedSupervisorTask
,默认每30 秒
刷新一次缓存。如果超时重新设置间隔,新的间隔最大不能超过默认值 10 倍;每 10 秒
一次心跳,如果超时重新设置间隔,则新的间隔,最大不能超过默认值 10 倍。这 4 个值都是可以通过配置来改变。
public class TimedSupervisorTask extends TimerTask {private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);
private final Counter successCounter;
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor executor;
private final long timeoutMillis;
private final Runnable task;
private final AtomicLong delay;
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counters and register.
successCounter = Monitors.newCounter("success");
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
@Override
public void run() {
Future<?> future = null;
try {future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();} catch (TimeoutException e) {logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn("task supervisor shutting down, reject the task", e);
} else {logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();} catch (Throwable e) {if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn("task supervisor shutting down, can't accept the task");
} else {logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();} finally {if (future != null) {future.cancel(true);
}
if (!scheduler.isShutdown()) {scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
TimedSupervisorTask
通过 通过线程池 +Future
方式防止任务超时,如果任务超时,则将下个任务的间隔时间延长一倍,如果没有超时则恢复成最初的间隔。
前面有提到,DiscoveryClient
初始化时默认不会调用 register
方法来注册信息,那什么时候会注册?通过源码可以看到有几个地方会调用 register
方法。
一、在前面提到的初始化定时任务里,有个上报节点信息的定时任务,如果发现当前节点状态 Dirty
,就会调用注册方法,而InstanceInfoReplicator
在刚启动时,会将 Dirty
设为 true
,就是为了服务刚启动时,向eureka-server
注册信息。
class InstanceInfoReplicator implements Runnable {public void start(int initialDelayMs) {if (started.compareAndSet(false, true)) {instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public void run() {
try {discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);
} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
二、在心跳 / 续约时 (renew),如果eureka-server
返回服务不存在,也会重新注册服务。
public class DiscoveryClient implements EurekaClient {
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
}
了解了服务注册之后,再了解下本地缓存原理,缓存刷新线程 CacheRefreshThread
是调用 DiscoveryClient
的refreshRegistry
方法。
class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
"ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();}
if (logger.isDebugEnabled()) {StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode:");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions?");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {allAppsHashCodes.append(", Remote region:");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(", apps hashcode:");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {}",
allAppsHashCodes);
}
} catch (Throwable e) {logger.error("Cannot fetch registry from server", e);
}
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {if (tracer != null) {tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
从 fetchRegistry
方法可以看到,如果配置了 禁用增量刷新 / 开启强制全量刷新 / 当前没有数据 / 当前没有版本号
时,会全量刷新,否则是增量刷新。
private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {logger.warn("Not updating applications as another thread is updating it already");
}
}
全量刷新就是去 eureka-server
获取全部信息,并且覆盖掉本地缓存 localRegionApps
已有的值。
private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe."
+ "Hence got the full registry.");
getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {fetchRegistryUpdateLock.unlock();
}
} else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
增量更新是调用 apps/delta
接口获取最新更新的数据 (详细怎么更新在eureka-server
篇),并根据 InstanceInfo
的ActionType
属性来做增量修改。appsHashCode
是数据一致性哈希码,eureka
会将本地数据(client)计算的哈希码和接口(server)返回的哈希码做比较,如果比较结果不一致,就重新全量获取数据reconcileAndLogDifference()
。
private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {logger.debug("The Reconcile hashcodes do not match, client : {}, server : {}. Getting the full registry",
reconcileHashCode, delta.getAppsHashCode());
RECONCILE_HASH_CODES_MISMATCH.increment();
long currentUpdateGeneration = fetchRegistryGeneration.get();
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
Applications serverApps = httpResponse.getEntity();
if (serverApps == null) {logger.warn("Cannot fetch full registry from the server; reconciliation failure");
return;
}
if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(serverApps));
getApplications().setVersion(delta.getVersion());
logger.debug("The Reconcile hashcodes after complete sync up, client : {}, server : {}.",
getApplications().getReconcileHashCode(),
delta.getAppsHashCode());
} else {logger.warn("Not setting the applications map as another thread has advanced the update generation");
}
}
appsHashCode
是数据一致性哈希码的规则是:“状态 1_状态 1 对应的服务数量_状态 2_状态 2 对应的服务数量_…”。
public class Applications {
private static final String STATUS_DELIMITER = "_";
@JsonIgnore
public String getReconcileHashCode() {TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
populateInstanceCountMap(instanceCountMap);
return getReconcileHashCode(instanceCountMap);
}
public void populateInstanceCountMap(Map<String, AtomicInteger> instanceCountMap) {for (Application app : this.getRegisteredApplications()) {for (InstanceInfo info : app.getInstancesAsIsFromEureka()) {AtomicInteger instanceCount = instanceCountMap.computeIfAbsent(info.getStatus().name(),
k -> new AtomicInteger(0));
instanceCount.incrementAndGet();}
}
}
public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) {StringBuilder reconcileHashCode = new StringBuilder(75);
for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER).append(mapEntry.getValue().get())
.append(STATUS_DELIMITER);
}
return reconcileHashCode.toString();}
}
注销
public class DiscoveryClient implements EurekaClient {
@PreDestroy
@Override
public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();}
if (eurekaTransport != null) {eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
private void cancelScheduledTasks() {if (instanceInfoReplicator != null) {instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {scheduler.shutdownNow();
}
}
/**
* unregister w/ the eureka service.
*/
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
private static final class EurekaTransport {
private ClosableResolver bootstrapResolver;
private TransportClientFactory transportClientFactory;
private EurekaHttpClient registrationClient;
private EurekaHttpClientFactory registrationClientFactory;
private EurekaHttpClient queryClient;
private EurekaHttpClientFactory queryClientFactory;
void shutdown() {if (registrationClientFactory != null) {registrationClientFactory.shutdown();
}
if (queryClientFactory != null) {queryClientFactory.shutdown();
}
if (registrationClient != null) {registrationClient.shutdown();
}
if (queryClient != null) {queryClient.shutdown();
}
if (transportClientFactory != null) {transportClientFactory.shutdown();
}
if (bootstrapResolver != null) {bootstrapResolver.shutdown();
}
}
}
}
从代码里可以看到,注销步骤很简答:
- 停止几个定时任务。
- 调用
cancel
接口向eureka-server
发送注销请求。 - 停止一些
client
和monitor
。
状态变更
最后我们再来看下 eureka-client
的状态变更。InstanceInfo
刚创建的时候,状态是InstanceStatus.STARTING
。
public class ApplicationInfoManager {public void initComponent(EurekaInstanceConfig config) {
try {
this.config = config;
this.instanceInfo = new EurekaConfigBasedInstanceInfoProvider(config).get();} catch (Throwable e) {throw new RuntimeException("Failed to initialize ApplicationInfoManager", e);
}
}
}
之后会变成InstanceStatus.UP
(以下代码是 springcloud-eureka-client)。
public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
@Override
public void register(EurekaRegistration reg) {maybeInitializeClient(reg);
if (log.isInfoEnabled()) {log.info("Registering application" + reg.getApplicationInfoManager().getInfo().getAppName()
+ "with eureka with status"
+ reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->
reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
}
服务停止时会从 InstanceStatus.UP
变成InstanceStatus.DOWN
(以下代码是 springcloud-eureka-client)。
public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
@Override
public void deregister(EurekaRegistration reg) {if (reg.getApplicationInfoManager().getInfo() != null) {if (log.isInfoEnabled()) {log.info("Unregistering application" + reg.getApplicationInfoManager().getInfo().getAppName()
+ "with eureka with status DOWN");
}
reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
//shutdown of eureka client should happen with EurekaRegistration.close()
//auto registration will create a bean which will be properly disposed
//manual registrations will need to call close()}
}
}
流程图
定时任务简易流程: