Eureka Client 是一个Java 客户端,用于简化与Eureka Server的交互,客户端同时也具备一个内置的、应用轮询负载算法的负载均衡器。

在利用启动后,将会向Eureka Server发送心跳(默认周期为30秒),如果Eureka Server在多个心跳周期没有收到某个节点的心跳,Eureka Server 将会从服务注册表中把这个服务节点移除(默认90秒)。Eureka Client具备缓存的机制,即便所有的Eureka Server 都挂掉的话,客户端仍然能够利用缓存中的信息生产其它服务的API。上面一起来看Client客户端相干操作的流程图。

1.从启动类动手

从启动类的@EnableDiscoveryClient注解动手看调用流程。进入 EnableDiscoveryClient 之后,通过正文晓得它的作用是为了激活 DiscoveryClient:

首先是在类头应用了 import 注解引入了:EnableDiscoveryClientImportSelector。该类的次要作用是实例化:AutoServiceRegistrationConfiguration。

@Order(Ordered.LOWEST_PRECEDENCE - 100)public class EnableDiscoveryClientImportSelector extends SpringFactoryImportSelector<EnableDiscoveryClient> {  @Override public String[] selectImports(AnnotationMetadata metadata) {     **//调用父类的办法,拿到通过父类办法要注入的全门路类名数组** String[] imports = super.selectImports(metadata); **//取得该注解(@EnableDiscoveryClient)的所有属性参数** AnnotationAttributes attributes = AnnotationAttributes.fromMap(  metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));  **//取得属性autoRegister的值,该值默认是true的** boolean autoRegister = attributes.getBoolean("autoRegister"); **//依据注解配置来判断是否要实例化上面的那个主动配置类** if (autoRegister) {  List<String> importsList = new ArrayList<>(Arrays.asList(imports));  importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");  imports = importsList.toArray(new String[0]); } else {  Environment env = getEnvironment();  if(ConfigurableEnvironment.class.isInstance(env)) {  ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env;  LinkedHashMap<String, Object> map = new LinkedHashMap<>();  map.put("spring.cloud.service-registry.auto-registration.enabled", false);  MapPropertySource propertySource = new MapPropertySource(   "springCloudDiscoveryClient", map);  configEnv.getPropertySources().addLast(propertySource);  } }  return imports; } @Override protected boolean isEnabled() { return getEnvironment().getProperty(  "spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE); } @Override protected boolean hasDefaultFactory() { return true; }}

这里最终的目标是想实例化:AutoServiceRegistrationConfiguration,咱们来看他做了什么:

@Configuration@Import(AutoServiceRegistrationConfiguration.class)@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)public class AutoServiceRegistrationAutoConfiguration {  @Autowired(required = false) private AutoServiceRegistration autoServiceRegistration; @Autowired private AutoServiceRegistrationProperties properties; @PostConstruct protected void init() { if (autoServiceRegistration == null && this.properties.isFailFast()) {  throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean"); } }}

从这里看次要目标是为了实例化:AutoServiceRegistration,AutoServiceRegistrationProperties这两个类。那么初始化这两个bean的作用是什么呢,查看调用的中央:

@Configuration@EnableConfigurationProperties@ConditionalOnClass(EurekaClientConfig.class)@Import(DiscoveryClientOptionalArgsConfiguration.class)@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})public class EurekaClientAutoConfiguration {  ......  ......  ......  @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) { return EurekaRegistration.builder(instanceConfig)  .with(applicationInfoManager)  .with(eurekaClient)  .with(healthCheckHandler)  .build(); }  @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); }    ......  ......  ......}

起因是在这里实例化bean的时候被作为前置条件。

EurekaClientAutoConfiguration 算是到目前为止比拟重要的一个类,次要做的事件包含:

  1. 注册 EurekaClientConfigBean ,初始化client端配置信息;
  2. 注册 EurekaInstanceConfigBean ,初始化客户端实例信息;
  3. 初始化 EurekaRegistration,EurekaServiceRegistry,EurekaAutoServiceRegistration实现Eureka服务主动注册;
  4. 初始化 EurekaClient ,ApplicationInfoManager。EurekaClient 的默认实现是 DiscoveryClient,是咱们接下来要剖析的重点;
  5. 初始化 EurekaHealthIndicator,为/health 端点提供Eureka相干信息,次要有Status以后实例状态和applications服务列表。

持续看 EurekaClientAutoConfiguration 又在哪里被应用:

@ConditionalOnClass(ConfigServicePropertySourceLocator.class)@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)@Configuration@Import({ EurekaDiscoveryClientConfiguration.class, **// this emulates @EnableDiscoveryClient, the import selector doesn't run before the bootstrap phase** EurekaClientAutoConfiguration.class })public class EurekaDiscoveryClientConfigServiceBootstrapConfiguration {}

在 EurekaDiscoveryClientConfigServiceBootstrapConfiguration 类中被作为注入的对象。

而 EurekaDiscoveryClientConfigServiceBootstrapConfiguration 被援用的分中央就比拟非凡,被配置在配置文件中。

org.springframework.cloud.bootstrap.BootstrapConfiguration=\org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

这个配置的Key局部对应着一个注解类 BootstrapConfiguration:

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface BootstrapConfiguration { **/**** \* Exclude specific auto-configuration classes such that they will never be applied. */ Class<?>[] exclude() default {};}

他被应用的中央是:BootstrapApplicationListener 的 164行,在这里拿到类的全门路之后,186行进行加载类。

public class BootstrapApplicationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {    ......  @Override public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) { ConfigurableEnvironment environment = event.getEnvironment(); ...... ConfigurableApplicationContext context = null; String configName = environment  .resolvePlaceholders("${spring.cloud.bootstrap.name:bootstrap}"); for (ApplicationContextInitializer<?> initializer : event.getSpringApplication()  .getInitializers()) {  if (initializer instanceof ParentContextApplicationContextInitializer) {  context = findBootstrapContext(   (ParentContextApplicationContextInitializer) initializer,   configName);  } } if (context == null) {      **//在这里被调用**  context = bootstrapServiceContext(environment, event.getSpringApplication(),   configName); } apply(context, event.getSpringApplication(), environment); }    private ConfigurableApplicationContext bootstrapServiceContext(    ConfigurableEnvironment environment, final SpringApplication application,    String configName) {    StandardEnvironment bootstrapEnvironment = new StandardEnvironment();    MutablePropertySources bootstrapProperties = bootstrapEnvironment      .getPropertySources();    for (PropertySource<?> source : bootstrapProperties) {      bootstrapProperties.remove(source.getName());    }    String configLocation = environment      .resolvePlaceholders("${spring.cloud.bootstrap.location:}");    Map<String, Object> bootstrapMap = new HashMap<>();    bootstrapMap.put("spring.config.name", configName);    **// if an app (or test) uses spring.main.web-application-type=reactive, bootstrap will fail**    **// force the environment to use none, because if though it is set below in the builder**   **// the environment overrides it**    bootstrapMap.put("spring.main.web-application-type", "none");    if (StringUtils.hasText(configLocation)) {      bootstrapMap.put("spring.config.location", configLocation);    }    bootstrapProperties.addFirst(      new MapPropertySource(BOOTSTRAP_PROPERTY_SOURCE_NAME, bootstrapMap));    for (PropertySource<?> source : environment.getPropertySources()) {      if (source instanceof StubPropertySource) {        continue;      }      bootstrapProperties.addLast(source);    }    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();    **// 在这里扫描BootstrapConfiguration注解**    List<String> names = new ArrayList<>(SpringFactoriesLoader                       .loadFactoryNames(BootstrapConfiguration.class, classLoader));    for (String name : StringUtils.commaDelimitedListToStringArray(      environment.getProperty("spring.cloud.bootstrap.sources", ""))) {     names.add(name);    }    **//** **TODO:** **is it possible or sensible to share a ResourceLoader?**    SpringApplicationBuilder builder = new SpringApplicationBuilder()      .profiles(environment.getActiveProfiles()).bannerMode(Mode.OFF)      .environment(bootstrapEnvironment)      **// Don't use the default properties in this builder**      .registerShutdownHook(false).logStartupInfo(false)      .web(WebApplicationType.NONE);    if (environment.getPropertySources().contains("refreshArgs")) {      **// If we are doing a context refresh, really we only want to refresh the**      **// Environment, and there are some toxic listeners (like the**      **// LoggingApplicationListener) that affect global static state, so we need a**      **// way to switch those off.**      builder.application()        .setListeners(filterListeners(builder.application().getListeners()));    }    List<Class<?>> sources = new ArrayList<>();    for (String name : names) {      Class<?> cls = ClassUtils.resolveClassName(name, null);      try {        cls.getDeclaredAnnotations();      }      catch (Exception e) {        continue;      }      sources.add(cls);    }    AnnotationAwareOrderComparator.sort(sources);    builder.sources(sources.toArray(new Class[sources.size()]));    final ConfigurableApplicationContext context = builder.run();    **// gh-214 using spring.application.name=bootstrap to set the context id via**    **// `ContextIdApplicationContextInitializer` prevents apps from getting the actual**    **// spring.application.name**    **// during the bootstrap phase.**    context.setId("bootstrap");    **// Make the bootstrap context a parent of the app context**    addAncestorInitializer(application, context);    **// It only has properties in it now that we don't want in the parent so remove**    **// it (and it will be added back later)**    bootstrapProperties.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME);    mergeDefaultProperties(environment.getPropertySources(), bootstrapProperties);    return context;  }      ......  ......   }

BootstrapApplicationListener 实现了ApplicationEnvironmentPreparedEvent,作为监听器在我的项目启动的时候被加载。Spring依据利用启动的过程,提供了四种事件供咱们应用:

· ApplicationStartedEvent :Spring Boot启动开始时执行的事件;

· ApplicationEnvironmentPreparedEvent:Spring Boot 对应Enviroment曾经筹备结束,但此时上下文context还没有创立;

· ApplicationPreparedEvent:Spring Boot 上下文context创立实现,但此时spring中的bean是没有齐全加载实现的;

· ApplicationFailedEvent:Spring Boot 启动异样时执行事件。

即这里的BootstrapApplicationListener 是在我的项目启动加载环境变量实现,还没有创立bean的时候去加载的。

剖析到这里,咱们把整个的EnableDiscoveryClient注解的初始化链路都走了一遍。大抵时序图如下图所示:


总结下面剖析的局部次要两个作用:

1. 初始化配置文件;

2. 激活 DiscoveryClient。

上面就开始剖析DiscoveryClient的作用。

2. DiscoveryClient

启动客户端的时候查看启动日志你会看到服务注册也是从 DiscoveryClient 类中收回的:

足以见得这个类在服务注册过程中应该做了一些重要的事件。上面一起来剖析一下具体实现。

2.1 服务注册

DiscoveryClient 是一个接口,持续观看它的实现类,能够看到每个实现类中都有一个:DESCRIPTION字段,这个字段明确形容了以后类的作用。

  1. EurekaDiscoveryClient:client 的次要实现逻辑类;
  2. CompositeDiscoveryClient:会装载别的服务注册客户端,程序查找;
  3. NoopDiscoveryClient:曾经被废除;
  4. SimpleDiscoveryClient:具体的服务实例从 SimpleDiscoveryProperties 配置中获取。

从形容上看 EurekaDiscoveryClient 是 client 的次要实现类。而在 EurekaDiscoveryClient 中,获取client实例次要是从 EurekaClient 中查找的:

@Overridepublic List<ServiceInstance> getInstances(String serviceId) {  List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,false);  List<ServiceInstance> instances = new ArrayList<>();  for (InstanceInfo info : infos) {    instances.add(new EurekaServiceInstance(info));  }  return instances;}

DiscoveryClient 是 EurekaClient 的惟一实现类,他有一个很重要的构造方法:

@InjectDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {  if (args != null) {    this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;    this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;    this.eventListeners.addAll(args.getEventListeners());    this.preRegistrationHandler = args.preRegistrationHandler;  } else {    this.healthCheckCallbackProvider = null;    this.healthCheckHandlerProvider = null;    this.preRegistrationHandler = null;  }   this.applicationInfoManager = applicationInfoManager;  InstanceInfo myInfo = applicationInfoManager.getInfo();   clientConfig = config;  staticClientConfig = clientConfig;  transportConfig = config.getTransportConfig();  instanceInfo = myInfo;  if (myInfo != null) {    appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();  } else {    logger.warn("Setting instanceInfo to a passed in null value");  }   this.backupRegistryProvider = backupRegistryProvider;   this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);  localRegionApps.set(new Applications());   fetchRegistryGeneration = new AtomicLong(0);   remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());  remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); **//下面次要是初始化一些参数**  **//如果 shouldFetchRegistry= true,注册监控**  if (config.shouldFetchRegistry()) {    this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});  } else {    this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;  } **//如果shouldRegisterWithEureka=true,注册监控**  if (config.shouldRegisterWithEureka()) {   this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});  } else {    this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;  }   logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); **//如果shouldRegisterWithEureka = false && shouldFetchRegistry=false**  **//就不做初始化的工作,间接返回**  if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {    logger.info("Client configured to neither register nor query for data.");    scheduler = null;    heartbeatExecutor = null;    cacheRefreshExecutor = null;    eurekaTransport = null;    instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());     **// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()**    **// to work with DI'd DiscoveryClient**    DiscoveryManager.getInstance().setDiscoveryClient(this);    DiscoveryManager.getInstance().setEurekaClientConfig(config);     initTimestampMs = System.currentTimeMillis();    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",          initTimestampMs, this.getApplications().size());     return; **// no need to setup up an network tasks and we are done**  } **//从这里开始创立各种工作的线程池**  try {    **// default size of 2 - 1 each for heartbeat and cacheRefresh**    **//创立定时线程池,线程数量为2个,别离用来维持心跳连贯和刷新其余eureka client实例缓存**    scheduler = Executors.newScheduledThreadPool(2,                           new ThreadFactoryBuilder()                           .setNameFormat("DiscoveryClient-%d")                           .setDaemon(true)                          .build()); **//创立一个线程池,线程池大小默认为2个,用来维持心跳连贯**   heartbeatExecutor = new ThreadPoolExecutor(      1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,      new SynchronousQueue<Runnable>(),     new ThreadFactoryBuilder()      .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")     .setDaemon(true)      .build()    ); **// use direct handoff** **//创立一个线程池,线程池大小默认为2个,用来刷新其余eureka client实例缓存**    cacheRefreshExecutor = new ThreadPoolExecutor(      1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,      new SynchronousQueue<Runnable>(),      new ThreadFactoryBuilder()      .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")      .setDaemon(true)      .build()   ); **// use direct handoff**     eurekaTransport = new EurekaTransport();    scheduleServerEndpointTask(eurekaTransport, args); ......    ......    ......  } catch (Throwable e) {   throw new RuntimeException("Failed to initialize DiscoveryClient!", e);  }   **//抓取近程实例注册信息,fetchRegistry()办法里的参数,这里为false,意思是要不要强制抓取所有实例注册信息**  **//这里获取注册信息,分两种形式,一种是全量获取,另一种是增量获取,默认是增量获取**  if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {    **//如果配置的是要获取实例注册信息,然而从近程获取失败,从备份获取实例注册信息**    fetchRegistryFromBackup();  }   **// call and execute the pre registration handler before all background tasks (inc registration) is started**  if (this.preRegistrationHandler != null) {    this.preRegistrationHandler.beforeRegistration();  }    **//如果client配置注册到eureka server 且 强制 初始化就注册到eureka 那么就注册到eureka server,默认是不初始化就注册到eureka**  if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {    try {      if (!register() ) {        throw new IllegalStateException("Registration error at startup. Invalid server response.");      }    } catch (Throwable th) {      logger.error("Registration error at startup: {}", th.getMessage());      throw new IllegalStateException(th);    }  }   **// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch**  **//初始化维持心跳连贯、更新注册信息缓存的定时工作**  initScheduledTasks();   try {    Monitors.registerObject(this);  } catch (Throwable e) {    logger.warn("Cannot register timers", e);  }   **// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()**  **// to work with DI'd DiscoveryClient**  DiscoveryManager.getInstance().setDiscoveryClient(this);  DiscoveryManager.getInstance().setEurekaClientConfig(config);   initTimestampMs = System.currentTimeMillis();  logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",        initTimestampMs, this.getApplications().size());}

初始化的过程次要做了两件事:

  1. 创立了 scheduler 定时工作的线程池,heartbeatExecutor 心跳查看线程池(服务续约),cacheRefreshExecutor 服务获取线程池 ;
  2. 调用 initScheduledTasks()办法开启线程池,往上面3个线程池别离增加相应工作。而后创立了一个instanceInfoReplicator(Runnable工作),而后调用InstanceInfoReplicator.start办法,把这个工作放进下面scheduler定时工作线程池(服务注册并更新)。

接着看 initScheduledTasks做了哪些事件 :

private void initScheduledTasks() {  **//获取服务列表信息**  if (clientConfig.shouldFetchRegistry()) {    **// registry cache refresh timer**    **//获取默认的注册频率信息,默认30S**    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();    **//如果缓存刷新超时,下一次执行的delay最大是registryFetchIntervalSeconds的几倍(默认10),默认每次执行是上一次的2倍**    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();    **//执行CacheRefreshThread,服务列表缓存刷新工作**    scheduler.schedule(      new TimedSupervisorTask(        "cacheRefresh",        scheduler,       cacheRefreshExecutor,        registryFetchIntervalSeconds,       TimeUnit.SECONDS,       expBackOffBound,       new CacheRefreshThread()     ),     registryFetchIntervalSeconds, TimeUnit.SECONDS);  } **//注册到eureka server**  if (clientConfig.shouldRegisterWithEureka()) {   **//续租工夫距离,默认30s**    int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();    **// 如果心跳工作超时,下一次执行的delay最大是renewalIntervalInSecs的几倍(默认10),默认每次执行是上一次的2倍**    int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();    logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);     **// Heartbeat timer**    **//执行HeartbeatThread,发送心跳数据**    scheduler.schedule(      new TimedSupervisorTask(        "heartbeat",        scheduler,        heartbeatExecutor,        renewalIntervalInSecs,        TimeUnit.SECONDS,        expBackOffBound,        new HeartbeatThread()      ),      renewalIntervalInSecs, TimeUnit.SECONDS);     **// 客户端实例信息复制**    instanceInfoReplicator = new InstanceInfoReplicator(      this,      instanceInfo,      clientConfig.getInstanceInfoReplicationIntervalSeconds(),      2); **// burstSize** **//注册监听器**    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {      @Override      public String getId() {        return "statusChangeListener";      }       @Override      public void notify(StatusChangeEvent statusChangeEvent) {        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||         InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {         **// log at warn level if DOWN was involved**          logger.warn("Saw local status change event {}", statusChangeEvent);        } else {          logger.info("Saw local status change event {}", statusChangeEvent);        }        instanceInfoReplicator.onDemandUpdate();      }    };     if (clientConfig.shouldOnDemandUpdateStatusChange()) {      applicationInfoManager.registerStatusChangeListener(statusChangeListener);    } **//进行服务刷新**    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());  } else {    logger.info("Not registering with Eureka server per configuration");  }}

总的来说initScheduledTasks()做了以下几件事:

· 如果shouldFetchRegistry=true,即要从Eureka Server获取服务列表:

启动刷新服务列表定时线程(DiscoveryClient-CacheRefreshExecutor-%d),默认registryFetchIntervalSeconds=30s执行一次,工作为CacheRefreshThread,即从Eureka Server获取服务列表,也刷新客户端缓存。

· 如果shouldRegisterWithEureka=true,即要注册到Eureka Server。

启动heartbeat心跳定时线程(DiscoveryClient-HeartbeatExecutor-%d),默认renewalIntervalInSecs=30s续约一次,工作为HeartbeatThread,即客户端向Eureka Server发送心跳;

启动InstanceInfo复制器定时线程(DiscoveryClient-InstanceInfoReplicator-%d),开启定时线程查看以后Instance的DataCenterInfo、LeaseInfo、InstanceStatus,如果发现变更就执行discoveryClient.register(),将实例信息同步到Server端。

下面有一个须要关注的点是:InstanceInfoReplicator。它会去定时刷新客户端实例的最新信息:以后实例最新数据,租约信息,实例状态。InstanceInfoReplicator 是一个线程类,关注 run()办法:

public void run() {  try {    **/****     \* 刷新 InstanceInfo     \* 1、刷新 DataCenterInfo     \* 2、刷新 LeaseInfo 租约信息     \* 3、依据HealthCheckHandler获取InstanceStatus,并更新,如果状态发生变化会触发所有StatusChangeListener     */    discoveryClient.refreshInstanceInfo(); **//刷新完之后,以后服务有变更,还未同步给server,发动注册**    Long dirtyTimestamp = instanceInfo.isDirtyWithTime();    if (dirtyTimestamp != null) {      **//发动注册**      discoveryClient.register();      instanceInfo.unsetIsDirty(dirtyTimestamp);    }  } catch (Throwable t) {    logger.warn("There was a problem with the instance info replicator", t);  } finally {    Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);    scheduledPeriodicRef.set(next);  }}

看一下register()的实现:

 **/****   \* Register with the eureka service by making the appropriate REST call.   \* 应用http的形式注册eureka服务   */boolean register() throws Throwable {  logger.info(PREFIX + "{}: registering service...", appPathIdentifier);  EurekaHttpResponse<Void> httpResponse;  try {    httpResponse = eurekaTransport.registrationClient.register(instanceInfo);  } catch (Exception e) {    logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);    throw e;  }  if (logger.isInfoEnabled()) {    logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());  }  return httpResponse.getStatusCode() == 204;}

往下跟踪到 RestTemplateEurekaHttpClient类:

public class RestTemplateEurekaHttpClient implements EurekaHttpClient {  protected final Log logger = LogFactory.getLog(getClass());  private RestTemplate restTemplate; private String serviceUrl;  public RestTemplateEurekaHttpClient(RestTemplate restTemplate, String serviceUrl) { this.restTemplate = restTemplate; this.serviceUrl = serviceUrl; if (!serviceUrl.endsWith("/")) {  this.serviceUrl = this.serviceUrl+"/"; } }    @Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = serviceUrl + "apps/" + info.getAppName();  HttpHeaders headers = new HttpHeaders(); headers.add(HttpHeaders.ACCEPT_ENCODING, "gzip"); headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);  ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.POST,  new HttpEntity<>(info, headers), Void.class);  return anEurekaHttpResponse(response.getStatusCodeValue())  .headers(headersOf(response)).build(); }    ......  ......  ......   }

封装了RestTemplate http client 模板办法,给 server 端发送一个post 申请。所以启动 client 的时候,向服务端发送注册申请的中央就在这里。

2.2 服务续约

服务续约的入口在DiscoveryClient 类initScheduledTasks()办法的heartBeat timer定时器工作中:

// Heartbeat timer****//开启定时工作每隔30s发送一次 心跳申请**scheduler.schedule(  new TimedSupervisorTask(    "heartbeat",    scheduler,    heartbeatExecutor,    renewalIntervalInSecs,    TimeUnit.SECONDS,    expBackOffBound,    new HeartbeatThread()  ),  renewalIntervalInSecs, TimeUnit.SECONDS);  **/****   \* The heartbeat task that renews the lease in the given intervals.   */private class HeartbeatThread implements Runnable {   public void run() {    if (renew()) {      lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();    }  }}  **/****   \* Renew with the eureka service by making the appropriate REST call   */boolean renew() {  EurekaHttpResponse<InstanceInfo> httpResponse;  try {    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);    logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());    if (httpResponse.getStatusCode() == 404) {      REREGISTER_COUNTER.increment();      logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());      long timestamp = instanceInfo.setIsDirtyWithTime();      boolean success = register();      if (success) {        instanceInfo.unsetIsDirty(timestamp);      }      return success;    }    return httpResponse.getStatusCode() == 200;  } catch (Throwable e) {    logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);    return false;  }} @Overridepublic EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id,                           InstanceInfo info, InstanceStatus overriddenStatus) {  String urlPath = serviceUrl + "apps/" + appName + '/' + id + "?status="    \+ info.getStatus().toString() + "&lastDirtyTimestamp="   \+ info.getLastDirtyTimestamp().toString() + (overriddenStatus != null                           ? "&overriddenstatus=" + overriddenStatus.name() : "");   ResponseEntity<InstanceInfo> response = restTemplate.exchange(urlPath,                                HttpMethod.PUT, null, InstanceInfo.class);   EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(    response.getStatusCodeValue(), InstanceInfo.class)    .headers(headersOf(response));   if (response.hasBody())    eurekaResponseBuilder.entity(response.getBody());   return eurekaResponseBuilder.build();}

下面贴出来了客户端发送心跳申请的残缺调用过程,每隔30s客户端向服务端发送一次申请,向服务端从新注册本人。

2.3 服务下线

服务下线比拟好了解,在服务敞开的时候勾销本机的各种定时工作,给服务端发送申请告知本人下线。

     **/****   \* Shuts down Eureka Client. Also sends a deregistration request to the   \* eureka server.   */@PreDestroy@Overridepublic synchronized void shutdown() {  if (isShutdown.compareAndSet(false, true)) {    logger.info("Shutting down DiscoveryClient ...");     if (statusChangeListener != null && applicationInfoManager != null) {      applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());    } **//勾销各种定时工作**    cancelScheduledTasks();     **// If APPINFO was registered**    if (applicationInfoManager != null      && clientConfig.shouldRegisterWithEureka()      && clientConfig.shouldUnregisterOnShutdown()) {      applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);      **//向服务端发送申请告知本人下线**      unregister();    }     if (eurekaTransport != null) {      eurekaTransport.shutdown();    } **//敞开监控**    heartbeatStalenessMonitor.shutdown();    registryStalenessMonitor.shutdown();     logger.info("Completed shut down of DiscoveryClient");  }}   **/****   \* unregister w/ the eureka service.   */void unregister() {  **// It can be null if shouldRegisterWithEureka == false**  if(eurekaTransport != null && eurekaTransport.registrationClient != null) {    try {      logger.info("Unregistering ...");      EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());      logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());    } catch (Exception e) {      logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);    }  }}   @Overridepublic EurekaHttpResponse<Void> cancel(String appName, String id) {  String urlPath = serviceUrl + "apps/" + appName + '/' + id;   ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.DELETE,                             null, Void.class);   return anEurekaHttpResponse(response.getStatusCodeValue())    .headers(headersOf(response)).build();}

2.4 服务获取 和 服务刷新

服务启动的时候会去服务端全量拉取所有曾经注册过的其余client实例信息,增量的时候就是在initScheduledTasks() 办法中每30s增量跑一次。

private void initScheduledTasks() {  if (clientConfig.shouldFetchRegistry()) {    **// registry cache refresh timer**    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();    scheduler.schedule(      new TimedSupervisorTask(        "cacheRefresh",        scheduler,        cacheRefreshExecutor,        registryFetchIntervalSeconds,        TimeUnit.SECONDS,        expBackOffBound,        new CacheRefreshThread()      ),      registryFetchIntervalSeconds, TimeUnit.SECONDS);  }    ......  ......  ......}  **/****   \* The task that fetches the registry information at specified intervals.   *   */class CacheRefreshThread implements Runnable {  public void run() {    refreshRegistry();  }} @VisibleForTestingvoid refreshRegistry() {  try {    boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();     boolean remoteRegionsModified = false;    **// This makes sure that a dynamic change to remote regions to fetch is honored.**    String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();    ......      ......      ......       boolean success = fetchRegistry(remoteRegionsModified);    if (success) {      registrySize = localRegionApps.get().size();      lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();    }     ......      ......      ......  } catch (Throwable e) {    logger.error("Cannot fetch registry from server", e);  }    }   **/****   \* Fetches the registry information.   *   \* <p>   \* This method tries to get only deltas after the first fetch unless there   \* is an issue in reconciling eureka server and client registry information.   \* </p>   *   \* @param forceFullRegistryFetch Forces a full registry fetch.   *   \* @return true if the registry was fetched   */private boolean fetchRegistry(boolean forceFullRegistryFetch) {  Stopwatch tracer = FETCH_REGISTRY_TIMER.start();   try {    **// 如果当初增量服务获取不可用,或者是第一次获取服务的时候,拉去所有的利用**    Applications applications = getApplications();     if (clientConfig.shouldDisableDelta()      || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))      || forceFullRegistryFetch      || (applications == null)      || (applications.getRegisteredApplications().size() == 0)      || (applications.getVersion() == -1)) **//Client application does not have latest library supporting delta**    {      logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());      logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());      logger.info("Force full registry fetch : {}", forceFullRegistryFetch);     logger.info("Application is null : {}", (applications == null));      logger.info("Registered Applications size is zero : {}",            (applications.getRegisteredApplications().size() == 0));      logger.info("Application version is -1: {}", (applications.getVersion() == -1));      getAndStoreFullRegistry();    } else {      getAndUpdateDelta(applications);    }    applications.setAppsHashCode(applications.getReconcileHashCode());    logTotalInstances();  } catch (Throwable e) {    logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);    return false;  } finally {    if (tracer != null) {      tracer.stop();    }  }   **// Notify about cache refresh before updating the instance remote status**  onCacheRefreshed();   **// Update remote status based on refreshed data held in the cache**  updateInstanceRemoteStatus();   **// registry was fetched successfully, so return true**  return true;}

客户端拉取服务端保留的所有客户端节点信息保留工夫为3分钟,Eureka client获得的数据尽管是增量更新,依然可能和30秒前取的数据一样,所以Eureka client要本人来解决反复信息。

另外,留神到在fetchRegistry()办法中:

 applications.setAppsHashCode(applications.getReconcileHashCode());

每次增量更新,服务端都会带过去一个一致性hash码。Eureka client的增量更新,其实获取的是Eureka server最近三分钟内的变更,如果Eureka client有超过三分钟没有做增量更新的话(例如网络问题),这就造成了Eureka server和Eureka client之间的数据不统一。失常状况下,Eureka client屡次增量更新后,最终的服务列表数据应该Eureka server保持一致,但如果期间产生异样,可能导致和Eureka server的数据不统一,为了裸露这个问题,Eureka server每次返回的增量更新数据中,会带有一致性哈希码,Eureka client用本地服务列表数据算出的一致性哈希码应该和Eureka server返回的统一,若不统一就证实增量更新出了问题导致Eureka client和Eureka server上的服务列表信息不统一了,此时须要全量更新。

对于客户端的代码剖析就到这里,可从以下两个角度去剖析:

1. 从启动类动手,查看初始化了什么;

2. 从启动日志动手,查看启动类做了什么。

最初,放上在微服务中Eureka的Server与Client启动流程汇总图,图示过程如下图所示: