共计 9431 个字符,预计需要花费 24 分钟才能阅读完成。
在上篇文章中, 我们简单介绍了 EurekaServer 自动装配及启动流程解析,本篇文章则继续研究 EurekaClient 的相关代码
老规矩,先看 spring.factories
文件,其中引入了一个配置类EurekaDiscoveryClientConfigServiceBootstrapConfiguration
@ConditionalOnClass(ConfigServicePropertySourceLocator.class)
@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)
@Configuration
@Import({ EurekaDiscoveryClientConfiguration.class,
EurekaClientAutoConfiguration.class })
public class EurekaDiscoveryClientConfigServiceBootstrapConfiguration {}
上方两个注解则是这个配置类是否能够开启的条件,这里就不再展开,直接看它引入的配置类吧
EurekaDiscoveryClientConfiguration
- 细心的读者可能会发现这里又注册了一个
Marker
类,可以猜测也是某个地方的开关 -
EurekaClientConfigurationRefresher
这个类看名字就知道这是当配置被动态刷新时的一个处理器,这里也不再展开了 -
EurekaHealthCheckHandlerConfiguration
这里面注册了一个 Eureka 健康检查的处理类,这个健康检查相关的原理分析可以参考这篇文章:SpringBoot 健康检查实现原理
EurekaClientAutoConfiguration
这个类里面全是重点,也是我们本文的核心
注解
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
首先可以看到这个类一共包含这些注解,我们来一一解析比较重要的几个注解吧
@Import(DiscoveryClientOptionalArgsConfiguration.class)
引入了两个 bean,RestTemplateDiscoveryClientOptionalArgs
和MutableDiscoveryClientOptionalArgs
,这两个类的作用暂且不说
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
刚才说的 Marker
类的作用出来了
@AutoConfigureBefore
既然必须在这三个类完成自动装配之后才能进行装配,那就代表着这三个类肯定大有用途,研究一下吧
NoopDiscoveryClientAutoConfiguration
故名思意,负责服务发现的类,咱们重点关注一下其中的几个方法
init
@PostConstruct
public void init() {
String host = "localhost";
try {host = InetAddress.getLocalHost().getHostName();}
catch (UnknownHostException e) {log.warn("Cannot get host info: (" + e.getMessage() + ")");
}
int port = findPort();
this.serviceInstance = new DefaultServiceInstance(this.environment.getProperty("spring.application.name", "application"),
host, port, false);
}
这里构造了一个 DefaultServiceInstance
对象,这个对象包含了当前项目的 ip+ 端口 + 项目名称
- 注入 bean
NoopDiscoveryClient
@Bean
public DiscoveryClient discoveryClient() {return new NoopDiscoveryClient(this.serviceInstance);
}
再深入看一下这个类
public class NoopDiscoveryClient implements DiscoveryClient {public NoopDiscoveryClient(ServiceInstance instance) { }
@Override
public String description() {return "Spring Cloud No-op DiscoveryClient";}
@Override
public List<ServiceInstance> getInstances(String serviceId) {return Collections.emptyList();
}
@Override
public List<String> getServices() {return Collections.emptyList();
}
}
这个类包含了获取当前实例以及当前服务的方法,但是返回的都是空,那么是不是会在后面的某个地方被覆盖呢?
CommonsClientAutoConfiguration
进去深入了解一下,哎哟,注册了几个 bean:DiscoveryClientHealthIndicator
、DiscoveryCompositeHealthIndicator
。原来是健康检查相关的东西,那就忽略了
ServiceRegistryAutoConfiguration
这个配置类中主要注册了一个 bean:ServiceRegistryEndpoint
这个类主要是对外提供对与 Eureka 状态的检查
@ReadOperation
public ResponseEntity getStatus() {if (this.registration == null) {return ResponseEntity.status(HttpStatus.NOT_FOUND).body("no registration found");
}
return ResponseEntity.ok().body(this.serviceRegistry.getStatus(this.registration));
}
而 Eureka 的状态则是通过 serviceRegistry
对象获取的,这个对象会再下方详细分析
注册 bean
接着来看这个类注入的几个 bean
EurekaClientConfigBean
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {EurekaClientConfigBean client = new EurekaClientConfigBean();
if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {client.setRegisterWithEureka(false);
}
return client;
}
这个 bean 中包含了 eureka.client.xxx
系列的一些配置,详细的配置信息可以参考这里:https://github.com/shiyujun/s…
EurekaInstanceConfigBean
这个 bean 中主要是包含 eureka 实例(eureka.instance.xxx
系列)的一些配置信息,详细的配置信息同上
RefreshableEurekaClientConfiguration.DiscoveryClient
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {manager.getInfo(); // force initialization
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
其中 CloudEurekaClient
是DiscoveryClient
的子类,而 DiscoveryClient
则是 EurekaClient 的核心类
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config,
AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher) {
// 这里会调用父类 DiscoveryClient 的构造方法
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
父类的构造方法中执行的代码块比较长,一些赋值操作等就忽略了, 这里只摘出比较重要的部分
- 初始化拉取监控和心跳监控
if (config.shouldFetchRegistry()) {this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
if (config.shouldRegisterWithEureka()) {this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
- 当当前实例不需要注册到 EurekaServer 时,构造方法走到这里就结束了
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return;
}
- 初始化心跳线程和刷新线程以及它们的调度器
try {
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build());
cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build());
- 从 EurekaServer 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {fetchRegistryFromBackup();
}
这里 fetchRegistry
是第一次拉取注册信息,如果拉取不成功的话则执行 fetchRegistryFromBackup
从备份注册中心获取,同样,拉取的信息会放在之后的文章中
- 注册之前的扩展点
if (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();
}
这里是个空的实现,可以通过实现 PreRegistrationHandler
接口做些什么操作
- 向 EurekaServer 发起注册
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
注册方法为register
,同样这里先不展开
- 初始化几个定时任务
initScheduledTasks();
private void initScheduledTasks() {
// 从 EurekaServer 拉取注册信息
if (clientConfig.shouldFetchRegistry()) {int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 向 EurekaServer 发送续租心跳
if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor:" + "renew interval is:" + renewalIntervalInSecs);
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()),
renewalIntervalInSecs, TimeUnit.SECONDS);
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2);
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {return "statusChangeListener";}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {logger.warn("Saw local status change event {}", statusChangeEvent);
} else {logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
// 注册状态监听器
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {logger.info("Not registering with Eureka server per configuration");
}
}
至此,EurekaClient 的自动装配与启动流程就解析完毕了