Spring Cloud Gateway源码分析
通过后面的学习,咱们晓得SpringCloud Gateway是一个微服务网关,次要实现不同性能服务路由,对于SpringCloud Gateway的实战应用咱们就告一段落,咱们接下来深刻学习SpringCloud Gateway源码。
2.1 Gateway工作流程源码分析
2.1.1 Gateway工作流程剖析
后面咱们曾经学习过Gateway的工作流程,如上工作流程图,咱们回顾一下工作流程:
1:所有都将由ReactorHttpHandlerAdapter.apply()办法拦挡解决,此时会封装申请对象和响应对象,并传递到HttpWebHandlerAdapter.handle()办法。
2:HttpWebHandlerAdapter.handle(),将request和response封装成上下文对象ServerWebExchange,办法通过getDelegate()获取全局异样处理器ExceptionHandlingWebHandler执行全局异样解决
3:ExceptionHandlingWebHandler执行实现后,调用DispatcherHandler.handle(),循环所有handlerMappings查找解决以后申请的Handler
4:找到Handler后调用DispatcherHandler.invokeHandler()执行找到的Handler,此时会调用FilteringWebHandler.handle()
5:DefaultGatewayFilterChain.filter()是要害流程,所有过滤器都会在这里执行,比方服务查找、负载平衡、近程调用等,都在这一块。
下面工作流程咱们都是基于说的层面,接下来咱们一层一层剖析Gateway源码,深刻学习Gateway。
2.1.2 Gateway工作流程源码
咱们首先来看一下Gateway拦挡解决所有申请的办法handle():
/****
*解决所有申请
****/
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
//创立网关上下文对象
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
//getDelegate()获取以后的Handler
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
下面getDelegate()办法源码如下:
/**
* Return the wrapped delegate.
* 返回WebHandler:解决web申请的对象
*/
public WebHandler getDelegate() {
return this.delegate;
}
咱们进行Debug测试如下:
以后返回的WebHandler是ExceptionHandlingWebHandler
,而ExceptionHandlingWebHandler
的delegate是FilteringWebHandler
,而FilteringWebHandler
的delegate是delegate
是DispatcherHandler
,所有的delegate的handle()
办法都会顺次执行,咱们能够把断点放到DispatcherHandler.handler()
办法上:
handler()办法会调用所有handlerMappings的getHandler(exchange)
办法,而getHandler(exchange)
办法会调用getHandlerInternal(exchange)
办法:
getHandlerInternal(exchange)
该办法由各个HandlerMapping
自行实现,咱们能够察看下断言解决的RoutePredicateHandlerMapping
的getHandlerInternal(exchange)
办法会调用lookupRoute办法,该办法用于返回对应的路由信息:
这里的路由匹配其实就是咱们我的项目中对应路由配置的一个一个服务的信息,这些服务信息能够帮咱们找到咱们要调用的实在服务:
每个Route对象如下:
Route的DEBUG数据如下:
找到对应Route后会返回指定的FilterWebHandler,如下代码:
FilterWebHandler次要蕴含了所有的过滤器,过滤器依照肯定程序排序,次要是order值,越小越靠前排,过滤器中次要将申请交给指定实在服务解决了,debug测试如下:
这里有RouteToRequestUrlFilter
和ForwardRoutingFilter
以及LoadBalancerClientFilter
等多个过滤器。
2.1.3 申请解决
在下面FilterWebHandler中有2个过滤器,别离为RouteToRequestUrlFilter
和ForwardRoutingFilter
。
RouteToRequestUrlFilter
:用于依据匹配的 Route,计算申请地址失去 lb://hailtaxi-order/order/list
ForwardRoutingFilter
:转发路由网关过滤器。其依据 forward:// 前缀( Scheme )过滤解决,将申请转发到以后网关实例本地接口。
2.1.3.1 RouteToRequestUrlFilter实在服务查找
RouteToRequestUrlFilter源码如下:
/***
* 解决uri过滤器
* @param exchange
* @param chain
* @return
*/
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//获取以后的route
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
//失去uri = http://localhost:8001/order/list?token=123
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();
if (hasAnotherScheme(routeUri)) {
// this is a special url, save scheme to special attribute
// replace routeUri with schemeSpecificPart
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
// Load balanced URIs should always have a host. If the host is null it is
// most
// likely because the host name was invalid (for example included an
// underscore)
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}
//将uri换成 lb://hailtaxi-order/order/list?token=123
URI mergedUrl = UriComponentsBuilder.fromUri(uri)
// .uri(routeUri)
.scheme(routeUri.getScheme()).host(routeUri.getHost())
.port(routeUri.getPort()).build(encoded).toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}
debug调试后果如下:
从下面调试后果咱们能够看到所抉择的Route以及uri和routeUri和mergedUrl,该过滤器其实就是将用户申请的地址换成服务地址,换成服务地址能够用来做负载平衡。
2.1.3.2 NettyRoutingFilter近程调用
SpringCloud在实现对后端服务近程调用是基于Netty发送Http申请实现,外围代码在NettyRoutingFilter.filter()
中,其中外围代码为send()办法,代码如下:
Flux<HttpClientResponse> responseFlux = httpClientWithTimeoutFrom(route)
// 头信息处理
.headers(headers -> {
headers.add(httpHeaders);
// Will either be set below, or later by Netty
headers.remove(HttpHeaders.HOST);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
// 执行发送,基于HTTP协定
}).request(method).uri(url).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound
.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText()
+ ", inbound: " + exchange.getLogPrefix()));
}
return nettyOutbound.send(request.getBody()
.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
.getNativeBuffer()));
}).
// 响应后果
responseConnection((res, connection) -> {
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
// 获取响应后果
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
setResponseStatus(res, response);
// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);
if (!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
return Mono.just(res);
});
Duration responseTimeout = getResponseTimeout(route);
下面send办法最终会调用ChannelOperations>send()
办法,而该办法其实是基于了Netty实现数据发送,外围代码如下:
2.1.3.3 Netty个性
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,他的并发性能失去了很大进步,比照于BIO(Blocking I/O,阻塞IO),暗藏其背地的复杂性而提供一个易于应用的 API 的客户端/服务器框架。Netty 是一个宽泛应用的 Java 网络编程框架。
传输极快
Netty的传输快其实也是依赖了NIO的一个个性——零拷贝。咱们晓得,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象寄存的中央,个别咱们的数据如果须要从IO读取到堆内存,两头须要通过Socket缓冲区,也就是说一个数据会被拷贝两次能力达到他的的起点,如果数据量大,就会造成不必要的资源节约。
Netty针对这种状况,应用了NIO中的另一大个性——零拷贝,当他须要接收数据的时候,他会在堆内存之外开拓一块内存,数据就间接从IO读到了那块内存中去,在netty外面通过ByteBuf能够间接对这些数据进行间接操作,从而放慢了传输速度。
良好的封装
Netty无论是性能还是封装性都远远超过传统Socket编程。
Channel:示意一个连贯,能够了解为每一个申请,就是一个Channel。
ChannelHandler:外围解决业务就在这里,用于解决业务申请。
ChannelHandlerContext:用于传输业务数据。
ChannelPipeline:用于保留处理过程须要用到的ChannelHandler和ChannelHandlerContext。
ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有本人的读索引和写索引,不便你对整段字节缓存进行读写,也反对get/set,不便你对其中每一个字节进行读写,他的数据结构如下图所示:
2.2 Gateway负载平衡源码分析
后面源码分析次要分析了Gateway的工作流程,咱们接下来分析Gateway的负载平衡流程。在最初的过滤器汇合中有LoadBalancerClientFilter
过滤器,该过滤器是用于实现负载平衡。
2.2.1 地址转换
LoadBalancerClientFilter
过滤器首先会将用户申请地址转换成实在服务地址,也就是IP:端口号,源码如下:
/***
* 负载平衡过滤
* @param exchange
* @param chain
* @return
*/
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//负载平衡的URL = lb://hailtaxi-order/order/list?token=123
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
//服务抉择
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
//用户提交的URI = http://localhost:8001/order/list?token=123
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
//实在服务的URL =http://192.168.211.1:18182/order/list?token=123
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
2.2.2 负载平衡服务抉择
下面代码的要害是choose(exchange)
的调用,该办法调用其实就是抉择指定服务,这里波及到负载平衡服务轮询调用算法等,咱们能够跟踪进去查看办法执行流程。
Gateway本身曾经集成Ribbon,所以看到的对象是RibbonLoadBalancerClient,咱们跟踪进去接着查看:
下面办法会顺次调用到getInstance()办法,该办法会返回所有可用实例,有可能有多个实例,如果有多个实例就波及到负载平衡算法,办法调用如下图:
此时调用getServer()办法,再调用BaseLoadBalancer.chooseServer()
,这里是依据指定算法获取对应实例,代码如下:
BaseLoadBalancer
是属于Ribbon的算法,咱们能够通过如下依赖包理解,并且该算法默认用的是RoundRobinRule
,也就是随机算法,如下代码:
本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!
如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源
发表回复