共计 20094 个字符,预计需要花费 51 分钟才能阅读完成。
Eureka 架构
Eureka 是 Netflix 开发的服务发现框架,本身是一个基于 REST 的服务。在 spring cloud 中担任比较中心的位置,eureka 包含两个组件:
- Eureka Server:注册中心服务端,用于维护和管理注册服务列表
- Eureka Client:注册中心客户端,向注册中心注册服务和获取服务
上图是来自官网的架构图,其中 us-east-1c 和 us-east-1d,us-east-1e 是代表不同的区也就是不同的机房,其中每一个 eureka
server 都是一个集群。application service 作为服务提供方向 eureka 中注册服务,eureka server 接受到注册事件会在集群和
分区中进行数据同步,application client 作为消费端可以从 eureka 中获取到服务注册信息,进行服务调用。这是作为注册中心
一个最简单的流程,其中有隐藏了很多细节问题,接下来让我们扒一扒其中的一些细节。
扒一扒 Eureka
我们就从常用的注册中心 zookeeper 中展开对 eureka 的分析,zk 作为注册中心的时候通过 znode 来存储服务 provider 信息,在服务进行注册时实际上就是创建一个 znode 临时节点,利用临时节点的特性实现服务的下线,然后利用 zk 天然的强一致性协议
实现集群中注册信息同步,通过 leader 选举实现高可用性。客户端在进行获取服务注册信息时,通过 zk 的目录结构进行查找效率
非常高。一切都这么完美。那 eureka 又是怎么处理的呢?
Register(服务注册)
那 eureka 又是怎么处理的呢?首先看下最核心的服务注册,eureka 是如何进行服务注册呢?带着问题看下 eureka 的流程。
熟悉的 spring boot 的同学都大概了解它的自动装配的原理,每个自定义 starter 都会在 spring.factories 中配置 EnableAutoConfiguration 自动配置类。eureka 也是如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
我们重点关注下 EurekaClientAutoConfiguration,在客户端启动的时候这个自动化配置会根据条件判断是否初始化创建所需要的 bean,如:EurekaClientConfigBean,EurekaAutoServiceRegistration 等,
对于注册我们只要关注 EurekaAutoServiceRegistration 就可以了,如下:
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
ApplicationContext context, EurekaServiceRegistry registry,
EurekaRegistration registration) {return new EurekaAutoServiceRegistration(context, registry, registration);
}
可是我们并没有设置 spring.cloud.service-registry.auto-registration.enabled 呀,看名字就知道是配置是否自动注册服务的,我猜这种配置作为客户端都是会注册的,
所以为了方便起见会默认为 true 的,当然不能全靠蒙的, 于是我就搜索下果然默认为 true:
{
"name": "spring.cloud.service-registry.auto-registration.enabled",
"type": "java.lang.Boolean",
"description": "Whether service auto-registration is enabled. Defaults to true.",
"sourceType": "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationProperties",
"defaultValue": true
}
可是创建这个 bean 有啥用呢?仔细看看这个类就能看出来一些问题,如下:
public class EurekaAutoServiceRegistration implements AutoServiceRegistration,
SmartLifecycle, Ordered, SmartApplicationListener { }
实现了 SmartLifecycle 接口,所以熟悉 spring 的同学就会很熟悉这个类的,spring 在 bean 初始化完成后会调实现 Lifecycle 接口的 start 方法,在销毁的时候会调用 stop 方法,那我们就看看 EurekaAutoServiceRegistration 的 start 的方法:
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this,
this.registration.getInstanceConfig()));
this.running.set(true);
}
根据配置的端口判断是否要进行注册,看代码看到一个很关键的代码 register, 一路走下去会调用 discoveryClient 的 registerHealthCheck -> onDemandUpdate -> InstanceInfoReplicator 的 run 方法,这段代码比较简单最后调用了 discoveryClient 的 register 方法,
到这里最关键的注册方法我们终于找到了,接下来就是使用 jersey 类似 spring mvc 的 web 框架,使用 post 请求进行注册如下:
public EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
// 使用 jerseyClient 发送 post 请求进行注册
// 完整的 url:http://127.0.0.1:8001/eureka/apps/${application.name}
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();}
}
eureka 服务端接受请求的都在 core 下的 resource 目录下,类似与 spring 中的 controller,客户端服务注册的请求映射到 ApplicationResource#addInstance 进行服务注册,接下来我们看下服务端的大概的注册流程。核心方法是 AbstractInstanceRegistry#register,主要流程如下:
Eureka 注册表
在上面我们说过在 zk 中服务信息保存到 znode 中,而 eureka 只是一个简单 servlet web 应用,它是如何保存服务信息呢?首先想到的肯定是 map,如果考虑线程安全问题那就是 ConcurrentHashMap,是这样吗?在上面服务注册的最后我们看到一段重要的代码:
// 首先判断注册表中是否存在该服务
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
// 如果为空就创建一个以 appname 作为 key 的空的 map
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {gMap = gNewMap;}
}
可以看出 eureka 中保存服务注册信息的确实就是一个 ConcurrentHashMap,数据结构如下:
ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
Fetch Registry(抓取注册信息)
至此我们已经很清楚的了解到服务是如何发布到注册中心,并且了解到了注册中心的核心数据结构注册列表,但是服务消费者是如何进行获取服务注册信息呢?并且如何感知有新的服务进行了注册呢?在上面服务注册中我们知道 spring boot 自动化配置中,在服务启动的时候会自动创建一些 bean,所以我们再次回到 EurekaClientAutoConfiguration 中,我们会看到一个重要的内部类:RefreshableEurekaClientConfiguration,具体代码如下:
@Configuration
@ConditionalOnRefreshScope
protected static class RefreshableEurekaClientConfiguration {
@Autowired
private ApplicationContext context;
@Autowired
private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {appManager = ProxyUtils.getTargetObject(manager);
}
else {appManager = manager;}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
}
上面的代码在服务启动的时候会创建 @Bean 标记的 EurekaClient,在内部我们看到会创建 CloudEurekaClient,而服务的信息获取的核心是调用父类的 DiscoveryClient 构造方法,没错又是这个类,在服务注册中也看到了,其实 eureka 的核心也是这个类,下面就是跟服务获取的主要代码:
//1,初始化应用集合在本地的缓存
localRegionApps.set(new Applications());
//2,shouldFetchRegistry = true,fetchRegistry 进行服务获取
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {fetchRegistryFromBackup();
}
//3,启东线程进行服务定时更新
initScheduledTasks();
首先看下 fetchRegister 方法,看看是如何进行服务获取的:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
try {
//... 省略...
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1))
{
//... 省略...
//1,服务全量获取服务信息
getAndStoreFullRegistry();} else {
//2,增量获取服务信息
getAndUpdateDelta(applications);
}
} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
}
}
当客户端第一次启动的时候会调用 fetchRegister 进行全量获取服务信息也就是走到代码中的 1 的逻辑中,里面的逻辑也比较简单有兴趣的同学可以自己看下,那什么时候会执行 2 的步骤呢?之前的代码中我们提到 initScheduledTasks 会进行服务异步更新,不妨看看 initScheduledTasks 的实现:
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
//1. 间隔多长时间更新一次信息
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
//2. 缓存刷新线程
new CacheRefreshThread()),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}
从上面代码中可以看出 eureka 使用定时任务每隔 registryFetchIntervalSeconds 秒定时执行 CacheRefreshThread 线程,而线程内部会调用 fetchRegistry 方法进行增量获取。registryFetchIntervalSeconds 是不是感觉很熟悉,没错我们经常会在 yml 或 properties 文件中配置这个值,如果不进行配置 eureka 默认 30s 会更新一次:
{
"name": "eureka.client.registry-fetch-interval-seconds",
"type": "java.lang.Integer",
"description": "Indicates how often(in seconds) to fetch the registry information from the eureka server.",
"sourceType": "org.springframework.cloud.netflix.eureka.EurekaClientConfigBean",
"defaultValue": 30
}
简单的说就是客户端启动的时候会进行一次服务全量获取,然后通过定时任务每隔一段时间对服务进行增量更新,大概的流程就是如此,当然内部细节也是很多的,比如服务端是如何响应的,有兴趣的可以自己研究下。
Renew(服务续约)
什么是服务续约?所谓的服务续约就是类似集群中的心跳机制,每隔一段时间发送一次心跳,表示节点是活着的,在 eureka 集群中也是这样的,Eureka Client 注册到注册中心后,会通过 Renew 机制跟 Eureka Server 保持续约,告诉注册中心服务“我还活着”,以免 Eureka Server 的剔除任务将其剔除。在之前的服务注册信息获取初始化 Discovery client 中会创建 HeartbeatThread 任务定时发送心跳进行续约:
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
// 每隔一段时间发送一次心跳
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
// 心跳线程
new HeartbeatThread()),
renewalIntervalInSecs, TimeUnit.SECONDS);
{
"name": "eureka.instance.lease-renewal-interval-in-seconds",
"type": "java.lang.Integer",
"description": "Indicates how often (in seconds) ...."sourceType":"org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean","defaultValue": 30
}
默认 30s 发送一次心跳进行续约,其本质就是向注册中心发送一次请求,可以看 InstanceResource#renewLease(), 底层是调用 AbstractInstanceRegistry#renew,如下:
public boolean renew(String appName, String id, boolean isReplication) {
// 增加 续约次数 到 监控
RENEW.increment(isReplication);
// 获得 租约
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {leaseToRenew = gMap.get(id);
}
// 续约不存在
if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);
//... 省略...
}
// 新增续约每分钟次数
renewsLastMin.increment();
// 设置续约最后更新时间
leaseToRenew.renew();
return true;
}
}
简单的梳理下服务续约的流程时序图如下:
Cancel(服务注销)
服务注册和服务续约我们都说过了,那么如果服务挂了或因为网络问题注册中心收不到心跳信息,那么注册中心是怎么对服务进行剔除呢?eureka client 挂了会调用 DiscoveryClient#shutdown 向服务端发送服务注销请求,具体的如下:
public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
// 将服务状态更新为 down
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
// 发送注销请求
unregister();}
}
}
服务端收到注销请求(InstanceResource#cancelLease)后,会对该客户端进行注销,源码如下:
/// 服务下线
if (super.cancel(appName, id, isReplication)) {
// 集群节点同步服务下线
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {// 更新阀值(最新的阀值已经修改为 expectedNumberOfClientsSendingRenews 和 numberOfRenewsPerMinThreshold)
if (this.expectedNumberOfClientsSendingRenews > 0) {
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();}
}
return true;
}
if (gMap != null) {
//1. 将实例从服务列表中清除。leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
//2. 将这个变化事件添加到 recentlyChangedQueue 队列中
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
//... 省略...
//3. 清空 Guava 缓存。invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
Evict(服务剔除)
我们上面都详细的说过服务的注册,续约,注销等问题,在注销中我们只是介绍了 eureka 是如何注销的,但是并没有看到服务是什么时候剔除的,eureka server 端又是如何剔除下线或过期的服务呢?接下来我们将详细的介绍服务端是如何进行剔除的。
在这之前我们不得不提到一个非常重要的类:EurekaServerInitializerConfiguration, 这个就是服务剔除的入口,在服务注册的时候我们也提到了 spring 在 bean 创建完成后会调用实现了 SmartLifecycle 的 start() 方法,而 EurekaServerInitializerConfiguration 也同样实现了 `SmartLifecycle,
因此我看就有 start()进入我们的服务剔除的探索,发现它是一个异步线程进行处理的:
public void start() {new Thread(new Runnable() {
@Override
public void run() {
try {
// 核心方法
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();}
EurekaServerBootstrap#contextInitialized 中完成了初始化环境设置和 EurekaServerContext,其核心就是在初始化 EurekaServerContext:
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
protected void postInit() {renewsLastMin.start();
if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}
//EvictionTask 线程进行服务剔除
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
public void evict(long additionalLeaseMs) {
//1. 判断是否过期
if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");
return;
}
//2. 筛选过期列表
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {expiredLeases.add(lease);
}
}
}
}
//3. 进行公平剔除
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
//4. 清除 guava 缓存
internalCancel(appName, id, false);
}
}
}
如何判断实例过期是否可用?必须满足两个条件:
public boolean isLeaseExpirationEnabled() {if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;}
1. 自我保护机制必须开启
2. 上一分钟的续约数是否小于设定的最小续约数阀值:
剔除操作时,首先计算过期的实例,并添加到过期实例列表里 (expiredLeases)。如何判断呢?
// 过期时间大于 0 并且当前时间大于 最近续约时间 + duration + additionalLeaseMs
public boolean isExpired(long additionalLeaseMs) {return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
如何剔除选择?
剔除第一步计算剔除最大数: 实例总数 – 实例总数 * 自我保护阀值因子。
剔除第二步公平剔除 :洗牌算法
比如我们一共有 50 个实例续约,并且 20 个实例过期了,并且自我保护阀值因子为 0.8:
// 第一轮执行开始
int registrySize = 50;
int registrySizeThreshold = (int) (50 * 0.8) = 40;
int evictionLimit = 50 - 40 = 10;
int toEvict = Math.min(20, 10) = 10;
// 第一轮执行结束,剩余 40 个租约,其中有 10 个租约过期。// 第二轮执行开始
int registrySize = 40;
int registrySizeThreshold = (int) (40 * 0.8) = 32;
int evictionLimit = 40 - 32 = 8;
int toEvict = Math.min(10, 8) = 8;
// 第二轮执行结束,剩余 32 个租约,其中有 8 个租约过期。// 第三轮执行开始
int registrySize = 32;
int registrySizeThreshold = (int) (32 * 0.8) = 25;
int evictionLimit = 32 - 25 = 7;
int toEvict = Math.min(2, 7) = 2;
// 第三轮执行结束,剩余 30 个租约,其中有 2 个租约过期。结束
为什么不全部剔除过期的实例?
由于 JVM GC,或是本地时间差异原因,可能自我保护机制的阀值 expectedNumberOfRenewsPerMin、numberOfRenewsPerMinThreshold 不够正确,在过期这个相对“危险”的操作,重新计算自我保护的阀值。随机清理过期的租约。由于租约是按照应用顺序添加到数组,通过随机的方式,尽量避免单个应用被全部过期。
节点同步复制原理
至此我们看过服务注册,续约,注销,剔除等功能,但是说的都是单节点的说明,注册中心又是如何同步这些信息呢?最后我们一起看下 eureka 是如何实现同步复制的。
eureka 节点同步分为两种:
- 服务启动的时候进行节点同步
- 服务注册,注销等处理的时候进行同步
服务启动同步
eureka server 在启动的时候初始化 EurekaServerContext 的时候,会调用 syncUp 进行节点同步, 如果之前有同步失败,则 sleep 最大同步重试时间默认为 0s 再次重试同步:
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {if (i > 0) {
try {
//sleep 最大重试时间
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {logger.warn("Interrupted during registry transfer..");
break;
}
}
// 同步
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {
try {if (isRegisterable(instance)) {register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {logger.error("During DS init copy", t);
}
}
}
}
服务注册, 注销等时候进行同步
接下来我们就以服务注册为例看下 eureka 服务端是如何进行服务同步的。在说服务端信息同步以前不得不说下 eureka 的任务相关的东西,因为在注册同步中涉及复杂的任务关系和复杂的队列转化。
- 任务分发器:TaskDispatcher
- 任务接收器:AcceptorExecutor,AcceptorRunner
- 任务执行器:TaskExecutors
- 任务处理器:TaskProcess
以上就是简单的 eureka 任务执行流程,那么任务执行的流程入口在哪呢?eureka 是一个标准的 Servlet 应用,在 eureka 中有两个非常重要的类:EurekaBootStrap 和 EurekaServerBootStrap 分别代表客户端与服务端的启动类,都实现了 ServletContextListener,在 tomcat 启动的时候会调用 contextInitialized 进行初始化,既然是任务肯定是初始化的时候就启动了,然后有服务注册的时候直接向队列中丢任务就可以了,那么我们就来看看 EurekaBootStrap 的具体实现:
以上就是大概初始化流程我们主要看 AcceptorRunner 的实现,至此我们根据时序图就直接到 AcceptorRunner:
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {//1. 处理完输入队列 ( 接收队列 + 重新执行队列)
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
//2. 计算调度时间
if (scheduleTime < now) {scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
//3. 调度批量任务
assignBatchWork();
//4. 调度单任务
assignSingleItemWork();}
} catch (InterruptedException ex) {//......}
}
}
至此初始化中对 reprocessQueue,acceptorQueue 和 pendingTasks 进行转化,当然 eureka 刚刚启动的时候是没有任何任务的,然后我们再回到 PeerEurekaNodes#start 方法中,队列初始化完成后,就执行任务:
try {
// 对队列进行转化
updatePeerEurekaNodes(resolvePeerUrls());
// 创建待执行的任务
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {logger.error("Cannot update the replica Nodes", e);
}
}
};
// 开始执行任务
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {throw new IllegalStateException(e);
}
执行任务的核心就是 askExecutors#batchExecutors 的处理,会根据添加的任务创建 TaskExecutors,然后执行线程 run 方法如下:
@Override
public void run() {
try {while (!isShutdown.get()) {List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
// 处理任务
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
}
内部首先对 task 进行转化,转化成 ReplicationList,然后批量提交分发到不同的节点上进行同步:
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
//task 转化
ReplicationList list = createReplicationListOf(tasks);
// 批量提交分发
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
}
至此我们完整的看完了 eureka 的任务处理流程还是比较复杂的,肯定会有同学有疑问不是说以服务注册为例子看下服务同步的吗, 并没有看到注册同步呀!其实了解了任务的执行流程接下来的服务注册及其他的处理就很简单了
只要向任务系统中丢任务了就可以了,不妨我们看下注册的例子。在之前我们已经详细的说过服务注册流程了,就直接看下 PeerAwareInstanceRegistryImpl#register 的源代码:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {leaseDuration = info.getLeaseInfo().getDurationInSecs();}
// 服务注册
super.register(info, leaseDuration, isReplication);
// 节点同步
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
内部对所有节点根据不同的类型进行相对应的处理
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
// 根据不同的 action 进行对应的处理,这里处理注册的事件
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
public void register(final InstanceInfo info) throws Exception {long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {public EurekaHttpResponse<Void> execute() {return replicationClient.register(info);
}
},
expiryTime
);
}
到这里我们就能看到其原理了,注册的处理就是向队列中添加 InstanceReplicationTask 任务,当线程开始执行这个任务的时间就会调用 replicationClient.register(),向其他的集群节点进行相互注册达到服务同步的效果,至此 eureka 的集群同步就到处为止了。
最后
最近稍微闲一点就花点时间看看 eureka,以上只是本人的学习总结,有很多细节也不是很了解,有些不一定准确仅供学习参考。最后想说下就是看源码的建议,看下大概的流程,大概的原理就可以了没必要了解每一个细节问题,不然真的很累。当遇到问题时可以相对应地详细的看下某个功能点的源码,这样效率更高!就像我们日常做项目一样,没有人一来公司就把公司的项目每一行代码都看一遍,当需要做一个需求的时候才会去看下相对应功能的代码。