关于java:Spring-Cloud-Eureka-与-Ribbon-是怎么做服务发现的

6次阅读

共计 20197 个字符,预计需要花费 51 分钟才能阅读完成。

作者:fredalxin

地址:https://fredal.xin/how-eureka…

本文基于 spring cloud dalston,同时文章较长,请抉择难受姿态进行浏览。

Eureka 与 Ribbon 是什么?和服务发现什么关系?

Eureka 与 Ribbon 都是 Netflix 提供的微服务组件,别离用于服务注册与发现、负载平衡。同时,这两者均属于 spring cloud netflix 体系,和 spring cloud 无缝集成,也正因为此被大家所熟知。

Eureka 自身是服务注册发现组件,实现了残缺的 Service Registry 和 Service Discovery。

Ribbon 则是一款负载平衡组件,那它和服务发现又有什么关系呢?负载平衡在整个微服务的调用模型中是紧挨着服务发现的,而 Ribbon 这个框架它其实是起到了开发者服务消费行为与底层服务发现组件 Eureka 之间桥梁的作用。

从严格概念上说 Ribbon 并不是做服务发现的,然而因为 Netflix 组件的松耦合,Ribbon 须要对 Eureka 的缓存服务列表进行相似 ” 服务发现 ” 的行为,从而构建本人的负载平衡列表并及时更新,也就是说 Ribbon 中的 ” 服务发现 ” 的宾语变成了 Eureka(或其余服务发现组件)。

Eureka 的服务注册与发现

咱们会先对 Eureka 的服务发现进行形容,重点是 Eureka-client 是如何进行服务的注册与发现的,同时不会过多停留于 Eureka 的架构、Eureka-server 的实现、Zone/Region 等领域。

Eureka-client 的服务发现都是由 DiscoveryClient 类实现的,它次要包含的性能有:

  • 向 Eureka-server 注册服务实例
  • 更新在 Eureka-server 的租期
  • 勾销在 Eureka-server 的租约(服务下线)
  • 发现服务实例并定期更新

服务注册

DiscoveryClient 所有的定时工作都是在 initScheduledTasks()办法里,咱们能够看到以下要害代码:

private void initScheduledTasks() {
    ...
    if (clientConfig.shouldRegisterWithEureka()) {
        ...
         // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
        ...
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    }
}

咱们能够看到在 if 判断分支里创立了一个 instanceInfoReplicator 实例,它会通过 start 执行一个定时工作:

public void run() {
    try {discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);
    } finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

咱们能够在 InstanceInfoReplicator 类的 run()办法中找到这一段,同时能够一眼发现其注册关键点在于 discoveryClient.register() 这段,咱们点进去看看:

boolean register() throws Throwable {logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;}

这边能够发现是通过 HTTP REST (jersey 客户端)申请的形式将 instanceInfo 实例信息注册到 Eureka-server 上。咱们简略看一下 InstanceInfo 对象,属性基本上都能见名知义:

@JsonCreator
public InstanceInfo(@JsonProperty("instanceId") String instanceId,
    @JsonProperty("app") String appName,
    @JsonProperty("appGroupName") String appGroupName,
    @JsonProperty("ipAddr") String ipAddr,
    @JsonProperty("sid") String sid,
    @JsonProperty("port") PortWrapper port,
    @JsonProperty("securePort") PortWrapper securePort,
    @JsonProperty("homePageUrl") String homePageUrl,
    @JsonProperty("statusPageUrl") String statusPageUrl,
    @JsonProperty("healthCheckUrl") String healthCheckUrl,
    @JsonProperty("secureHealthCheckUrl") String secureHealthCheckUrl,
    @JsonProperty("vipAddress") String vipAddress,
    @JsonProperty("secureVipAddress") String secureVipAddress,
    @JsonProperty("countryId") int countryId,
    @JsonProperty("dataCenterInfo") DataCenterInfo dataCenterInfo,
    @JsonProperty("hostName") String hostName,
    @JsonProperty("status") InstanceStatus status,
    @JsonProperty("overriddenstatus") InstanceStatus overriddenstatus,
    @JsonProperty("leaseInfo") LeaseInfo leaseInfo,
    @JsonProperty("isCoordinatingDiscoveryServer") Boolean isCoordinatingDiscoveryServer,
    @JsonProperty("metadata") HashMap<String, String> metadata,
    @JsonProperty("lastUpdatedTimestamp") Long lastUpdatedTimestamp,
    @JsonProperty("lastDirtyTimestamp") Long lastDirtyTimestamp,
    @JsonProperty("actionType") ActionType actionType,
    @JsonProperty("asgName") String asgName) {
    this.instanceId = instanceId;
    this.sid = sid;
    this.appName = StringCache.intern(appName);
    this.appGroupName = StringCache.intern(appGroupName);
    this.ipAddr = ipAddr;
    this.port = port == null ? 0 : port.getPort();
    this.isUnsecurePortEnabled = port != null && port.isEnabled();
    this.securePort = securePort == null ? 0 : securePort.getPort();
    this.isSecurePortEnabled = securePort != null && securePort.isEnabled();
    this.homePageUrl = homePageUrl;
    this.statusPageUrl = statusPageUrl;
    this.healthCheckUrl = healthCheckUrl;
    this.secureHealthCheckUrl = secureHealthCheckUrl;
    this.vipAddress = StringCache.intern(vipAddress);
    this.secureVipAddress = StringCache.intern(secureVipAddress);
    this.countryId = countryId;
    this.dataCenterInfo = dataCenterInfo;
    this.hostName = hostName;
    this.status = status;
    this.overriddenstatus = overriddenstatus;
    this.leaseInfo = leaseInfo;
    this.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer;
    this.lastUpdatedTimestamp = lastUpdatedTimestamp;
    this.lastDirtyTimestamp = lastDirtyTimestamp;
    this.actionType = actionType;
    this.asgName = StringCache.intern(asgName);

    // ---------------------------------------------------------------
    // for compatibility

    if (metadata == null) {this.metadata = Collections.emptyMap();
    } else if (metadata.size() == 1) {this.metadata = removeMetadataMapLegacyValues(metadata);
    } else {this.metadata = metadata;}

    if (sid == null) {this.sid = SID_DEFAULT;}
}

总结一下整个过程如下:

服务续期

服务续期说起来可能比拟艰涩,其实就是在 client 端定时发动调用,让 Eureka-server 晓得本人还活着,在 eureka 代码中的正文解释为心跳(heart-beat)。

这里有两个比拟重要的配置须要留神:

  • instance.leaseRenewalIntervalInSeconds
    示意客户端的更新频率,默认 30s,也就是每 30s 就会向 Eureka-server 发动 renew 更新操作。
  • instance.leaseExpirationDurationInSeconds
    这是服务端视角的生效工夫,默认是 90s,也就是 Eureka-server 在 90s 内没有接管到来自 client 的 renew 操作就会将其剔除。

咱们间接从代码角度看一下,同样呢相干定时工作在 initScheduledTasks()办法中:

private void initScheduledTasks() {
    ...
    if (clientConfig.shouldRegisterWithEureka()) {
        ...
         // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
        ...
    }
}

能够看到这里创立了一个 HeartbeatThread()线程执行操作:

private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

咱们间接看 renew()办法:

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            return register();}
        return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

这里比较简单,能够发现和服务注册是相似的,同样应用 HTTP REST 发动一个 hearbeat 申请,底层应用 jersey 客户端。

总结一下整个过程如下:

服务登记

服务登记逻辑比较简单,自身并不在定时工作中触发,而是通过对办法标记 @PreDestroy,从而调用 shutdown 办法触发,最终会调用 unRegister()办法进行登记,同样的这也是一个 HTTP REST 申请,能够简略看下代码:

@PreDestroy
@Override
public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }

        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();}

        if (eurekaTransport != null) {eurekaTransport.shutdown();
        }

        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        logger.info("Completed shut down of DiscoveryClient");
    }
}

/**
     * unregister w/ the eureka service.
     */
void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + appPathIdentifier + "- deregister  status:" + httpResponse.getStatusCode());
        } catch (Exception e) {logger.error(PREFIX + appPathIdentifier + "- de-registration failed" + e.getMessage(), e);
        }
    }
}

服务发现及更新

咱们来看作为服务消费者的要害逻辑,即发现服务以及更新服务。

首先 consumer 会在启动时从 Eureka-server 获取所有的服务列表,并在本地缓存。同时呢,因为本地有一份缓存,所以须要定期更新,更新频率能够配置。

启动时候在 consumer 在 discoveryClient 中会调用 fetchRegistry() 办法:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            ...
    if (clientConfig.shouldDisableDelta()
            || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
    {
        ...
        getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);
    }
    ...
}

这里能够看到 fetchRegistry 里有 2 个判断分支,对应首次更新以及后续更新。首次更新会调用 getAndStoreFullRegistry()办法,咱们看一下:

 private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();

     logger.info("Getting all instance registry info from the eureka server");

     Applications apps = null;
     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
         ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
         : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();
     }
     logger.info("The response status is {}", httpResponse.getStatusCode());

     if (apps == null) {logger.error("The application is null for some reason. Not storing this information");
     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(apps));
         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
     } else {logger.warn("Not updating applications as another thread is updating it already");
     }
 }

能够看到和之前相似,如果在没有非凡指定的状况下,咱们会发动一个 HTTP REST 申请拉取所有利用的信息并进行缓存,缓存对象为 Applications,有趣味的能够进一步查看。

接下来,在咱们相熟的 initScheduledTasks()办法中,咱们还会启动一个更新利用信息缓存的 task:

private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    ...
}

在 CacheRefreshThread()这个 task 的 run 办法中,依然会调用到咱们之前的 fetchRegistry()办法,同时在判断时会走到另一个分支中,即调用到 getAndUpdateDelta()办法:

private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();
    }

    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe."
                    + "Hence got the full registry.");
        getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {fetchRegistryUpdateLock.unlock();
            }
        } else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

能够看到,这边是应用 HTTP REST 发动一个 getDelta 申请,同时在 updateDelta()办法中会更新本地的 Applications 缓存对象。

总结一下,整个服务发现与更新的过程如下:

Ribbon 的 ” 服务发现 ”

接下来咱们来看看 Ribbon 是怎么基于 Eureka 进行 ” 服务发现 ” 的,咱们之前说过这里的 ” 服务发现 ” 并不是严格意义上的服务发现,而是 Ribbon 如何基于 Eureka 构建本人的负载平衡列表并及时更新,同时咱们也不关注 Ribbon 其余负载平衡的具体逻辑(包含 IRule 路由,IPing 判断可用性)。

咱们能够先做一些猜测,首先 Ribbon 必定是基于 Eureka 的服务发现的。咱们上边形容了 Eureka 会拉取所有服务信息到本地缓存 Applications 中,那么 Ribbon 必定是基于这个 Applications 缓存来构建负载平衡列表的了,同时呢,负载平衡列表同样须要一个定时更新的机制来保障一致性。

服务调用

首先咱们从开发者的最后应用上看,在开发者在 RestTemplate 上开启 @LoadBalanced 注解就可开启 Ribbon 的逻辑了,显然这是用了相似拦挡的办法。在 LoadBalancerAutoConfiguration 类中,咱们能够看到相干代码:

...
@Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializer(final List<RestTemplateCustomizer> customizers) {return new SmartInitializingSingleton() {
        @Override
        public void afterSingletonsInstantiated() {for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {for (RestTemplateCustomizer customizer : customizers) {customizer.customize(restTemplate);
                }
            }
        }
    };
}


@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
    @Bean
    public LoadBalancerInterceptor ribbonInterceptor(
        LoadBalancerClient loadBalancerClient,
        LoadBalancerRequestFactory requestFactory) {return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {return new RestTemplateCustomizer() {
            @Override
            public void customize(RestTemplate restTemplate) {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            }
        };
    }
}
...

能够看到,在初始化的过程中通过调用 customize()办法来给 RestTemplate 减少了拦截器 LoadBalancerInterceptor。而 LoadBalancerInterceptor 则是在拦挡办法中应用了 loadBalancer(RibbonLoadBalancerClient 类) 实现申请调用:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                    final ClientHttpRequestExecution execution) throws IOException {final URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname:" + originalUri);
    return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}

服务发现

到当初为止呢,咱们的申请调用曾经被 RibbonLoadBalancerClient 所封装,而其 ” 服务发现 ” 也是产生在 RibbonLoadBalancerClient 中的。

咱们点到其 execute()办法中:

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer);
    if (server == null) {throw new IllegalStateException("No instances available for" + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                                                                             serviceId), serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

这里依据 serviceId 构建了一个 ILoadBalancer,同时从 loadBalancer 中获取到了最终的实例 server 信息。ILoadBalancer 是定义了负载平衡的一个接口,它的要害办法 chooseServer()即是从负载平衡列表依据路由规定中选取一个 server。当然咱们次要关怀的点在于,负载平衡列表是怎么构建进去的。

通过源码跟踪咱们发现,在通过 getLoadBalancer()办法构建好 ILoadBalancer 对象后,对象中就曾经蕴含了服务列表。所以咱们来看看 ILoadBalancer 对象是怎么创立的:

protected ILoadBalancer getLoadBalancer(String serviceId) {return this.clientFactory.getLoadBalancer(serviceId);
}

那么这里其实是 springcloud 封装的 clientFactory,它会在 applicationContext 容器中寻找对应的 bean。

通过源码追踪,咱们能够在主动配置类 RibbonClientConfiguration 中找到对应代码:

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

咱们看到这里最终构建了 ILoadBalancer,其实现类是 ZoneAwareLoadBalancer,咱们察看其超类的初始化:

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                     ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
    }
    restOfInit(clientConfig);
}

这边最终执行了 restOfInit()办法,进一步跟踪:

void restOfInit(IClientConfig clientConfig) {boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature();

    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

updateListOfServers()办法是获取所有的 ServerList 的,最终由 serverListImpl.getUpdatedListOfServers()获取所有的服务列表,在此 serverListImpl 即实现类为 DiscoveryEnabledNIWSServerList。

其中 DiscoveryEnabledNIWSServerList 有 getInitialListOfServers()和 getUpdatedListOfServers()办法,具体代码如下

@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){return obtainServersViaDiscovery();
}

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){return obtainServersViaDiscovery();
}

此时咱们查看 obtainServersViaDiscovery()办法,曾经根本靠近于事物本质了,它创立了一个 EurekaClient 对象,在此就是 Eureka 的 DiscoveryClient 实现类,调用了其 getInstancesByVipAddress()办法,它最终从 DiscoveryClient 的 Applications 缓存中依据 serviceId 选取了对应的服务信息:

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();}

    EurekaClient eurekaClient = eurekaClientProvider.get();
    if (vipAddresses!=null){for (String vipAddress : vipAddresses.split(",")) {
            // if targetRegion is null, it will be interpreted as the same region of client
            List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
            for (InstanceInfo ii : listOfInstanceInfo) {if (ii.getStatus().equals(InstanceStatus.UP)) {if(shouldUseOverridePort){if(logger.isDebugEnabled()){logger.debug("Overriding port on client name:" + clientName + "to" + overridePort);
                        }

                        // copy is necessary since the InstanceInfo builder just uses the original reference,
                        // and we don't want to corrupt the global eureka copy of the object which may be
                        // used by other clients in our system
                        InstanceInfo copy = new InstanceInfo(ii);

                        if(isSecure){ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();}else{ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();}
                    }

                    DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                    des.setZone(DiscoveryClient.getZone(ii));
                    serverList.add(des);
                }
            }
            if (serverList.size()>0 && prioritizeVipAddressBasedServers){break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers}
        }
    }
    return serverList;
}

服务更新

咱们曾经晓得首次启动时,Ribbon 是怎么联合 Eureka 实现负载平衡列表的构建了,那么与 Eureka 相似,咱们还须要及时对服务列表进行更新以保障一致性。

在 RibbonClientConfiguration 主动配置类中构建 ILoadBalancer 时咱们能够看到其结构器中有 ServerListUpdater 对象,而此对象也是在以后类中构建的:

@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {return new PollingServerListUpdater(config);
}

咱们察看此对象中的 start()办法看是如何实现更新的:

@Override
public synchronized void start(final UpdateAction updateAction) {if (isActive.compareAndSet(false, true)) {final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {if (!isActive.get()) {if (scheduledFuture != null) {scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();} catch (Exception e) {logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            initialDelayMs,
            refreshIntervalMs,
            TimeUnit.MILLISECONDS
        );
    } else {logger.info("Already active, no-op");
    }
}

这里有 2 个配置,即 initialDelayMs 首次检测默认 1s,refreshIntervalMs 检测距离默认 30s(和 Eureka 统一),创立了一个定时工作,执行 updateAction.doUpdate()办法。

咱们回到之前的 restOfInit()办法,查看其中的 enableAndInitLearnNewServersFeature()办法,能够看到是在此处触发了 ServerListUpdater 的 start 办法,同时传入了 updateAction 对象:

public void enableAndInitLearnNewServersFeature() {LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
    serverListUpdater.start(updateAction);
}

其实 updateAction 一开始就曾经创立好了,它依然是调用 之前的 updateListOfServers 办法来进行后续的更新:

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {updateListOfServers();
    }
};

总结一下 Ribbon 三局部服务发现的整体流程如下:

参考资料:

  • https://zhuanlan.zhihu.com/p/…
  • https://blog.csdn.net/forezp/…

近期热文举荐:

1.600+ 道 Java 面试题及答案整顿(2021 最新版)

2. 终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!

3. 阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!

4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0