问题

当前项目用的是springcloud-gateway + eureka + springboot架构,请求会先经过网关,网关根据注册中心获取业务项目服务器地址,再转发到业务服务接口上;这种架构在项目重启时,存在几个问题 :

  1. 业务项目实例shutdown时,会停止当前未完成的REQUEST请求。
  2. 某个业务项目实例已经停止了,但是网关仍会转发请求过去,导致请求失败。
  3. 某个业务项目实例已经重新启动了,但是网关并不会马上向这个实例转发请求;假如项目只有两个实例,如果在第一个节点刚启动完就立刻重启另外一个实例,就会导致服务不可用。

要解决以上问题,我们需要先了解gatewayeurekaribbonTomcat的原理,明白为什么会出现以上问题。

主流程

先从gateway入口处开始了解,以下是springcloud-gateway官网的一张图:

有个关键类RoutePredicateHandlerMapping,继承了AbstractHandlerMapping,是webflux的handlermapping,作用相当于webmvc的handlermapping:将请求映射到对应的handler来处理。RoutePredicateHandlerMapping会遍历所有路由Route,获取符合规则的路由,并将获取到的route放入当前请求上下文的属性中。

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {    @Override    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {        // don't handle requests on the management port if set        if (managmentPort != null && exchange.getRequest().getURI().getPort() == managmentPort.intValue()) {            return Mono.empty();        }        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());        return lookupRoute(exchange)                // .log("route-predicate-handler-mapping", Level.FINER) //name this                .flatMap((Function<Route, Mono<?>>) r -> {                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);                    if (logger.isDebugEnabled()) {                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);                    }                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);                    return Mono.just(webHandler);                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);                    if (logger.isTraceEnabled()) {                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");                    }                })));    }    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {        return this.routeLocator                .getRoutes()                //individually filter routes so that filterWhen error delaying is not a problem                .concatMap(route -> Mono                        .just(route)                        .filterWhen(r -> {                            // add the current route we are testing                            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());                            return r.getPredicate().apply(exchange);                        })                        //instead of immediately stopping main flux due to error, log and swallow it                        .doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))                        .onErrorResume(e -> Mono.empty())                )                // .defaultIfEmpty() put a static Route not found                // or .switchIfEmpty()                // .switchIfEmpty(Mono.<Route>empty().log("noroute"))                .next()                //TODO: error handling                .map(route -> {                    if (logger.isDebugEnabled()) {                        logger.debug("Route matched: " + route.getId());                    }                    validateRoute(route, exchange);                    return route;                });        /* TODO: trace logging            if (logger.isTraceEnabled()) {                logger.trace("RouteDefinition did not match: " + routeDefinition.getId());            }*/    }}

routeLocator.getRoutes()看到是从routeLocator里获取路由列表,我们看下路由规则是怎么生成的。routeLocator有个实现类是RouteDefinitionRouteLocator

public class RouteDefinitionRouteLocator implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {    @Override    public Flux<Route> getRoutes() {        return this.routeDefinitionLocator.getRouteDefinitions()                .map(this::convertToRoute)                //TODO: error handling                .map(route -> {                    if (logger.isDebugEnabled()) {                        logger.debug("RouteDefinition matched: " + route.getId());                    }                    return route;                });        /* TODO: trace logging            if (logger.isTraceEnabled()) {                logger.trace("RouteDefinition did not match: " + routeDefinition.getId());            }*/    }}

RouteDefinitionRouteLocator是从RouteDefinitionLocator里获取路由列表。在项目配置的是基于服务发现的路由spring.cloud.gateway.discovery.locator.enabled: true情况下,RouteDefinitionLocator实现类默认是从DiscoveryClientRouteDefinitionLocator获取路由列表。

public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {    @Override    public Flux<RouteDefinition> getRouteDefinitions() {        SpelExpressionParser parser = new SpelExpressionParser();        Expression includeExpr = parser.parseExpression(properties.getIncludeExpression());        Expression urlExpr = parser.parseExpression(properties.getUrlExpression());        Predicate<ServiceInstance> includePredicate;        if (properties.getIncludeExpression() == null || "true".equalsIgnoreCase(properties.getIncludeExpression())) {            includePredicate = instance -> true;        } else {            includePredicate = instance -> {                Boolean include = includeExpr.getValue(evalCtxt, instance, Boolean.class);                if (include == null) {                    return false;                }                return include;            };        }        return Flux.fromIterable(discoveryClient.getServices())                .map(discoveryClient::getInstances)                .filter(instances -> !instances.isEmpty())                .map(instances -> instances.get(0))                .filter(includePredicate)                .map(instance -> {                    String serviceId = instance.getServiceId();                    RouteDefinition routeDefinition = new RouteDefinition();                    routeDefinition.setId(this.routeIdPrefix + serviceId);                    String uri = urlExpr.getValue(evalCtxt, instance, String.class);                    routeDefinition.setUri(URI.create(uri));                    final ServiceInstance instanceForEval = new DelegatingServiceInstance(instance, properties);                    for (PredicateDefinition original : this.properties.getPredicates()) {                        PredicateDefinition predicate = new PredicateDefinition();                        predicate.setName(original.getName());                        for (Map.Entry<String, String> entry : original.getArgs().entrySet()) {                            String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry);                            predicate.addArg(entry.getKey(), value);                        }                        routeDefinition.getPredicates().add(predicate);                    }                    for (FilterDefinition original : this.properties.getFilters()) {                        FilterDefinition filter = new FilterDefinition();                        filter.setName(original.getName());                        for (Map.Entry<String, String> entry : original.getArgs().entrySet()) {                            String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry);                            filter.addArg(entry.getKey(), value);                        }                        routeDefinition.getFilters().add(filter);                    }                    return routeDefinition;                });    }}

DiscoveryClientRouteDefinitionLocatorEureka中获取服务列表(discoveryClient.getServices),为了性能考虑,springcloud-gateway又在RouteDefinitionRouteLocator上套上了缓存CachingRouteLocator

public class CachingRouteLocator implements RouteLocator, ApplicationListener<RefreshRoutesEvent> {   private final RouteLocator delegate;   private final Flux<Route> routes;   private final Map<String, List> cache = new HashMap<>();   public CachingRouteLocator(RouteLocator delegate) {      this.delegate = delegate;      routes = CacheFlux.lookup(cache, "routes", Route.class)            .onCacheMissResume(() -> this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE));   }   @Override   public Flux<Route> getRoutes() {      return this.routes;   }   /**    * Clears the routes cache    * @return routes flux    */   public Flux<Route> refresh() {      this.cache.clear();      return this.routes;   }   @Override   public void onApplicationEvent(RefreshRoutesEvent event) {      refresh();   }   @Deprecated   /* for testing */ void handleRefresh() {      refresh();   }}

CachingRouteLocator监听到RefreshRoutesEvent事件时刷新缓存(GatewayWebfluxEndpoint有一个HTTP API调用了ApplicationEventPublisher ,发布RefreshRoutesEvent事件),而RouteRefreshListener监听到服务注册InstanceRegisteredEvent事件时,会发送RefreshRoutesEvent事件,也就是当有新服务注册时,会刷新缓存。

public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {   private HeartbeatMonitor monitor = new HeartbeatMonitor();   private final ApplicationEventPublisher publisher;   public RouteRefreshListener(ApplicationEventPublisher publisher) {      Assert.notNull(publisher, "publisher may not be null");      this.publisher = publisher;   }   @Override   public void onApplicationEvent(ApplicationEvent event) {      if (event instanceof ContextRefreshedEvent            || event instanceof RefreshScopeRefreshedEvent            || event instanceof InstanceRegisteredEvent) {         reset();      }      else if (event instanceof ParentHeartbeatEvent) {         ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;         resetIfNeeded(e.getValue());      }      else if (event instanceof HeartbeatEvent) {         HeartbeatEvent e = (HeartbeatEvent) event;         resetIfNeeded(e.getValue());      }   }   private void resetIfNeeded(Object value) {      if (this.monitor.update(value)) {         reset();      }   }   private void reset() {      this.publisher.publishEvent(new RefreshRoutesEvent(this));   }}

接下来进入到FilteringWebHandlerFilteringWebHandler获取route的过滤器列表并转为过滤链,开始执行过滤器链。

public class FilteringWebHandler implements WebHandler {    @Override    public Mono<Void> handle(ServerWebExchange exchange) {        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);        List<GatewayFilter> gatewayFilters = route.getFilters();        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);        combined.addAll(gatewayFilters);        //TODO: needed or cached?        AnnotationAwareOrderComparator.sort(combined);        if (logger.isDebugEnabled()) {            logger.debug("Sorted gatewayFilterFactories: "+ combined);        }        return new DefaultGatewayFilterChain(combined).filter(exchange);    }}

接下来进入到RouteToRequestUrlFilter,构造完整的负载均衡地址,例如route配置的中转服务是lb://MY-SERVICE,请求的路径是/hello/world,则构建后的地址是lb://MY-SERVICE/hello/world,将构建后的地址放入当前请求上下文中,继续下一个filter

public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);        if (route == null) {            return chain.filter(exchange);        }        log.trace("RouteToRequestUrlFilter start");        URI uri = exchange.getRequest().getURI();        boolean encoded = containsEncodedParts(uri);        URI routeUri = route.getUri();        if (hasAnotherScheme(routeUri)) {            // this is a special url, save scheme to special attribute            // replace routeUri with schemeSpecificPart            exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());            routeUri = URI.create(routeUri.getSchemeSpecificPart());        }        if("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {            //Load balanced URIs should always have a host. If the host is null it is most            //likely because the host name was invalid (for example included an underscore)            throw new IllegalStateException("Invalid host: " + routeUri.toString());        }        URI mergedUrl = UriComponentsBuilder.fromUri(uri)                // .uri(routeUri)                .scheme(routeUri.getScheme())                .host(routeUri.getHost())                .port(routeUri.getPort())                .build(encoded)                .toUri();        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);        return chain.filter(exchange);    }}

接下来就是LoadBalancerClientFilter了,进入LoadBalancerClientFilter可以看到,首先获取scheme,如果不是lb,则直接往下一个filter传递;如果是lb,则选择服务节点构建成最终的中转地址。

public class LoadBalancerClientFilter implements GlobalFilter, Ordered {    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);        String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);        if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);            log.trace("LoadBalancerClientFilter url before: " + url);            ServiceInstance instance = this.choose(exchange);            if (instance == null) {                String msg = "Unable to find instance for " + url.getHost();                if (this.properties.isUse404()) {                    throw new LoadBalancerClientFilter.FourOFourNotFoundException(msg);                } else {                    throw new NotFoundException(msg);                }            } else {                URI uri = exchange.getRequest().getURI();                String overrideScheme = instance.isSecure() ? "https" : "http";                if (schemePrefix != null) {                    overrideScheme = url.getScheme();                }                    URI requestUrl = this.loadBalancer.reconstructURI(new LoadBalancerClientFilter.DelegatingServiceInstance(instance, overrideScheme), uri);                log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);                return chain.filter(exchange);            }        } else {            return chain.filter(exchange);        }    }        protected ServiceInstance choose(ServerWebExchange exchange) {        return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost());    }}

最后对实际地址的转发在NettyRoutingFilter中。

public class NettyRoutingFilter implements GlobalFilter, Ordered {    private final HttpClient httpClient;    private final ObjectProvider<List<HttpHeadersFilter>> headersFilters;    private final HttpClientProperties properties;    public NettyRoutingFilter(HttpClient httpClient,                              ObjectProvider<List<HttpHeadersFilter>> headersFilters,                              HttpClientProperties properties) {        this.httpClient = httpClient;        this.headersFilters = headersFilters;        this.properties = properties;    }    @Override    public int getOrder() {        return Ordered.LOWEST_PRECEDENCE;    }    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);        String scheme = requestUrl.getScheme();        if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {            return chain.filter(exchange);        }        setAlreadyRouted(exchange);        ServerHttpRequest request = exchange.getRequest();        final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());        final String url = requestUrl.toString();        HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(),                exchange);        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();        filtered.forEach(httpHeaders::set);        String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);        boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);        boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);        // 这里        Flux<HttpClientResponse> responseFlux = this.httpClient                .chunkedTransfer(chunkedTransfer)                .request(method)                .uri(url)                .send((req, nettyOutbound) -> {                    req.headers(httpHeaders);                    if (preserveHost) {                        String host = request.getHeaders().getFirst(HttpHeaders.HOST);                        req.header(HttpHeaders.HOST, host);                    }                    return nettyOutbound                            .options(NettyPipeline.SendOptions::flushOnEach)                            .send(request.getBody().map(dataBuffer ->                                    ((NettyDataBuffer) dataBuffer).getNativeBuffer()));                }).responseConnection((res, connection) -> {                    ServerHttpResponse response = exchange.getResponse();                    // put headers and status so filters can modify the response                    HttpHeaders headers = new HttpHeaders();                    res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));                    String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);                    if (StringUtils.hasLength(contentTypeValue)) {                        exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);                    }                    HttpStatus status = HttpStatus.resolve(res.status().code());                    if (status != null) {                        response.setStatusCode(status);                    } else if (response instanceof AbstractServerHttpResponse) {                        // https://jira.spring.io/browse/SPR-16748                        ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code());                    } else {                        throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass());                    }                    // make sure headers filters run after setting status so it is available in response                    HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(                            this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE);                    if(!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) &&                            filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {                        //It is not valid to have both the transfer-encoding header and the content-length header                        //remove the transfer-encoding header in the response if the content-length header is presen                        response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);                    }                    exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());                    response.getHeaders().putAll(filteredResponseHeaders);                    // Defer committing the response until all route filters have run                    // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter                    exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);                    exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);                    return Mono.just(res);                });        if (properties.getResponseTimeout() != null) {            responseFlux = responseFlux.timeout(properties.getResponseTimeout(),                    Mono.error(new TimeoutException("Response took longer than timeout: " +                            properties.getResponseTimeout()))).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, null, th));        }        return responseFlux.then(chain.filter(exchange));    }}

负载均衡

整个转发流程的重点就是选择服务节点,在选择服务节点之前,需要先获取负载均衡策略器。

// RibbonLoadBalancerClient.javaprotected ILoadBalancer getLoadBalancer(String serviceId) {   return this.clientFactory.getLoadBalancer(serviceId);}// SpringClientFactory.javapublic class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {    public ILoadBalancer getLoadBalancer(String name) {        return (ILoadBalancer)this.getInstance(name, ILoadBalancer.class);    }        public <C> C getInstance(String name, Class<C> type) {        C instance = super.getInstance(name, type);        if (instance != null) {            return instance;        } else {            IClientConfig config = (IClientConfig)this.getInstance(name, IClientConfig.class);            return instantiateWithConfig(this.getContext(name), type, config);        }    }        static <C> C instantiateWithConfig(AnnotationConfigApplicationContext context, Class<C> clazz, IClientConfig config) {        Object result = null;        try {            Constructor<C> constructor = clazz.getConstructor(IClientConfig.class);            result = constructor.newInstance(config);        } catch (Throwable var5) {        }        if (result == null) {            result = BeanUtils.instantiate(clazz);            if (result instanceof IClientConfigAware) {                ((IClientConfigAware)result).initWithNiwsConfig(config);            }            if (context != null) {                context.getAutowireCapableBeanFactory().autowireBean(result);            }        }        return result;    }}// NamedContextFactory.javapublic <T> T getInstance(String name, Class<T> type) {        AnnotationConfigApplicationContext context = this.getContext(name);    return BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0 ? context.getBean(type) : null;}

通过源码可以看到,先根据serviceIdspring-context里获取负载均衡策略器,如果没有获取到,则自己初始化一个,默认负载均衡策略器是ZoneAwareLoadBalancer
获取到负载均衡策略器之后,就要获取服务列表,并选择其中的一个节点;选择服务的核心在BaseLoadBalancer

public Server chooseServer(Object key) {    if (counter == null) {        counter = createCounter();    }    counter.increment();    if (rule == null) {        return null;    } else {        try {            return rule.choose(key);        } catch (Exception e) {            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);            return null;        }    }}

每个ILoadBalancer内都有个IRule对象,ILoadBalancer.chooseServer最终是调IRule.chooseServer,默认是ZoneAvoidanceRule
Rule选择服务之前,要先获取所有的服务。

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {       /**     * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.     *      */    public abstract AbstractServerPredicate getPredicate();            /**     * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.     * The performance for this method is O(n) where n is number of servers to be filtered.     */    @Override    public Server choose(Object key) {        ILoadBalancer lb = getLoadBalancer();        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);        if (server.isPresent()) {            return server.get();        } else {            return null;        }           }}public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware {            protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());        @Override    public List<Server> getAllServers() {        return Collections.unmodifiableList(allServerList);    }}public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {    private int incrementAndGetModulo(int modulo) {        for (;;) {            int current = nextIndex.get();            int next = (current + 1) % modulo;            if (nextIndex.compareAndSet(current, next) && current < modulo)                return current;        }    }    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);        if (eligible.size() == 0) {            return Optional.absent();        }        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));    }}

可以看到在LoadBalancer内有保存了服务列表,然后IRule根据自己的规则选择其中的一个服务节点。至于LoadBalancer内有保存的服务列表是怎么获取的,在BaseLoadBalancer里可以看到设置allServerList的方法。

public void setServersList(List lsrv) {    Lock writeLock = allServerLock.writeLock();    logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);        ArrayList<Server> newServers = new ArrayList<Server>();    writeLock.lock();    try {        ArrayList<Server> allServers = new ArrayList<Server>();        for (Object server : lsrv) {            if (server == null) {                continue;            }            if (server instanceof String) {                server = new Server((String) server);            }            if (server instanceof Server) {                logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());                allServers.add((Server) server);            } else {                throw new IllegalArgumentException(                        "Type String or Server expected, instead found:"                                + server.getClass());            }        }        boolean listChanged = false;        if (!allServerList.equals(allServers)) {            listChanged = true;            if (changeListeners != null && changeListeners.size() > 0) {               List<Server> oldList = ImmutableList.copyOf(allServerList);               List<Server> newList = ImmutableList.copyOf(allServers);                                  for (ServerListChangeListener l: changeListeners) {                   try {                       l.serverListChanged(oldList, newList);                   } catch (Exception e) {                       logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);                   }               }            }        }        if (isEnablePrimingConnections()) {            for (Server server : allServers) {                if (!allServerList.contains(server)) {                    server.setReadyToServe(false);                    newServers.add((Server) server);                }            }            if (primeConnections != null) {                primeConnections.primeConnectionsAsync(newServers, this);            }        }        // This will reset readyToServe flag to true on all servers        // regardless whether        // previous priming connections are success or not        allServerList = allServers;        if (canSkipPing()) {            for (Server s : allServerList) {                s.setAlive(true);            }            upServerList = allServerList;        } else if (listChanged) {            forceQuickPing();        }    } finally {        writeLock.unlock();    }}

ZoneAwareLoadBalancer获取服务列表的代码入口是在DynamicServerListLoadBalancerBaseLoadBalancer的子类、同时也是默认负载均衡器ZoneAwareLoadBalancer父类)里。在
ZoneAwareLoadBalancer构造方法里启动了ServerListUpdater.UpdateAction定时任务。

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {            protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {        @Override        public void doUpdate() {            updateListOfServers();        }    };        public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,                                         ServerList<T> serverList, ServerListFilter<T> filter,                                         ServerListUpdater serverListUpdater) {        super(clientConfig, rule, ping);        this.serverListImpl = serverList;        this.filter = filter;        this.serverListUpdater = serverListUpdater;        if (filter instanceof AbstractServerListFilter) {            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());        }        restOfInit(clientConfig);    }        void restOfInit(IClientConfig clientConfig) {        boolean primeConnection = this.isEnablePrimingConnections();        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()        this.setEnablePrimingConnections(false);        enableAndInitLearnNewServersFeature();        updateListOfServers();        if (primeConnection && this.getPrimeConnections() != null) {            this.getPrimeConnections()                    .primeConnections(getReachableServers());        }        this.setEnablePrimingConnections(primeConnection);        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());    }        public void enableAndInitLearnNewServersFeature() {        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());        serverListUpdater.start(updateAction);    }    @VisibleForTesting    public void updateListOfServers() {        List<T> servers = new ArrayList<T>();        if (serverListImpl != null) {            servers = serverListImpl.getUpdatedListOfServers();            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",                    getIdentifier(), servers);            if (filter != null) {                servers = filter.getFilteredListOfServers(servers);                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",                        getIdentifier(), servers);            }        }        updateAllServerList(servers);    }}

其中serverListUpdater默认是PollingServerListUpdater,进入代码可以看到是启动定时任务来定时调用updateAction

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;@Overridepublic synchronized void start(final UpdateAction updateAction) {    if (isActive.compareAndSet(false, true)) {        final Runnable wrapperRunnable = new Runnable() {            @Override            public void run() {                if (!isActive.get()) {                    if (scheduledFuture != null) {                        scheduledFuture.cancel(true);                    }                    return;                }                try {                    updateAction.doUpdate();                    lastUpdated = System.currentTimeMillis();                } catch (Exception e) {                    logger.warn("Failed one update cycle", e);                }            }        };        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(                wrapperRunnable,                initialDelayMs,                refreshIntervalMs,                TimeUnit.MILLISECONDS        );    } else {        logger.info("Already active, no-op");    }}

定时任务默认是每30秒刷新调用一次,也可以通过设置参数ribbon.ServerListRefreshInterval调整刷新频率。
updateListOfServers方法里的serverListImpl.getUpdatedListOfServers()是获取服务列表,这里的ServerList serverListImpl默认是DomainExtractingServerList

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,            IClientConfig clientConfig, boolean approximateZoneFromHostname) {        this.list = list;        this.ribbon = RibbonProperties.from(clientConfig);        this.approximateZoneFromHostname = approximateZoneFromHostname;    }        @Override    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {        List<DiscoveryEnabledServer> servers = setZones(this.list                .getUpdatedListOfServers());        return servers;    }}

DomainExtractingServerList里面包着一层ServerList: DiscoveryEnabledNIWSServerList,最终获取服务列表的方式在DiscoveryEnabledNIWSServerList里。

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {            logger.warn("EurekaClient has not been initialized yet, returning an empty list");            return new ArrayList<DiscoveryEnabledServer>();        }        EurekaClient eurekaClient = eurekaClientProvider.get();        if (vipAddresses!=null){            for (String vipAddress : vipAddresses.split(",")) {                // if targetRegion is null, it will be interpreted as the same region of client                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);                for (InstanceInfo ii : listOfInstanceInfo) {                    if (ii.getStatus().equals(InstanceStatus.UP)) {                        if(shouldUseOverridePort){                            if(logger.isDebugEnabled()){                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);                            }                            // copy is necessary since the InstanceInfo builder just uses the original reference,                            // and we don't want to corrupt the global eureka copy of the object which may be                            // used by other clients in our system                            InstanceInfo copy = new InstanceInfo(ii);                            if(isSecure){                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();                            }else{                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();                            }                        }                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);                        serverList.add(des);                    }                }                if (serverList.size()>0 && prioritizeVipAddressBasedServers){                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers                }            }        }        return serverList;    }}

最终是获取Eureka里的节点信息,也就是Ribbon通过Eureka获取服务列表,获取的服务列表会缓存在本地,每隔一段时间刷新(默认30秒)。需要注意的是每个服务都有一个独立的LoadBalancer,所以allServerList保存的是是单个服务的所有节点列表。
springcloud-gatewayGatewayControllerEndpoint提供了对routefilter的操作API。

流程图

建议流程如下: