乐趣区

springcloud项目优雅重启一问题和gatewayribbon流程

问题

当前项目用的是 springcloud-gateway + eureka + springboot 架构,请求会先经过网关,网关根据注册中心获取业务项目服务器地址,再转发到业务服务接口上;这种架构在项目重启时,存在几个问题:

  1. 业务项目实例 shutdown 时,会停止当前未完成的 REQUEST 请求。
  2. 某个业务项目实例已经停止了,但是网关仍会转发请求过去,导致请求失败。
  3. 某个业务项目实例已经重新启动了,但是网关并不会马上向这个实例转发请求;假如项目只有两个实例,如果在第一个节点刚启动完就立刻重启另外一个实例,就会导致服务不可用。

要解决以上问题,我们需要先了解 gatewayeurekaribbonTomcat 的原理,明白为什么会出现以上问题。

主流程

先从 gateway 入口处开始了解,以下是 springcloud-gateway 官网的一张图:

有个关键类 RoutePredicateHandlerMapping,继承了 AbstractHandlerMapping,是 webflux 的 handlermapping,作用相当于 webmvc 的 handlermapping:将请求映射到对应的 handler 来处理。RoutePredicateHandlerMapping 会遍历所有路由 Route,获取符合规则的路由,并将获取到的route 放入当前请求上下文的属性中。

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {


    @Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        // don't handle requests on the management port if set
        if (managmentPort != null && exchange.getRequest().getURI().getPort() == managmentPort.intValue()) {return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to" + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }

    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator
                .getRoutes()
                //individually filter routes so that filterWhen error delaying is not a problem
                .concatMap(route -> Mono
                        .just(route)
                        .filterWhen(r -> {
                            // add the current route we are testing
                            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                            return r.getPredicate().apply(exchange);
                        })
                        //instead of immediately stopping main flux due to error, log and swallow it
                        .doOnError(e -> logger.error("Error applying predicate for route:"+route.getId(), e))
                        .onErrorResume(e -> Mono.empty())
                )
                // .defaultIfEmpty() put a static Route not found
                // or .switchIfEmpty()
                // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
                .next()
                //TODO: error handling
                .map(route -> {if (logger.isDebugEnabled()) {logger.debug("Route matched:" + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });

        /* TODO: trace logging
            if (logger.isTraceEnabled()) {logger.trace("RouteDefinition did not match:" + routeDefinition.getId());
            }*/
    }

}

routeLocator.getRoutes() 看到是从 routeLocator 里获取路由列表,我们看下路由规则是怎么生成的。routeLocator有个实现类是RouteDefinitionRouteLocator

public class RouteDefinitionRouteLocator implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {

    @Override
    public Flux<Route> getRoutes() {return this.routeDefinitionLocator.getRouteDefinitions()
                .map(this::convertToRoute)
                //TODO: error handling
                .map(route -> {if (logger.isDebugEnabled()) {logger.debug("RouteDefinition matched:" + route.getId());
                    }
                    return route;
                });
        /* TODO: trace logging
            if (logger.isTraceEnabled()) {logger.trace("RouteDefinition did not match:" + routeDefinition.getId());
            }*/
    }

}

RouteDefinitionRouteLocator是从 RouteDefinitionLocator 里获取路由列表。在项目配置的是 基于服务发现的路由 spring.cloud.gateway.discovery.locator.enabled: true 情况下,RouteDefinitionLocator实现类默认是从 DiscoveryClientRouteDefinitionLocator 获取路由列表。

public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {SpelExpressionParser parser = new SpelExpressionParser();
        Expression includeExpr = parser.parseExpression(properties.getIncludeExpression());
        Expression urlExpr = parser.parseExpression(properties.getUrlExpression());

        Predicate<ServiceInstance> includePredicate;
        if (properties.getIncludeExpression() == null || "true".equalsIgnoreCase(properties.getIncludeExpression())) {includePredicate = instance -> true;} else {
            includePredicate = instance -> {Boolean include = includeExpr.getValue(evalCtxt, instance, Boolean.class);
                if (include == null) {return false;}
                return include;
            };
        }

        return Flux.fromIterable(discoveryClient.getServices())
                .map(discoveryClient::getInstances)
                .filter(instances -> !instances.isEmpty())
                .map(instances -> instances.get(0))
                .filter(includePredicate)
                .map(instance -> {String serviceId = instance.getServiceId();
                    RouteDefinition routeDefinition = new RouteDefinition();
                    routeDefinition.setId(this.routeIdPrefix + serviceId);
                    String uri = urlExpr.getValue(evalCtxt, instance, String.class);
                    routeDefinition.setUri(URI.create(uri));
                    final ServiceInstance instanceForEval = new DelegatingServiceInstance(instance, properties);
                    for (PredicateDefinition original : this.properties.getPredicates()) {PredicateDefinition predicate = new PredicateDefinition();
                        predicate.setName(original.getName());
                        for (Map.Entry<String, String> entry : original.getArgs().entrySet()) {String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry);
                            predicate.addArg(entry.getKey(), value);
                        }
                        routeDefinition.getPredicates().add(predicate);
                    }

                    for (FilterDefinition original : this.properties.getFilters()) {FilterDefinition filter = new FilterDefinition();
                        filter.setName(original.getName());
                        for (Map.Entry<String, String> entry : original.getArgs().entrySet()) {String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry);
                            filter.addArg(entry.getKey(), value);
                        }
                        routeDefinition.getFilters().add(filter);
                    }
                    return routeDefinition;
                });
    }
}

DiscoveryClientRouteDefinitionLocatorEureka 中获取服务列表(discoveryClient.getServices),为了性能考虑,springcloud-gateway又在 RouteDefinitionRouteLocator 上套上了缓存CachingRouteLocator

public class CachingRouteLocator implements RouteLocator, ApplicationListener<RefreshRoutesEvent> {

   private final RouteLocator delegate;
   private final Flux<Route> routes;
   private final Map<String, List> cache = new HashMap<>();

   public CachingRouteLocator(RouteLocator delegate) {
      this.delegate = delegate;
      routes = CacheFlux.lookup(cache, "routes", Route.class)
            .onCacheMissResume(() -> this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE));
   }

   @Override
   public Flux<Route> getRoutes() {return this.routes;}

   /**
    * Clears the routes cache
    * @return routes flux
    */
   public Flux<Route> refresh() {this.cache.clear();
      return this.routes;
   }

   @Override
   public void onApplicationEvent(RefreshRoutesEvent event) {refresh();
   }

   @Deprecated
   /* for testing */ void handleRefresh() {refresh();
   }
}

CachingRouteLocator监听到 RefreshRoutesEvent 事件时刷新缓存(GatewayWebfluxEndpoint有一个 HTTP API 调用了 ApplicationEventPublisher ,发布RefreshRoutesEvent 事件),而 RouteRefreshListener 监听到服务注册 InstanceRegisteredEvent 事件时,会发送 RefreshRoutesEvent 事件,也就是当有新服务注册时,会刷新缓存。

public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {private HeartbeatMonitor monitor = new HeartbeatMonitor();
   private final ApplicationEventPublisher publisher;

   public RouteRefreshListener(ApplicationEventPublisher publisher) {Assert.notNull(publisher, "publisher may not be null");
      this.publisher = publisher;
   }

   @Override
   public void onApplicationEvent(ApplicationEvent event) {
      if (event instanceof ContextRefreshedEvent
            || event instanceof RefreshScopeRefreshedEvent
            || event instanceof InstanceRegisteredEvent) {reset();
      }
      else if (event instanceof ParentHeartbeatEvent) {ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;
         resetIfNeeded(e.getValue());
      }
      else if (event instanceof HeartbeatEvent) {HeartbeatEvent e = (HeartbeatEvent) event;
         resetIfNeeded(e.getValue());
      }
   }

   private void resetIfNeeded(Object value) {if (this.monitor.update(value)) {reset();
      }
   }

   private void reset() {this.publisher.publishEvent(new RefreshRoutesEvent(this));
   }

}

接下来进入到 FilteringWebHandlerFilteringWebHandler 获取 route 的过滤器列表并转为过滤链,开始执行过滤器链。

public class FilteringWebHandler implements WebHandler {

    @Override

    public Mono<Void> handle(ServerWebExchange exchange) {Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        List<GatewayFilter> gatewayFilters = route.getFilters();

        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        combined.addAll(gatewayFilters);
        //TODO: needed or cached?
        AnnotationAwareOrderComparator.sort(combined);
        if (logger.isDebugEnabled()) {logger.debug("Sorted gatewayFilterFactories:"+ combined);
        }
        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }
}

接下来进入到 RouteToRequestUrlFilter,构造完整的 负载均衡地址 ,例如route 配置的中转服务是lb://MY-SERVICE,请求的路径是/hello/world,则构建后的地址是lb://MY-SERVICE/hello/world,将构建后的地址放入当前请求上下文中,继续下一个filter

public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
        if (route == null) {return chain.filter(exchange);
        }
        log.trace("RouteToRequestUrlFilter start");
        URI uri = exchange.getRequest().getURI();
        boolean encoded = containsEncodedParts(uri);
        URI routeUri = route.getUri();

        if (hasAnotherScheme(routeUri)) {
            // this is a special url, save scheme to special attribute
            // replace routeUri with schemeSpecificPart
            exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
            routeUri = URI.create(routeUri.getSchemeSpecificPart());
        }

        if("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
            //Load balanced URIs should always have a host. If the host is null it is most
            //likely because the host name was invalid (for example included an underscore)
            throw new IllegalStateException("Invalid host:" + routeUri.toString());
        }

        URI mergedUrl = UriComponentsBuilder.fromUri(uri)
                // .uri(routeUri)
                .scheme(routeUri.getScheme())
                .host(routeUri.getHost())
                .port(routeUri.getPort())
                .build(encoded)
                .toUri();

        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
        return chain.filter(exchange);
    }

}

接下来就是 LoadBalancerClientFilter 了,进入 LoadBalancerClientFilter 可以看到,首先获取 scheme,如果不是lb,则直接往下一个 filter 传递;如果是lb,则选择服务节点构建成最终的中转地址。

public class LoadBalancerClientFilter implements GlobalFilter, Ordered {public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
        if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
            log.trace("LoadBalancerClientFilter url before:" + url);
            ServiceInstance instance = this.choose(exchange);
            if (instance == null) {String msg = "Unable to find instance for" + url.getHost();
                if (this.properties.isUse404()) {throw new LoadBalancerClientFilter.FourOFourNotFoundException(msg);
                } else {throw new NotFoundException(msg);
                }
            } else {URI uri = exchange.getRequest().getURI();
                String overrideScheme = instance.isSecure() ? "https" : "http";
                if (schemePrefix != null) {overrideScheme = url.getScheme();
                }
    
                URI requestUrl = this.loadBalancer.reconstructURI(new LoadBalancerClientFilter.DelegatingServiceInstance(instance, overrideScheme), uri);
                log.trace("LoadBalancerClientFilter url chosen:" + requestUrl);
                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                return chain.filter(exchange);
            }
        } else {return chain.filter(exchange);
        }
    }
    
    protected ServiceInstance choose(ServerWebExchange exchange) {return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost());
    }
}

最后对实际地址的转发在 NettyRoutingFilter 中。

public class NettyRoutingFilter implements GlobalFilter, Ordered {

    private final HttpClient httpClient;
    private final ObjectProvider<List<HttpHeadersFilter>> headersFilters;
    private final HttpClientProperties properties;

    public NettyRoutingFilter(HttpClient httpClient,
                              ObjectProvider<List<HttpHeadersFilter>> headersFilters,
                              HttpClientProperties properties) {
        this.httpClient = httpClient;
        this.headersFilters = headersFilters;
        this.properties = properties;
    }

    @Override
    public int getOrder() {return Ordered.LOWEST_PRECEDENCE;}

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

        String scheme = requestUrl.getScheme();
        if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {return chain.filter(exchange);
        }
        setAlreadyRouted(exchange);

        ServerHttpRequest request = exchange.getRequest();

        final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
        final String url = requestUrl.toString();

        HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(),
                exchange);

        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        filtered.forEach(httpHeaders::set);

        String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
        boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);

        boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
        // 这里
        Flux<HttpClientResponse> responseFlux = this.httpClient
                .chunkedTransfer(chunkedTransfer)
                .request(method)
                .uri(url)
                .send((req, nettyOutbound) -> {req.headers(httpHeaders);

                    if (preserveHost) {String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                        req.header(HttpHeaders.HOST, host);
                    }
                    return nettyOutbound
                            .options(NettyPipeline.SendOptions::flushOnEach)
                            .send(request.getBody().map(dataBuffer ->
                                    ((NettyDataBuffer) dataBuffer).getNativeBuffer()));
                }).responseConnection((res, connection) -> {ServerHttpResponse response = exchange.getResponse();
                    // put headers and status so filters can modify the response
                    HttpHeaders headers = new HttpHeaders();

                    res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

                    String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
                    if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
                    }

                    HttpStatus status = HttpStatus.resolve(res.status().code());
                    if (status != null) {response.setStatusCode(status);
                    } else if (response instanceof AbstractServerHttpResponse) {
                        // https://jira.spring.io/browse/SPR-16748
                        ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code());
                    } else {throw new IllegalStateException("Unable to set status code on response:" + res.status().code() + "," + response.getClass());
                    }

                    // make sure headers filters run after setting status so it is available in response
                    HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE);

                    if(!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) &&
                            filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                        //It is not valid to have both the transfer-encoding header and the content-length header
                        //remove the transfer-encoding header in the response if the content-length header is presen
                        response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
                    }

                    exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());

                    response.getHeaders().putAll(filteredResponseHeaders);

                    // Defer committing the response until all route filters have run
                    // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
                    exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
                    exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

                    return Mono.just(res);
                });

        if (properties.getResponseTimeout() != null) {responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
                    Mono.error(new TimeoutException("Response took longer than timeout:" +
                            properties.getResponseTimeout()))).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, null, th));
        }

        return responseFlux.then(chain.filter(exchange));
    }

}

负载均衡

整个转发流程的重点就是 选择服务节点,在选择服务节点之前,需要先获取负载均衡策略器。

// RibbonLoadBalancerClient.java
protected ILoadBalancer getLoadBalancer(String serviceId) {return this.clientFactory.getLoadBalancer(serviceId);
}
// SpringClientFactory.java
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {public ILoadBalancer getLoadBalancer(String name) {return (ILoadBalancer)this.getInstance(name, ILoadBalancer.class);
    }
    
    public <C> C getInstance(String name, Class<C> type) {C instance = super.getInstance(name, type);
        if (instance != null) {return instance;} else {IClientConfig config = (IClientConfig)this.getInstance(name, IClientConfig.class);
            return instantiateWithConfig(this.getContext(name), type, config);
        }
    }
    
    static <C> C instantiateWithConfig(AnnotationConfigApplicationContext context, Class<C> clazz, IClientConfig config) {
        Object result = null;

        try {Constructor<C> constructor = clazz.getConstructor(IClientConfig.class);
            result = constructor.newInstance(config);
        } catch (Throwable var5) { }

        if (result == null) {result = BeanUtils.instantiate(clazz);
            if (result instanceof IClientConfigAware) {((IClientConfigAware)result).initWithNiwsConfig(config);
            }

            if (context != null) {context.getAutowireCapableBeanFactory().autowireBean(result);
            }
        }

        return result;
    }
}
// NamedContextFactory.java
public <T> T getInstance(String name, Class<T> type) {AnnotationConfigApplicationContext context = this.getContext(name);
    return BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0 ? context.getBean(type) : null;
}

通过源码可以看到,先根据 serviceIdspring-context里获取负载均衡策略器,如果没有获取到,则自己初始化一个,默认负载均衡策略器是 ZoneAwareLoadBalancer
获取到负载均衡策略器之后,就要获取服务列表,并选择其中的一个节点;选择服务的核心在BaseLoadBalancer

public Server chooseServer(Object key) {if (counter == null) {counter = createCounter();
    }
    counter.increment();
    if (rule == null) {return null;} else {
        try {return rule.choose(key);
        } catch (Exception e) {logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

每个 ILoadBalancer 内都有个 IRule 对象,ILoadBalancer.chooseServer最终是调 IRule.chooseServer,默认是ZoneAvoidanceRule
Rule选择服务之前,要先获取所有的服务。

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
   
    /**
     * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
     * 
     */
    public abstract AbstractServerPredicate getPredicate();
        
    /**
     * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
     * The performance for this method is O(n) where n is number of servers to be filtered.
     */
    @Override
    public Server choose(Object key) {ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {return server.get();
        } else {return null;}       
    }
}

public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware {protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    
    @Override
    public List<Server> getAllServers() {return Collections.unmodifiableList(allServerList);
    }
}

public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextIndex.get();
            int next = (current + 1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
}

可以看到在 LoadBalancer 内有保存了服务列表,然后 IRule 根据自己的规则选择其中的一个服务节点。至于 LoadBalancer 内有保存的服务列表是怎么获取的,在 BaseLoadBalancer 里可以看到设置 allServerList 的方法。

public void setServersList(List lsrv) {Lock writeLock = allServerLock.writeLock();
    logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
    
    ArrayList<Server> newServers = new ArrayList<Server>();
    writeLock.lock();
    try {ArrayList<Server> allServers = new ArrayList<Server>();
        for (Object server : lsrv) {if (server == null) {continue;}

            if (server instanceof String) {server = new Server((String) server);
            }

            if (server instanceof Server) {logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
                allServers.add((Server) server);
            } else {
                throw new IllegalArgumentException(
                        "Type String or Server expected, instead found:"
                                + server.getClass());
            }

        }
        boolean listChanged = false;
        if (!allServerList.equals(allServers)) {
            listChanged = true;
            if (changeListeners != null && changeListeners.size() > 0) {List<Server> oldList = ImmutableList.copyOf(allServerList);
               List<Server> newList = ImmutableList.copyOf(allServers);                   
               for (ServerListChangeListener l: changeListeners) {
                   try {l.serverListChanged(oldList, newList);
                   } catch (Exception e) {logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
                   }
               }
            }
        }
        if (isEnablePrimingConnections()) {for (Server server : allServers) {if (!allServerList.contains(server)) {server.setReadyToServe(false);
                    newServers.add((Server) server);
                }
            }
            if (primeConnections != null) {primeConnections.primeConnectionsAsync(newServers, this);
            }
        }
        // This will reset readyToServe flag to true on all servers
        // regardless whether
        // previous priming connections are success or not
        allServerList = allServers;
        if (canSkipPing()) {for (Server s : allServerList) {s.setAlive(true);
            }
            upServerList = allServerList;
        } else if (listChanged) {forceQuickPing();
        }
    } finally {writeLock.unlock();
    }
}

ZoneAwareLoadBalancer获取服务列表的代码入口是在 DynamicServerListLoadBalancerBaseLoadBalancer 的子类、同时也是默认负载均衡器 ZoneAwareLoadBalancer 父类)里。在
ZoneAwareLoadBalancer 构造方法里启动了 ServerListUpdater.UpdateAction 定时任务。

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {updateListOfServers();
        }
    };
    
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }
    
    void restOfInit(IClientConfig clientConfig) {boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
    
    public void enableAndInitLearnNewServersFeature() {LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

    @VisibleForTesting
    public void updateListOfServers() {List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }
}

其中 serverListUpdater 默认是PollingServerListUpdater,进入代码可以看到是启动定时任务来定时调用updateAction

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
@Override
public synchronized void start(final UpdateAction updateAction) {if (isActive.compareAndSet(false, true)) {final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {if (!isActive.get()) {if (scheduledFuture != null) {scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();} catch (Exception e) {logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {logger.info("Already active, no-op");
    }
}

定时任务默认是每 30 秒刷新调用一次,也可以通过设置参数 ribbon.ServerListRefreshInterval 调整刷新频率。
updateListOfServers 方法里的 serverListImpl.getUpdatedListOfServers() 是获取服务列表,这里的 ServerList serverListImpl 默认是DomainExtractingServerList

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }
    
    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }
}

DomainExtractingServerList里面包着一层 ServerList: DiscoveryEnabledNIWSServerList,最终获取服务列表的方式在DiscoveryEnabledNIWSServerList 里。

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();}

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {if (ii.getStatus().equals(InstanceStatus.UP)) {if(shouldUseOverridePort){if(logger.isDebugEnabled()){logger.debug("Overriding port on client name:" + clientName + "to" + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();}else{ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();}
                        }

                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers}
            }
        }
        return serverList;
    }
}

最终是获取 Eureka 里的节点信息,也就是 Ribbon 通过 Eureka 获取服务列表,获取的服务列表会缓存在本地,每隔一段时间刷新(默认 30 秒)。需要注意的是每个服务都有一个独立的 LoadBalancer,所以allServerList 保存的是是单个服务的所有节点列表。
springcloud-gatewayGatewayControllerEndpoint 提供了对 routefilter的操作 API。

流程图

建议流程如下:

退出移动版