共计 29732 个字符,预计需要花费 75 分钟才能阅读完成。
Eureka Client 是一个 Java 客户端,用于简化与 Eureka Server 的交互,客户端同时也具备一个内置的、应用轮询负载算法的负载均衡器。
在利用启动后,将会向 Eureka Server 发送心跳(默认周期为 30 秒),如果 Eureka Server 在多个心跳周期没有收到某个节点的心跳,Eureka Server 将会从服务注册表中把这个服务节点移除(默认 90 秒)。Eureka Client 具备缓存的机制,即便所有的 Eureka Server 都挂掉的话,客户端仍然能够利用缓存中的信息生产其它服务的 API。上面一起来看 Client 客户端相干操作的流程图。
1. 从启动类动手
从启动类的 @EnableDiscoveryClient 注解动手看调用流程。进入 EnableDiscoveryClient 之后,通过正文晓得它的作用是为了激活 DiscoveryClient:
首先是在类头应用了 import 注解引入了:EnableDiscoveryClientImportSelector。该类的次要作用是实例化:AutoServiceRegistrationConfiguration。
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
extends SpringFactoryImportSelector<EnableDiscoveryClient> {
@Override
public String[] selectImports(AnnotationMetadata metadata) {
**// 调用父类的办法, 拿到通过父类办法要注入的全门路类名数组 **
String[] imports = super.selectImports(metadata);
**// 取得该注解 (@EnableDiscoveryClient) 的所有属性参数 **
AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
**// 取得属性 autoRegister 的值, 该值默认是 true 的 **
boolean autoRegister = attributes.getBoolean("autoRegister");
**// 依据注解配置来判断是否要实例化上面的那个主动配置类 **
if (autoRegister) {List<String> importsList = new ArrayList<>(Arrays.asList(imports));
importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
imports = importsList.toArray(new String[0]);
} else {Environment env = getEnvironment();
if(ConfigurableEnvironment.class.isInstance(env)) {ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env;
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
map.put("spring.cloud.service-registry.auto-registration.enabled", false);
MapPropertySource propertySource = new MapPropertySource("springCloudDiscoveryClient", map);
configEnv.getPropertySources().addLast(propertySource);
}
}
return imports;
}
@Override
protected boolean isEnabled() {return getEnvironment().getProperty("spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE);
}
@Override
protected boolean hasDefaultFactory() {return true;}
}
这里最终的目标是想实例化:AutoServiceRegistrationConfiguration,咱们来看他做了什么:
@Configuration
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public class AutoServiceRegistrationAutoConfiguration {@Autowired(required = false)
private AutoServiceRegistration autoServiceRegistration;
@Autowired
private AutoServiceRegistrationProperties properties;
@PostConstruct
protected void init() {if (autoServiceRegistration == null && this.properties.isFailFast()) {throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
}
}
}
从这里看次要目标是为了实例化:AutoServiceRegistration,AutoServiceRegistrationProperties 这两个类。那么初始化这两个 bean 的作用是什么呢,查看调用的中央:
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
......
......
......
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) {return EurekaRegistration.builder(instanceConfig)
.with(applicationInfoManager)
.with(eurekaClient)
.with(healthCheckHandler)
.build();}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {return new EurekaAutoServiceRegistration(context, registry, registration);
}
......
......
......
}
起因是在这里实例化 bean 的时候被作为前置条件。
EurekaClientAutoConfiguration 算是到目前为止比拟重要的一个类,次要做的事件包含:
- 注册 EurekaClientConfigBean,初始化 client 端配置信息;
- 注册 EurekaInstanceConfigBean,初始化客户端实例信息;
- 初始化 EurekaRegistration,EurekaServiceRegistry,EurekaAutoServiceRegistration 实现 Eureka 服务主动注册;
- 初始化 EurekaClient,ApplicationInfoManager。EurekaClient 的默认实现是 DiscoveryClient,是咱们接下来要剖析的重点;
- 初始化 EurekaHealthIndicator,为 /health 端点提供 Eureka 相干信息,次要有 Status 以后实例状态和 applications 服务列表。
持续看 EurekaClientAutoConfiguration 又在哪里被应用:
@ConditionalOnClass(ConfigServicePropertySourceLocator.class)
@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)
@Configuration
@Import({ EurekaDiscoveryClientConfiguration.class, **// this emulates @EnableDiscoveryClient, the import selector doesn't run before the bootstrap phase**
EurekaClientAutoConfiguration.class })
public class EurekaDiscoveryClientConfigServiceBootstrapConfiguration {}
在 EurekaDiscoveryClientConfigServiceBootstrapConfiguration 类中被作为注入的对象。
而 EurekaDiscoveryClientConfigServiceBootstrapConfiguration 被援用的分中央就比拟非凡,被配置在配置文件中。
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
这个配置的 Key 局部对应着一个注解类 BootstrapConfiguration:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BootstrapConfiguration {
**/****
\* Exclude specific auto-configuration classes such that they will never be applied.
*/
Class<?>[] exclude() default {};}
他被应用的中央是:BootstrapApplicationListener 的 164 行,在这里拿到类的全门路之后,186 行进行加载类。
public class BootstrapApplicationListener
implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {
......
@Override
public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {ConfigurableEnvironment environment = event.getEnvironment();
......
ConfigurableApplicationContext context = null;
String configName = environment
.resolvePlaceholders("${spring.cloud.bootstrap.name:bootstrap}");
for (ApplicationContextInitializer<?> initializer : event.getSpringApplication()
.getInitializers()) {if (initializer instanceof ParentContextApplicationContextInitializer) {
context = findBootstrapContext((ParentContextApplicationContextInitializer) initializer,
configName);
}
}
if (context == null) {
**// 在这里被调用 **
context = bootstrapServiceContext(environment, event.getSpringApplication(),
configName);
}
apply(context, event.getSpringApplication(), environment);
}
private ConfigurableApplicationContext bootstrapServiceContext(
ConfigurableEnvironment environment, final SpringApplication application,
String configName) {StandardEnvironment bootstrapEnvironment = new StandardEnvironment();
MutablePropertySources bootstrapProperties = bootstrapEnvironment
.getPropertySources();
for (PropertySource<?> source : bootstrapProperties) {bootstrapProperties.remove(source.getName());
}
String configLocation = environment
.resolvePlaceholders("${spring.cloud.bootstrap.location:}");
Map<String, Object> bootstrapMap = new HashMap<>();
bootstrapMap.put("spring.config.name", configName);
**// if an app (or test) uses spring.main.web-application-type=reactive, bootstrap will fail**
**// force the environment to use none, because if though it is set below in the builder**
**// the environment overrides it**
bootstrapMap.put("spring.main.web-application-type", "none");
if (StringUtils.hasText(configLocation)) {bootstrapMap.put("spring.config.location", configLocation);
}
bootstrapProperties.addFirst(new MapPropertySource(BOOTSTRAP_PROPERTY_SOURCE_NAME, bootstrapMap));
for (PropertySource<?> source : environment.getPropertySources()) {if (source instanceof StubPropertySource) {continue;}
bootstrapProperties.addLast(source);
}
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
**// 在这里扫描 BootstrapConfiguration 注解 **
List<String> names = new ArrayList<>(SpringFactoriesLoader
.loadFactoryNames(BootstrapConfiguration.class, classLoader));
for (String name : StringUtils.commaDelimitedListToStringArray(environment.getProperty("spring.cloud.bootstrap.sources", ""))) {names.add(name);
}
**//** **TODO:** **is it possible or sensible to share a ResourceLoader?**
SpringApplicationBuilder builder = new SpringApplicationBuilder()
.profiles(environment.getActiveProfiles()).bannerMode(Mode.OFF)
.environment(bootstrapEnvironment)
**// Don't use the default properties in this builder**
.registerShutdownHook(false).logStartupInfo(false)
.web(WebApplicationType.NONE);
if (environment.getPropertySources().contains("refreshArgs")) {
**// If we are doing a context refresh, really we only want to refresh the**
**// Environment, and there are some toxic listeners (like the**
**// LoggingApplicationListener) that affect global static state, so we need a**
**// way to switch those off.**
builder.application()
.setListeners(filterListeners(builder.application().getListeners()));
}
List<Class<?>> sources = new ArrayList<>();
for (String name : names) {Class<?> cls = ClassUtils.resolveClassName(name, null);
try {cls.getDeclaredAnnotations();
}
catch (Exception e) {continue;}
sources.add(cls);
}
AnnotationAwareOrderComparator.sort(sources);
builder.sources(sources.toArray(new Class[sources.size()]));
final ConfigurableApplicationContext context = builder.run();
**// gh-214 using spring.application.name=bootstrap to set the context id via**
**// `ContextIdApplicationContextInitializer` prevents apps from getting the actual**
**// spring.application.name**
**// during the bootstrap phase.**
context.setId("bootstrap");
**// Make the bootstrap context a parent of the app context**
addAncestorInitializer(application, context);
**// It only has properties in it now that we don't want in the parent so remove**
**// it (and it will be added back later)**
bootstrapProperties.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME);
mergeDefaultProperties(environment.getPropertySources(), bootstrapProperties);
return context;
}
......
......
}
BootstrapApplicationListener 实现了 ApplicationEnvironmentPreparedEvent,作为监听器在我的项目启动的时候被加载。Spring 依据利用启动的过程, 提供了四种事件供咱们应用:
· ApplicationStartedEvent:Spring Boot 启动开始时执行的事件;
· ApplicationEnvironmentPreparedEvent:Spring Boot 对应 Enviroment 曾经筹备结束,但此时上下文 context 还没有创立;
· ApplicationPreparedEvent:Spring Boot 上下文 context 创立实现,但此时 spring 中的 bean 是没有齐全加载实现的;
· ApplicationFailedEvent:Spring Boot 启动异样时执行事件。
即这里的 BootstrapApplicationListener 是在我的项目启动加载环境变量实现,还没有创立 bean 的时候去加载的。
剖析到这里,咱们把整个的 EnableDiscoveryClient 注解的初始化链路都走了一遍。大抵时序图如下图所示:
总结下面剖析的局部次要两个作用:
1. 初始化配置文件;
2. 激活 DiscoveryClient。
上面就开始剖析 DiscoveryClient 的作用。
2. DiscoveryClient
启动客户端的时候查看启动日志你会看到服务注册也是从 DiscoveryClient 类中收回的:
足以见得这个类在服务注册过程中应该做了一些重要的事件。上面一起来剖析一下具体实现。
2.1 服务注册
DiscoveryClient 是一个接口,持续观看它的实现类,能够看到每个实现类中都有一个:DESCRIPTION 字段,这个字段明确形容了以后类的作用。
- EurekaDiscoveryClient:client 的次要实现逻辑类;
- CompositeDiscoveryClient:会装载别的服务注册客户端,程序查找;
- NoopDiscoveryClient:曾经被废除;
- SimpleDiscoveryClient:具体的服务实例从 SimpleDiscoveryProperties 配置中获取。
从形容上看 EurekaDiscoveryClient 是 client 的次要实现类。而在 EurekaDiscoveryClient 中,获取 client 实例次要是从 EurekaClient 中查找的:
@Override
public List<ServiceInstance> getInstances(String serviceId) {List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) { instances.add(new EurekaServiceInstance(info));
}
return instances;
}
DiscoveryClient 是 EurekaClient 的惟一实现类,他有一个很重要的构造方法:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();} else {logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
**// 下面次要是初始化一些参数 **
**// 如果 shouldFetchRegistry= true,注册监控 **
if (config.shouldFetchRegistry()) {this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
**// 如果 shouldRegisterWithEureka=true,注册监控 **
if (config.shouldRegisterWithEureka()) {this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
**// 如果 shouldRegisterWithEureka = false && shouldFetchRegistry=false**
**// 就不做初始化的工作,间接返回 **
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
**// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()**
**// to work with DI'd DiscoveryClient**
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; **// no need to setup up an network tasks and we are done**
}
**// 从这里开始创立各种工作的线程池 **
try {
**// default size of 2 - 1 each for heartbeat and cacheRefresh**
**// 创立定时线程池,线程数量为 2 个,别离用来维持心跳连贯和刷新其余 eureka client 实例缓存 **
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
**// 创立一个线程池,线程池大小默认为 2 个,用来维持心跳连贯 **
heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()); **// use direct handoff**
**// 创立一个线程池,线程池大小默认为 2 个,用来刷新其余 eureka client 实例缓存 **
cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()); **// use direct handoff**
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
......
......
......
} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
**// 抓取近程实例注册信息,fetchRegistry()办法里的参数,这里为 false,意思是要不要强制抓取所有实例注册信息 **
**// 这里获取注册信息,分两种形式,一种是全量获取,另一种是增量获取,默认是增量获取 **
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
**// 如果配置的是要获取实例注册信息,然而从近程获取失败,从备份获取实例注册信息 **
fetchRegistryFromBackup();}
**// call and execute the pre registration handler before all background tasks (inc registration) is started**
if (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();
}
**// 如果 client 配置注册到 eureka server 且 强制 初始化就注册到 eureka 那么就注册到 eureka server,默认是不初始化就注册到 eureka**
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
**// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch**
**// 初始化维持心跳连贯、更新注册信息缓存的定时工作 **
initScheduledTasks();
try {Monitors.registerObject(this);
} catch (Throwable e) {logger.warn("Cannot register timers", e);
}
**// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()**
**// to work with DI'd DiscoveryClient**
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
初始化的过程次要做了两件事:
- 创立了 scheduler 定时工作的线程池,heartbeatExecutor 心跳查看线程池 ( 服务续约),cacheRefreshExecutor 服务获取线程池;
- 调用 initScheduledTasks()办法开启线程池,往上面 3 个线程池别离增加相应工作。而后创立了一个 instanceInfoReplicator(Runnable 工作),而后调用 InstanceInfoReplicator.start 办法,把这个工作放进下面 scheduler 定时工作线程池 ( 服务注册并更新)。
接着看 initScheduledTasks 做了哪些事件:
private void initScheduledTasks() {
**// 获取服务列表信息 **
if (clientConfig.shouldFetchRegistry()) {
**// registry cache refresh timer**
**// 获取默认的注册频率信息,默认 30S**
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
**// 如果缓存刷新超时,下一次执行的 delay 最大是 registryFetchIntervalSeconds 的几倍(默认 10),默认每次执行是上一次的 2 倍 **
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
**// 执行 CacheRefreshThread,服务列表缓存刷新工作 **
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
**// 注册到 eureka server**
if (clientConfig.shouldRegisterWithEureka()) {
**// 续租工夫距离,默认 30s**
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
**// 如果心跳工作超时,下一次执行的 delay 最大是 renewalIntervalInSecs 的几倍(默认 10),默认每次执行是上一次的 2 倍 **
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor:" + "renew interval is: {}", renewalIntervalInSecs);
**// Heartbeat timer**
**// 执行 HeartbeatThread,发送心跳数据 **
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()),
renewalIntervalInSecs, TimeUnit.SECONDS);
**// 客户端实例信息复制 **
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); **// burstSize**
**// 注册监听器 **
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {return "statusChangeListener";}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
**// log at warn level if DOWN was involved**
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
**// 进行服务刷新 **
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {logger.info("Not registering with Eureka server per configuration");
}
}
总的来说 initScheduledTasks()做了以下几件事:
· 如果 shouldFetchRegistry=true,即要从 Eureka Server 获取服务列表:
启动刷新服务列表定时线程(DiscoveryClient-CacheRefreshExecutor-%d),默认 registryFetchIntervalSeconds=30s 执行一次,工作为 CacheRefreshThread,即从 Eureka Server 获取服务列表,也刷新客户端缓存。
· 如果 shouldRegisterWithEureka=true,即要注册到 Eureka Server。
启动 heartbeat 心跳定时线程(DiscoveryClient-HeartbeatExecutor-%d),默认 renewalIntervalInSecs=30s 续约一次,工作为 HeartbeatThread,即客户端向 Eureka Server 发送心跳;
启动 InstanceInfo 复制器定时线程(DiscoveryClient-InstanceInfoReplicator-%d),开启定时线程查看以后 Instance 的 DataCenterInfo、LeaseInfo、InstanceStatus,如果发现变更就执行 discoveryClient.register(),将实例信息同步到 Server 端。
下面有一个须要关注的点是:InstanceInfoReplicator。它会去定时刷新客户端实例的最新信息:以后实例最新数据,租约信息,实例状态。InstanceInfoReplicator 是一个线程类,关注 run()办法:
public void run() {
try {
**/****
\* 刷新 InstanceInfo
\* 1、刷新 DataCenterInfo
\* 2、刷新 LeaseInfo 租约信息
\* 3、依据 HealthCheckHandler 获取 InstanceStatus,并更新,如果状态发生变化会触发所有 StatusChangeListener
*/
discoveryClient.refreshInstanceInfo();
**// 刷新完之后,以后服务有变更,还未同步给 server,发动注册 **
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
**// 发动注册 **
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);
} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
看一下 register()的实现:
**/****
\* Register with the eureka service by making the appropriate REST call.
\* 应用 http 的形式注册 eureka 服务
*/
boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;}
往下跟踪到 RestTemplateEurekaHttpClient 类:
public class RestTemplateEurekaHttpClient implements EurekaHttpClient {protected final Log logger = LogFactory.getLog(getClass());
private RestTemplate restTemplate;
private String serviceUrl;
public RestTemplateEurekaHttpClient(RestTemplate restTemplate, String serviceUrl) {
this.restTemplate = restTemplate;
this.serviceUrl = serviceUrl;
if (!serviceUrl.endsWith("/")) {this.serviceUrl = this.serviceUrl+"/";}
}
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = serviceUrl + "apps/" + info.getAppName();
HttpHeaders headers = new HttpHeaders();
headers.add(HttpHeaders.ACCEPT_ENCODING, "gzip");
headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.POST,
new HttpEntity<>(info, headers), Void.class);
return anEurekaHttpResponse(response.getStatusCodeValue())
.headers(headersOf(response)).build();}
......
......
......
}
封装了 RestTemplate http client 模板办法,给 server 端发送一个 post 申请。所以启动 client 的时候,向服务端发送注册申请的中央就在这里。
2.2 服务续约
服务续约的入口在 DiscoveryClient 类 initScheduledTasks()办法的 heartBeat timer 定时器工作中:
// Heartbeat timer**
**// 开启定时工作每隔 30s 发送一次 心跳申请 **
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()),
renewalIntervalInSecs, TimeUnit.SECONDS);
**/****
\* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
**/****
\* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id,
InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = serviceUrl + "apps/" + appName + '/' + id + "?status="
\+ info.getStatus().toString() + "&lastDirtyTimestamp="
\+ info.getLastDirtyTimestamp().toString() + (overriddenStatus != null
? "&overriddenstatus=" + overriddenStatus.name() : "");
ResponseEntity<InstanceInfo> response = restTemplate.exchange(urlPath,
HttpMethod.PUT, null, InstanceInfo.class);
EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatusCodeValue(), InstanceInfo.class)
.headers(headersOf(response));
if (response.hasBody())
eurekaResponseBuilder.entity(response.getBody());
return eurekaResponseBuilder.build();}
下面贴出来了客户端发送心跳申请的残缺调用过程,每隔 30s 客户端向服务端发送一次申请,向服务端从新注册本人。
2.3 服务下线
服务下线比拟好了解,在服务敞开的时候勾销本机的各种定时工作,给服务端发送申请告知本人下线。
**/****
\* Shuts down Eureka Client. Also sends a deregistration request to the
\* eureka server.
*/
@PreDestroy
@Override
public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
**// 勾销各种定时工作 **
cancelScheduledTasks();
**// If APPINFO was registered**
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
**// 向服务端发送申请告知本人下线 **
unregister();}
if (eurekaTransport != null) {eurekaTransport.shutdown();
}
**// 敞开监控 **
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
**/****
\* unregister w/ the eureka service.
*/
void unregister() {
**// It can be null if shouldRegisterWithEureka == false**
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = serviceUrl + "apps/" + appName + '/' + id;
ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.DELETE,
null, Void.class);
return anEurekaHttpResponse(response.getStatusCodeValue())
.headers(headersOf(response)).build();}
2.4 服务获取 和 服务刷新
服务启动的时候会去服务端全量拉取所有曾经注册过的其余 client 实例信息,增量的时候就是在 initScheduledTasks() 办法中每 30s 增量跑一次。
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {
**// registry cache refresh timer**
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
......
......
......
}
**/****
\* The task that fetches the registry information at specified intervals.
*
*/
class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
**// This makes sure that a dynamic change to remote regions to fetch is honored.**
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
......
......
......
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();}
......
......
......
} catch (Throwable e) {logger.error("Cannot fetch registry from server", e);
}
}
**/****
\* Fetches the registry information.
*
\* <p>
\* This method tries to get only deltas after the first fetch unless there
\* is an issue in reconciling eureka server and client registry information.
\* </p>
*
\* @param forceFullRegistryFetch Forces a full registry fetch.
*
\* @return true if the registry was fetched
*/
private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
**// 如果当初增量服务获取不可用,或者是第一次获取服务的时候,拉去所有的利用 **
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) **//Client application does not have latest library supporting delta**
{logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {if (tracer != null) {tracer.stop();
}
}
**// Notify about cache refresh before updating the instance remote status**
onCacheRefreshed();
**// Update remote status based on refreshed data held in the cache**
updateInstanceRemoteStatus();
**// registry was fetched successfully, so return true**
return true;
}
客户端拉取服务端保留的所有客户端节点信息保留工夫为 3 分钟,Eureka client 获得的数据尽管是增量更新,依然可能和 30 秒前取的数据一样,所以 Eureka client 要本人来解决反复信息。
另外,留神到在 fetchRegistry()办法中:
applications.setAppsHashCode(applications.getReconcileHashCode());
每次增量更新,服务端都会带过去一个一致性 hash 码。Eureka client 的增量更新,其实获取的是 Eureka server 最近三分钟内的变更,如果 Eureka client 有超过三分钟没有做增量更新的话(例如网络问题),这就造成了 Eureka server 和 Eureka client 之间的数据不统一。失常状况下,Eureka client 屡次增量更新后,最终的服务列表数据应该 Eureka server 保持一致,但如果期间产生异样,可能导致和 Eureka server 的数据不统一,为了裸露这个问题,Eureka server 每次返回的增量更新数据中,会带有一致性哈希码,Eureka client 用本地服务列表数据算出的一致性哈希码应该和 Eureka server 返回的统一,若不统一就证实增量更新出了问题导致 Eureka client 和 Eureka server 上的服务列表信息不统一了,此时须要全量更新。
对于客户端的代码剖析就到这里,可从以下两个角度去剖析:
1. 从启动类动手,查看初始化了什么;
2. 从启动日志动手,查看启动类做了什么。
最初,放上在微服务中 Eureka 的 Server 与 Client 启动流程汇总图,图示过程如下图所示: