关于java:Spring-Cloud-Eureka源码分析之服务注册的流程与数据存储设计

4次阅读

共计 28416 个字符,预计需要花费 72 分钟才能阅读完成。

Spring Cloud 是一个生态,它提供了一套规范,这套规范能够通过不同的组件来实现,其中就蕴含服务注册 / 发现、熔断、负载平衡等,在 spring-cloud-common 这个包中,org.springframework.cloud.client.serviceregistry门路下,能够看到一个服务注册的接口定义ServiceRegistry。它就是定义了 spring cloud 中服务注册的一个接口。

public interface ServiceRegistry<R extends Registration> {void register(R registration);

    void deregister(R registration);

    void close();

    void setStatus(R registration, String status);

    <T> T getStatus(R registration);
}

咱们看一下它的类关系图,这个接口有一个惟一的实现EurekaServiceRegistry。示意采纳的是 Eureka Server 作为服务注册核心。

主动注册的触发机制

Eureka 主动注册,是通过 EurekaAutoServiceRegistration 这个对象来触发的。

在 Spring Boot 我的项目启动时,会基于主动拆卸机制,在 EurekaClientAutoConfiguration 这个配置类中,初始化一个 EurekaAutoServiceRegistration 这个 Bean 对象,代码如下。

public class EurekaClientAutoConfiguration {
    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(
        value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
    public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
        ApplicationContext context, EurekaServiceRegistry registry,
        EurekaRegistration registration) {return new EurekaAutoServiceRegistration(context, registry, registration);
    }
}

EurekaAutoServiceRegistration 这个类的定义如下。

public class EurekaAutoServiceRegistration implements AutoServiceRegistration,
SmartLifecycle, Ordered, SmartApplicationListener {
    // 省略
    @Override
    public void start() {
        // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
        if (this.port.get() != 0) {if (this.registration.getNonSecurePort() == 0) {this.registration.setNonSecurePort(this.port.get());
            }

            if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {this.registration.setSecurePort(this.port.get());
            }
        }

        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {this.serviceRegistry.register(this.registration);

            this.context.publishEvent(new InstanceRegisteredEvent<>(this,
                                                                    this.registration.getInstanceConfig()));
            this.running.set(true);
        }
    }
    // 省略...
}

咱们发现,EurekaAutoServiceRegistration 实现了 SmartLifecycle 接口,当 Spring 容器加载完所有的 Bean 并且初始化之后,会持续回调实现了 SmartLifeCycle 接口的类中对应的办法,比方(start)。

SmartLifeCycle 常识拓展

我拓展一下 SmartLifeCycle 这块的常识,SmartLifeCycle 是一个接口,当 Spring 容器加载完所有的 Bean 并且初始化之后,会持续回调实现了 SmartLifeCycle 接口的类中对应的办法,比方(start)。

实际上咱们本人也能够拓展,比方在 springboot 工程的 main 办法同级目录下,写一个测试类,实现 SmartLifeCycle 接口,并且通过 @Service 申明为一个 bean,因为要被 spring 去加载,首先得是 bean。

@Service
public class TestSmartLifeCycle implements SmartLifecycle {
    @Override
    public void start() {System.out.println("start");
    }

    @Override
    public void stop() {System.out.println("stop");
    }

    @Override
    public boolean isRunning() {return false;}
}

接着,咱们启动 spring boot 利用后,能够看到控制台输入了 start 字符串。

咱们在 DefaultLifecycleProcessor.startBeans 办法上加一个 debug,能够很显著的看到咱们本人定义的 TestSmartLifeCycle 被扫描到了,并且最初会调用该 bean 的 start 办法。

在 startBeans 办法中,咱们能够看到它首先会取得所有实现了 SmartLifeCycle 的 Bean,而后会循环调用实现了 SmartLifeCycle 的 bean 的 start 办法,代码如下。

private void startBeans(boolean autoStartupOnly) {Map<String, Lifecycle> lifecycleBeans = this.getLifecycleBeans();
    Map<Integer, DefaultLifecycleProcessor.LifecycleGroup> phases = new HashMap();
    lifecycleBeans.forEach((beanName, bean) -> {if (!autoStartupOnly || bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()) {int phase = this.getPhase(bean);
            DefaultLifecycleProcessor.LifecycleGroup group = (DefaultLifecycleProcessor.LifecycleGroup)phases.get(phase);
            if (group == null) {group = new DefaultLifecycleProcessor.LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
                phases.put(phase, group);
            }

            group.add(beanName, bean);
        }

    });
    if (!phases.isEmpty()) {List<Integer> keys = new ArrayList(phases.keySet());
        Collections.sort(keys);
        Iterator var5 = keys.iterator();

        while(var5.hasNext()) {Integer key = (Integer)var5.next();
            ((DefaultLifecycleProcessor.LifecycleGroup)phases.get(key)).start(); // 循环调用实现了 SmartLifeCycle 接口的 start 办法。}
    }

}

SmartLifeCycle 接口的回调,是在 SpringBoot 启动时触发,具体的执行门路如下!

SpringApplication.run() -> this.refreshContext(context);->this.refresh(context);->ServletWebServerApplicationContext.refresh()->this.finishRefresh();->AbstractApplicationContext.finishRefresh->DefaultLifecycleProcessor.onRefresh()-> this.startBeans->this.start()->this.doStart()->

服务注册

因而,当 SpringBoot 启动时,会触发在 EurekaAutoServiceRegistration 中的 start 办法,代码如下。

public class EurekaAutoServiceRegistration implements AutoServiceRegistration,
SmartLifecycle, Ordered, SmartApplicationListener {
    // 省略
    @Override
    public void start() {
        // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
        if (this.port.get() != 0) {if (this.registration.getNonSecurePort() == 0) {this.registration.setNonSecurePort(this.port.get());
            }

            if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {this.registration.setSecurePort(this.port.get());
            }
        }

        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
            // 实现服务注册。this.serviceRegistry.register(this.registration);
            // 公布一个事件
            this.context.publishEvent(new InstanceRegisteredEvent<>(this,
                                                                    this.registration.getInstanceConfig()));
            this.running.set(true);
        }
    }
    // 省略...
}

EurekaServiceRegistry.register

this.serviceRegistry.register(this.registration);,理论调用的是 EurekaServiceRegistry 这个对象中的 register 办法,代码如下。

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);

    @Override
    public void register(EurekaRegistration reg) {maybeInitializeClient(reg);

        if (log.isInfoEnabled()) {
            log.info("Registering application"
                    + reg.getApplicationInfoManager().getInfo().getAppName()
                    + "with eureka with status"
                    + reg.getInstanceConfig().getInitialStatus());
        }
 // 设置以后实例的状态,一旦这个实例的状态发生变化,只有状态不是 DOWN,那么就会被监听器监听并且执行服务注册。reg.getApplicationInfoManager()
                .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
    // 设置健康检查的解决
        reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
                .getEurekaClient().registerHealthCheck(healthCheckHandler));
    }
}

从上述代码来看,注册办法中并没有真正调用 Eureka 的办法去执行注册,而是仅仅设置了一个状态以及设置健康检查处理器。咱们持续看一下 reg.getApplicationInfoManager().setInstanceStatus 办法。

public synchronized void setInstanceStatus(InstanceStatus status) {InstanceStatus next = instanceStatusMapper.map(status);
    if (next == null) {return;}

    InstanceStatus prev = instanceInfo.setStatus(next);
    if (prev != null) {for (StatusChangeListener listener : listeners.values()) {
            try {listener.notify(new StatusChangeEvent(prev, next));
            } catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);
            }
        }
    }
}

在这个办法中,它会通过监听器来公布一个状态变更事件。ok,此时 listener 的实例是 StatusChangeListener,也就是调用StatusChangeListenernotify()办法。这个事件是触发一个服务状态变更,应该是有中央会监听这个事件,而后基于这个事件。

这个时候咱们认为找到了方向,而后点击进去一看,卞击,发现它是一个接口。而且咱们发现它是动态的外部接口,还无奈间接看到它的实现类。

依我多年源码浏览教训,于是又往回找,因为我基本上能猜测到肯定是在某个中央做了初始化的工作,于是,我想找到 EurekaServiceRegistry.register 办法中的 reg.getApplicationInfoManager 这个实例是什么,而且咱们发现 ApplicationInfoManager 是来自于 EurekaRegistration 这个类中的属性。

public class EurekaRegistration implements Registration {

    private final ApplicationInfoManager applicationInfoManager;

    private ObjectProvider<HealthCheckHandler> healthCheckHandler;

    private EurekaRegistration(CloudEurekaInstanceConfig instanceConfig,
                               EurekaClient eurekaClient, ApplicationInfoManager applicationInfoManager,
                               ObjectProvider<HealthCheckHandler> healthCheckHandler) {
        this.eurekaClient = eurekaClient;
        this.instanceConfig = instanceConfig;
        this.applicationInfoManager = applicationInfoManager;
        this.healthCheckHandler = healthCheckHandler;
    }
}

EurekaRegistration 又是在 EurekaAutoServiceRegistration 这个类中实例化的。

那咱们去 EurekaAutoServiceRegistration 这个配置类中,找一下 applicationInfoManager 的实例化过程,代码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {

    @Bean
    @ConditionalOnMissingBean(value = ApplicationInfoManager.class,
                              search = SearchStrategy.CURRENT)
    @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
        return new ApplicationInfoManager(config, instanceInfo);  // 构建了一个 ApplicationInfoManager 实例。}

}

在 ApplicationInfoManager 的构造方法中,初始化了一个 listeners 对象,它是一个 ConcurrentHashMap 汇合,然而初始化的时候,这个汇合并没有赋值。

@Inject
public ApplicationInfoManager(EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) {
    this.config = config;
    this.instanceInfo = instanceInfo;
    this.listeners = new ConcurrentHashMap<String, StatusChangeListener>();
    if (optionalArgs != null) {this.instanceStatusMapper = optionalArgs.getInstanceStatusMapper();
    } else {this.instanceStatusMapper = NO_OP_MAPPER;}

    // Hack to allow for getInstance() to use the DI'd ApplicationInfoManager
    instance = this;
}

遇到这个问题,咱们先别慌,先来看一下 ApplicationInfoManager 这个类中对 listeners 赋值的办法如下。

public void registerStatusChangeListener(StatusChangeListener listener) {listeners.put(listener.getId(), listener);
}

这个办法惟一的调用方是:DiscoveryClient.initScheduledTasks 办法。

这个办法又是在哪里调用的呢?

DiscoveryClient

EurekaClientAutoConfiguration 这个主动配置类的动态外部类 EurekaClientConfiguration 中,通过 @Bean 注入了一个 CloudEurekaClient 实例,代码如下。

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {

    @Autowired
    private ApplicationContext context;

    @Autowired
    private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class,
                              search = SearchStrategy.CURRENT)
    public EurekaClient eurekaClient(ApplicationInfoManager manager,
                                     EurekaClientConfig config) {
        return new CloudEurekaClient(manager, config, this.optionalArgs,
                                     this.context);
    }
}

从名字不难猜测出,EurekaClient 应该是专门负责和 EurekaServer 进行交互的客户端实现类,而这里返回的实例对象是CloudEurekaClient,结构代码如下。

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                         EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
                         ApplicationEventPublisher publisher) {super(applicationInfoManager, config, args);
    this.applicationInfoManager = applicationInfoManager;
    this.publisher = publisher;
    this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
                                                          "eurekaTransport");
    ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

留神,在 CloudEurekaClient 这个类的构造方法中,传递了 ApplicationInfoManager 这个实例,后续会用到。

同时,该构造方法中会同步调用 super(applicationInfoManager, config, args);,也就是调用父类DiscoveryClient 的构造方法,代码如下。

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {this(applicationInfoManager, config, args, ResolverUtils::randomize);
}

最终会调用 DiscoveryClient 中重载的如下办法,代码比拟长,把非关键代码省略。

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    // 省略....

    if (config.shouldFetchRegistry()) { // 是否要从 eureka server 上获取服务地址信息
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}
    // 是否要注册到 eureka server 上
    if (config.shouldRegisterWithEureka()) {this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
  // 如果不须要注册并且不须要更新服务地址
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        initRegistrySize = this.getApplications().size();
        registrySize = initRegistrySize;
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, initRegistrySize);

        return;  // no need to setup up an network tasks and we are done
    }

    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        // 构建一个延期执行的线程池
        scheduler = Executors.newScheduledThreadPool(2,
                                                     new ThreadFactoryBuilder()
                                                     .setNameFormat("DiscoveryClient-%d")
                                                     .setDaemon(true)
                                                     .build());
        // 解决心跳的线程池
        heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
            .setDaemon(true)
            .build());  // use direct handoff
        // 解决缓存刷新的线程池
        cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
            .setDaemon(true)
            .build());  // use direct handoff

        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);

        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }
   
    // 如果须要注册到 Eureka server 并且是开启了初始化的时候强制注册,则调用 register()发动服务注册
    if (clientConfig.shouldFetchRegistry()) {
        try {
            // 从 Eureka-Server 中拉去注册地址信息
            boolean primaryFetchRegistryResult = fetchRegistry(false);
            if (!primaryFetchRegistryResult) {logger.info("Initial registry fetch from primary servers failed");
            }
            // 从备用地址拉去服务注册信息
            boolean backupFetchRegistryResult = true;
            if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                backupFetchRegistryResult = false;
                logger.info("Initial registry fetch from backup servers failed");
            }
            // 如果还是没有拉取到,并且配置了强制拉取注册表的话,就会抛异样
            if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
            }
        } catch (Throwable th) {logger.error("Fetch registry error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }
    
    // call and execute the pre registration handler before all background tasks (inc registration) is started
    // 这里是判断一下有没有预注册处理器,有的话就执行一下
    if (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();
    }
      // 如果须要注册到 Eureka server 并且是开启了初始化的时候强制注册,则调用 register()发动服务注册(默认状况下,shouldEnforceRegistrationAtInit 为 false)
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    // 初始化一个定时工作,负责心跳、实例数据更新
    initScheduledTasks();

    try {Monitors.registerObject(this);
    } catch (Throwable e) {logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    initRegistrySize = this.getApplications().size();
    registrySize = initRegistrySize;
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, initRegistrySize);
}

DiscoveryClient.initScheduledTasks

initScheduledTasks去启动一个定时工作。

  • 如果配置了开启从注册核心刷新服务列表,则会开启 cacheRefreshExecutor 这个定时工作
  • 如果开启了服务注册到 Eureka,则通过须要做几个事件.

    • 建设心跳检测机制
    • 通过外部类来实例化 StatusChangeListener 实例状态监控接口,这个就是后面咱们在剖析启动过程中所看到的,调用 notify 的办法,实际上会在这里体现。
private void initScheduledTasks() {
     // 如果配置了开启从注册核心刷新服务列表,则会开启 cacheRefreshExecutor 这个定时工作
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        //registryFetchIntervalSeconds:30s
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        //expBackOffBound:10
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
            "cacheRefresh",
            scheduler,
            cacheRefreshExecutor,
            registryFetchIntervalSeconds,
            TimeUnit.SECONDS,
            expBackOffBound,
            new CacheRefreshThread());
        scheduler.schedule(
            cacheRefreshTask,
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
      // 如果开启了服务注册到 Eureka,则通过须要做几个事件
    if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor:" + "renew interval is: {}", renewalIntervalInSecs);

        // 开启一个心跳工作
        heartbeatTask = new TimedSupervisorTask(
            "heartbeat",
            scheduler,
            heartbeatExecutor,
            renewalIntervalInSecs,
            TimeUnit.SECONDS,
            expBackOffBound,
            new HeartbeatThread());
        scheduler.schedule(
            heartbeatTask,
            renewalIntervalInSecs, TimeUnit.SECONDS);

        // 创立一个 instanceInfoReplicator 实例信息复制器
        instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2); // burstSize
        // 初始化一个状态变更监听器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {return "statusChangeListener";}

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {logger.info("Saw local status change event {}", statusChangeEvent);
                instanceInfoReplicator.onDemandUpdate();}
        };
               // 注册实例状态变动的监听
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener); // 留神(case)}
  // 启动一个实例信息复制器,次要就是为了开启一个定时线程,每 40 秒判断实例信息是否变更,如果变更了则从新注册
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {logger.info("Not registering with Eureka server per configuration");
    }
}

在上述代码中,咱们发现了一个很重要的逻辑:applicationInfoManager.registerStatusChangeListener(statusChangeListener);

这个代码是注册一个 StatusChangeListener,保留到 ApplicationInfoManager 中的 listener 汇合中。(还记得后面源码剖析中的服务注册逻辑吗?当服务器启动或者进行时,会调用 ApplicationInfoManager.listener,一一遍历调用 listener.notify 办法),而这个listener 汇合中的对象是在 DiscoveryClient 初始化的时候实现的。

instanceInfoReplicator.onDemandUpdate()

这个办法的次要作用是依据实例数据是否发生变化,来触发服务注册核心的数据。

public boolean onDemandUpdate() {if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {    // 限流判断
        if (!scheduler.isShutdown()) { // 提交一个工作
            scheduler.submit(new Runnable() {
                @Override
                public void run() {logger.debug("Executing on-demand update of local InstanceInfo");
                   // 取出之前曾经提交的工作,也就是在 start 办法中提交的更新工作,如果工作还没有执行实现,则勾销之前的工作。Future latestPeriodic = scheduledPeriodicRef.get();
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);// 如果此工作未实现,就立刻勾销
                    }
                  // 通过调用 run 办法,令工作在延时后执行,相当于周期性工作中的一次
                    InstanceInfoReplicator.this.run();}
            });
            return true;
        } else {logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false;
        }
    } else {logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

InstanceInfoReplicator.this.run();

run 办法调用 register 办法进行服务注册,并且在 finally 中,每 30s 会定时执行一下以后的 run 办法进行查看。

public void run() {
    try {
        // 刷新实例信息
        discoveryClient.refreshInstanceInfo();
        // 是否有状态更新过了,有的话获取更新的工夫
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {// 有脏数据,要从新注册
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        // 每隔 30s,执行一次以后的 `run` 办法
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

DiscoveryClient.register

记功上述剖析后,最终咱们找到了 Eureka 的服务注册办法:eurekaTransport.registrationClient.register,最终调用的是 AbstractJerseyEurekaHttpClient#register(…)。

boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

AbstractJerseyEurekaHttpClient#register

很显然,这里是发动了一次 http 申请,拜访 Eureka-Server 的 apps/${APP_NAME}接口,将以后服务实例的信息发送到 Eureka Server 进行保留。

至此,咱们基本上曾经晓得 Spring Cloud Eureka 是如何在启动的时候把服务信息注册到 Eureka Server 上的了

public EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
            .header("Accept-Encoding", "gzip")
            .type(MediaType.APPLICATION_JSON_TYPE)
            .accept(MediaType.APPLICATION_JSON)
            .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                         response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {response.close();
        }
    }
}

服务注册总结

服务注册的过程分两个步骤

  1. DiscoveryClient 这个对象,在初始化时,调用 initScheduledTask() 办法,构建一个 StatusChangeListener 监听。
  2. Spring Cloud 利用在启动时,基于 SmartLifeCycle 接口回调,触发 StatusChangeListener 事件告诉
  3. 在 StatusChangeListener 的回调办法中,通过调用 onDemandUpdate 办法,去更新客户端的地址信息,从而实现服务注册。

Eureka 注册信息如何存储?

Eureka Server 收到客户端的服务注册申请后,须要把信息存储到 Eureka Server 中,它的存储构造如下图所示。

EurekaServer 采纳了 ConcurrentHashMap 汇合的形式。来存储服务提供者的地址信息,其中,每个节点的实例信息的最终存储对象是 InstanceInfo。>

Eureka Server 接管申请解决

申请入口在:com.netflix.eureka.resources.ApplicationResource.addInstance()

大家能够发现,这里所提供的 REST 服务,采纳的是 jersey 来实现的。Jersey 是基于 JAX-RS 规范,提供 REST 的实现的反对,这里就不开展剖析了。

Eureka Server 端定义的服务注册接口实现如下:

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
      
    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    // 实例部署的数据中心,这里是 AWS 实现的数据相干的逻辑,这里不波及到,所以不须要去关怀
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {String entity = "DataCenterInfo of type" + dataCenterInfo.getClass() + "must contain a valid id";
                return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }
   // 在这里会调用服务注册办法,传递 `info`,示意客户端的服务实例信息。registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible}

PeerAwareInstanceRegistryImpl.register

咱们先来看 PeerAwareInstanceRegistryImpl 的类关系图,从类关系图能够看出,PeerAwareInstanceRegistry 的最顶层接口为 LeaseManager 与 LookupService,

  • 其中 LookupService 定义了最根本的发现实例的行为。
  • LeaseManager 定义了解决客户端注册,续约,登记等操作。

InstanceRegistry.register

接着进入到 InstanceRegistry 的 register 办法,在这个办法中,减少了一个 handleRegistration 办法的调用,这个办法用来公布一个 EurekaInstanceRegisteredEvent 事件。

@Override
public void register(final InstanceInfo info, final boolean isReplication) {handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
   super.register(info, isReplication);
}

父类的 register 办法

接着调用父类 PeerAwareInstanceRegistryImpl 的 register 办法,代码如下。

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;  // 租约过期工夫
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { // 如果客户端有本人定义心跳超时工夫,则采纳客户端的
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);  // 节点注册
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); // 把注册信息同步到其余集群节点。}

其中:

  • leaseDuration 示意租约过期工夫,默认是 90s,也就是当服务端超过 90s 没有收到客户端的心跳,则被动剔除该节点
  • 调用 super.register 发动节点注册
  • 将信息复制到 Eureka Server 集群中的其余机器上,同步的实现也很简略,就是取得集群中的所有节点,而后一一发动注册

AbstractInstanceRegistry.register

最终在这个抽象类的实例注册类中实现服务注册的实现,代码如下。

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {read.lock();
    try {
        // 从 registry 中取得以后实例信息,依据 appName,registry 中保留了所有客户端的实例数据
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);  // 原子递增,做数据统计
        if (gMap == null) { // 如果 gMap 为空,阐明以后服务端没有保留该实例数据,则通过上面代码进行初始化
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {gMap = gNewMap;}
        } 
        // 从 gMap 中查问曾经存在的 Lease 信息,Lease 中文翻译为租约,实际上它把服务提供者的实例信息包装成了一个 lease,外面提供了对于改服务实例的租约治理
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        //  当 instance 曾经存在时,和客户端的 instance 的信息做比拟,工夫最新的那个,为无效 instance 信息
        if (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
              // 比拟 lastDirtyTimestamp,以 lastDirtyTimestamp 大的为准
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();  // 从新赋值 registrant 为服务端最新的实例信息}
        } else {
            // 如果 lease 不存在,则认为是一个新的实例信息,执行上面这段代码(后续独自剖析它的作用)synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();}
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        // 创立一个 Lease 租约信息
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {  // 当原来存在 Lease 的信息时,设置 serviceUpTimestamp, 保障服务启动的工夫始终是第一次注册的那个(防止状态变更影响到服务启动工夫)lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);  // 把以后服务实例保留到 gMap 中。recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),
            registrant.getAppName() + "(" + registrant.getId() + ")"));
        // This is where the initial state transfer of overridden status happens
        // 如果实例状态不等于 UNKNOWN,则把以后实例状态增加到 overriddenInstanceStatusMap 中
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the"
                         + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        // 重写实例状态
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 设置实例信息的状态,但不标记 dirty

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) { // 如果服务实例信息为 UP 状态,则更新该实例的启动工夫。lease.serviceUp();}
        registrant.setActionType(ActionType.ADDED); // 设置注册类型为增加
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));  // 租约变更记录队列,记录了实例的每次变动,用于注册信息的增量获取
        registrant.setLastUpdatedTimestamp(); // 批改最初一次更新工夫
        // 让缓存生效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {read.unlock();
    }
}

EurekaServer 注册信息存储总结

至此,咱们就把服务注册在客户端和服务端的处理过程做了一个具体的剖析,实际上在 Eureka Server 端,会把客户端的地址信息保留到 ConcurrentHashMap 中存储。并且服务提供者和注册核心之间,会建设一个心跳检测机制。

用于监控服务提供者的衰弱状态。

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

正文完
 0