作者:fredalxin
地址:https://fredal.xin/how-eureka...
本文基于 spring cloud dalston,同时文章较长,请抉择难受姿态进行浏览。
Eureka 与 Ribbon 是什么?和服务发现什么关系?
Eureka 与 Ribbon 都是 Netflix 提供的微服务组件,别离用于服务注册与发现、负载平衡。同时,这两者均属于 spring cloud netflix 体系,和 spring cloud 无缝集成,也正因为此被大家所熟知。
Eureka 自身是服务注册发现组件,实现了残缺的 Service Registry 和 Service Discovery。
Ribbon 则是一款负载平衡组件,那它和服务发现又有什么关系呢?负载平衡在整个微服务的调用模型中是紧挨着服务发现的,而 Ribbon 这个框架它其实是起到了开发者服务消费行为与底层服务发现组件 Eureka 之间桥梁的作用。
从严格概念上说 Ribbon 并不是做服务发现的,然而因为 Netflix 组件的松耦合,Ribbon 须要对 Eureka 的缓存服务列表进行相似"服务发现"的行为,从而构建本人的负载平衡列表并及时更新,也就是说 Ribbon 中的"服务发现"的宾语变成了 Eureka(或其余服务发现组件)。
Eureka 的服务注册与发现
咱们会先对 Eureka 的服务发现进行形容,重点是 Eureka-client 是如何进行服务的注册与发现的,同时不会过多停留于 Eureka 的架构、Eureka-server 的实现、Zone/Region 等领域。
Eureka-client 的服务发现都是由 DiscoveryClient 类实现的,它次要包含的性能有:
- 向 Eureka-server 注册服务实例
- 更新在 Eureka-server 的租期
- 勾销在 Eureka-server 的租约(服务下线)
- 发现服务实例并定期更新
服务注册
DiscoveryClient 所有的定时工作都是在 initScheduledTasks()办法里,咱们能够看到以下要害代码:
private void initScheduledTasks() { ... if (clientConfig.shouldRegisterWithEureka()) { ... // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize ... instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); }}
咱们能够看到在 if 判断分支里创立了一个 instanceInfoReplicator 实例,它会通过 start 执行一个定时工作:
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); }}
咱们能够在 InstanceInfoReplicator 类的 run()办法中找到这一段,同时能够一眼发现其注册关键点在于discoveryClient.register()
这段,咱们点进去看看:
boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204;}
这边能够发现是通过 HTTP REST (jersey 客户端)申请的形式将 instanceInfo 实例信息注册到 Eureka-server 上。咱们简略看一下 InstanceInfo 对象,属性基本上都能见名知义:
@JsonCreatorpublic InstanceInfo( @JsonProperty("instanceId") String instanceId, @JsonProperty("app") String appName, @JsonProperty("appGroupName") String appGroupName, @JsonProperty("ipAddr") String ipAddr, @JsonProperty("sid") String sid, @JsonProperty("port") PortWrapper port, @JsonProperty("securePort") PortWrapper securePort, @JsonProperty("homePageUrl") String homePageUrl, @JsonProperty("statusPageUrl") String statusPageUrl, @JsonProperty("healthCheckUrl") String healthCheckUrl, @JsonProperty("secureHealthCheckUrl") String secureHealthCheckUrl, @JsonProperty("vipAddress") String vipAddress, @JsonProperty("secureVipAddress") String secureVipAddress, @JsonProperty("countryId") int countryId, @JsonProperty("dataCenterInfo") DataCenterInfo dataCenterInfo, @JsonProperty("hostName") String hostName, @JsonProperty("status") InstanceStatus status, @JsonProperty("overriddenstatus") InstanceStatus overriddenstatus, @JsonProperty("leaseInfo") LeaseInfo leaseInfo, @JsonProperty("isCoordinatingDiscoveryServer") Boolean isCoordinatingDiscoveryServer, @JsonProperty("metadata") HashMap<String, String> metadata, @JsonProperty("lastUpdatedTimestamp") Long lastUpdatedTimestamp, @JsonProperty("lastDirtyTimestamp") Long lastDirtyTimestamp, @JsonProperty("actionType") ActionType actionType, @JsonProperty("asgName") String asgName) { this.instanceId = instanceId; this.sid = sid; this.appName = StringCache.intern(appName); this.appGroupName = StringCache.intern(appGroupName); this.ipAddr = ipAddr; this.port = port == null ? 0 : port.getPort(); this.isUnsecurePortEnabled = port != null && port.isEnabled(); this.securePort = securePort == null ? 0 : securePort.getPort(); this.isSecurePortEnabled = securePort != null && securePort.isEnabled(); this.homePageUrl = homePageUrl; this.statusPageUrl = statusPageUrl; this.healthCheckUrl = healthCheckUrl; this.secureHealthCheckUrl = secureHealthCheckUrl; this.vipAddress = StringCache.intern(vipAddress); this.secureVipAddress = StringCache.intern(secureVipAddress); this.countryId = countryId; this.dataCenterInfo = dataCenterInfo; this.hostName = hostName; this.status = status; this.overriddenstatus = overriddenstatus; this.leaseInfo = leaseInfo; this.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer; this.lastUpdatedTimestamp = lastUpdatedTimestamp; this.lastDirtyTimestamp = lastDirtyTimestamp; this.actionType = actionType; this.asgName = StringCache.intern(asgName); // --------------------------------------------------------------- // for compatibility if (metadata == null) { this.metadata = Collections.emptyMap(); } else if (metadata.size() == 1) { this.metadata = removeMetadataMapLegacyValues(metadata); } else { this.metadata = metadata; } if (sid == null) { this.sid = SID_DEFAULT; }}
总结一下整个过程如下:
服务续期
服务续期说起来可能比拟艰涩,其实就是在 client 端定时发动调用,让 Eureka-server 晓得本人还活着,在 eureka 代码中的正文解释为心跳(heart-beat)。
这里有两个比拟重要的配置须要留神:
- instance.leaseRenewalIntervalInSeconds
示意客户端的更新频率,默认 30s,也就是每 30s 就会向 Eureka-server 发动 renew 更新操作。 - instance.leaseExpirationDurationInSeconds
这是服务端视角的生效工夫,默认是 90s,也就是 Eureka-server 在 90s 内没有接管到来自 client 的 renew 操作就会将其剔除。
咱们间接从代码角度看一下,同样呢相干定时工作在 initScheduledTasks()办法中:
private void initScheduledTasks() { ... if (clientConfig.shouldRegisterWithEureka()) { ... // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); ... }}
能够看到这里创立了一个 HeartbeatThread()线程执行操作:
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } }}
咱们间接看 renew()办法:
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); return register(); } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; }}
这里比较简单,能够发现和服务注册是相似的,同样应用 HTTP REST 发动一个 hearbeat 申请,底层应用 jersey 客户端。
总结一下整个过程如下:
服务登记
服务登记逻辑比较简单,自身并不在定时工作中触发,而是通过对办法标记@PreDestroy,从而调用 shutdown 办法触发,最终会调用 unRegister()办法进行登记,同样的这也是一个 HTTP REST 申请,能够简略看下代码:
@PreDestroy@Overridepublic synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); }}/** * unregister w/ the eureka service. */void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + appPathIdentifier + " - deregister status: " + httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e); } }}
服务发现及更新
咱们来看作为服务消费者的要害逻辑,即发现服务以及更新服务。
首先 consumer 会在启动时从 Eureka-server 获取所有的服务列表,并在本地缓存。同时呢,因为本地有一份缓存,所以须要定期更新,更新频率能够配置。
启动时候在 consumer 在 discoveryClient 中会调用 fetchRegistry() 办法:
private boolean fetchRegistry(boolean forceFullRegistryFetch) { ... if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { ... getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } ...}
这里能够看到 fetchRegistry 里有 2 个判断分支,对应首次更新以及后续更新。首次更新会调用 getAndStoreFullRegistry()办法,咱们看一下:
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
能够看到和之前相似,如果在没有非凡指定的状况下,咱们会发动一个 HTTP REST 申请拉取所有利用的信息并进行缓存,缓存对象为 Applications,有趣味的能够进一步查看。
接下来,在咱们相熟的 initScheduledTasks()办法中,咱们还会启动一个更新利用信息缓存的 task:
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } ...}
在 CacheRefreshThread()这个 task 的 run 办法中,依然会调用到咱们之前的 fetchRegistry()办法,同时在判断时会走到另一个分支中,即调用到 getAndUpdateDelta()办法:
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); }}
能够看到,这边是应用 HTTP REST 发动一个 getDelta 申请,同时在 updateDelta()办法中会更新本地的 Applications 缓存对象。
总结一下,整个服务发现与更新的过程如下:
Ribbon 的"服务发现"
接下来咱们来看看 Ribbon 是怎么基于 Eureka 进行"服务发现"的,咱们之前说过这里的"服务发现"并不是严格意义上的服务发现,而是 Ribbon 如何基于 Eureka 构建本人的负载平衡列表并及时更新,同时咱们也不关注 Ribbon 其余负载平衡的具体逻辑(包含 IRule 路由,IPing 判断可用性)。
咱们能够先做一些猜测,首先 Ribbon 必定是基于 Eureka 的服务发现的。咱们上边形容了 Eureka 会拉取所有服务信息到本地缓存 Applications 中,那么 Ribbon 必定是基于这个 Applications 缓存来构建负载平衡列表的了,同时呢,负载平衡列表同样须要一个定时更新的机制来保障一致性。
服务调用
首先咱们从开发者的最后应用上看,在开发者在 RestTemplate 上开启@LoadBalanced 注解就可开启 Ribbon 的逻辑了,显然这是用了相似拦挡的办法。在 LoadBalancerAutoConfiguration 类中,咱们能够看到相干代码:
...@Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializer( final List<RestTemplateCustomizer> customizers) { return new SmartInitializingSingleton() { @Override public void afterSingletonsInstantiated() { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } } };}@Configuration@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return new RestTemplateCustomizer() { @Override public void customize(RestTemplate restTemplate) { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; }}...
能够看到,在初始化的过程中通过调用 customize()办法来给 RestTemplate 减少了拦截器 LoadBalancerInterceptor。而 LoadBalancerInterceptor 则是在拦挡办法中应用了 loadBalancer(RibbonLoadBalancerClient 类) 实现申请调用:
@Overridepublic ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));}
服务发现
到当初为止呢,咱们的申请调用曾经被 RibbonLoadBalancerClient 所封装,而其"服务发现"也是产生在 RibbonLoadBalancerClient 中的。
咱们点到其 execute()办法中:
@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request);}
这里依据 serviceId 构建了一个 ILoadBalancer,同时从 loadBalancer 中获取到了最终的实例 server 信息。ILoadBalancer 是定义了负载平衡的一个接口,它的要害办法 chooseServer()即是从负载平衡列表依据路由规定中选取一个 server。当然咱们次要关怀的点在于,负载平衡列表是怎么构建进去的。
通过源码跟踪咱们发现,在通过 getLoadBalancer()办法构建好 ILoadBalancer 对象后,对象中就曾经蕴含了服务列表。所以咱们来看看 ILoadBalancer 对象是怎么创立的:
protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId);}
那么这里其实是 springcloud 封装的 clientFactory,它会在 applicationContext 容器中寻找对应的 bean 。
通过源码追踪,咱们能够在主动配置类 RibbonClientConfiguration 中找到对应代码:
@Bean@ConditionalOnMissingBeanpublic ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater);}
咱们看到这里最终构建了 ILoadBalancer,其实现类是 ZoneAwareLoadBalancer,咱们察看其超类的初始化:
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig);}
这边最终执行了 restOfInit()办法,进一步跟踪:
void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());}
updateListOfServers()办法是获取所有的 ServerList 的,最终由 serverListImpl.getUpdatedListOfServers()获取所有的服务列表,在此 serverListImpl 即实现类为 DiscoveryEnabledNIWSServerList。
其中 DiscoveryEnabledNIWSServerList 有 getInitialListOfServers()和 getUpdatedListOfServers()办法,具体代码如下
@Overridepublic List<DiscoveryEnabledServer> getInitialListOfServers(){ return obtainServersViaDiscovery();}@Overridepublic List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery();}
此时咱们查看 obtainServersViaDiscovery()办法,曾经根本靠近于事物本质了,它创立了一个 EurekaClient 对象,在此就是 Eureka 的 DiscoveryClient 实现类,调用了其 getInstancesByVipAddress()办法,它最终从 DiscoveryClient 的 Applications 缓存中依据 serviceId 选取了对应的服务信息:
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList;}
服务更新
咱们曾经晓得首次启动时,Ribbon 是怎么联合 Eureka 实现负载平衡列表的构建了,那么与 Eureka 相似,咱们还须要及时对服务列表进行更新以保障一致性。
在 RibbonClientConfiguration 主动配置类中构建 ILoadBalancer 时咱们能够看到其结构器中有 ServerListUpdater 对象,而此对象也是在以后类中构建的:
@Bean@ConditionalOnMissingBeanpublic ServerListUpdater ribbonServerListUpdater(IClientConfig config) { return new PollingServerListUpdater(config);}
咱们察看此对象中的 start()办法看是如何实现更新的:
@Overridepublic synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); }}
这里有 2 个配置,即 initialDelayMs 首次检测默认 1s,refreshIntervalMs 检测距离默认 30s(和 Eureka 统一),创立了一个定时工作,执行 updateAction.doUpdate()办法。
咱们回到之前的 restOfInit()办法,查看其中的 enableAndInitLearnNewServersFeature()办法,能够看到是在此处触发了 ServerListUpdater 的 start 办法,同时传入了 updateAction 对象:
public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction);}
其实 updateAction 一开始就曾经创立好了,它依然是调用 之前的 updateListOfServers 办法来进行后续的更新:
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); }};
总结一下 Ribbon 三局部服务发现的整体流程如下:
参考资料:
- https://zhuanlan.zhihu.com/p/...
- https://blog.csdn.net/forezp/...
近期热文举荐:
1.600+ 道 Java面试题及答案整顿(2021最新版)
2.终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!
3.阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!
5.《Java开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞+转发哦!