关于java:手撕Gateway源码今日撕工作流程负载均衡源码

32次阅读

共计 9218 个字符,预计需要花费 24 分钟才能阅读完成。

Spring Cloud Gateway 源码分析

通过后面的学习,咱们晓得 SpringCloud Gateway 是一个微服务网关,次要实现不同性能服务路由,对于 SpringCloud Gateway 的实战应用咱们就告一段落,咱们接下来深刻学习 SpringCloud Gateway 源码。

2.1 Gateway 工作流程源码分析

2.1.1 Gateway 工作流程剖析

后面咱们曾经学习过 Gateway 的工作流程,如上工作流程图,咱们回顾一下工作流程:

1: 所有都将由 ReactorHttpHandlerAdapter.apply()办法拦挡解决,此时会封装申请对象和响应对象,并传递到 HttpWebHandlerAdapter.handle()办法。2:HttpWebHandlerAdapter.handle(),将 request 和 response 封装成上下文对象 ServerWebExchange,办法通过 getDelegate()获取全局异样处理器 ExceptionHandlingWebHandler 执行全局异样解决

3:ExceptionHandlingWebHandler 执行实现后,调用 DispatcherHandler.handle(), 循环所有 handlerMappings 查找解决以后申请的 Handler

4: 找到 Handler 后调用 DispatcherHandler.invokeHandler()执行找到的 Handler,此时会调用 FilteringWebHandler.handle()

5:DefaultGatewayFilterChain.filter()是要害流程,所有过滤器都会在这里执行,比方服务查找、负载平衡、近程调用等,都在这一块。

下面工作流程咱们都是基于说的层面,接下来咱们一层一层剖析 Gateway 源码,深刻学习 Gateway。

2.1.2 Gateway 工作流程源码

咱们首先来看一下 Gateway 拦挡解决所有申请的办法 handle():

/****
* 解决所有申请
****/
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {if (this.forwardedHeaderTransformer != null) {request = this.forwardedHeaderTransformer.apply(request);
    }
    // 创立网关上下文对象
    ServerWebExchange exchange = createExchange(request, response);

    LogFormatUtils.traceDebug(logger, traceOn ->
            exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
                    (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

    //getDelegate()获取以后的 Handler
    return getDelegate().handle(exchange)
            .doOnSuccess(aVoid -> logResponse(exchange))
            .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
            .then(Mono.defer(response::setComplete));
}

下面 getDelegate()办法源码如下:

/**
* Return the wrapped delegate.
* 返回 WebHandler:解决 web 申请的对象
*/
public WebHandler getDelegate() {return this.delegate;}

咱们进行 Debug 测试如下:

以后返回的 WebHandler 是 ExceptionHandlingWebHandler,而ExceptionHandlingWebHandler 的 delegate 是 FilteringWebHandler,而FilteringWebHandler 的 delegate 是 delegateDispatcherHandler,所有的 delegate 的 handle() 办法都会顺次执行,咱们能够把断点放到 DispatcherHandler.handler() 办法上:

handler()办法会调用所有 handlerMappings 的 getHandler(exchange) 办法,而 getHandler(exchange) 办法会调用 getHandlerInternal(exchange) 办法:

getHandlerInternal(exchange)该办法由各个 HandlerMapping 自行实现,咱们能够察看下断言解决的 RoutePredicateHandlerMappinggetHandlerInternal(exchange)办法会调用 lookupRoute 办法,该办法用于返回对应的路由信息:

这里的路由匹配其实就是咱们我的项目中对应路由配置的一个一个服务的信息,这些服务信息能够帮咱们找到咱们要调用的实在服务:

每个 Route 对象如下:

Route 的 DEBUG 数据如下:

找到对应 Route 后会返回指定的 FilterWebHandler,如下代码:

FilterWebHandler 次要蕴含了所有的过滤器,过滤器依照肯定程序排序,次要是 order 值,越小越靠前排,过滤器中次要将申请交给指定实在服务解决了,debug 测试如下:

这里有 RouteToRequestUrlFilterForwardRoutingFilter以及 LoadBalancerClientFilter 等多个过滤器。

2.1.3 申请解决

在下面 FilterWebHandler 中有 2 个过滤器,别离为 RouteToRequestUrlFilterForwardRoutingFilter

RouteToRequestUrlFilter:用于依据 匹配 的 Route, 计算申请地址失去 lb://hailtaxi-order/order/list

ForwardRoutingFilter: 转发路由网关过滤器。其依据 forward:// 前缀 (Scheme) 过滤解决,将申请转发到以后网关实例本地接口。

2.1.3.1 RouteToRequestUrlFilter 实在服务查找

RouteToRequestUrlFilter 源码如下:

/***
 * 解决 uri 过滤器
 * @param exchange
 * @param chain
 * @return
 */
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    // 获取以后的 route
    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
    if (route == null) {return chain.filter(exchange);
    }
    log.trace("RouteToRequestUrlFilter start");
    // 失去 uri  = http://localhost:8001/order/list?token=123
    URI uri = exchange.getRequest().getURI();
    boolean encoded = containsEncodedParts(uri);
    URI routeUri = route.getUri();

    if (hasAnotherScheme(routeUri)) {
        // this is a special url, save scheme to special attribute
        // replace routeUri with schemeSpecificPart
        exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
                routeUri.getScheme());
        routeUri = URI.create(routeUri.getSchemeSpecificPart());
    }

    if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
        // Load balanced URIs should always have a host. If the host is null it is
        // most
        // likely because the host name was invalid (for example included an
        // underscore)
        throw new IllegalStateException("Invalid host:" + routeUri.toString());
    }

    // 将 uri 换成 lb://hailtaxi-order/order/list?token=123 
    URI mergedUrl = UriComponentsBuilder.fromUri(uri)
            // .uri(routeUri)
            .scheme(routeUri.getScheme()).host(routeUri.getHost())
            .port(routeUri.getPort()).build(encoded).toUri();
    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
    return chain.filter(exchange);
}

debug 调试后果如下:

从下面调试后果咱们能够看到所抉择的 Route 以及 uri 和 routeUri 和 mergedUrl,该过滤器其实就是将用户申请的地址换成服务地址,换成服务地址能够用来做负载平衡。

2.1.3.2 NettyRoutingFilter 近程调用

SpringCloud 在实现对后端服务近程调用是基于 Netty 发送 Http 申请实现,外围代码在 NettyRoutingFilter.filter() 中,其中外围代码为 send()办法,代码如下:

Flux<HttpClientResponse> responseFlux = httpClientWithTimeoutFrom(route)
        // 头信息处理
        .headers(headers -> {headers.add(httpHeaders);
            // Will either be set below, or later by Netty
            headers.remove(HttpHeaders.HOST);
            if (preserveHost) {String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                headers.add(HttpHeaders.HOST, host);
            }
            // 执行发送,基于 HTTP 协定
        }).request(method).uri(url).send((req, nettyOutbound) -> {if (log.isTraceEnabled()) {
                nettyOutbound
                        .withConnection(connection -> log.trace("outbound route:"
                                + connection.channel().id().asShortText()
                                + ", inbound:" + exchange.getLogPrefix()));
            }
            return nettyOutbound.send(request.getBody()
                    .map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
                            .getNativeBuffer()));
        }).
        // 响应后果
        responseConnection((res, connection) -> {

            // Defer committing the response until all route filters have run
            // Put client response as ServerWebExchange attribute and write
            // response later NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
            // 获取响应后果
            ServerHttpResponse response = exchange.getResponse();
            // put headers and status so filters can modify the response
            HttpHeaders headers = new HttpHeaders();

            res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
                        contentTypeValue);
            }

            setResponseStatus(res, response);

            // make sure headers filters run after setting status so it is
            // available in response
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE);

            if (!filteredResponseHeaders
                    .containsKey(HttpHeaders.TRANSFER_ENCODING)
                    && filteredResponseHeaders
                            .containsKey(HttpHeaders.CONTENT_LENGTH)) {
                // It is not valid to have both the transfer-encoding header and
                // the content-length header.
                // Remove the transfer-encoding header in the response if the
                // content-length header is present.
                response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }

            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
                    filteredResponseHeaders.keySet());

            response.getHeaders().putAll(filteredResponseHeaders);

            return Mono.just(res);
        });

Duration responseTimeout = getResponseTimeout(route);

下面 send 办法最终会调用 ChannelOperations>send() 办法,而该办法其实是基于了 Netty 实现数据发送,外围代码如下:

2.1.3.3 Netty 个性

Netty 是一款基于 NIO(Nonblocking I/O,非阻塞 IO)开发的网络通信框架,他的并发性能失去了很大进步,比照于 BIO(Blocking I/O,阻塞 IO),暗藏其背地的复杂性而提供一个易于应用的 API 的客户端 / 服务器框架。Netty 是一个宽泛应用的 Java 网络编程框架。

传输极快

Netty 的传输快其实也是依赖了 NIO 的一个个性——零拷贝。咱们晓得,Java 的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是 Java 对象寄存的中央,个别咱们的数据如果须要从 IO 读取到堆内存,两头须要通过 Socket 缓冲区,也就是说一个数据会被拷贝两次能力达到他的的起点,如果数据量大,就会造成不必要的资源节约。
Netty 针对这种状况,应用了 NIO 中的另一大个性——零拷贝,当他须要接收数据的时候,他会在堆内存之外开拓一块内存,数据就间接从 IO 读到了那块内存中去,在 netty 外面通过 ByteBuf 能够间接对这些数据进行间接操作,从而放慢了传输速度。

良好的封装

Netty 无论是性能还是封装性都远远超过传统 Socket 编程。

Channel: 示意一个连贯,能够了解为每一个申请,就是一个 Channel。

ChannelHandler: 外围解决业务就在这里,用于解决业务申请。

ChannelHandlerContext: 用于传输业务数据。

ChannelPipeline: 用于保留处理过程须要用到的 ChannelHandler 和 ChannelHandlerContext。

ByteBuf 是一个存储字节的容器,最大特点就是使用方便,它既有本人的读索引和写索引,不便你对整段字节缓存进行读写,也反对 get/set,不便你对其中每一个字节进行读写,他的数据结构如下图所示:

2.2 Gateway 负载平衡源码分析

后面源码分析次要分析了 Gateway 的工作流程,咱们接下来分析 Gateway 的负载平衡流程。在最初的过滤器汇合中有 LoadBalancerClientFilter 过滤器,该过滤器是用于实现负载平衡。

2.2.1 地址转换

LoadBalancerClientFilter过滤器首先会将用户申请地址转换成实在服务地址,也就是 IP: 端口号,源码如下:

/***
 * 负载平衡过滤
 * @param exchange
 * @param chain
 * @return
 */
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    // 负载平衡的 URL = lb://hailtaxi-order/order/list?token=123
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
    if (url == null
            || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {return chain.filter(exchange);
    }
    // preserve the original url
    addOriginalRequestUrl(exchange, url);

    if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url before:" + url);
    }
    
    // 服务抉择
    final ServiceInstance instance = choose(exchange);

    if (instance == null) {throw NotFoundException.create(properties.isUse404(),
                "Unable to find instance for" + url.getHost());
    }
    // 用户提交的 URI = http://localhost:8001/order/list?token=123
    URI uri = exchange.getRequest().getURI();

    // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
    // if the loadbalancer doesn't provide one.
    String overrideScheme = instance.isSecure() ? "https" : "http";
    if (schemePrefix != null) {overrideScheme = url.getScheme();
    }
    // 实在服务的 URL =http://192.168.211.1:18182/order/list?token=123
    URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);

    if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url chosen:" + requestUrl);
    }

    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
    return chain.filter(exchange);
}

2.2.2 负载平衡服务抉择

下面代码的要害是 choose(exchange) 的调用,该办法调用其实就是抉择指定服务,这里波及到负载平衡服务轮询调用算法等,咱们能够跟踪进去查看办法执行流程。

Gateway 本身曾经集成 Ribbon,所以看到的对象是 RibbonLoadBalancerClient,咱们跟踪进去接着查看:

下面办法会顺次调用到 getInstance()办法,该办法会返回所有可用实例,有可能有多个实例,如果有多个实例就波及到负载平衡算法,办法调用如下图:

此时调用 getServer()办法,再调用BaseLoadBalancer.chooseServer(),这里是依据指定算法获取对应实例,代码如下:

BaseLoadBalancer是属于 Ribbon 的算法,咱们能够通过如下依赖包理解,并且该算法默认用的是RoundRobinRule,也就是随机算法,如下代码:

本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源

正文完
 0