作为一个server端,他有以下性能:

  1. 服务同步(replicate):Eureka Server集群会相互同步数据,放弃最终一致性。
  2. 服务剔除(evict):把可能故障的服务剔除下线。
  3. 响应客户端的服务注册、服务发现、服务续约、服务下线等申请。

EurekaServerAutoConfiguration

Eureka - 简略示例曾经看到,Server端是须要@EnableEurekaServer注解的。EnableEurekaServer中有个@Import(EurekaServerMarkerConfiguration.class)

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(EurekaServerMarkerConfiguration.class)public @interface EnableEurekaServer {}

EurekaServerMarkerConfiguration做的次要事件就是加载Marker。

@Configuration(proxyBeanMethods = false)public class EurekaServerMarkerConfiguration {    @Bean    public Marker eurekaServerMarkerBean() {        return new Marker();    }    class Marker {    }}

综上,@EnableEurekaServer做的事件就是加载Marker,Marker类什么也没有,那这个Marker是干嘛用的呢?
springboot的主动拆卸读取Eureka Server的spring.factories文件,咱们看到EurekaServerAutoConfiguration类,这个类有个@ConditionalOnBean的注解,刚好就是下面的Marker类,所以引入了@EnableEurekaServer就是创立了Marker类,让EurekaServerAutoConfiguration能够加载。

@Configuration(proxyBeanMethods = false)@Import(EurekaServerInitializerConfiguration.class)@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)@EnableConfigurationProperties({ EurekaDashboardProperties.class,        InstanceRegistryProperties.class })@PropertySource("classpath:/eureka/server.properties")public class EurekaServerAutoConfiguration implements WebMvcConfigurer {    // 其余略}

EurekaServerConfig

EurekaServerConfig的加载,eureka.server前缀的配置都是这个类里。这里默认了向服务器读取失败的重试的次数是5。

@Configuration(proxyBeanMethods = false)protected static class EurekaServerConfigBeanConfiguration {    @Bean    @ConditionalOnMissingBean    public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {        EurekaServerConfigBean server = new EurekaServerConfigBean();        if (clientConfig.shouldRegisterWithEureka()) {            // Set a sensible default if we are supposed to replicate            server.setRegistrySyncRetries(5);        }        return server;    }}

PeerAwareInstanceRegistry

次要是用于集群注册表,这里的构造函数会开启一个线程,用来清理过期的增量信息。

@Beanpublic PeerAwareInstanceRegistry peerAwareInstanceRegistry(        ServerCodecs serverCodecs) {    this.eurekaClient.getApplications(); // force initialization    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,            serverCodecs, this.eurekaClient,            this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());}protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {    this.serverConfig = serverConfig;    this.clientConfig = clientConfig;    this.serverCodecs = serverCodecs;    this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);    this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);    this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);    // 清理过期的增量信息,默认30秒    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),            serverConfig.getDeltaRetentionTimerIntervalInMs(),            serverConfig.getDeltaRetentionTimerIntervalInMs());}private TimerTask getDeltaRetentionTask() {    return new TimerTask() {        @Override        public void run() {            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();            while (it.hasNext()) {                // 如果小于客户端放弃增量信息缓存的工夫,就删除                if (it.next().getLastUpdateTime() <                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {                    it.remove();                } else {                    break;                }            }        }    };}

PeerEurekaNodes

PeerEurekaNodes次要是用于保留其余集群节点的信息,用于同步集群之间的数据

@Bean@ConditionalOnMissingBeanpublic PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,        ServerCodecs serverCodecs,        ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {    return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,            this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,            replicationClientAdditionalFilters);}

EurekaServerContext

EurekaServerContext次要是EurekaServer的上下文信息。

@Bean@ConditionalOnMissingBeanpublic EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,        PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,            registry, peerEurekaNodes, this.applicationInfoManager);}

EurekaServerContext加载的时候,因为@PostConstruct注解,会调用initialize的办法。而后就会调用PeerEurekaNodes#start()和PeerAwareInstanceRegistry#init。

@PostConstruct@Overridepublic void initialize() {    logger.info("Initializing ...");    peerEurekaNodes.start();    try {        registry.init(peerEurekaNodes);    } catch (Exception e) {        throw new RuntimeException(e);    }    logger.info("Initialized");}

PeerEurekaNodes#start()

咱们先看看PeerEurekaNodes#start(),次要是开启一个线程,读取其余节点的信息并更新。

public void start() {    taskExecutor = Executors.newSingleThreadScheduledExecutor(            new ThreadFactory() {                @Override                public Thread newThread(Runnable r) {                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");                    thread.setDaemon(true);                    return thread;                }            }    );    try {        // 更新节点        // resolvePeerUrls次要是从节点从剔除本身节点,这里不细说        updatePeerEurekaNodes(resolvePeerUrls());        Runnable peersUpdateTask = new Runnable() {            @Override            public void run() {                try {                    updatePeerEurekaNodes(resolvePeerUrls());                } catch (Throwable e) {                    logger.error("Cannot update the replica Nodes", e);                }            }        };        // 默认每10分钟更新        taskExecutor.scheduleWithFixedDelay(                peersUpdateTask,                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),                TimeUnit.MILLISECONDS        );    } catch (Exception e) {        throw new IllegalStateException(e);    }    for (PeerEurekaNode node : peerEurekaNodes) {        logger.info("Replica node URL:  {}", node.getServiceUrl());    }}

对节点的新增、删除。createPeerEurekaNode的办法在EurekaServerAutoConfiguration中。

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {    if (newPeerUrls.isEmpty()) {        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");        return;    }    // 获取旧的节点    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);    // 旧的(1,2,3)-新的(2,3,4),剩下的(1)就是能够删除的节点    toShutdown.removeAll(newPeerUrls);    Set<String> toAdd = new HashSet<>(newPeerUrls);        // 新的(2,3,4)-旧的(1,2,3),剩下的(4)就是能够新增的节点    toAdd.removeAll(peerEurekaNodeUrls);    // 没有新增没有移除的,阐明没有扭转    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change        return;    }    // Remove peers no long available    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);    // 移除能够删除的节点    if (!toShutdown.isEmpty()) {        logger.info("Removing no longer available peer nodes {}", toShutdown);        int i = 0;        while (i < newNodeList.size()) {            PeerEurekaNode eurekaNode = newNodeList.get(i);            if (toShutdown.contains(eurekaNode.getServiceUrl())) {                newNodeList.remove(i);                eurekaNode.shutDown();            } else {                i++;            }        }    }    // Add new peers    // 新增新节点    if (!toAdd.isEmpty()) {        logger.info("Adding new peer nodes {}", toAdd);        for (String peerUrl : toAdd) {            newNodeList.add(createPeerEurekaNode(peerUrl));        }    }    // 从新赋值    this.peerEurekaNodes = newNodeList;    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);}

PeerAwareInstanceRegistry#init

次要是更新只读响应缓存和开启自我爱护

@Overridepublic void init(PeerEurekaNodes peerEurekaNodes) throws Exception {    this.numberOfReplicationsLastMin.start();    this.peerEurekaNodes = peerEurekaNodes;    // 更新只读响应缓存    initializedResponseCache();    //开启自我爱护    scheduleRenewalThresholdUpdateTask();    initRemoteRegionRegistry();    try {        Monitors.registerObject(this);    } catch (Throwable e) {        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);    }}

initializedResponseCache,更新只读响应缓存

@Overridepublic synchronized void initializedResponseCache() {    if (responseCache == null) {        responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);    }}//ResponseCacheImpl构造函数略,会每个30秒调用TimerTask的fun办法private TimerTask getCacheUpdateTask() {    return new TimerTask() {        @Override        public void run() {            logger.debug("Updating the client cache from response cache");            // 迭代readOnlyCacheMap            for (Key key : readOnlyCacheMap.keySet()) {                if (logger.isDebugEnabled()) {                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",                            key.getEntityType(), key.getName(), key.getVersion(), key.getType());                }                try {                    CurrentRequestVersion.set(key.getVersion());                    Value cacheValue = readWriteCacheMap.get(key);                    Value currentCacheValue = readOnlyCacheMap.get(key);                    // 不统一时,进行替换                    if (cacheValue != currentCacheValue) {                        readOnlyCacheMap.put(key, cacheValue);                    }                } catch (Throwable th) {                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);                } finally {                    CurrentRequestVersion.remove();                }            }        }    };}

scheduleRenewalThresholdUpdateTask,依据自我爱护的频率定时调用updateRenewalThreshold办法,默认每15分钟。

private void scheduleRenewalThresholdUpdateTask() {    timer.schedule(new TimerTask() {                       @Override                       public void run() {                           updateRenewalThreshold();                       }                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),            serverConfig.getRenewalThresholdUpdateIntervalMs());}private void updateRenewalThreshold() {    try {        //获取以后的利用实例数        Applications apps = eurekaClient.getApplications();        int count = 0;        for (Application app : apps.getRegisteredApplications()) {            for (InstanceInfo instance : app.getInstances()) {                if (this.isRegisterable(instance)) {                    ++count;                }            }        }        synchronized (lock) {            // rRenewalPercentThreshold默认0.85,            // expectedNumberOfClientsSendingRenews冀望收到客户端续约的总数,每次有服务注册进来就加1            // selfPreservationModeEnabled是否开启自我保护模式,如果没有开启,则每次都会计算            if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)                    || (!this.isSelfPreservationModeEnabled())) {                // 重置expectedNumberOfClientsSendingRenews为以后实例数                this.expectedNumberOfClientsSendingRenews = count;                // 更新冀望最小每分钟续租次数,当每分钟心跳次数( renewsLastMin ) 小于 numberOfRenewsPerMinThreshold 时,                // 并且开启主动保护模式开关( eureka.enableSelfPreservation = true ) 时,触发主动爱护机制,不再主动过期租约                updateRenewsPerMinThreshold();            }        }        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);    } catch (Throwable e) {        logger.error("Cannot update renewal threshold", e);    }}protected void updateRenewsPerMinThreshold() {    // expectedClientRenewalIntervalSeconds 多久续订一次    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())            * serverConfig.getRenewalPercentThreshold());}

EurekaServerBootstrap

加载EurekaServerBootstrap

@Beanpublic EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,        EurekaServerContext serverContext) {    return new EurekaServerBootstrap(this.applicationInfoManager,            this.eurekaClientConfig, this.eurekaServerConfig, registry,            serverContext);}

以上几个类的关系图如下,完结了?就只有一个定时器解决节点以及自我爱护,还有其余的性能呢?EurekaServerAutoConfiguration还import了一个类--EurekaServerInitializerConfiguration。

EurekaServerInitializerConfiguration

EurekaServerInitializerConfiguration继承了SmartLifecycle接口,所以加载的时候会调用start办法,在这里开始了Eureka Server的启动过程。

@Overridepublic void start() {    new Thread(() -> {        try {            // Eureka Server 初始化及启动            eurekaServerBootstrap.contextInitialized(                    EurekaServerInitializerConfiguration.this.servletContext);            log.info("Started Eureka Server");            // 公布监听            publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));            // 开启状态设置true            EurekaServerInitializerConfiguration.this.running = true;            // 公布监听            publish(new EurekaServerStartedEvent(getEurekaServerConfig()));        }        catch (Exception ex) {            // Help!            log.error("Could not initialize Eureka servlet context", ex);        }    }).start();}

EurekaServerBootstrap#contextInitialized

这里次要是两件事,初始化环境信息、初始化上下文。初始化上下文中包含初始注册信以及开启线程定期剔除没有心跳的客户端。

public void contextInitialized(ServletContext context) {    try {        // 初始化环境信息        initEurekaEnvironment();        // 初始化上下文        initEurekaServerContext();        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);    }    catch (Throwable e) {        log.error("Cannot bootstrap eureka server :", e);        throw new RuntimeException("Cannot bootstrap eureka server :", e);    }}protected void initEurekaServerContext() throws Exception {    // 其余代码略    // 初始注册信息    int registryCount = this.registry.syncUp();    // 开启线程定期剔除没有心跳的客户端    this.registry.openForTraffic(this.applicationInfoManager, registryCount);    // 其余代码略}

PeerAwareInstanceRegistryImpl#syncUp()

@Overridepublic int syncUp() {    // 其余代码略,其余代码逻辑就是如果获取失败,就休眠serverConfig.getRegistrySyncRetryWaitMs()后持续重试    // 最多重试serverConfig.getRegistrySyncRetryWaitMs(),这个值下面加载EurekaServerConfigBeanConfiguration提过    // 把其余server的注册信息复制到本地    register(instance, instance.getLeaseInfo().getDurationInSecs(), true);    // 其余代码略}

注册信息复制到本地

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {    // 加读锁    read.lock();    try {        // 从本地获取示例信息        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());        REGISTER.increment(isReplication);        //为空就创立一个ConcurrentHashMap放入registry        if (gMap == null) {            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);            if (gMap == null) {                gMap = gNewMap;            }        }        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());        // 曾经存在的状况,看哪个工夫新用哪个        if (existingLease != null && (existingLease.getHolder() != null)) {            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();                                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {                                registrant = existingLease.getHolder();            }        } else {            // 不存在阐明新增,更新expectedNumberOfClientsSendingRenews以及numberOfRenewsPerMinThreshold            synchronized (lock) {                if (this.expectedNumberOfClientsSendingRenews > 0) {                    // Since the client wants to register it, increase the number of clients sending renews                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;                    updateRenewsPerMinThreshold();                }            }                    }        // 创立lease,leaseDuration是过期工夫,ServiceUpTimestamp服务启动工夫,沿用之前的,也就是说始终是第一次的        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);        if (existingLease != null) {            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());        }        // 新增或者更新gMap        gMap.put(registrant.getId(), lease);        recentRegisteredQueue.add(new Pair<Long, String>(                System.currentTimeMillis(),                registrant.getAppName() + "(" + registrant.getId() + ")"));        // This is where the initial state transfer of overridden status happens        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {                logger.info("Not found overridden id {} and hence adding it", registrant.getId());                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());            }        }        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());        if (overriddenStatusFromMap != null) {            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);            registrant.setOverriddenStatus(overriddenStatusFromMap);        }        // Set the status based on the overridden status rules        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);        registrant.setStatusWithoutDirty(overriddenInstanceStatus);        // If the lease is registered with UP status, set lease service up timestamp        // 更新状态        if (InstanceStatus.UP.equals(registrant.getStatus())) {            lease.serviceUp();        }        // 操作类型为新增        registrant.setActionType(ActionType.ADDED);        recentlyChangedQueue.add(new RecentlyChangedItem(lease));        // 最初更新工夫戳        registrant.setLastUpdatedTimestamp();        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());        logger.info("Registered instance {}/{} with status {} (replication={})",                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);    } finally {        // 解锁        read.unlock();    }}

PeerAwareInstanceRegistryImpl#openForTraffic

服务的定期剔除就是这个这里开启的线程。

@Overridepublic void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {    //其余略    super.postInit();}

AbstractInstanceRegistry#postInit,开启线程,工夫默认60s

protected void postInit() {    renewsLastMin.start();    if (evictionTaskRef.get() != null) {        //勾销之前的工作        evictionTaskRef.get().cancel();    }    evictionTaskRef.set(new EvictionTask());    //开启线程,工夫默认60s    evictionTimer.schedule(evictionTaskRef.get(),            serverConfig.getEvictionIntervalTimerInMs(),            serverConfig.getEvictionIntervalTimerInMs());}

EvictionTask#run,移除节点,这个要依据爱护机制,默认不能剔除15%的实例。

@Overridepublic void run() {    try {        // 计算弥补工夫,定时器执行的时候略有提早        long compensationTimeMs = getCompensationTimeMs();        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);        evict(compensationTimeMs);    } catch (Throwable e) {        logger.error("Could not run the evict task", e);    }}public void evict(long additionalLeaseMs) {    // 敞开了就不剔除    if (!isLeaseExpirationEnabled()) {                return;    }    // 获取过期的实例    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();        if (leaseMap != null) {            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {                Lease<InstanceInfo> lease = leaseEntry.getValue();                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {                    expiredLeases.add(lease);                }            }        }    }    // 计算最大可剔除的格局,默认保留85%,也就是最多剔除15%    int registrySize = (int) getLocalRegistrySize();    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());    int evictionLimit = registrySize - registrySizeThreshold;    int toEvict = Math.min(expiredLeases.size(), evictionLimit);    if (toEvict > 0) {        // 随机剔除        Random random = new Random(System.currentTimeMillis());        for (int i = 0; i < toEvict; i++) {            int next = i + random.nextInt(expiredLeases.size() - i);            Collections.swap(expiredLeases, i, next);            Lease<InstanceInfo> lease = expiredLeases.get(i);            String appName = lease.getHolder().getAppName();            String id = lease.getHolder().getId();            EXPIRED.increment();            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);            // 从registry.get(appName)中移除            internalCancel(appName, id, false);        }    }}

总结

Server服务启动除了加载以上的几个bean,还包含了每30革除增量数据recentlyChangedQueue、每10分钟更新集群节点、每30秒更新只读响应缓存、每15分钟更新更新冀望最小每分钟续租次数、每60s剔除过期服务。