关于eureka:Eureka源码分析之-Client的启动流程

39次阅读

共计 29732 个字符,预计需要花费 75 分钟才能阅读完成。

​ Eureka Client 是一个 Java 客户端,用于简化与 Eureka Server 的交互,客户端同时也具备一个内置的、应用轮询负载算法的负载均衡器。

​ 在利用启动后,将会向 Eureka Server 发送心跳(默认周期为 30 秒),如果 Eureka Server 在多个心跳周期没有收到某个节点的心跳,Eureka Server 将会从服务注册表中把这个服务节点移除(默认 90 秒)。Eureka Client 具备缓存的机制,即便所有的 Eureka Server 都挂掉的话,客户端仍然能够利用缓存中的信息生产其它服务的 API。上面一起来看 Client 客户端相干操作的流程图。

1. 从启动类动手

​ 从启动类的 @EnableDiscoveryClient 注解动手看调用流程。进入 EnableDiscoveryClient 之后,通过正文晓得它的作用是为了激活 DiscoveryClient:

​ 首先是在类头应用了 import 注解引入了:EnableDiscoveryClientImportSelector。该类的次要作用是实例化:AutoServiceRegistrationConfiguration。

@Order(Ordered.LOWEST_PRECEDENCE - 100)

public class EnableDiscoveryClientImportSelector

 extends SpringFactoryImportSelector<EnableDiscoveryClient> {

 

 @Override

 public String[] selectImports(AnnotationMetadata metadata) {

     **// 调用父类的办法, 拿到通过父类办法要注入的全门路类名数组 **

 String[] imports = super.selectImports(metadata);

 **// 取得该注解 (@EnableDiscoveryClient) 的所有属性参数 **

 AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));

  **// 取得属性 autoRegister 的值, 该值默认是 true 的 **

 boolean autoRegister = attributes.getBoolean("autoRegister");

 **// 依据注解配置来判断是否要实例化上面的那个主动配置类 **

 if (autoRegister) {List<String> importsList = new ArrayList<>(Arrays.asList(imports));

  importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");

  imports = importsList.toArray(new String[0]);

 } else {Environment env = getEnvironment();

  if(ConfigurableEnvironment.class.isInstance(env)) {ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env;

  LinkedHashMap<String, Object> map = new LinkedHashMap<>();

  map.put("spring.cloud.service-registry.auto-registration.enabled", false);

  MapPropertySource propertySource = new MapPropertySource("springCloudDiscoveryClient", map);

  configEnv.getPropertySources().addLast(propertySource);

  }

 }
 
 return imports;

 }


 @Override

 protected boolean isEnabled() {return getEnvironment().getProperty("spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE);

 }


 @Override

 protected boolean hasDefaultFactory() {return true;}


}

这里最终的目标是想实例化:AutoServiceRegistrationConfiguration,咱们来看他做了什么:

@Configuration

@Import(AutoServiceRegistrationConfiguration.class)

@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)

public class AutoServiceRegistrationAutoConfiguration {@Autowired(required = false)

 private AutoServiceRegistration autoServiceRegistration;

 @Autowired

 private AutoServiceRegistrationProperties properties;

 @PostConstruct

 protected void init() {if (autoServiceRegistration == null && this.properties.isFailFast()) {throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");

 }

 }

}

从这里看次要目标是为了实例化:AutoServiceRegistration,AutoServiceRegistrationProperties 这两个类。那么初始化这两个 bean 的作用是什么呢,查看调用的中央:

@Configuration

@EnableConfigurationProperties

@ConditionalOnClass(EurekaClientConfig.class)

@Import(DiscoveryClientOptionalArgsConfiguration.class)

@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)

@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)

@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,

 CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })

@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",

 "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",

 "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})

public class EurekaClientAutoConfiguration {

  ......

  ......

  ......

  @Bean

 @ConditionalOnBean(AutoServiceRegistrationProperties.class)

 @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)

 public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) {return EurekaRegistration.builder(instanceConfig)

  .with(applicationInfoManager)

  .with(eurekaClient)

  .with(healthCheckHandler)

  .build();}

 

 @Bean

 @ConditionalOnBean(AutoServiceRegistrationProperties.class)

 @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)

 public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {return new EurekaAutoServiceRegistration(context, registry, registration);

 }  

  ......

  ......

  ......

}

起因是在这里实例化 bean 的时候被作为前置条件。

EurekaClientAutoConfiguration 算是到目前为止比拟重要的一个类,次要做的事件包含:

  1. 注册 EurekaClientConfigBean,初始化 client 端配置信息;
  2. 注册 EurekaInstanceConfigBean,初始化客户端实例信息;
  3. 初始化 EurekaRegistration,EurekaServiceRegistry,EurekaAutoServiceRegistration 实现 Eureka 服务主动注册;
  4. 初始化 EurekaClient,ApplicationInfoManager。EurekaClient 的默认实现是 DiscoveryClient,是咱们接下来要剖析的重点;
  5. 初始化 EurekaHealthIndicator,为 /health 端点提供 Eureka 相干信息,次要有 Status 以后实例状态和 applications 服务列表。

持续看 EurekaClientAutoConfiguration 又在哪里被应用:

@ConditionalOnClass(ConfigServicePropertySourceLocator.class)

@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)

@Configuration

@Import({ EurekaDiscoveryClientConfiguration.class, **// this emulates @EnableDiscoveryClient, the import selector doesn't run before the bootstrap phase**

 EurekaClientAutoConfiguration.class })

public class EurekaDiscoveryClientConfigServiceBootstrapConfiguration {}

在 EurekaDiscoveryClientConfigServiceBootstrapConfiguration 类中被作为注入的对象。

而 EurekaDiscoveryClientConfigServiceBootstrapConfiguration 被援用的分中央就比拟非凡,被配置在配置文件中。

org.springframework.cloud.bootstrap.BootstrapConfiguration=\

org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

这个配置的 Key 局部对应着一个注解类 BootstrapConfiguration:

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

@Documented

public @interface BootstrapConfiguration {

 **/****

 \* Exclude specific auto-configuration classes such that they will never be applied.

 */

 Class<?>[] exclude() default {};}

他被应用的中央是:BootstrapApplicationListener 的 164 行,在这里拿到类的全门路之后,186 行进行加载类。

public class BootstrapApplicationListener

 implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {

  

  ......

  @Override

 public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {ConfigurableEnvironment environment = event.getEnvironment();

 ......

 ConfigurableApplicationContext context = null;

 String configName = environment

  .resolvePlaceholders("${spring.cloud.bootstrap.name:bootstrap}");

 for (ApplicationContextInitializer<?> initializer : event.getSpringApplication()

  .getInitializers()) {if (initializer instanceof ParentContextApplicationContextInitializer) {

  context = findBootstrapContext((ParentContextApplicationContextInitializer) initializer,

   configName);

  }

 }

 if (context == null) {

      **// 在这里被调用 **

  context = bootstrapServiceContext(environment, event.getSpringApplication(),

   configName);

 }

 apply(context, event.getSpringApplication(), environment);

 }

  

  private ConfigurableApplicationContext bootstrapServiceContext(

    ConfigurableEnvironment environment, final SpringApplication application,

    String configName) {StandardEnvironment bootstrapEnvironment = new StandardEnvironment();

    MutablePropertySources bootstrapProperties = bootstrapEnvironment

      .getPropertySources();

    for (PropertySource<?> source : bootstrapProperties) {bootstrapProperties.remove(source.getName());

    }

    String configLocation = environment

      .resolvePlaceholders("${spring.cloud.bootstrap.location:}");

    Map<String, Object> bootstrapMap = new HashMap<>();

    bootstrapMap.put("spring.config.name", configName);

    **// if an app (or test) uses spring.main.web-application-type=reactive, bootstrap will fail**

    **// force the environment to use none, because if though it is set below in the builder**

   **// the environment overrides it**

    bootstrapMap.put("spring.main.web-application-type", "none");

    if (StringUtils.hasText(configLocation)) {bootstrapMap.put("spring.config.location", configLocation);

    }

    bootstrapProperties.addFirst(new MapPropertySource(BOOTSTRAP_PROPERTY_SOURCE_NAME, bootstrapMap));

    for (PropertySource<?> source : environment.getPropertySources()) {if (source instanceof StubPropertySource) {continue;}

      bootstrapProperties.addLast(source);

    }

    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

    **// 在这里扫描 BootstrapConfiguration 注解 **

    List<String> names = new ArrayList<>(SpringFactoriesLoader

                       .loadFactoryNames(BootstrapConfiguration.class, classLoader));

    for (String name : StringUtils.commaDelimitedListToStringArray(environment.getProperty("spring.cloud.bootstrap.sources", ""))) {names.add(name);

    }

    **//** **TODO:** **is it possible or sensible to share a ResourceLoader?**

    SpringApplicationBuilder builder = new SpringApplicationBuilder()

      .profiles(environment.getActiveProfiles()).bannerMode(Mode.OFF)

      .environment(bootstrapEnvironment)

      **// Don't use the default properties in this builder**

      .registerShutdownHook(false).logStartupInfo(false)

      .web(WebApplicationType.NONE);

    if (environment.getPropertySources().contains("refreshArgs")) {

      **// If we are doing a context refresh, really we only want to refresh the**

      **// Environment, and there are some toxic listeners (like the**

      **// LoggingApplicationListener) that affect global static state, so we need a**

      **// way to switch those off.**

      builder.application()

        .setListeners(filterListeners(builder.application().getListeners()));

    }

    List<Class<?>> sources = new ArrayList<>();

    for (String name : names) {Class<?> cls = ClassUtils.resolveClassName(name, null);

      try {cls.getDeclaredAnnotations();

      }

      catch (Exception e) {continue;}

      sources.add(cls);

    }

    AnnotationAwareOrderComparator.sort(sources);

    builder.sources(sources.toArray(new Class[sources.size()]));

    final ConfigurableApplicationContext context = builder.run();

    **// gh-214 using spring.application.name=bootstrap to set the context id via**

    **// `ContextIdApplicationContextInitializer` prevents apps from getting the actual**

    **// spring.application.name**

    **// during the bootstrap phase.**

    context.setId("bootstrap");

    **// Make the bootstrap context a parent of the app context**

    addAncestorInitializer(application, context);

    **// It only has properties in it now that we don't want in the parent so remove**

    **// it (and it will be added back later)**

    bootstrapProperties.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME);

    mergeDefaultProperties(environment.getPropertySources(), bootstrapProperties);

    return context;

  }

  

  

  ......

  ......

   

}

BootstrapApplicationListener 实现了 ApplicationEnvironmentPreparedEvent,作为监听器在我的项目启动的时候被加载。Spring 依据利用启动的过程, 提供了四种事件供咱们应用:

· ApplicationStartedEvent:Spring Boot 启动开始时执行的事件;

· ApplicationEnvironmentPreparedEvent:Spring Boot 对应 Enviroment 曾经筹备结束,但此时上下文 context 还没有创立;

· ApplicationPreparedEvent:Spring Boot 上下文 context 创立实现,但此时 spring 中的 bean 是没有齐全加载实现的;

· ApplicationFailedEvent:Spring Boot 启动异样时执行事件。

即这里的 BootstrapApplicationListener 是在我的项目启动加载环境变量实现,还没有创立 bean 的时候去加载的。

剖析到这里,咱们把整个的 EnableDiscoveryClient 注解的初始化链路都走了一遍。大抵时序图如下图所示:


总结下面剖析的局部次要两个作用:

1. 初始化配置文件;

2. 激活 DiscoveryClient。

上面就开始剖析 DiscoveryClient 的作用。

2. DiscoveryClient

启动客户端的时候查看启动日志你会看到服务注册也是从 DiscoveryClient 类中收回的:

足以见得这个类在服务注册过程中应该做了一些重要的事件。上面一起来剖析一下具体实现。

2.1 服务注册

DiscoveryClient 是一个接口,持续观看它的实现类,能够看到每个实现类中都有一个:DESCRIPTION 字段,这个字段明确形容了以后类的作用。

  1. EurekaDiscoveryClient:client 的次要实现逻辑类;
  2. CompositeDiscoveryClient:会装载别的服务注册客户端,程序查找;
  3. NoopDiscoveryClient:曾经被废除;
  4. SimpleDiscoveryClient:具体的服务实例从 SimpleDiscoveryProperties 配置中获取。

从形容上看 EurekaDiscoveryClient 是 client 的次要实现类。而在 EurekaDiscoveryClient 中,获取 client 实例次要是从 EurekaClient 中查找的:

@Override

public List<ServiceInstance> getInstances(String serviceId) {List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,false);

  List<ServiceInstance> instances = new ArrayList<>();

  for (InstanceInfo info : infos) {​    instances.add(new EurekaServiceInstance(info));

  }

  return instances;

}

DiscoveryClient 是 EurekaClient 的惟一实现类,他有一个很重要的构造方法:

@Inject

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {if (args != null) {

    this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;

    this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;

    this.eventListeners.addAll(args.getEventListeners());

    this.preRegistrationHandler = args.preRegistrationHandler;

  } else {

    this.healthCheckCallbackProvider = null;

    this.healthCheckHandlerProvider = null;

    this.preRegistrationHandler = null;

  }

 

  this.applicationInfoManager = applicationInfoManager;

  InstanceInfo myInfo = applicationInfoManager.getInfo();

 

  clientConfig = config;

  staticClientConfig = clientConfig;

  transportConfig = config.getTransportConfig();

  instanceInfo = myInfo;

  if (myInfo != null) {appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();} else {logger.warn("Setting instanceInfo to a passed in null value");

  }

 

  this.backupRegistryProvider = backupRegistryProvider;

 

  this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);

  localRegionApps.set(new Applications());

 

  fetchRegistryGeneration = new AtomicLong(0);

 

  remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());

  remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

 **// 下面次要是初始化一些参数 **

  **// 如果 shouldFetchRegistry= true,注册监控 **

  if (config.shouldFetchRegistry()) {this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});

  } else {this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}

 **// 如果 shouldRegisterWithEureka=true,注册监控 **

  if (config.shouldRegisterWithEureka()) {this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});

  } else {this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}

 

  logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

 **// 如果 shouldRegisterWithEureka = false && shouldFetchRegistry=false**

  **// 就不做初始化的工作,间接返回 **

  if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");

    scheduler = null;

    heartbeatExecutor = null;

    cacheRefreshExecutor = null;

    eurekaTransport = null;

    instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

 

    **// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()**

    **// to work with DI'd DiscoveryClient**

    DiscoveryManager.getInstance().setDiscoveryClient(this);

    DiscoveryManager.getInstance().setEurekaClientConfig(config);

 

    initTimestampMs = System.currentTimeMillis();

    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",

          initTimestampMs, this.getApplications().size());

 

    return; **// no need to setup up an network tasks and we are done**

  }

 **// 从这里开始创立各种工作的线程池 **

  try {

    **// default size of 2 - 1 each for heartbeat and cacheRefresh**

    **// 创立定时线程池,线程数量为 2 个,别离用来维持心跳连贯和刷新其余 eureka client 实例缓存 **

    scheduler = Executors.newScheduledThreadPool(2,

                           new ThreadFactoryBuilder()

                           .setNameFormat("DiscoveryClient-%d")

                           .setDaemon(true)

                          .build());

 **// 创立一个线程池,线程池大小默认为 2 个,用来维持心跳连贯 **

   heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,

      new SynchronousQueue<Runnable>(),

     new ThreadFactoryBuilder()

      .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")

     .setDaemon(true)

      .build()); **// use direct handoff**

 **// 创立一个线程池,线程池大小默认为 2 个,用来刷新其余 eureka client 实例缓存 **

    cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,

      new SynchronousQueue<Runnable>(),

      new ThreadFactoryBuilder()

      .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")

      .setDaemon(true)

      .build()); **// use direct handoff**

 

    eurekaTransport = new EurekaTransport();

    scheduleServerEndpointTask(eurekaTransport, args);

 ......

    ......

    ......

  } catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);

  }

   **// 抓取近程实例注册信息,fetchRegistry()办法里的参数,这里为 false,意思是要不要强制抓取所有实例注册信息 **

  **// 这里获取注册信息,分两种形式,一种是全量获取,另一种是增量获取,默认是增量获取 **

  if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {

    **// 如果配置的是要获取实例注册信息,然而从近程获取失败,从备份获取实例注册信息 **

    fetchRegistryFromBackup();}

 

  **// call and execute the pre registration handler before all background tasks (inc registration) is started**

  if (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();

  }

    **// 如果 client 配置注册到 eureka server 且 强制 初始化就注册到 eureka 那么就注册到 eureka server,默认是不初始化就注册到 eureka**

  if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {

    try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");

      }

    } catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());

      throw new IllegalStateException(th);

    }

  }

 

  **// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch**

  **// 初始化维持心跳连贯、更新注册信息缓存的定时工作 **

  initScheduledTasks();

 

  try {Monitors.registerObject(this);

  } catch (Throwable e) {logger.warn("Cannot register timers", e);

  }

 

  **// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()**

  **// to work with DI'd DiscoveryClient**

  DiscoveryManager.getInstance().setDiscoveryClient(this);

  DiscoveryManager.getInstance().setEurekaClientConfig(config);

 

  initTimestampMs = System.currentTimeMillis();

  logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",

        initTimestampMs, this.getApplications().size());

}

初始化的过程次要做了两件事:

  1. 创立了 scheduler 定时工作的线程池,heartbeatExecutor 心跳查看线程池 ( 服务续约),cacheRefreshExecutor 服务获取线程池;
  2. 调用 initScheduledTasks()办法开启线程池,往上面 3 个线程池别离增加相应工作。而后创立了一个 instanceInfoReplicator(Runnable 工作),而后调用 InstanceInfoReplicator.start 办法,把这个工作放进下面 scheduler 定时工作线程池 ( 服务注册并更新)。

接着看 initScheduledTasks 做了哪些事件:

private void initScheduledTasks() {

  **// 获取服务列表信息 **

  if (clientConfig.shouldFetchRegistry()) {

    **// registry cache refresh timer**

    **// 获取默认的注册频率信息,默认 30S**

    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();

    **// 如果缓存刷新超时,下一次执行的 delay 最大是 registryFetchIntervalSeconds 的几倍(默认 10),默认每次执行是上一次的 2 倍 **

    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();

    **// 执行 CacheRefreshThread,服务列表缓存刷新工作 **

    scheduler.schedule(

      new TimedSupervisorTask(

        "cacheRefresh",

        scheduler,

       cacheRefreshExecutor,

        registryFetchIntervalSeconds,

       TimeUnit.SECONDS,

       expBackOffBound,

       new CacheRefreshThread()),

     registryFetchIntervalSeconds, TimeUnit.SECONDS);

  }

 **// 注册到 eureka server**

  if (clientConfig.shouldRegisterWithEureka()) {

   **// 续租工夫距离,默认 30s**

    int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();

    **// 如果心跳工作超时,下一次执行的 delay 最大是 renewalIntervalInSecs 的几倍(默认 10),默认每次执行是上一次的 2 倍 **

    int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();

    logger.info("Starting heartbeat executor:" + "renew interval is: {}", renewalIntervalInSecs);

 

    **// Heartbeat timer**

    **// 执行 HeartbeatThread,发送心跳数据 **

    scheduler.schedule(

      new TimedSupervisorTask(

        "heartbeat",

        scheduler,

        heartbeatExecutor,

        renewalIntervalInSecs,

        TimeUnit.SECONDS,

        expBackOffBound,

        new HeartbeatThread()),

      renewalIntervalInSecs, TimeUnit.SECONDS);

 

    **// 客户端实例信息复制 **

    instanceInfoReplicator = new InstanceInfoReplicator(

      this,

      instanceInfo,

      clientConfig.getInstanceInfoReplicationIntervalSeconds(),

      2); **// burstSize**

 **// 注册监听器 **

    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {

      @Override

      public String getId() {return "statusChangeListener";}

 

      @Override

      public void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||

         InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {

         **// log at warn level if DOWN was involved**

          logger.warn("Saw local status change event {}", statusChangeEvent);

        } else {logger.info("Saw local status change event {}", statusChangeEvent);

        }

        instanceInfoReplicator.onDemandUpdate();}

    };

 

    if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);

    }

 **// 进行服务刷新 **

    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

  } else {logger.info("Not registering with Eureka server per configuration");

  }

}

总的来说 initScheduledTasks()做了以下几件事:

· 如果 shouldFetchRegistry=true,即要从 Eureka Server 获取服务列表:

启动刷新服务列表定时线程(DiscoveryClient-CacheRefreshExecutor-%d),默认 registryFetchIntervalSeconds=30s 执行一次,工作为 CacheRefreshThread,即从 Eureka Server 获取服务列表,也刷新客户端缓存。

· 如果 shouldRegisterWithEureka=true,即要注册到 Eureka Server。

启动 heartbeat 心跳定时线程(DiscoveryClient-HeartbeatExecutor-%d),默认 renewalIntervalInSecs=30s 续约一次,工作为 HeartbeatThread,即客户端向 Eureka Server 发送心跳;

启动 InstanceInfo 复制器定时线程(DiscoveryClient-InstanceInfoReplicator-%d),开启定时线程查看以后 Instance 的 DataCenterInfo、LeaseInfo、InstanceStatus,如果发现变更就执行 discoveryClient.register(),将实例信息同步到 Server 端。

下面有一个须要关注的点是:InstanceInfoReplicator。它会去定时刷新客户端实例的最新信息:以后实例最新数据,租约信息,实例状态。InstanceInfoReplicator 是一个线程类,关注 run()办法:

public void run() {

  try {

    **/****

     \* 刷新 InstanceInfo

     \* 1、刷新 DataCenterInfo

     \* 2、刷新 LeaseInfo 租约信息

     \* 3、依据 HealthCheckHandler 获取 InstanceStatus,并更新,如果状态发生变化会触发所有 StatusChangeListener

     */

    discoveryClient.refreshInstanceInfo();

 **// 刷新完之后,以后服务有变更,还未同步给 server,发动注册 **

    Long dirtyTimestamp = instanceInfo.isDirtyWithTime();

    if (dirtyTimestamp != null) {

      **// 发动注册 **

      discoveryClient.register();

      instanceInfo.unsetIsDirty(dirtyTimestamp);

    }

  } catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);

  } finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);

    scheduledPeriodicRef.set(next);

  }

}

看一下 register()的实现:

 **/****

   \* Register with the eureka service by making the appropriate REST call.

   \* 应用 http 的形式注册 eureka 服务

   */

boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);

  EurekaHttpResponse<Void> httpResponse;

  try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);

  } catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);

    throw e;

  }

  if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());

  }

  return httpResponse.getStatusCode() == 204;}

往下跟踪到 RestTemplateEurekaHttpClient 类:

public class RestTemplateEurekaHttpClient implements EurekaHttpClient {protected final Log logger = LogFactory.getLog(getClass());

 

 private RestTemplate restTemplate;

 private String serviceUrl;

 

 public RestTemplateEurekaHttpClient(RestTemplate restTemplate, String serviceUrl) {

 this.restTemplate = restTemplate;

 this.serviceUrl = serviceUrl;

 if (!serviceUrl.endsWith("/")) {this.serviceUrl = this.serviceUrl+"/";}

 }

  

  @Override

 public EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = serviceUrl + "apps/" + info.getAppName();

 

 HttpHeaders headers = new HttpHeaders();

 headers.add(HttpHeaders.ACCEPT_ENCODING, "gzip");

 headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);

 

 ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.POST,

  new HttpEntity<>(info, headers), Void.class);

 

 return anEurekaHttpResponse(response.getStatusCodeValue())

  .headers(headersOf(response)).build();}

  

  ......

  ......

  ......

   

}

封装了 RestTemplate http client 模板办法,给 server 端发送一个 post 申请。所以启动 client 的时候,向服务端发送注册申请的中央就在这里。

2.2 服务续约

服务续约的入口在 DiscoveryClient 类 initScheduledTasks()办法的 heartBeat timer 定时器工作中:

// Heartbeat timer**

**// 开启定时工作每隔 30s 发送一次 心跳申请 **

scheduler.schedule(

  new TimedSupervisorTask(

    "heartbeat",

    scheduler,

    heartbeatExecutor,

    renewalIntervalInSecs,

    TimeUnit.SECONDS,

    expBackOffBound,

    new HeartbeatThread()),

  renewalIntervalInSecs, TimeUnit.SECONDS);

 

 **/****

   \* The heartbeat task that renews the lease in the given intervals.

   */

private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();

    }

  }

}

 

 **/****

   \* Renew with the eureka service by making the appropriate REST call

   */

boolean renew() {

  EurekaHttpResponse<InstanceInfo> httpResponse;

  try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);

    logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());

    if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();

      logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());

      long timestamp = instanceInfo.setIsDirtyWithTime();

      boolean success = register();

      if (success) {instanceInfo.unsetIsDirty(timestamp);

      }

      return success;

    }

    return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);

    return false;

  }

}

 

@Override

public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id,

                           InstanceInfo info, InstanceStatus overriddenStatus) {

  String urlPath = serviceUrl + "apps/" + appName + '/' + id + "?status="

    \+ info.getStatus().toString() + "&lastDirtyTimestamp="

   \+ info.getLastDirtyTimestamp().toString() + (overriddenStatus != null

                           ? "&overriddenstatus=" + overriddenStatus.name() : "");

 

  ResponseEntity<InstanceInfo> response = restTemplate.exchange(urlPath,

                                HttpMethod.PUT, null, InstanceInfo.class);

 

  EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatusCodeValue(), InstanceInfo.class)

    .headers(headersOf(response));

 

  if (response.hasBody())

    eurekaResponseBuilder.entity(response.getBody());

 

  return eurekaResponseBuilder.build();}

下面贴出来了客户端发送心跳申请的残缺调用过程,每隔 30s 客户端向服务端发送一次申请,向服务端从新注册本人。

2.3 服务下线

服务下线比拟好了解,在服务敞开的时候勾销本机的各种定时工作,给服务端发送申请告知本人下线。

     **/****

   \* Shuts down Eureka Client. Also sends a deregistration request to the

   \* eureka server.

   */

@PreDestroy

@Override

public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");

 

    if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());

    }

 **// 勾销各种定时工作 **

    cancelScheduledTasks();

 

    **// If APPINFO was registered**

    if (applicationInfoManager != null

      && clientConfig.shouldRegisterWithEureka()

      && clientConfig.shouldUnregisterOnShutdown()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);

      **// 向服务端发送申请告知本人下线 **

      unregister();}

 

    if (eurekaTransport != null) {eurekaTransport.shutdown();

    }

 **// 敞开监控 **

    heartbeatStalenessMonitor.shutdown();

    registryStalenessMonitor.shutdown();

 

    logger.info("Completed shut down of DiscoveryClient");

  }

}

 

 

 **/****

   \* unregister w/ the eureka service.

   */

void unregister() {

  **// It can be null if shouldRegisterWithEureka == false**

  if(eurekaTransport != null && eurekaTransport.registrationClient != null) {

    try {logger.info("Unregistering ...");

      EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());

      logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());

    } catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);

    }

  }

}

 

 

 

@Override

public EurekaHttpResponse<Void> cancel(String appName, String id) {

  String urlPath = serviceUrl + "apps/" + appName + '/' + id;

 

  ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.DELETE,

                             null, Void.class);

 

  return anEurekaHttpResponse(response.getStatusCodeValue())

    .headers(headersOf(response)).build();}

2.4 服务获取 和 服务刷新

服务启动的时候会去服务端全量拉取所有曾经注册过的其余 client 实例信息,增量的时候就是在 initScheduledTasks() 办法中每 30s 增量跑一次。

private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {

    **// registry cache refresh timer**

    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();

    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();

    scheduler.schedule(

      new TimedSupervisorTask(

        "cacheRefresh",

        scheduler,

        cacheRefreshExecutor,

        registryFetchIntervalSeconds,

        TimeUnit.SECONDS,

        expBackOffBound,

        new CacheRefreshThread()),

      registryFetchIntervalSeconds, TimeUnit.SECONDS);

  }

  

  ......

  ......

  ......

}

 

 **/****

   \* The task that fetches the registry information at specified intervals.

   *

   */

class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();

  }

}

 

@VisibleForTesting

void refreshRegistry() {

  try {boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

 

    boolean remoteRegionsModified = false;

    **// This makes sure that a dynamic change to remote regions to fetch is honored.**

    String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();

    ......

      ......

      ......

 

      boolean success = fetchRegistry(remoteRegionsModified);

    if (success) {registrySize = localRegionApps.get().size();

      lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();}

 

    ......

      ......

      ......

  } catch (Throwable e) {logger.error("Cannot fetch registry from server", e);

  }    

}

 

 

 **/****

   \* Fetches the registry information.

   *

   \* <p>

   \* This method tries to get only deltas after the first fetch unless there

   \* is an issue in reconciling eureka server and client registry information.

   \* </p>

   *

   \* @param forceFullRegistryFetch Forces a full registry fetch.

   *

   \* @return true if the registry was fetched

   */

private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

 

  try {

    **// 如果当初增量服务获取不可用,或者是第一次获取服务的时候,拉去所有的利用 **

    Applications applications = getApplications();

 

    if (clientConfig.shouldDisableDelta()

      || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))

      || forceFullRegistryFetch

      || (applications == null)

      || (applications.getRegisteredApplications().size() == 0)

      || (applications.getVersion() == -1)) **//Client application does not have latest library supporting delta**

    {logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());

      logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());

      logger.info("Force full registry fetch : {}", forceFullRegistryFetch);

     logger.info("Application is null : {}", (applications == null));

      logger.info("Registered Applications size is zero : {}",

            (applications.getRegisteredApplications().size() == 0));

      logger.info("Application version is -1: {}", (applications.getVersion() == -1));

      getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);

    }

    applications.setAppsHashCode(applications.getReconcileHashCode());

    logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);

    return false;

  } finally {if (tracer != null) {tracer.stop();

    }

  }

 

  **// Notify about cache refresh before updating the instance remote status**

  onCacheRefreshed();

 

  **// Update remote status based on refreshed data held in the cache**

  updateInstanceRemoteStatus();

 

  **// registry was fetched successfully, so return true**

  return true;

}

​ 客户端拉取服务端保留的所有客户端节点信息保留工夫为 3 分钟,Eureka client 获得的数据尽管是增量更新,依然可能和 30 秒前取的数据一样,所以 Eureka client 要本人来解决反复信息。

另外,留神到在 fetchRegistry()办法中:

 applications.setAppsHashCode(applications.getReconcileHashCode());

​ 每次增量更新,服务端都会带过去一个一致性 hash 码。Eureka client 的增量更新,其实获取的是 Eureka server 最近三分钟内的变更,如果 Eureka client 有超过三分钟没有做增量更新的话(例如网络问题),这就造成了 Eureka server 和 Eureka client 之间的数据不统一。失常状况下,Eureka client 屡次增量更新后,最终的服务列表数据应该 Eureka server 保持一致,但如果期间产生异样,可能导致和 Eureka server 的数据不统一,为了裸露这个问题,Eureka server 每次返回的增量更新数据中,会带有一致性哈希码,Eureka client 用本地服务列表数据算出的一致性哈希码应该和 Eureka server 返回的统一,若不统一就证实增量更新出了问题导致 Eureka client 和 Eureka server 上的服务列表信息不统一了,此时须要全量更新。

对于客户端的代码剖析就到这里,可从以下两个角度去剖析:

1. 从启动类动手,查看初始化了什么;

2. 从启动日志动手,查看启动类做了什么。

最初,放上在微服务中 Eureka 的 Server 与 Client 启动流程汇总图,图示过程如下图所示:


正文完
 0