本系列是 我 TM 人傻了 系列第五期[捂脸],往期精彩回顾:
- 降级到 Spring 5.3.x 之后,GC 次数急剧减少,我 TM 人傻了
- 这个大表走索引字段查问的 SQL 怎么就成全扫描了,我 TM 人傻了
- 获取异样信息里再出异样就找不到日志了,我 TM 人傻了
- spring-data-redis 连贯透露,我 TM 人傻了
本篇文章波及底层设计以及原理,以及问题定位和可能的问题点,十分深刻,篇幅较长,所以拆分成上中下三篇:
- 上:问题简略形容以及 Spring Cloud Gateway 根本构造和流程以及底层原理
- 中:Spring Cloud Sleuth 如何在 Spring Cloud Gateway 退出的链路追踪以及为何会呈现这个问题
- 下:现有 Spring Cloud Sleuth 的非侵入设计带来的性能问题,其余可能的问题点,以及如何解决
Spring Cloud Gateway 其余的可能失落链路信息的点
通过后面的剖析,咱们能够看出,不止这里,还有其余中央会导致 Spring Cloud Sleuth 的链路追踪信息隐没,这里举几个大家常见的例子:
1. 在 GatewayFilter 中指定了异步执行某些工作,因为线程切换了,并且这时候可能 Span 曾经完结了,所以没有链路信息,例如:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> {
// 这里就没有链路信息了
log.info("success");
});
}
2. 将 GatewayFilter 中持续链路的 chain.filter(exchange)
放到了异步工作中执行,下面的 AdaptCachedBodyGlobalFilter 就属于这种状况,这样会导致之后的 GatewayFilter 都没有链路信息,例如:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));
}
Java 并发编程模型与 Project Reactor 编程模型的抵触思考
Java 中的很多框架,都用到了 ThreadLocal,或者通过 Thread 来标识唯一性。例如:
- 日志框架中的 MDC,个别都是 ThreadLocal 实现。
- 所有的锁、基于 AQS 的数据结构,都是通过 Thread 的属性来惟一标识谁获取到了锁的。
- 分布式锁等数据结构,也是通过 Thread 的属性来惟一标识谁获取到了锁的,例如 Redisson 中分布式 Redis 锁的实现。
然而放到 Project Reactor 编程模型,这就显得心心相印了,因为 Project Reactor 异步响应式编程就是不固定线程,没法保障提交工作和回调能在同一个线程,所以 ThreadLocal 的语义在这里很难成立。Project Reactor 尽管提供了对标 ThreadLocal 的 Context,然而支流框架还没有兼容这个 Context,所以给 Spring Cloud Sleuth 粘合这些链路追踪带来了很大艰难,因为 MDC 是一个 ThreadLocal 的 Map 实现,而不是基于 Context 的 Map。这就须要 Spring Cloud Sleuth 在订阅一开始,就须要将链路信息放入 MDC,同时还须要保障运行时不切换线程。
运行不切换线程,这样其实限度了 Project Reactor 的灵便调度,是有一些性能损失的。咱们其实想尽量就算退出了链路追踪信息,也不必强制运行不切换线程 。然而 Spring Cloud Sleuth 是 非侵入式设计 ,很难实现这一点。然而对于咱们本人业务的应用,咱们能够 定制一些编程标准,来保障大家写的代码不失落链路信息。
改良咱们的编程标准
首先,咱们 自定义 Mono 和 Flux 的工厂
公共 Subscriber 封装,将 reactor Subscriber 的所有要害接口,都查看以后上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则间接执行即可。
public class TracedCoreSubscriber<T> implements Subscriber<T>{
private final Subscriber<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void onSubscribe(Subscription s) {executeWithinScope(() -> {delegate.onSubscribe(s);
});
}
@Override
public void onError(Throwable t) {executeWithinScope(() -> {delegate.onError(t);
});
}
@Override
public void onComplete() {executeWithinScope(() -> {delegate.onComplete();
});
}
@Override
public void onNext(T o) {executeWithinScope(() -> {delegate.onNext(o);
});
}
private void executeWithinScope(Runnable runnable) {
// 如果以后没有链路信息,强制包裹
if (tracer.currentSpan() == null) {try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {runnable.run();
}
} else {
// 如果以后已有链路信息,则间接执行
runnable.run();}
}
}
之后别离定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:
public class TracedFlux<T> extends Flux<T> {
private final Flux<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}
public class TracedMono<T> extends Mono<T> {
private final Mono<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}
定义工厂类,应用申请 ServerWebExchange 和原始 Flux 创立 TracedFlux,以及应用申请 ServerWebExchange 和原始 Mono 创立 TracedMono,并且 Span 是通过 Attributes 获取的,依据前文的源码剖析咱们晓得,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。因为咱们只在 GatewayFilter 中应用,肯定在 TraceWebFilter 之后 所以这个 Attribute 肯定存在。
@Component
public class TracedPublisherFactory {protected static final String TRACE_REQUEST_ATTR = Span.class.getName();
@Autowired
private Tracer tracer;
@Autowired
private CurrentTraceContext currentTraceContext;
public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}
public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}
}
而后,咱们规定:1. 所有的 GatewayFilter,须要继承咱们自定义的抽象类,这个抽象类仅仅是把 filter 的后果用 TracedPublisherFactory 的 getTracedMono 给封装了一层 TracedMono,以 GlobalFilter 为例子:
public abstract class AbstractTracedFilter implements GlobalFilter {
@Autowired
protected TracedPublisherFactory tracedPublisherFactory;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange);
}
protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}
2. GatewayFilter 中新生成的 Flux 或者 Mono,对立应用 TracedPublisherFactory 再封装一层。
3. 对于 AdaptCachedBodyGlobalFilter 读取 Request Body 导致的链路失落,我向社区提了一个 Pull Request: fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家能够参考。也能够在这个 Filter 之前本人将 Request Body 应用 TracedPublisherFactory 进行封装解决。
微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer: