关于spring-cloud:Spring-Cloud-Gateway-没有链路信息我-TM-人傻了下

36次阅读

共计 5469 个字符,预计需要花费 14 分钟才能阅读完成。

本系列是 我 TM 人傻了 系列第五期[捂脸],往期精彩回顾:

  • 降级到 Spring 5.3.x 之后,GC 次数急剧减少,我 TM 人傻了
  • 这个大表走索引字段查问的 SQL 怎么就成全扫描了,我 TM 人傻了
  • 获取异样信息里再出异样就找不到日志了,我 TM 人傻了
  • spring-data-redis 连贯透露,我 TM 人傻了

本篇文章波及底层设计以及原理,以及问题定位和可能的问题点,十分深刻,篇幅较长,所以拆分成上中下三篇:

  • :问题简略形容以及 Spring Cloud Gateway 根本构造和流程以及底层原理
  • :Spring Cloud Sleuth 如何在 Spring Cloud Gateway 退出的链路追踪以及为何会呈现这个问题
  • :现有 Spring Cloud Sleuth 的非侵入设计带来的性能问题,其余可能的问题点,以及如何解决

Spring Cloud Gateway 其余的可能失落链路信息的点

通过后面的剖析,咱们能够看出,不止这里,还有其余中央会导致 Spring Cloud Sleuth 的链路追踪信息隐没,这里举几个大家常见的例子:

1. 在 GatewayFilter 中指定了异步执行某些工作,因为线程切换了,并且这时候可能 Span 曾经完结了,所以没有链路信息,例如

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> {
            // 这里就没有链路信息了
            log.info("success");
    });
}

2. 将 GatewayFilter 中持续链路的 chain.filter(exchange) 放到了异步工作中执行,下面的 AdaptCachedBodyGlobalFilter 就属于这种状况,这样会导致之后的 GatewayFilter 都没有链路信息,例如:

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));
}

Java 并发编程模型与 Project Reactor 编程模型的抵触思考

Java 中的很多框架,都用到了 ThreadLocal,或者通过 Thread 来标识唯一性。例如:

  • 日志框架中的 MDC,个别都是 ThreadLocal 实现。
  • 所有的锁、基于 AQS 的数据结构,都是通过 Thread 的属性来惟一标识谁获取到了锁的。
  • 分布式锁等数据结构,也是通过 Thread 的属性来惟一标识谁获取到了锁的,例如 Redisson 中分布式 Redis 锁的实现。

然而放到 Project Reactor 编程模型,这就显得心心相印了,因为 Project Reactor 异步响应式编程就是不固定线程,没法保障提交工作和回调能在同一个线程,所以 ThreadLocal 的语义在这里很难成立。Project Reactor 尽管提供了对标 ThreadLocal 的 Context,然而支流框架还没有兼容这个 Context,所以给 Spring Cloud Sleuth 粘合这些链路追踪带来了很大艰难,因为 MDC 是一个 ThreadLocal 的 Map 实现,而不是基于 Context 的 Map。这就须要 Spring Cloud Sleuth 在订阅一开始,就须要将链路信息放入 MDC,同时还须要保障运行时不切换线程。

运行不切换线程,这样其实限度了 Project Reactor 的灵便调度,是有一些性能损失的。咱们其实想尽量就算退出了链路追踪信息,也不必强制运行不切换线程 。然而 Spring Cloud Sleuth 是 非侵入式设计 ,很难实现这一点。然而对于咱们本人业务的应用,咱们能够 定制一些编程标准,来保障大家写的代码不失落链路信息

改良咱们的编程标准

首先,咱们 自定义 Mono 和 Flux 的工厂

公共 Subscriber 封装,将 reactor Subscriber 的所有要害接口,都查看以后上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则间接执行即可。

public class TracedCoreSubscriber<T> implements Subscriber<T>{
    private final Subscriber<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void onSubscribe(Subscription s) {executeWithinScope(() -> {delegate.onSubscribe(s);
        });
    }

    @Override
    public void onError(Throwable t) {executeWithinScope(() -> {delegate.onError(t);
        });
    }

    @Override
    public void onComplete() {executeWithinScope(() -> {delegate.onComplete();
        });
    }

    @Override
    public void onNext(T o) {executeWithinScope(() -> {delegate.onNext(o);
        });
    }

    private void executeWithinScope(Runnable runnable) {
        // 如果以后没有链路信息,强制包裹
        if (tracer.currentSpan() == null) {try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {runnable.run();
            }
        } else {
            // 如果以后已有链路信息,则间接执行
            runnable.run();}
    }
}

之后别离定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:

public class TracedFlux<T> extends Flux<T> {
    private final Flux<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
    }
}

public class TracedMono<T> extends Mono<T> {
    private final Mono<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
    }
}

定义工厂类,应用申请 ServerWebExchange 和原始 Flux 创立 TracedFlux,以及应用申请 ServerWebExchange 和原始 Mono 创立 TracedMono,并且 Span 是通过 Attributes 获取的,依据前文的源码剖析咱们晓得,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。因为咱们只在 GatewayFilter 中应用,肯定在 TraceWebFilter 之后 所以这个 Attribute 肯定存在。

@Component
public class TracedPublisherFactory {protected static final String TRACE_REQUEST_ATTR = Span.class.getName();

    @Autowired
    private Tracer tracer;
    @Autowired
    private CurrentTraceContext currentTraceContext;

    public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
    }

    public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
    }
}

而后,咱们规定:1. 所有的 GatewayFilter,须要继承咱们自定义的抽象类,这个抽象类仅仅是把 filter 的后果用 TracedPublisherFactory 的 getTracedMono 给封装了一层 TracedMono,以 GlobalFilter 为例子:

public abstract class AbstractTracedFilter implements GlobalFilter {
    @Autowired
    protected TracedPublisherFactory tracedPublisherFactory;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange);
    }

    protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}

2. GatewayFilter 中新生成的 Flux 或者 Mono,对立应用 TracedPublisherFactory 再封装一层

3. 对于 AdaptCachedBodyGlobalFilter 读取 Request Body 导致的链路失落,我向社区提了一个 Pull Request: fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家能够参考。也能够在这个 Filter 之前本人将 Request Body 应用 TracedPublisherFactory 进行封装解决。

微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer

正文完
 0