本文通过浏览Eureka源码,分享Eureka的实现原理。
本文次要梳理Eureka整体设计及实现,并不一一列举Eureka源码细节。

源码剖析基于Spring Cloud Hoxton,Eureka版本为1.9

Eureka分为Eureka Client,Eureka Server,多个Eureka Server节点组成一个Eureka集群,服务通过Eureka Client注册到Eureka Server。

CAP实践指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。
因为分布式系统中必须保障分区容错性,因而咱们只能在A和C之间进行衡量。
Zookeeper保障的是CP, 而Eureka则是保障AP。
为什么呢?
在注册核心这种场景中,可用性比一致性更重要。
作为注册核心,其实数据是不常常变更的,只有服务公布,机器高低线,服务扩缩容时才变更。
因而Eureka抉择AP,即便出问题了,也返回旧数据,保障服务能(最大水平)失常调用, 避免出现因为注册核心的问题导致服务不可用这种得失相当的状况。
所以,Eureka各个节点都是平等的(去中心化的架构,无master/slave辨别),挂掉的节点不会影响失常节点的工作,残余的节点仍然能够提供注册和查问服务。

Eureka Client

Eureka 1.9只有引入spring-cloud-starter-netflix-eureka-client依赖,即便不应用@EnableDiscoveryClient或@EnableEurekaClient注解,服务也会注册到Eureka集群。

client次要逻辑在com.netflix.discovery.DiscoveryClient实现,EurekaClientAutoConfiguration中构建了其子类CloudEurekaClient。

定时工作

DiscoveryClient#initScheduledTasks办法设置定时工作,次要有CacheRefreshThread,HeartbeatThread,以及InstanceInfoReplicator。

同步

服务注册信息缓存在DiscoveryClient#localRegionApps变量中,CacheRefreshThread负责定时从Eureka Server读取最新的服务注册信息,更新到本地缓存。
CacheRefreshThread -> DiscoveryClient#refreshRegistry -> DiscoveryClient#fetchRegistry
当存在多个Eureka Server节点时,Client会与eureka.client.serviceUrl.defaultZone配置的第一个Server节点同步数据,当第一个Server节点同步失败,才会同步第二个节点,以此类推。

从DiscoveryClient#fetchRegistry能够看到,同步数据有两个办法
(1)全量同步
由DiscoveryClient#getAndStoreFullRegistry办法实现,通过Http Get调用Server接口apps/
获取Server节点中所有服务注册信息替换DiscoveryClient#localRegionApps

留神:Client申请Server端的服务,都是通过EurekaHttpClient接口发动,该接口实现类EurekaHttpClientDecorator通过RequestExecutor接口将申请委托给其余EurekaHttpClient实现类,并提供execute办法给子类实现扩大解决(该扩大解决能够针对每一个EurekaHttpClient办法,相似AOP)。子类RetryableEurekaHttpClient#execute中,会获取eureka.client.service-url.defaultZone中配置的地址,通过TransportClientFactory#newClient,结构一个RestTemplateTransportClientFactory,再真正发动申请。

(2)增量同步
由DiscoveryClient#getAndUpdateDelta办法实现,通过Http Get调用Server接口apps/delta,获取最新ADDED、MODIFIED,DELETED操作,更新本地缓存。
如果获取最新操作失败,则会发动全量同步。

配置:
eureka.client.fetch-registry,是否定时同步信息,默认true
eureka.client.registry-fetch-interval-seconds,距离多少秒同步一次服务注册信息,默认30

心跳

HeartbeatThread -> DiscoveryClient#renew -> EurekaHttpClient#sendHeartBeat
通过Http Put调用Server接口apps/{appName}/{instanceId}
appName是服务的spring.application.name,instanceId是服务IP加服务端口。

留神:如果Server返回NOT_FOUND状态,则从新注册。

配置:
eureka.client.register-with-eureka,以后利用是否注册到Eureka集群,默认true
eureka.instance.lease-renewal-interval-in-seconds,距离多少秒发送一次心跳,默认30

注册

DiscoveryClient#构造函数 -> DiscoveryClient#register
通过Http Post调用Server接口apps/{appName},发送以后利用的注册信息到Server。
配置:
eureka.client.register-with-eureka,以后利用是否注册到Eureka集群,默认true
eureka.client.should-enforce-registration-at-init,是否在初始化时注册,默认false

InstanceInfoReplicator

InstanceInfoReplicator工作会去监测利用本身的IP信息以及配置信息是否产生扭转,如果产生扭转,则会从新发动注册。
配置:
eureka.client.initial-instance-info-replication-interval-seconds,距离多少秒查看一次本身信息,默认40

下线

EurekaClientAutoConfiguration配置了CloudEurekaClient的销毁办法

@Bean(destroyMethod = "shutdown")

DiscoveryClient#shutdown办法实现下线的解决工作,包含勾销定时工作,调用unregister办法(通过Http Delete调用Server接口apps/{appName}/{id}),勾销监控工作等

Eureka Server

@EnableEurekaServer引入EurekaServerMarkerConfiguration,EurekaServerMarkerConfiguration构建EurekaServerMarkerConfiguration.Marker。
EurekaServerAutoConfiguration会在Spring上下文中存在EurekaServerMarkerConfiguration.Marker时失效,结构Server端组件类。

Eureka Server也要应用DiscoveryClient,拉取其余Server节点的服务注册信息或者将本身注册到Eureka集群中。

启动同步

Server启动时,须要从相邻Server节点获取服务注册信息,同步到本身内存。

Server的服务注册信息寄存在AbstractInstanceRegistry#registry变量中,类型为ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>。
外层Map Key为appName,外层Map Key为instanceId,Lease代表Client与Server之间维持的一个契约。InstanceInfo保留具体的服务注册信息,如instanceId,appName,ipAddr,port等。

EurekaServerBootstrap是Server端的启动疏导类,EurekaServerInitializerConfiguration实现了Lifecycle接口,start办法调用eurekaServerBootstrap.contextInitialized实现Server端初始化。
eurekaServerBootstrap.contextInitialized -> EurekaServerBootstrap#initEurekaServerContext -> PeerAwareInstanceRegistryImpl#syncUp -> AbstractInstanceRegistry#register
PeerAwareInstanceRegistryImpl#syncUp调用DiscoveryClient#getApplications办法,获取相邻server节点的所有服务注册信息,再调用AbstractInstanceRegistry#register办法,注册到AbstractInstanceRegistry#registry变量中。

AbstractInstanceRegistry#register

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {    try {        read.lock();        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());        REGISTER.increment(isReplication);        ...        // #1        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());            if (existingLease != null && (existingLease.getHolder() != null)) {            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();            ...            // #2            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {                    registrant = existingLease.getHolder();            }        } else {            synchronized (lock) {                if (this.expectedNumberOfClientsSendingRenews > 0) {                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;                    // #3                    updateRenewsPerMinThreshold();                    }            }            logger.debug("No previous lease information found; it is new registration");        }        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);        if (existingLease != null) {            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());        }        // #4        gMap.put(registrant.getId(), lease);            ...        registrant.setActionType(ActionType.ADDED);        // #5        recentlyChangedQueue.add(new RecentlyChangedItem(lease));            registrant.setLastUpdatedTimestamp();        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());            logger.info("Registered instance {}/{} with status {} (replication={})",                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);    } finally {        read.unlock();    }}

#1 通过appName,instanceId查问已有的Lease
#2 如果该服务已存在Lease,并且LastDirtyTimestamp的值更大,应用已存在的Lease。
#3 更新numberOfRenewsPerMinThreshold,该值用于自我保护模式。
#4 构建一个新的Lease,增加到AbstractInstanceRegistry#registry缓存中。
#5 增加recentlyChangedQueue,apps/delta接口从中获取最新变更操作。

提供服务

Server通过ApplicationsResource/ApplicationResource/InstanceResource对外提供Http服务。

AbstractInstanceRegistry负责实现cancle,register,renew,statusUpdate,deleteStatusOverride等操作的业务逻辑。
PeerAwareInstanceRegistryImpl通过replicateToPeers办法将操作同步到其余节点,以保障集群节点数据同步。
PeerAwareInstanceRegistryImpl#replicateToPeers办法最初一个参数isReplication,决定是否须要进行同步。
如果Server节点接管到其余Server节点发送的同步操作,是不须要再持续向其余Server同步的,否则会引起循环更新。
该参数通过Http Requst的Header参数x-netflix-discovery-replication决定(只有Client发送的申请该参数才为true)。

数据统一

PeerAwareInstanceRegistryImpl#replicateToPeers办法通过PeerEurekaNodes#getPeerEurekaNodes获取其余server节点地址,
PeerEurekaNodes#peerEurekaNodes变量保护了所有的Server节点信息。

PeerEurekaNodes通过peersUpdateTask工作定时从DNS或配置文件获取最新的Server节点地址列表,并更新PeerEurekaNodes#peerEurekaNodes。
配置:
eureka.server.peer-eureka-nodes-update-interval-ms,距离多少分钟拉取一次Server节点地址列表,默认10

PeerEurekaNode治理具体一个Server节点,并负责向该Server节点同步register,cancel,heartbeat等操作。
PeerEurekaNode通过定时工作的形式同步这些操作。它保护了两个TaskDispatcher,批处理调度器batchingDispatcher和非批处理调度器nonBatchingDispatcher。
PeerEurekaNode#构造方法调用TaskDispatchers#createBatchingTaskDispatcher结构TaskDispatcher

public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,                                                                         int maxBufferSize,                                                                         int workloadSize,                                                                         int workerCount,                                                                         long maxBatchingDelay,                                                                         long congestionRetryDelayMs,                                                                         long networkFailureRetryMs,                                                                         TaskProcessor<T> taskProcessor) {    final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(            id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs    );    final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);    return new TaskDispatcher<ID, T>() {        public void process(ID id, T task, long expiryTime) {            acceptorExecutor.process(id, task, expiryTime);        }        public void shutdown() {            acceptorExecutor.shutdown();            taskExecutor.shutdown();        }    };}

TaskDispatcher负责工作散发,过期工作会被摈弃,如果两个工作有雷同id,则前一个工作则会被删除。
AcceptorExecutor负责整合工作,将工作放入批次中。
TaskExecutors将整合好的工作(批次)分给TaskProcessor解决,理论解决工作的是ReplicationTaskProcessor。
ReplicationTaskProcessor能够反复执行失败的工作,ReplicationTaskProcessor#process(List<ReplicationTask> tasks)解决批次工作,将tasks合并到一个申请,发送到上游Server接口peerreplication/batch/
工作类为ReplicationTask,它提供了handleFailure办法,当上游Server接口返回statusCode不在[200,300)区间,则调用该办法。

从TaskExecutors#BatchWorkerRunnable的run办法能够看到,
调用上游Server接口时,如果上游返回503状态或产生IO异样,会通过taskDispatcher.reprocess从新执行工作,以保障最终一致性。
如果产生其余异样,只打印日志,不反复执行工作。

配置:
eureka.server.max-elements-in-peer-replication-pool,期待执行工作最大数量,默认为10000

须要留神一下PeerEurekaNode#heartbeat办法,心跳工作实现了handleFailure办法

public void handleFailure(int statusCode, Object responseEntity) throws Throwable {    super.handleFailure(statusCode, responseEntity);    if (statusCode == 404) {        logger.warn("{}: missing entry.", getTaskName());        if (info != null) {            logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",                    getTaskName(), info.getId(), info.getStatus());            register(info);        }    }     ...}

如果上游server节点没有找到服务注册信息,就返回404状态,这时须要从新注册该服务。这点很重要,它能够保障不同Server节点保持数据统一。

假如有一个client,注册到Eureka集群server1,server2,server3。上面来剖析两个场景
场景1. client启动时,server1接管带client的注册信息,但同步给server2前宕机了,怎么办?
这时,client定时发动心跳,但它与server1心跳操作失败,只能向server2发动心跳,server2返回404(NOT_FOUND状态),client从新注册。

场景2. server3与其余机器server1,server2之间呈现了网络分区,这时client注册到eureka集群。而后网络复原了,server3怎么同步数据呢?
当server1向server3同步心跳时,server3返回404,于是server1从新向server3注册client信息,数据最终保持一致。

被动生效

AbstractInstanceRegistry#deltaRetentionTimer工作会定时移除recentlyChangedQueue中过期的增量操作信息
配置:
eureka.server.delta-retention-timer-interval-in-ms,距离多少秒清理一次过期的增量操作信息,默认30
eureka.server.retention-time-in-m-s-in-delta-queue,增量操作保留多少分钟,默认3

AbstractInstanceRegistry#evictionTimer工作会定时剔除AbstractInstanceRegistry#registry中曾经过期的(太久没收到心跳)服务注册信息。
计算服务生效工夫时还要加上弥补工夫,即计算本次工作执行的工夫和上次工作执行的时间差,若超过eviction-interval-timer-in-ms配置值则加上超出时间差作为弥补工夫。
每次剔除服务的数量都有一个下限,为注册服务数量*renewal-percent-threshold,Eureka会随机剔除过期的服务。
配置:
eureka.server.eviction-interval-timer-in-ms,距离多少秒清理一次过期的服务,默认60
eureka.instance.lease-expiration-duration-in-seconds,距离多少秒没收到心跳则断定服务过期,默认90
eureka.server.renewal-percent-threshold,自我爱护阀值因子,默认0.85

自我爱护机制

PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask,定时更新numberOfRenewsPerMinThreshold,该值用于断定是否进入自我保护模式,在自我保护模式下,AbstractInstanceRegistry#evictionTimer工作间接返回,不剔除过期服务。

numberOfRenewsPerMinThreshold计算在PeerAwareInstanceRegistryImpl#updateRenewsPerMinThreshold

protected void updateRenewsPerMinThreshold() {    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())            * serverConfig.getRenewalPercentThreshold());}

expectedNumberOfClientsSendingRenews -> 已注册服务总数
60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds() -> expected-client-renewal-interval-seconds配置了Client距离多少秒发一次心跳,这里计算一个Client每分钟发送心跳数量。
RenewalPercentThreshold 自我爱护阀值因子。
能够看到,numberOfRenewsPerMinThreshold示意一分钟内Server接管心跳最低次数,理论数量少于该值则进入自我保护模式。
此时Eureka认为客户端与注册核心呈现了网络故障(比方网络故障或频繁的启动敞开客户端),不再剔除任何服务,它要期待网络故障复原后,再退出自我保护模式。这样能够最大水平保障服务间失常调用。

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled办法断定以后是否处于自我保护模式。该办法比拟renewsLastMin中的值是否大于numberOfRenewsPerMinThreshold,AbstractInstanceRegistry#renewsLastMin统计一分钟内心跳次数。
配置:
eureka.server.enable-self-preservation,是否启用自我爱护机制,默认为true
eureka.server.expected-client-renewal-interval-seconds,Client距离多少秒发送一次心跳
eureka.server.renewal-percent-threshold,自我爱护阀值因子,默认0.85

状态更新

InstanceInfo保护了状态变量status和笼罩状态变量overriddenStatus。
status是Eureka Client自身公布的状态。
overriddenstatus是手动或通过工具强制执行的状态。
Server端提供服务apps/{appName}/{instanceId}/status,能够变更服务实例status以及overriddenStatus,从而被动变更服务状态。
留神,并不会批改Client端的服务状态,而是批改Server段服务注册信息中保留的服务状态。
而Server解决Client注册或心跳时,会应用overriddenstatus笼罩status。
Eureka Client在获取到注册信息时,会调用DiscoveryClient#shuffleInstances办法,过滤掉非InstanceStatus.UP状态的服务实例,从而防止调动该实例,以达到服务实例的暂停服务,而无需敞开服务实例。

InstanceInfo还保护了lastDirtyTimestamp变量,代表服务注册信息最初更新工夫。
从InstanceResource能够看到,更新状态statusUpdate或者删除状态deleteStatusUpdate时都能够提供lastDirtyTimestamp,
而解决心跳的renewLease办法,必须有lastDirtyTimestamp参数,validateDirtyTimestamp办法负责测验lastDirtyTimestamp参数

  1. 当lastDirtyTimestamp参数等于以后注册信息中的lastDirtyTimestamp,返回解决胜利。
  2. 当lastDirtyTimestamp参数大于以后注册信息中的lastDirtyTimestamp,返回NOT_FOUND状态,示意Client的信息曾经过期,须要从新注册。
  3. 当lastDirtyTimestamp参数小于以后注册信息中的lastDirtyTimestamp,返回CONFLICT(409)状态,示意数据抵触,并返回以后节点中该服务的注册信息。

这时如果心跳是Client发动的,Client会疏忽409的返回状态(DiscoveryClient#renew),但如果是其余Server节点同步过去的,发送心跳的Server节点会应用返回的服务注册信息更新本节点的注册信息(PeerEurekaNode#heartbeat)。

配置:
eureka.client.filter-only-up-instances,获取实例时是否只保留UP状态的实例,默认为true
eureka.server.sync-when-timestamp-differs,当工夫戳不统一时,是否进行同步数据,默认为true

文本对于Eureka的分享就到这里,咱们能够Eureka设计和实现都比较简单,然而十分实用。
我在深刻浏览Eureka源码前犹豫了一段时间(毕竟Eureka 2.0 开源流产),不过通过一段时间深刻学习,播种不少,心愿这篇文章也能够给对Eureka感兴趣的同学提供一个深刻学习思路。

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!