本系列是 我TM人傻了 系列第五期[捂脸],往期精彩回顾:

  • 降级到Spring 5.3.x之后,GC次数急剧减少,我TM人傻了
  • 这个大表走索引字段查问的 SQL 怎么就成全扫描了,我TM人傻了
  • 获取异样信息里再出异样就找不到日志了,我TM人傻了
  • spring-data-redis 连贯透露,我 TM 人傻了

本篇文章波及底层设计以及原理,以及问题定位和可能的问题点,十分深刻,篇幅较长,所以拆分成上中下三篇:

  • :问题简略形容以及 Spring Cloud Gateway 根本构造和流程以及底层原理
  • :Spring Cloud Sleuth 如何在 Spring Cloud Gateway 退出的链路追踪以及为何会呈现这个问题
  • :现有 Spring Cloud Sleuth 的非侵入设计带来的性能问题,其余可能的问题点,以及如何解决

Spring Cloud Sleuth 是如何减少链路信息

通过之前的源码剖析,咱们晓得,在最开始的 TraceWebFilter,咱们将 Mono 封装成了一个 MonoWebFilterTrace,它的外围源码是:

@Overridepublic void subscribe(CoreSubscriber<? super Void> subscriber) {    Context context = contextWithoutInitialSpan(subscriber.currentContext());    Span span = findOrCreateSpan(context);    //将 Span 放入执行上下文中,对于日志其实就是将链路信息放入 org.slf4j.MDC    //日志的 MDC 个别都是 ThreadLocal 的 Map,对于 Log4j2 的实现类就是 org.apache.logging.log4j.ThreadContext,其外围 contextMap 就是一个基于 ThreadLocal 实现的 Map    //简略了解就是将链路信息放入一个 ThreadLocal 的 Map 中,每个线程拜访本人的 Map 获取链路信息    try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) {        //将理论的 subscribe 用 Span 所在的 Context 包裹住,完结时敞开 Span        this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this));    }    //在 scope.close() 之后,会将链路信息从  ThreadLocal 的 Map 中剔除}@Overridepublic Object scanUnsafe(Attr key) {    if (key == Attr.RUN_STYLE) {        //执行的形式必须是不能切换线程,也就是同步的        //因为,日志的链路信息是放在 ThreadLocal 对象中,切换线程,链路信息就没了        return Attr.RunStyle.SYNC;     }    return super.scanUnsafe(key);}

WebFilterTraceSubscriber 干了些什么呢?出现异常,以及 http 申请完结的时候,咱们可能想将响应信息,异样信息记录进入 Span 中,就是通过这个类封装实现的。

通过 MonoWebFilterTrace 的封装,因为 Spring-WebFlux 解决申请,其实就是封装成咱们下面得出的 Mono 之后进行 subscribe 解决的申请,所以这样,整个外部 Mono 的 publish 链路以及 subscribe 链路,就被 WebFilterTraceSubscriber 中的 scope 包裹起来了。只有咱们本人不在 GatewayFilter 中转换成某些强制异步的 Mono 或者 Flux 导致切换线程,链路信息是不会失落的。

咱们利用中失落链路信息的中央

通过查看日志咱们发现,启用 RequestBody 缓存的中央,都有链路缺失。这个 RequestBody 缓存咱们应用的是 Spring Cloud Gateway 中的 AdaptCachedBodyGlobalFilter,其外围源码是:

private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange, boolean cacheDecoratedRequest,        Function<ServerHttpRequest, Mono<T>> function) {    ServerHttpResponse response = exchange.getResponse();    NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();    return         //读取 Body,因为 TCP 拆包,所以须要他们拼接到一起        DataBufferUtils.join(exchange.getRequest().getBody())            //如果没有 Body,则间接返回空 DataBuffer            .defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))            //decorate办法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了避免反复进入这个 `AdaptCachedBodyGlobalFilter` 的状况导致反复缓存申请 Body            //之后,应用新的 body 以及原始申请封装成新的申请,持续 GatewayFilters 链路            .map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))            .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);}

为何会应用这个 AdaptCachedBodyGlobalFilter 呢?获取申请 Body 是通过 exchange.getRequest().getBody() 获取的,其后果是一个 Flux<DataBuffer>.申请的 Body 是一次性的,如果你须要申请重试的话,在第一次调用失败的之后,第二次重试的时候,Body 就读取不到了,因为 Flux 曾经完结。所以,对于须要反复调用,例如重试,一对多路由转发的状况,须要将申请 Body 缓存起来,就是通过这个 GatewayFilter。然而通过这个 GatewayFilter 之后,链路信息就没了,能够通过以下这个简略我的项目进行复现(我的项目地址):

引入依赖:

<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.4.6</version></parent><dependencies>    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-starter-gateway</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-starter-sleuth</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-log4j2</artifactId>    </dependency>    <!--log4j2异步日志须要的依赖,所有我的项目都必须用log4j2和异步日志配置-->    <dependency>        <groupId>com.lmax</groupId>        <artifactId>disruptor</artifactId>        <version>${disruptor.version}</version>    </dependency></dependencies><dependencyManagement>    <dependencies>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-dependencies</artifactId>            <version>2020.0.3</version>            <type>pom</type>            <scope>import</scope>        </dependency>    </dependencies></dependencyManagement>

对所有门路开启 AdaptCachedBodyGlobalFilter:

@Configuration(proxyBeanMethods = false)public class ApiGatewayConfiguration {    @Autowired    private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter;    @Autowired    private GatewayProperties gatewayProperties;    @PostConstruct    public void init() {        gatewayProperties.getRoutes().forEach(routeDefinition -> {            //对 spring cloud gateway 路由配置中的每个路由都启用 AdaptCachedBodyGlobalFilter             EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId());            adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent);        });    }}

配置(咱们只有一个路由,将申请转发到 httpbin.org 这个 http 申请测试网站):

server:  port: 8181spring:  application:    name: apiGateway  cloud:    gateway:      httpclient:        connect-timeout: 500        response-timeout: 60000      routes:        - id: first_route          uri: http://httpbin.org          predicates:              - Path=/httpbin/**          filters:              - StripPrefix=1

增加两个全局 Filter,一个在 AdaptCachedBodyGlobalFilter 之前,一个在 AdaptCachedBodyGlobalFilter 之后。这两个 Filter 非常简单,只是打一行日志。

@Log4j2@Componentpublic class PreLogFilter implements GlobalFilter, Ordered {    public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1;    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        log.info("before AdaptCachedBodyGlobalFilter");        return chain.filter(exchange);    }    @Override    public int getOrder() {        return ORDER;    }}@Log4j2@Componentpublic class PostLogFilter implements GlobalFilter, Ordered {    public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1;    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        log.info("after AdaptCachedBodyGlobalFilter");        return chain.filter(exchange);    }    @Override    public int getOrder() {        return ORDER;    }}

最初指定 Log4j2 的输入格局中蕴含链路信息,就像系列文章结尾中指定的那样。

启动这个利用,之后拜访 http://127.0.0.1:8181/httpbin/anything,查看日志,发现 PostLogFilter 中的日志,没有链路信息了:

2021-09-08 06:32:35.457  INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter2021-09-08 06:32:35.474  INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter

为何链路信息会失落

咱们来看通过 AdaptCachedBodyGlobalFilter 之后,咱们后面拼的 Mono 链路会变成什么样:

return Mono.defer(() ->    new MonoWebFilterTrace(source,         RoutePredicateHandlerMapping.this.lookupRoute(exchange) //依据申请寻找路由                .flatMap((Function<Route, Mono<?>>) r -> {                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,前面咱们还会用到                    return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler                }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由                    Mono.empty()                     .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志                        if (logger.isTraceEnabled()) {                            logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");                    }                })))            .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则间接返回 404            .then(                Mono.defer(() -> {                    //省略在 AdaptCachedBodyGlobalFilter 后面的链路嵌套                    //读取 Body,因为 TCP 拆包,所以须要他们拼接到一起                    DataBufferUtils.join(exchange.getRequest().getBody())                        //如果没有 Body,则间接返回空 DataBuffer                        .defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))                        //decorate办法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了避免反复进入这个 `AdaptCachedBodyGlobalFilter` 的状况导致反复缓存申请 Body                        //之后,应用新的 body 以及原始申请封装成新的申请,持续 GatewayFilters 链路                        .map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))                        .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);                })                .then(Mono.empty()))            ), //调用对应的 Handler    TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {        //MetricsWebFilter 相干的解决,在后面的代码中给出了,这里省略    }););

其中 DataBufferUtils.join(exchange.getRequest().getBody()) 其实是一个 FluxReceive,这里咱们能够了解为:提交一个尝试读取申请 Body 的工作,将之后的 GatewayFilter 的链路解决加到在读取完 Body 之后的回调当中,提交这个工作后,立即返回。这么看可能比较复杂,咱们用一个相似的例子类比下:

//首先咱们创立一个新的 SpanSpan span = tracer.newTrace();//申明一个相似于 TraceWebFilter 中封装的 MonoWebFilterTrace 的 MonoOperatorclass MonoWebFilterTrace<T> extends MonoOperator<T, T> {    protected MonoWebFilterTrace(Mono<? extends T> source) {        super(source);    }    @Override    public void subscribe(CoreSubscriber<? super T> actual) {        //将 subscribe 用 span 包裹        try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) {            source.subscribe(actual);            //在将要敞开 spanInScope 的时候(即从 ThreadLocal 的 Map 中移除链路信息),打印日志            log.info("stopped");        }    }}Mono.defer(() -> new MonoWebFilterTrace(        Mono.fromRunnable(() -> {            log.info("first");        })        //模仿 FluxReceive        .then(Mono.delay(Duration.ofSeconds(1))        .doOnSuccess(longSignal -> log.info(longSignal))))).subscribe(aLong -> log.info(aLong));

Mono.delay 和 FluxReceive 体现相似,都是异步切换线程池执行。执行下面的代码,咱们能够从日志下面就能看进去:

2021-09-08 07:12:45.236  INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first2021-09-08 07:12:45.240  INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped2021-09-08 07:12:46.241  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)2021-09-08 07:12:46.242  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()2021-09-08 07:12:46.242  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0

在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 应用的线程池和调用 GatewayFilter 的是同一个线程池,所以可能线程还是同一个,然而因为 Span 曾经完结,从 ThreadLocal 的 Map 中曾经移除了链路信息,所以日志中还是没有链路信息。

微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种offer