在上篇文章中,我们简单介绍了EurekaServer自动装配及启动流程解析,本篇文章则继续研究EurekaClient的相关代码

老规矩,先看spring.factories文件,其中引入了一个配置类EurekaDiscoveryClientConfigServiceBootstrapConfiguration

@ConditionalOnClass(ConfigServicePropertySourceLocator.class)@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)@Configuration@Import({ EurekaDiscoveryClientConfiguration.class,        EurekaClientAutoConfiguration.class })public class EurekaDiscoveryClientConfigServiceBootstrapConfiguration {}

上方两个注解则是这个配置类是否能够开启的条件,这里就不再展开,直接看它引入的配置类吧

EurekaDiscoveryClientConfiguration

  1. 细心的读者可能会发现这里又注册了一个Marker类,可以猜测也是某个地方的开关
  2. EurekaClientConfigurationRefresher这个类看名字就知道这是当配置被动态刷新时的一个处理器,这里也不再展开了
  3. EurekaHealthCheckHandlerConfiguration这里面注册了一个Eureka健康检查的处理类,这个健康检查相关的原理分析可以参考这篇文章:SpringBoot健康检查实现原理

EurekaClientAutoConfiguration

这个类里面全是重点,也是我们本文的核心

注解
@Configuration@EnableConfigurationProperties@ConditionalOnClass(EurekaClientConfig.class)@Import(DiscoveryClientOptionalArgsConfiguration.class)@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,        CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",        "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",        "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})

首先可以看到这个类一共包含这些注解,我们来一一解析比较重要的几个注解吧

@Import(DiscoveryClientOptionalArgsConfiguration.class)

引入了两个bean,RestTemplateDiscoveryClientOptionalArgsMutableDiscoveryClientOptionalArgs ,这两个类的作用暂且不说

@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)

刚才说的Marker类的作用出来了

@AutoConfigureBefore

既然必须在这三个类完成自动装配之后才能进行装配,那就代表着这三个类肯定大有用途,研究一下吧

NoopDiscoveryClientAutoConfiguration

故名思意,负责服务发现的类,咱们重点关注一下其中的几个方法

  1. init
@PostConstruct    public void init() {        String host = "localhost";        try {            host = InetAddress.getLocalHost().getHostName();        }        catch (UnknownHostException e) {            log.warn("Cannot get host info: (" + e.getMessage() + ")");        }        int port = findPort();        this.serviceInstance = new DefaultServiceInstance(                this.environment.getProperty("spring.application.name", "application"),                host, port, false);    }

这里构造了一个DefaultServiceInstance对象,这个对象包含了当前项目的ip+端口+项目名称

  1. 注入beanNoopDiscoveryClient
@Bean    public DiscoveryClient discoveryClient() {        return new NoopDiscoveryClient(this.serviceInstance);    }

再深入看一下这个类

public class NoopDiscoveryClient implements DiscoveryClient {    public NoopDiscoveryClient(ServiceInstance instance) {    }    @Override    public String description() {        return "Spring Cloud No-op DiscoveryClient";    }    @Override    public List<ServiceInstance> getInstances(String serviceId) {        return Collections.emptyList();    }    @Override    public List<String> getServices() {        return Collections.emptyList();    }}

这个类包含了获取当前实例以及当前服务的方法,但是返回的都是空,那么是不是会在后面的某个地方被覆盖呢?

CommonsClientAutoConfiguration

进去深入了解一下,哎哟,注册了几个bean:DiscoveryClientHealthIndicatorDiscoveryCompositeHealthIndicator。原来是健康检查相关的东西,那就忽略了

ServiceRegistryAutoConfiguration

这个配置类中主要注册了一个bean:ServiceRegistryEndpoint这个类主要是对外提供对与Eureka状态的检查

@ReadOperation    public ResponseEntity getStatus() {        if (this.registration == null) {            return ResponseEntity.status(HttpStatus.NOT_FOUND).body("no registration found");        }        return ResponseEntity.ok().body(this.serviceRegistry.getStatus(this.registration));    }

而Eureka的状态则是通过serviceRegistry对象获取的,这个对象会再下方详细分析

注册bean

接着来看这个类注入的几个bean

EurekaClientConfigBean
@Bean    @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)    public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {        EurekaClientConfigBean client = new EurekaClientConfigBean();        if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {            client.setRegisterWithEureka(false);        }        return client;    }

这个bean中包含了eureka.client.xxx系列的一些配置,详细的配置信息可以参考这里:https://github.com/shiyujun/s...

EurekaInstanceConfigBean

这个bean中主要是包含eureka实例(eureka.instance.xxx系列)的一些配置信息,详细的配置信息同上

RefreshableEurekaClientConfiguration.DiscoveryClient
        @Bean(destroyMethod = "shutdown")        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)        @org.springframework.cloud.context.config.annotation.RefreshScope        @Lazy        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {            manager.getInfo(); // force initialization            return new CloudEurekaClient(manager, config, this.optionalArgs,                    this.context);        }

其中CloudEurekaClientDiscoveryClient的子类,而DiscoveryClient则是EurekaClient的核心类

       public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,                             EurekaClientConfig config,                             AbstractDiscoveryClientOptionalArgs<?> args,                             ApplicationEventPublisher publisher) {                             //这里会调用父类DiscoveryClient的构造方法        super(applicationInfoManager, config, args);        this.applicationInfoManager = applicationInfoManager;        this.publisher = publisher;        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");        ReflectionUtils.makeAccessible(this.eurekaTransportField);    }

父类的构造方法中执行的代码块比较长,一些赋值操作等就忽略了,这里只摘出比较重要的部分

  1. 初始化拉取监控和心跳监控
       if (config.shouldFetchRegistry()) {            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});        } else {            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;        }        if (config.shouldRegisterWithEureka()) {            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});        } else {            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;        }
  1. 当当前实例不需要注册到EurekaServer时,构造方法走到这里就结束了
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {            logger.info("Client configured to neither register nor query for data.");            scheduler = null;            heartbeatExecutor = null;            cacheRefreshExecutor = null;            eurekaTransport = null;            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());            DiscoveryManager.getInstance().setDiscoveryClient(this);            DiscoveryManager.getInstance().setEurekaClientConfig(config);            initTimestampMs = System.currentTimeMillis();            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",                    initTimestampMs, this.getApplications().size());            return;          }
  1. 初始化心跳线程和刷新线程以及它们的调度器
  try {            scheduler = Executors.newScheduledThreadPool(2,                    new ThreadFactoryBuilder()                            .setNameFormat("DiscoveryClient-%d")                            .setDaemon(true)                            .build());            heartbeatExecutor = new ThreadPoolExecutor(                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,                    new SynchronousQueue<Runnable>(),                    new ThreadFactoryBuilder()                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")                            .setDaemon(true)                            .build()            );              cacheRefreshExecutor = new ThreadPoolExecutor(                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,                    new SynchronousQueue<Runnable>(),                    new ThreadFactoryBuilder()                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")                            .setDaemon(true)                            .build()            ); 
  1. 从EurekaServer拉取注册信息
 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {            fetchRegistryFromBackup();        }

这里fetchRegistry是第一次拉取注册信息,如果拉取不成功的话则执行fetchRegistryFromBackup从备份注册中心获取,同样,拉取的信息会放在之后的文章中

  1. 注册之前的扩展点
  if (this.preRegistrationHandler != null) {            this.preRegistrationHandler.beforeRegistration();        }

这里是个空的实现,可以通过实现PreRegistrationHandler接口做些什么操作

  1. 向EurekaServer发起注册
 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {            try {                if (!register() ) {                    throw new IllegalStateException("Registration error at startup. Invalid server response.");                }            } catch (Throwable th) {                logger.error("Registration error at startup: {}", th.getMessage());                throw new IllegalStateException(th);            }        }

注册方法为register,同样这里先不展开

  1. 初始化几个定时任务
initScheduledTasks();private void initScheduledTasks() {   // 从 EurekaServer 拉取注册信息   if (clientConfig.shouldFetchRegistry()) {       int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();       int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();       scheduler.schedule(               new TimedSupervisorTask(                       "cacheRefresh",                       scheduler,                       cacheRefreshExecutor,                       registryFetchIntervalSeconds,                       TimeUnit.SECONDS,                       expBackOffBound,                       new CacheRefreshThread()               ),               registryFetchIntervalSeconds, TimeUnit.SECONDS);   }   // 向 EurekaServer 发送续租心跳   if (clientConfig.shouldRegisterWithEureka()) {       int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();       int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();       logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);       scheduler.schedule(               new TimedSupervisorTask(                       "heartbeat",                       scheduler,                       heartbeatExecutor,                       renewalIntervalInSecs,                       TimeUnit.SECONDS,                       expBackOffBound,                       new HeartbeatThread()               ),               renewalIntervalInSecs, TimeUnit.SECONDS);       instanceInfoReplicator = new InstanceInfoReplicator(               this,               instanceInfo,               clientConfig.getInstanceInfoReplicationIntervalSeconds(),               2);        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {           @Override           public String getId() {               return "statusChangeListener";           }           @Override           public void notify(StatusChangeEvent statusChangeEvent) {               if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||                       InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {                   logger.warn("Saw local status change event {}", statusChangeEvent);               } else {                   logger.info("Saw local status change event {}", statusChangeEvent);               }               instanceInfoReplicator.onDemandUpdate();           }       };       if (clientConfig.shouldOnDemandUpdateStatusChange()) {       //注册状态监听器           applicationInfoManager.registerStatusChangeListener(statusChangeListener);       }       instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());   } else {       logger.info("Not registering with Eureka server per configuration");   }}

至此,EurekaClient的自动装配与启动流程就解析完毕了