共计 5068 个字符,预计需要花费 13 分钟才能阅读完成。
本系列代码地址:https://github.com/JoJoTec/sp…
咱们在这一节咱们将持续解说防止链路信息失落做的设计,次要针对获取到现有 Span 之后,如何保障每个 GlobalFilter 都能放弃链路信息。首先,咱们 自定义 Reactor 的外围 Publisher 即 Mono 和 Flux 的工厂,将链路信息封装进去,保障由这个工厂生成的 Mono 和 Flux,都是只有是这个工厂生成的 Mono 和 Flux 之间无论怎么拼接都会放弃链路信息的:
自定义 Mono 和 Flux 的工厂
公共 Subscriber 封装,将 reactor Subscriber 的所有要害接口,都查看以后上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则间接执行即可。
public class TracedCoreSubscriber<T> implements Subscriber<T>{
private final Subscriber<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void onSubscribe(Subscription s) {executeWithinScope(() -> {delegate.onSubscribe(s);
});
}
@Override
public void onError(Throwable t) {executeWithinScope(() -> {delegate.onError(t);
});
}
@Override
public void onComplete() {executeWithinScope(() -> {delegate.onComplete();
});
}
@Override
public void onNext(T o) {executeWithinScope(() -> {delegate.onNext(o);
});
}
private void executeWithinScope(Runnable runnable) {
// 如果以后没有链路信息,强制包裹
if (tracer.currentSpan() == null) {try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {runnable.run();
}
} else {
// 如果以后已有链路信息,则间接执行
runnable.run();}
}
}
之后别离定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:
public class TracedFlux<T> extends Flux<T> {
private final Flux<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}
public class TracedMono<T> extends Mono<T> {
private final Mono<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}
定义工厂类,应用申请 ServerWebExchange 和原始 Flux 创立 TracedFlux,以及应用申请 ServerWebExchange 和原始 Mono 创立 TracedMono,并且 Span 是通过 Attributes 获取的,依据前文的源码剖析咱们晓得,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。因为咱们只在 GatewayFilter 中应用,肯定在 TraceWebFilter 之后 所以这个 Attribute 肯定存在。
@Component
public class TracedPublisherFactory {protected static final String TRACE_REQUEST_ATTR = Span.class.getName();
@Autowired
private Tracer tracer;
@Autowired
private CurrentTraceContext currentTraceContext;
public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}
public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}
}
公共形象 GlobalFilter – CommonTraceFilter
咱们编写所有咱们前面要实现的 GlobalFilter 的抽象类,这个抽象类的次要性能是:
- 保障继承这个抽象类的 GlobalFilter 自身以及拼接的链路中,是有链路信息的,其实就是保障它 filter 返回的 Mono<Void> 是由咱们下面实现的 Factory 生成的即可。
- 不同 GlobalFilter 之间须要排序,有程序的执行,这个通过实现 Ordered 接口即可
package com.github.jojotech.spring.cloud.apigateway.filter;
import com.github.jojotech.spring.cloud.apigateway.common.TraceWebFilterUtil;
import com.github.jojotech.spring.cloud.apigateway.common.TracedPublisherFactory;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.core.Ordered;
import org.springframework.web.server.ServerWebExchange;
/**
* 所有 filter 的子类
* 次要保障 span 的完整性,在某些状况下,span 会半途进行,导致日志中没有 traceId 和 spanId
* 参考:https://github.com/spring-cloud/spring-cloud-sleuth/issues/2004
*/
public abstract class AbstractTracedFilter implements GlobalFilter, Ordered {
@Autowired
protected Tracer tracer;
@Autowired
protected TracedPublisherFactory tracedPublisherFactory;
@Autowired
protected CurrentTraceContext currentTraceContext;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Mono<Void> traced;
if (tracer.currentSpan() == null) {
try (CurrentTraceContext.Scope scope = this.currentTraceContext
.maybeScope(((Span) exchange.getAttributes().get(TraceWebFilterUtil.TRACE_REQUEST_ATTR))
.context())) {traced = traced(exchange, chain);
}
}
else {
// 如果以后已有链路信息,则间接执行
traced = traced(exchange, chain);
}
return tracedPublisherFactory.getTracedMono(traced, exchange);
}
protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}
这样,咱们就能够基于这个抽象类去实现须要定制的 GlobalFilter 了
微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer: