关于eureka:Eureka-Server服务启动

8次阅读

共计 16649 个字符,预计需要花费 42 分钟才能阅读完成。

作为一个 server 端,他有以下性能:

  1. 服务同步(replicate):Eureka Server 集群会相互同步数据,放弃最终一致性。
  2. 服务剔除(evict):把可能故障的服务剔除下线。
  3. 响应客户端的服务注册、服务发现、服务续约、服务下线等申请。

EurekaServerAutoConfiguration

Eureka – 简略示例曾经看到,Server 端是须要 @EnableEurekaServer 注解的。EnableEurekaServer 中有个 @Import(EurekaServerMarkerConfiguration.class)

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {}

EurekaServerMarkerConfiguration 做的次要事件就是加载 Marker。

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
    @Bean
    public Marker eurekaServerMarkerBean() {return new Marker();
    }

    class Marker {}}

综上,@EnableEurekaServer 做的事件就是加载 Marker,Marker 类什么也没有,那这个 Marker 是干嘛用的呢?
springboot 的主动拆卸读取 Eureka Server 的 spring.factories 文件,咱们看到 EurekaServerAutoConfiguration 类,这个类有个 @ConditionalOnBean 的注解,刚好就是下面的 Marker 类,所以引入了 @EnableEurekaServer 就是创立了 Marker 类,让 EurekaServerAutoConfiguration 能够加载。

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {// 其余略}

EurekaServerConfig

EurekaServerConfig 的加载,eureka.server 前缀的配置都是这个类里。这里默认了向服务器读取失败的重试的次数是 5。

@Configuration(proxyBeanMethods = false)
protected static class EurekaServerConfigBeanConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {EurekaServerConfigBean server = new EurekaServerConfigBean();
        if (clientConfig.shouldRegisterWithEureka()) {
            // Set a sensible default if we are supposed to replicate
            server.setRegistrySyncRetries(5);
        }
        return server;
    }

}

PeerAwareInstanceRegistry

次要是用于集群注册表,这里的构造函数会开启一个线程,用来清理过期的增量信息。

@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {this.eurekaClient.getApplications(); // force initialization
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
            this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}

protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
    this.serverConfig = serverConfig;
    this.clientConfig = clientConfig;
    this.serverCodecs = serverCodecs;
    this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
    this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);

    this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
    // 清理过期的增量信息,默认 30 秒
    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
            serverConfig.getDeltaRetentionTimerIntervalInMs(),
            serverConfig.getDeltaRetentionTimerIntervalInMs());
}

private TimerTask getDeltaRetentionTask() {return new TimerTask() {

        @Override
        public void run() {Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                // 如果小于客户端放弃增量信息缓存的工夫,就删除
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {it.remove();
                } else {break;}
            }
        }

    };
}

PeerEurekaNodes

PeerEurekaNodes 次要是用于保留其余集群节点的信息,用于同步集群之间的数据

@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
        ServerCodecs serverCodecs,
        ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
    return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
            this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
            replicationClientAdditionalFilters);
}

EurekaServerContext

EurekaServerContext 次要是 EurekaServer 的上下文信息。

@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
        PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
            registry, peerEurekaNodes, this.applicationInfoManager);
}

EurekaServerContext 加载的时候,因为 @PostConstruct 注解,会调用 initialize 的办法。而后就会调用 PeerEurekaNodes#start()和 PeerAwareInstanceRegistry#init。

@PostConstruct
@Override
public void initialize() {logger.info("Initializing ...");
    peerEurekaNodes.start();
    try {registry.init(peerEurekaNodes);
    } catch (Exception e) {throw new RuntimeException(e);
    }
    logger.info("Initialized");
}

PeerEurekaNodes#start()

咱们先看看 PeerEurekaNodes#start(),次要是开启一个线程,读取其余节点的信息并更新。

public void start() {
    taskExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
        // 更新节点
        // resolvePeerUrls 次要是从节点从剔除本身节点,这里不细说
        updatePeerEurekaNodes(resolvePeerUrls());
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        // 默认每 10 分钟更新
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {logger.info("Replica node URL:  {}", node.getServiceUrl());
    }
}

对节点的新增、删除。createPeerEurekaNode 的办法在 EurekaServerAutoConfiguration 中。

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {if (newPeerUrls.isEmpty()) {logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
        return;
    }
    // 获取旧的节点
    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    // 旧的(1,2,3)- 新的(2,3,4),剩下的(1)就是能够删除的节点
    toShutdown.removeAll(newPeerUrls);
    Set<String> toAdd = new HashSet<>(newPeerUrls);    
    // 新的(2,3,4)- 旧的(1,2,3),剩下的(4)就是能够新增的节点
    toAdd.removeAll(peerEurekaNodeUrls);
    // 没有新增没有移除的,阐明没有扭转
    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
        return;
    }

    // Remove peers no long available
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
    // 移除能够删除的节点
    if (!toShutdown.isEmpty()) {logger.info("Removing no longer available peer nodes {}", toShutdown);
        int i = 0;
        while (i < newNodeList.size()) {PeerEurekaNode eurekaNode = newNodeList.get(i);
            if (toShutdown.contains(eurekaNode.getServiceUrl())) {newNodeList.remove(i);
                eurekaNode.shutDown();} else {i++;}
        }
    }

    // Add new peers
    // 新增新节点
    if (!toAdd.isEmpty()) {logger.info("Adding new peer nodes {}", toAdd);
        for (String peerUrl : toAdd) {newNodeList.add(createPeerEurekaNode(peerUrl));
        }
    }
    // 从新赋值
    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}

PeerAwareInstanceRegistry#init

次要是更新只读响应缓存和开启自我爱护

@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    // 更新只读响应缓存
    initializedResponseCache();
    // 开启自我爱护
    scheduleRenewalThresholdUpdateTask();
    initRemoteRegionRegistry();

    try {Monitors.registerObject(this);
    } catch (Throwable e) {logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}

initializedResponseCache,更新只读响应缓存

@Override
public synchronized void initializedResponseCache() {if (responseCache == null) {responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
    }
}
//ResponseCacheImpl 构造函数略,会每个 30 秒调用 TimerTask 的 fun 办法
private TimerTask getCacheUpdateTask() {return new TimerTask() {
        @Override
        public void run() {logger.debug("Updating the client cache from response cache");
            // 迭代 readOnlyCacheMap
            for (Key key : readOnlyCacheMap.keySet()) {if (logger.isDebugEnabled()) {logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                            key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    // 不统一时,进行替换
                    if (cacheValue != currentCacheValue) {readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {CurrentRequestVersion.remove();
                }
            }
        }
    };
}

scheduleRenewalThresholdUpdateTask,依据自我爱护的频率定时调用 updateRenewalThreshold 办法,默认每 15 分钟。

private void scheduleRenewalThresholdUpdateTask() {timer.schedule(new TimerTask() {
                       @Override
                       public void run() {updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());
}
private void updateRenewalThreshold() {
    try {
        // 获取以后的利用实例数
        Applications apps = eurekaClient.getApplications();
        int count = 0;
        for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {if (this.isRegisterable(instance)) {++count;}
            }
        }
        synchronized (lock) {
            // rRenewalPercentThreshold 默认 0.85,// expectedNumberOfClientsSendingRenews 冀望收到客户端续约的总数,每次有服务注册进来就加 1
            // selfPreservationModeEnabled 是否开启自我保护模式,如果没有开启,则每次都会计算
            if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                    || (!this.isSelfPreservationModeEnabled())) {
                // 重置 expectedNumberOfClientsSendingRenews 为以后实例数
                this.expectedNumberOfClientsSendingRenews = count;
                // 更新冀望最小每分钟续租次数,当每分钟心跳次数(renewsLastMin) 小于 numberOfRenewsPerMinThreshold 时,// 并且开启主动保护模式开关(eureka.enableSelfPreservation = true) 时,触发主动爱护机制,不再主动过期租约
                updateRenewsPerMinThreshold();}
        }
        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwable e) {logger.error("Cannot update renewal threshold", e);
    }
}

protected void updateRenewsPerMinThreshold() {
    // expectedClientRenewalIntervalSeconds 多久续订一次
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

EurekaServerBootstrap

加载 EurekaServerBootstrap

@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
        EurekaServerContext serverContext) {
    return new EurekaServerBootstrap(this.applicationInfoManager,
            this.eurekaClientConfig, this.eurekaServerConfig, registry,
            serverContext);
}

以上几个类的关系图如下,完结了?就只有一个定时器解决节点以及自我爱护,还有其余的性能呢?EurekaServerAutoConfiguration 还 import 了一个类 –EurekaServerInitializerConfiguration。

EurekaServerInitializerConfiguration

EurekaServerInitializerConfiguration 继承了 SmartLifecycle 接口,所以加载的时候会调用 start 办法,在这里开始了 Eureka Server 的启动过程。

@Override
public void start() {new Thread(() -> {
        try {
            // Eureka Server 初始化及启动
            eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
            log.info("Started Eureka Server");
            // 公布监听
            publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
            // 开启状态设置 true
            EurekaServerInitializerConfiguration.this.running = true;
            // 公布监听
            publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
        }
        catch (Exception ex) {
            // Help!
            log.error("Could not initialize Eureka servlet context", ex);
        }
    }).start();}

EurekaServerBootstrap#contextInitialized

这里次要是两件事,初始化环境信息、初始化上下文。初始化上下文中包含初始注册信以及开启线程定期剔除没有心跳的客户端。

public void contextInitialized(ServletContext context) {
    try {
        // 初始化环境信息
        initEurekaEnvironment();
        // 初始化上下文
        initEurekaServerContext();

        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}

protected void initEurekaServerContext() throws Exception {
    // 其余代码略
    // 初始注册信息
    int registryCount = this.registry.syncUp();
    // 开启线程定期剔除没有心跳的客户端
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    // 其余代码略
}

PeerAwareInstanceRegistryImpl#syncUp()

@Override
public int syncUp() {// 其余代码略,其余代码逻辑就是如果获取失败,就休眠 serverConfig.getRegistrySyncRetryWaitMs()后持续重试
    // 最多重试 serverConfig.getRegistrySyncRetryWaitMs(),这个值下面加载 EurekaServerConfigBeanConfiguration 提过
    // 把其余 server 的注册信息复制到本地
    register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
    // 其余代码略
}

注册信息复制到本地

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    // 加读锁
    read.lock();
    try {
        // 从本地获取示例信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        // 为空就创立一个 ConcurrentHashMap 放入 registry
        if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {gMap = gNewMap;}
        }
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // 曾经存在的状况,看哪个工夫新用哪个
        if (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();                    
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {registrant = existingLease.getHolder();
            }
        } else {
            // 不存在阐明新增,更新 expectedNumberOfClientsSendingRenews 以及 numberOfRenewsPerMinThreshold
            synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();}
            }            
        }
        // 创立 lease,leaseDuration 是过期工夫,ServiceUpTimestamp 服务启动工夫,沿用之前的,也就是说始终是第一次的
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 新增或者更新 gMap
        gMap.put(registrant.getId(), lease);
        recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),
                registrant.getAppName() + "(" + registrant.getId() + ")"));
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the"
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        // 更新状态
        if (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();
        }
        // 操作类型为新增
        registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        // 最初更新工夫戳
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        // 解锁
        read.unlock();}
}

PeerAwareInstanceRegistryImpl#openForTraffic

服务的定期剔除就是这个这里开启的线程。

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 其余略
    super.postInit();}

AbstractInstanceRegistry#postInit,开启线程,工夫默认 60s

protected void postInit() {renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        // 勾销之前的工作
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    // 开启线程,工夫默认 60s
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

EvictionTask#run,移除节点,这个要依据爱护机制,默认不能剔除 15% 的实例。

@Override
public void run() {
    try {
        // 计算弥补工夫,定时器执行的时候略有提早
        long compensationTimeMs = getCompensationTimeMs();
        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
        evict(compensationTimeMs);
    } catch (Throwable e) {logger.error("Could not run the evict task", e);
    }
}

public void evict(long additionalLeaseMs) {
    // 敞开了就不剔除
    if (!isLeaseExpirationEnabled()) {return;}

    // 获取过期的实例
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {expiredLeases.add(lease);
                }
            }
        }
    }

    // 计算最大可剔除的格局,默认保留 85%,也就是最多剔除 15%
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        // 随机剔除
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);
            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            // 从 registry.get(appName)中移除
            internalCancel(appName, id, false);
        }
    }
}

总结

Server 服务启动除了加载以上的几个 bean,还包含了每 30 革除增量数据 recentlyChangedQueue、每 10 分钟更新集群节点、每 30 秒更新只读响应缓存、每 15 分钟更新更新冀望最小每分钟续租次数、每 60s 剔除过期服务。

正文完
 0