关于springcloud:SpringCloud源码解析-Eureka原理探究

37次阅读

共计 12168 个字符,预计需要花费 31 分钟才能阅读完成。

本文通过浏览 Eureka 源码,分享 Eureka 的实现原理。
本文次要梳理 Eureka 整体设计及实现,并不一一列举 Eureka 源码细节。

源码剖析基于 Spring Cloud Hoxton,Eureka 版本为 1.9

Eureka 分为 Eureka Client,Eureka Server,多个 Eureka Server 节点组成一个 Eureka 集群,服务通过 Eureka Client 注册到 Eureka Server。

CAP 实践 指出,一个分布式系统不可能同时满足 C(一致性)、A(可用性)和 P(分区容错性)。
因为分布式系统中必须保障分区容错性,因而咱们只能在 A 和 C 之间进行衡量。
Zookeeper 保障的是 CP, 而 Eureka 则是保障 AP。
为什么呢?
在注册核心这种场景中,可用性比一致性更重要。
作为注册核心,其实数据是不常常变更的,只有服务公布,机器高低线,服务扩缩容时才变更。
因而 Eureka 抉择 AP,即便出问题了,也返回旧数据,保障服务能(最大水平)失常调用,避免出现因为注册核心的问题导致服务不可用这种得失相当的状况。
所以,Eureka 各个节点都是平等的(去中心化的架构,无 master/slave 辨别),挂掉的节点不会影响失常节点的工作,残余的节点仍然能够提供注册和查问服务。

Eureka Client

Eureka 1.9 只有引入 spring-cloud-starter-netflix-eureka-client 依赖,即便不应用 @EnableDiscoveryClient 或 @EnableEurekaClient 注解,服务也会注册到 Eureka 集群。

client 次要逻辑在 com.netflix.discovery.DiscoveryClient 实现,EurekaClientAutoConfiguration 中构建了其子类 CloudEurekaClient。

定时工作

DiscoveryClient#initScheduledTasks 办法设置定时工作,次要有 CacheRefreshThread,HeartbeatThread,以及 InstanceInfoReplicator。

同步

服务注册信息缓存在 DiscoveryClient#localRegionApps 变量中,CacheRefreshThread 负责定时从 Eureka Server 读取最新的服务注册信息,更新到本地缓存。
CacheRefreshThread -> DiscoveryClient#refreshRegistry -> DiscoveryClient#fetchRegistry
当存在多个 Eureka Server 节点时,Client 会与 eureka.client.serviceUrl.defaultZone 配置的第一个 Server 节点同步数据,当第一个 Server 节点同步失败,才会同步第二个节点,以此类推。

从 DiscoveryClient#fetchRegistry 能够看到,同步数据有两个办法
(1) 全量同步
由 DiscoveryClient#getAndStoreFullRegistry 办法实现,通过 Http Get 调用 Server 接口 apps/
获取 Server 节点中所有服务注册信息替换 DiscoveryClient#localRegionApps

留神:Client 申请 Server 端的服务,都是通过 EurekaHttpClient 接口发动,该接口实现类 EurekaHttpClientDecorator 通过 RequestExecutor 接口将申请委托给其余 EurekaHttpClient 实现类,并提供 execute 办法给子类实现扩大解决(该扩大解决能够针对每一个 EurekaHttpClient 办法,相似 AOP)。子类 RetryableEurekaHttpClient#execute 中,会获取 eureka.client.service-url.defaultZone 中配置的地址,通过 TransportClientFactory#newClient,结构一个 RestTemplateTransportClientFactory,再真正发动申请。

(2)增量同步
由 DiscoveryClient#getAndUpdateDelta 办法实现,通过 Http Get 调用 Server 接口 apps/delta,获取最新 ADDED、MODIFIED,DELETED 操作,更新本地缓存。
如果获取最新操作失败,则会发动全量同步。

配置:
eureka.client.fetch-registry,是否定时同步信息,默认 true
eureka.client.registry-fetch-interval-seconds,距离多少秒同步一次服务注册信息,默认 30

心跳

HeartbeatThread -> DiscoveryClient#renew -> EurekaHttpClient#sendHeartBeat
通过 Http Put 调用 Server 接口apps/{appName}/{instanceId}
appName 是服务的 spring.application.name,instanceId 是服务 IP 加服务端口。

留神:如果 Server 返回 NOT_FOUND 状态,则从新注册。

配置:
eureka.client.register-with-eureka,以后利用是否注册到 Eureka 集群,默认 true
eureka.instance.lease-renewal-interval-in-seconds,距离多少秒发送一次心跳,默认 30

注册

DiscoveryClient# 构造函数 -> DiscoveryClient#register
通过 Http Post 调用 Server 接口 apps/{appName},发送以后利用的注册信息到 Server。
配置:
eureka.client.register-with-eureka,以后利用是否注册到 Eureka 集群,默认 true
eureka.client.should-enforce-registration-at-init,是否在初始化时注册,默认 false

InstanceInfoReplicator

InstanceInfoReplicator 工作会去监测利用本身的 IP 信息以及配置信息是否产生扭转,如果产生扭转,则会从新发动注册。
配置:
eureka.client.initial-instance-info-replication-interval-seconds,距离多少秒查看一次本身信息,默认 40

下线

EurekaClientAutoConfiguration 配置了 CloudEurekaClient 的销毁办法

@Bean(destroyMethod = "shutdown")

DiscoveryClient#shutdown 办法实现下线的解决工作,包含勾销定时工作,调用 unregister 办法(通过 Http Delete 调用 Server 接口apps/{appName}/{id}),勾销监控工作等

Eureka Server

@EnableEurekaServer 引入 EurekaServerMarkerConfiguration,EurekaServerMarkerConfiguration 构建 EurekaServerMarkerConfiguration.Marker。
EurekaServerAutoConfiguration 会在 Spring 上下文中存在 EurekaServerMarkerConfiguration.Marker 时失效,结构 Server 端组件类。

Eureka Server 也要应用 DiscoveryClient,拉取其余 Server 节点的服务注册信息或者将本身注册到 Eureka 集群中。

启动同步

Server 启动时,须要从相邻 Server 节点获取服务注册信息,同步到本身内存。

Server 的服务注册信息寄存在 AbstractInstanceRegistry#registry 变量中,类型为 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>。
外层 Map Key 为 appName,外层 Map Key 为 instanceId,Lease 代表 Client 与 Server 之间维持的一个契约。InstanceInfo 保留具体的服务注册信息,如 instanceId,appName,ipAddr,port 等。

EurekaServerBootstrap 是 Server 端的启动疏导类,EurekaServerInitializerConfiguration 实现了 Lifecycle 接口,start 办法调用 eurekaServerBootstrap.contextInitialized 实现 Server 端初始化。
eurekaServerBootstrap.contextInitialized -> EurekaServerBootstrap#initEurekaServerContext -> PeerAwareInstanceRegistryImpl#syncUp -> AbstractInstanceRegistry#register
PeerAwareInstanceRegistryImpl#syncUp 调用 DiscoveryClient#getApplications 办法,获取相邻 server 节点的所有服务注册信息,再调用 AbstractInstanceRegistry#register 办法,注册到 AbstractInstanceRegistry#registry 变量中。

AbstractInstanceRegistry#register

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {read.lock();
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        ...
        // #1
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());    
        if (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            ...
            // #2
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {registrant = existingLease.getHolder();
            }
        } else {synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    // #3
                    updateRenewsPerMinThreshold();}
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // #4
        gMap.put(registrant.getId(), lease);    
        ...
        registrant.setActionType(ActionType.ADDED);
        // #5
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));    
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());    
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {read.unlock();
    }
}

#1 通过 appName,instanceId 查问已有的 Lease
#2 如果该服务已存在 Lease,并且 LastDirtyTimestamp 的值更大,应用已存在的 Lease。
#3 更新 numberOfRenewsPerMinThreshold,该值用于自我保护模式。
#4 构建一个新的 Lease,增加到 AbstractInstanceRegistry#registry 缓存中。
#5 增加 recentlyChangedQueue,apps/delta接口从中获取最新变更操作。

提供服务

Server 通过 ApplicationsResource/ApplicationResource/InstanceResource 对外提供 Http 服务。

AbstractInstanceRegistry 负责实现 cancle,register,renew,statusUpdate,deleteStatusOverride 等操作的业务逻辑。
PeerAwareInstanceRegistryImpl 通过 replicateToPeers 办法将操作同步到其余节点,以保障集群节点数据同步。
PeerAwareInstanceRegistryImpl#replicateToPeers 办法最初一个参数 isReplication,决定是否须要进行同步。
如果 Server 节点接管到其余 Server 节点发送的同步操作,是不须要再持续向其余 Server 同步的,否则会引起循环更新。
该参数通过 Http Requst 的 Header 参数 x -netflix-discovery-replication 决定(只有 Client 发送的申请该参数才为 true)。

数据统一

PeerAwareInstanceRegistryImpl#replicateToPeers 办法通过 PeerEurekaNodes#getPeerEurekaNodes 获取其余 server 节点地址,
PeerEurekaNodes#peerEurekaNodes 变量保护了所有的 Server 节点信息。

PeerEurekaNodes 通过 peersUpdateTask 工作定时从 DNS 或配置文件获取最新的 Server 节点地址列表,并更新 PeerEurekaNodes#peerEurekaNodes。
配置:
eureka.server.peer-eureka-nodes-update-interval-ms,距离多少分钟拉取一次 Server 节点地址列表,默认 10

PeerEurekaNode 治理具体一个 Server 节点,并负责向该 Server 节点同步 register,cancel,heartbeat 等操作。
PeerEurekaNode 通过定时工作的形式同步这些操作。它保护了两个 TaskDispatcher,批处理调度器 batchingDispatcher 和非批处理调度器 nonBatchingDispatcher。
PeerEurekaNode# 构造方法调用 TaskDispatchers#createBatchingTaskDispatcher 结构 TaskDispatcher

public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                         int maxBufferSize,
                                                                         int workloadSize,
                                                                         int workerCount,
                                                                         long maxBatchingDelay,
                                                                         long congestionRetryDelayMs,
                                                                         long networkFailureRetryMs,
                                                                         TaskProcessor<T> taskProcessor) {
    final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs);
    final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
    return new TaskDispatcher<ID, T>() {public void process(ID id, T task, long expiryTime) {acceptorExecutor.process(id, task, expiryTime);
        }

        public void shutdown() {acceptorExecutor.shutdown();
            taskExecutor.shutdown();}
    };
}

TaskDispatcher 负责工作散发,过期工作会被摈弃,如果两个工作有雷同 id,则前一个工作则会被删除。
AcceptorExecutor 负责整合工作,将工作放入批次中。
TaskExecutors 将整合好的工作(批次)分给 TaskProcessor 解决,理论解决工作的是 ReplicationTaskProcessor。
ReplicationTaskProcessor 能够反复执行失败的工作,ReplicationTaskProcessor#process(List<ReplicationTask> tasks)解决批次工作,将 tasks 合并到一个申请,发送到上游 Server 接口 peerreplication/batch/
工作类为 ReplicationTask,它提供了 handleFailure 办法,当上游 Server 接口返回 statusCode 不在 [200,300) 区间,则调用该办法。

从 TaskExecutors#BatchWorkerRunnable 的 run 办法能够看到,
调用上游 Server 接口时,如果上游返回 503 状态或产生 IO 异样,会通过 taskDispatcher.reprocess 从新执行工作,以保障最终一致性。
如果产生其余异样,只打印日志,不反复执行工作。

配置:
eureka.server.max-elements-in-peer-replication-pool,期待执行工作最大数量,默认为 10000

须要留神一下 PeerEurekaNode#heartbeat 办法,心跳工作实现了 handleFailure 办法

public void handleFailure(int statusCode, Object responseEntity) throws Throwable {super.handleFailure(statusCode, responseEntity);
    if (statusCode == 404) {logger.warn("{}: missing entry.", getTaskName());
        if (info != null) {logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                    getTaskName(), info.getId(), info.getStatus());
            register(info);
        }
    } 
    ...
}

如果上游 server 节点没有找到服务注册信息,就返回 404 状态,这时须要从新注册该服务。这点很重要,它能够保障不同 Server 节点保持数据统一。

假如有一个 client,注册到 Eureka 集群 server1,server2,server3。上面来剖析两个场景
场景 1. client 启动时,server1 接管带 client 的注册信息,但同步给 server2 前宕机了,怎么办?
这时,client 定时发动心跳,但它与 server1 心跳操作失败,只能向 server2 发动心跳,server2 返回 404(NOT_FOUND 状态),client 从新注册。

场景 2. server3 与其余机器 server1,server2 之间呈现了网络分区,这时 client 注册到 eureka 集群。而后网络复原了,server3 怎么同步数据呢?
当 server1 向 server3 同步心跳时,server3 返回 404,于是 server1 从新向 server3 注册 client 信息,数据最终保持一致。

被动生效

AbstractInstanceRegistry#deltaRetentionTimer 工作会定时移除 recentlyChangedQueue 中过期的增量操作信息
配置:
eureka.server.delta-retention-timer-interval-in-ms,距离多少秒清理一次过期的增量操作信息,默认 30
eureka.server.retention-time-in-m-s-in-delta-queue,增量操作保留多少分钟,默认 3

AbstractInstanceRegistry#evictionTimer 工作会定时剔除 AbstractInstanceRegistry#registry 中曾经过期的(太久没收到心跳)服务注册信息。
计算服务生效工夫时还要加上弥补工夫,即计算本次工作执行的工夫和上次工作执行的时间差,若超过 eviction-interval-timer-in-ms 配置值则加上超出时间差作为弥补工夫。
每次剔除服务的数量都有一个下限,为注册服务数量 *renewal-percent-threshold,Eureka 会随机剔除过期的服务。
配置:
eureka.server.eviction-interval-timer-in-ms,距离多少秒清理一次过期的服务,默认 60
eureka.instance.lease-expiration-duration-in-seconds,距离多少秒没收到心跳则断定服务过期,默认 90
eureka.server.renewal-percent-threshold,自我爱护阀值因子,默认 0.85

自我爱护机制

PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask,定时更新 numberOfRenewsPerMinThreshold,该值用于断定是否进入自我保护模式,在自我保护模式下,AbstractInstanceRegistry#evictionTimer 工作间接返回,不剔除过期服务。

numberOfRenewsPerMinThreshold 计算在 PeerAwareInstanceRegistryImpl#updateRenewsPerMinThreshold

protected void updateRenewsPerMinThreshold() {this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

expectedNumberOfClientsSendingRenews -> 已注册服务总数
60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds() -> expected-client-renewal-interval-seconds 配置了 Client 距离多少秒发一次心跳,这里计算一个 Client 每分钟发送心跳数量。
RenewalPercentThreshold 自我爱护阀值因子。
能够看到,numberOfRenewsPerMinThreshold 示意一分钟内 Server 接管心跳最低次数,理论数量少于该值则进入自我保护模式。
此时 Eureka 认为客户端与注册核心呈现了网络故障(比方网络故障或频繁的启动敞开客户端),不再剔除任何服务,它要期待网络故障复原后,再退出自我保护模式。这样能够最大水平保障服务间失常调用。

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled 办法断定以后是否处于自我保护模式。该办法比拟 renewsLastMin 中的值是否大于 numberOfRenewsPerMinThreshold,AbstractInstanceRegistry#renewsLastMin 统计一分钟内心跳次数。
配置:
eureka.server.enable-self-preservation,是否启用自我爱护机制,默认为 true
eureka.server.expected-client-renewal-interval-seconds,Client 距离多少秒发送一次心跳
eureka.server.renewal-percent-threshold,自我爱护阀值因子,默认 0.85

状态更新

InstanceInfo 保护了状态变量 status 和笼罩状态变量 overriddenStatus。
status 是 Eureka Client 自身公布的状态。
overriddenstatus 是手动或通过工具强制执行的状态。
Server 端提供服务 apps/{appName}/{instanceId}/status,能够变更服务实例 status 以及 overriddenStatus,从而被动变更服务状态。
留神,并不会批改 Client 端的服务状态,而是批改 Server 段服务注册信息中保留的服务状态。
而 Server 解决 Client 注册或心跳时,会应用 overriddenstatus 笼罩 status。
Eureka Client 在获取到注册信息时,会调用 DiscoveryClient#shuffleInstances 办法,过滤掉非 InstanceStatus.UP 状态的服务实例,从而防止调动该实例,以达到服务实例的暂停服务,而无需敞开服务实例。

InstanceInfo 还保护了 lastDirtyTimestamp 变量,代表服务注册信息最初更新工夫。
从 InstanceResource 能够看到,更新状态 statusUpdate 或者删除状态 deleteStatusUpdate 时都能够提供 lastDirtyTimestamp,
而解决心跳的 renewLease 办法,必须有 lastDirtyTimestamp 参数,validateDirtyTimestamp 办法负责测验 lastDirtyTimestamp 参数

  1. 当 lastDirtyTimestamp 参数等于以后注册信息中的 lastDirtyTimestamp,返回解决胜利。
  2. 当 lastDirtyTimestamp 参数大于以后注册信息中的 lastDirtyTimestamp,返回 NOT_FOUND 状态,示意 Client 的信息曾经过期,须要从新注册。
  3. 当 lastDirtyTimestamp 参数小于以后注册信息中的 lastDirtyTimestamp,返回 CONFLICT(409)状态,示意数据抵触,并返回以后节点中该服务的注册信息。

这时如果心跳是 Client 发动的,Client 会疏忽 409 的返回状态(DiscoveryClient#renew),但如果是其余 Server 节点同步过去的,发送心跳的 Server 节点会应用返回的服务注册信息更新本节点的注册信息(PeerEurekaNode#heartbeat)。

配置:
eureka.client.filter-only-up-instances,获取实例时是否只保留 UP 状态的实例,默认为 true
eureka.server.sync-when-timestamp-differs,当工夫戳不统一时,是否进行同步数据,默认为 true

文本对于 Eureka 的分享就到这里,咱们能够 Eureka 设计和实现都比较简单,然而十分实用。
我在深刻浏览 Eureka 源码前犹豫了一段时间(毕竟 Eureka 2.0 开源流产),不过通过一段时间深刻学习,播种不少,心愿这篇文章也能够给对 Eureka 感兴趣的同学提供一个深刻学习思路。

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!

正文完
 0