本系列代码地址:https://github.com/JoJoTec/sp…
在开始编写咱们本人的日志 Filter 之前,还有一个问题我想在这里和大家分享,即在 Spring Cloud Gateway 中可能产生链路信息失落的问题。
次要抵触 – Project Reactor 与 Java Logger MDC 之间的设计抵触
Poject Reactor 是基于异步响应式设计的编程模式的实现,它的次要实现思路是先编写执行链路,最初 sub 执行整个链路。然而链路的每一部分, 到底是哪个线程执行的,是不确定的 。
Java 的日志框架设计,其上下文 MDC(Mapped Diagnostic Context)信息,是基于线程设计的,其实能够简略了解为一个 ThreadLocal 的 Map。日志的链路信息,是保留在这个 MDC 中的。
这样其实能够看出 Project Reactor 与日志框架的 MDC 默认是不兼容的,只有产生异步线程切换,这个 MDC 就变了。Spring Cloud Sleuth 为此加了很多粘合代码,然而智者千虑必有一失,Project Reactor 利用场景和库也在一直倒退和壮大,Spring Cloud Sleuth 也可能会漏掉一些场景导致链路信息失落。
一种 Spring Cloud Gateway 常见的链路信息失落的场景
咱们编写一个简略的测试项目(我的项目地址):
引入依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.6</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!--log4j2 异步日志须要的依赖,所有我的项目都必须用 log4j2 和异步日志配置 -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2020.0.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
对所有门路开启 AdaptCachedBodyGlobalFilter
:
@Configuration(proxyBeanMethods = false)
public class ApiGatewayConfiguration {
@Autowired
private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter;
@Autowired
private GatewayProperties gatewayProperties;
@PostConstruct
public void init() {gatewayProperties.getRoutes().forEach(routeDefinition -> {
// 对 spring cloud gateway 路由配置中的每个路由都启用 AdaptCachedBodyGlobalFilter
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId());
adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent);
});
}
}
配置 (咱们只有一个路由,将申请转发到 httpbin.org 这个 http 申请测试网站):
server:
port: 8181
spring:
application:
name: apiGateway
cloud:
gateway:
httpclient:
connect-timeout: 500
response-timeout: 60000
routes:
- id: first_route
uri: http://httpbin.org
predicates:
- Path=/httpbin/**
filters:
- StripPrefix=1
增加两个全局 Filter,一个在 AdaptCachedBodyGlobalFilter
之前,一个在 AdaptCachedBodyGlobalFilter
之后。这两个 Filter 非常简单,只是打一行日志。
@Log4j2
@Component
public class PreLogFilter implements GlobalFilter, Ordered {public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {log.info("before AdaptCachedBodyGlobalFilter");
return chain.filter(exchange);
}
@Override
public int getOrder() {return ORDER;}
}
@Log4j2
@Component
public class PostLogFilter implements GlobalFilter, Ordered {public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {log.info("after AdaptCachedBodyGlobalFilter");
return chain.filter(exchange);
}
@Override
public int getOrder() {return ORDER;}
}
最初指定 Log4j2 的输入格局中蕴含链路信息,就像系列文章结尾中指定的那样。
启动这个利用,之后拜访 http://127.0.0.1:8181/httpbin/anything
,查看日志,发现 PostLogFilter 中的日志,没有链路信息了:
2021-09-08 06:32:35.457 INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter
2021-09-08 06:32:35.474 INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter
Spring Cloud Sleuth 是如何减少链路信息
通过系列之前的源码剖析,咱们晓得,在最开始的 TraceWebFilter,咱们将 Mono 封装成了一个 MonoWebFilterTrace,它的外围源码是:
@Override
public void subscribe(CoreSubscriber<? super Void> subscriber) {Context context = contextWithoutInitialSpan(subscriber.currentContext());
Span span = findOrCreateSpan(context);
// 将 Span 放入执行上下文中,对于日志其实就是将链路信息放入 org.slf4j.MDC
// 日志的 MDC 个别都是 ThreadLocal 的 Map,对于 Log4j2 的实现类就是 org.apache.logging.log4j.ThreadContext,其外围 contextMap 就是一个基于 ThreadLocal 实现的 Map
// 简略了解就是将链路信息放入一个 ThreadLocal 的 Map 中,每个线程拜访本人的 Map 获取链路信息
try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) {
// 将理论的 subscribe 用 Span 所在的 Context 包裹住,完结时敞开 Span
this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this));
}
// 在 scope.close() 之后,会将链路信息从 ThreadLocal 的 Map 中剔除}
@Override
public Object scanUnsafe(Attr key) {if (key == Attr.RUN_STYLE) {
// 执行的形式必须是不能切换线程,也就是同步的
// 因为,日志的链路信息是放在 ThreadLocal 对象中,切换线程,链路信息就没了
return Attr.RunStyle.SYNC;
}
return super.scanUnsafe(key);
}
WebFilterTraceSubscriber 干了些什么呢?出现异常,以及 http 申请完结的时候,咱们可能想将响应信息,异样信息记录进入 Span 中,就是通过这个类封装实现的。
通过 MonoWebFilterTrace 的封装,因为 Spring-WebFlux 解决申请,其实就是封装成咱们下面得出的 Mono 之后进行 subscribe 解决的申请,所以这样,整个外部 Mono 的 publish 链路以及 subscribe 链路,就被 WebFilterTraceSubscriber 中的 scope 包裹起来了。只有咱们本人不在 GatewayFilter 中转换成某些强制异步的 Mono 或者 Flux 导致切换线程,链路信息是不会失落的。
为何下面的测试项目中链路信息会失落
咱们来看通过 AdaptCachedBodyGlobalFilter 之后,咱们后面拼的 Mono 链路会变成什么样:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) // 依据申请寻找路由
.flatMap((Function<Route, Mono<?>>) r -> {exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); // 将路由放入 Attributes 中,前面咱们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); // 返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty(// 如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> {// 返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) // 如果没有返回不为 Mono.empty() 的 handlerMapping,则间接返回 404
.then(Mono.defer(() -> {
// 省略在 AdaptCachedBodyGlobalFilter 后面的链路嵌套
// 读取 Body,因为 TCP 拆包,所以须要他们拼接到一起
DataBufferUtils.join(exchange.getRequest().getBody())
// 如果没有 Body,则间接返回空 DataBuffer
.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
//decorate 办法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了避免反复进入这个 `AdaptCachedBodyGlobalFilter` 的状况导致反复缓存申请 Body
// 之后,应用新的 body 以及原始申请封装成新的申请,持续 GatewayFilters 链路
.map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))
.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
})
.then(Mono.empty()))
), // 调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {//MetricsWebFilter 相干的解决,在后面的代码中给出了,这里省略});
);
其中 DataBufferUtils.join(exchange.getRequest().getBody())
其实是一个 FluxReceive,这里咱们能够了解为: 提交一个尝试读取申请 Body 的工作,将之后的 GatewayFilter 的链路解决加到在读取完 Body 之后的回调当中,提交这个工作后,立即返回 。这么看可能比较复杂,咱们用一个相似的例子类比下:
// 首先咱们创立一个新的 Span
Span span = tracer.newTrace();
// 申明一个相似于 TraceWebFilter 中封装的 MonoWebFilterTrace 的 MonoOperator
class MonoWebFilterTrace<T> extends MonoOperator<T, T> {protected MonoWebFilterTrace(Mono<? extends T> source) {super(source);
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
// 将 subscribe 用 span 包裹
try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) {source.subscribe(actual);
// 在将要敞开 spanInScope 的时候(即从 ThreadLocal 的 Map 中移除链路信息),打印日志
log.info("stopped");
}
}
}
Mono.defer(() -> new MonoWebFilterTrace(Mono.fromRunnable(() -> {log.info("first");
})
// 模仿 FluxReceive
.then(Mono.delay(Duration.ofSeconds(1))
.doOnSuccess(longSignal -> log.info(longSignal))))
).subscribe(aLong -> log.info(aLong));
Mono.delay
和 FluxReceive 体现相似,都是异步切换线程池执行。执行下面的代码,咱们能够从日志下面就能看进去:
2021-09-08 07:12:45.236 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first
2021-09-08 07:12:45.240 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped
2021-09-08 07:12:46.241 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)
2021-09-08 07:12:46.242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()
2021-09-08 07:12:46.242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0
在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 应用的线程池和调用 GatewayFilter 的是同一个线程池,所以可能线程还是同一个,然而因为 Span 曾经完结,从 ThreadLocal 的 Map 中曾经移除了链路信息,所以日志中还是没有链路信息。
微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer: