聊聊sentinel的SentinelGatewayFilter

42次阅读

共计 8565 个字符,预计需要花费 22 分钟才能阅读完成。

本文主要研究一下 sentinel 的 SentinelGatewayFilter

SentinelGatewayFilter

Sentinel-1.6.2/sentinel-adapter/sentinel-spring-cloud-gateway-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/gateway/sc/SentinelGatewayFilter.java

public class SentinelGatewayFilter implements GatewayFilter, GlobalFilter {

    private final GatewayParamParser<ServerWebExchange> paramParser = new GatewayParamParser<>(new ServerWebExchangeItemParser());

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);

        Mono<Void> asyncResult = chain.filter(exchange);
        if (route != null) {String routeId = route.getId();
            Object[] params = paramParser.parseParameterFor(routeId, exchange,
                r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID);
            String origin = Optional.ofNullable(GatewayCallbackManager.getRequestOriginParser())
                .map(f -> f.apply(exchange))
                .orElse("");
            asyncResult = asyncResult.transform(
                new SentinelReactorTransformer<>(new EntryConfig(routeId, EntryType.IN,
                    1, params, new ContextConfig(contextName(routeId), origin)))
            );
        }

        Set<String> matchingApis = pickMatchingApiDefinitions(exchange);
        for (String apiName : matchingApis) {Object[] params = paramParser.parseParameterFor(apiName, exchange,
                r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME);
            asyncResult = asyncResult.transform(new SentinelReactorTransformer<>(new EntryConfig(apiName, EntryType.IN, 1, params))
            );
        }

        return asyncResult;
    }

    private String contextName(String route) {return SentinelGatewayConstants.GATEWAY_CONTEXT_ROUTE_PREFIX + route;}

    Set<String> pickMatchingApiDefinitions(ServerWebExchange exchange) {return GatewayApiMatcherManager.getApiMatcherMap().values()
            .stream()
            .filter(m -> m.test(exchange))
            .map(WebExchangeApiMatcher::getApiName)
            .collect(Collectors.toSet());
    }
}
  • SentinelGatewayFilter 实现了 GatewayFilter、GlobalFilter 接口;其 filter 方法主要是获取 route 信息,然后对 asyncResult 进行 transform,这里使用的是 SentinelReactorTransformer

SentinelReactorTransformer

Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java

public class SentinelReactorTransformer<T> implements Function<Publisher<T>, Publisher<T>> {

    private final EntryConfig entryConfig;

    public SentinelReactorTransformer(String resourceName) {this(new EntryConfig(resourceName));
    }

    public SentinelReactorTransformer(EntryConfig entryConfig) {AssertUtil.notNull(entryConfig, "entryConfig cannot be null");
        this.entryConfig = entryConfig;
    }

    @Override
    public Publisher<T> apply(Publisher<T> publisher) {if (publisher instanceof Mono) {return new MonoSentinelOperator<>((Mono<T>) publisher, entryConfig);
        }
        if (publisher instanceof Flux) {return new FluxSentinelOperator<>((Flux<T>) publisher, entryConfig);
        }

        throw new IllegalStateException("Publisher type is not supported:" + publisher.getClass().getCanonicalName());
    }
}
  • SentinelReactorTransformer 使用 entryConfig 创建了 MonoSentinelOperator 或者 MonoSentinelOperator

MonoSentinelOperator

Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java

public class MonoSentinelOperator<T> extends MonoOperator<T, T> {

    private final EntryConfig entryConfig;

    public MonoSentinelOperator(Mono<? extends T> source, EntryConfig entryConfig) {super(source);
        AssertUtil.notNull(entryConfig, "entryConfig cannot be null");
        this.entryConfig = entryConfig;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, true));
    }
}
  • MonoSentinelOperator 在 subscribe 的时候,使用的是 SentinelReactorSubscriber

FluxSentinelOperator

Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java

public class FluxSentinelOperator<T> extends FluxOperator<T, T> {

    private final EntryConfig entryConfig;

    public FluxSentinelOperator(Flux<? extends T> source, EntryConfig entryConfig) {super(source);
        AssertUtil.notNull(entryConfig, "entryConfig cannot be null");
        this.entryConfig = entryConfig;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, false));
    }
}
  • FluxSentinelOperator 在 subscribe 的时候,使用的是 SentinelReactorSubscriber

SentinelReactorSubscriber

Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java

public class SentinelReactorSubscriber<T> extends InheritableBaseSubscriber<T> {

    private final EntryConfig entryConfig;

    private final CoreSubscriber<? super T> actual;
    private final boolean unary;

    private volatile AsyncEntry currentEntry;
    private final AtomicBoolean entryExited = new AtomicBoolean(false);

    public SentinelReactorSubscriber(EntryConfig entryConfig,
                                     CoreSubscriber<? super T> actual,
                                     boolean unary) {checkEntryConfig(entryConfig);
        this.entryConfig = entryConfig;
        this.actual = actual;
        this.unary = unary;
    }

    private void checkEntryConfig(EntryConfig config) {AssertUtil.notNull(config, "entryConfig cannot be null");
    }

    @Override
    public Context currentContext() {if (currentEntry == null || entryExited.get()) {return actual.currentContext();
        }
        com.alibaba.csp.sentinel.context.Context sentinelContext = currentEntry.getAsyncContext();
        if (sentinelContext == null) {return actual.currentContext();
        }
        return actual.currentContext()
            .put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, currentEntry.getAsyncContext());
    }

    private void doWithContextOrCurrent(Supplier<Optional<com.alibaba.csp.sentinel.context.Context>> contextSupplier,
                                        Runnable f) {Optional<com.alibaba.csp.sentinel.context.Context> contextOpt = contextSupplier.get();
        if (!contextOpt.isPresent()) {
            // Provided context is absent, use current context.
            f.run();} else {
            // Run on provided context.
            ContextUtil.runOnContext(contextOpt.get(), f);
        }
    }

    private void entryWhenSubscribed() {ContextConfig sentinelContextConfig = entryConfig.getContextConfig();
        if (sentinelContextConfig != null) {
            // If current we're already in a context, the context config won't work.
            ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin());
        }
        try {AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName(), entryConfig.getEntryType(),
                entryConfig.getAcquireCount(), entryConfig.getArgs());
            this.currentEntry = entry;
            actual.onSubscribe(this);
        } catch (BlockException ex) {// Mark as completed (exited) explicitly.
            entryExited.set(true);
            // Signal cancel and propagate the {@code BlockException}.
            cancel();
            actual.onSubscribe(this);
            actual.onError(ex);
        } finally {if (sentinelContextConfig != null) {ContextUtil.exit();
            }
        }
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {doWithContextOrCurrent(() -> currentContext().getOrEmpty(SentinelReactorConstants.SENTINEL_CONTEXT_KEY),
            this::entryWhenSubscribed);
    }

    @Override
    protected void hookOnNext(T value) {if (isDisposed()) {tryCompleteEntry();
            return;
        }
        doWithContextOrCurrent(() -> Optional.ofNullable(currentEntry).map(AsyncEntry::getAsyncContext),
            () -> actual.onNext(value));

        if (unary) {// For some cases of unary operator (Mono), we have to do this during onNext hook.
            // e.g. this kind of order: onSubscribe() -> onNext() -> cancel() -> onComplete()
            // the onComplete hook will not be executed so we'll need to complete the entry in advance.
            tryCompleteEntry();}
    }

    @Override
    protected void hookOnComplete() {tryCompleteEntry();
        actual.onComplete();}

    @Override
    protected boolean shouldCallErrorDropHook() {
        // When flow control triggered or stream terminated, the incoming
        // deprecated exceptions should be dropped implicitly, so we'll not call the `onErrorDropped` hook.
        return !entryExited.get();}

    @Override
    protected void hookOnError(Throwable t) {if (currentEntry != null && currentEntry.getAsyncContext() != null) {
            // Normal requests with non-BlockException will go through here.
            Tracer.traceContext(t, 1, currentEntry.getAsyncContext());
        }
        tryCompleteEntry();
        actual.onError(t);
    }

    @Override
    protected void hookOnCancel() {}

    private boolean tryCompleteEntry() {if (currentEntry != null && entryExited.compareAndSet(false, true)) {currentEntry.exit(1, entryConfig.getArgs());
            return true;
        }
        return false;
    }
}
  • SentinelReactorSubscriber 继承了 InheritableBaseSubscriber( 拷贝自 reactor.core.publisher.BaseSubscriber,允许子类覆盖 onSubscribe、onNext、onError、onComplete 方法 )
  • 这里 hookOnSubscribe 调用了 entryWhenSubscribed,它在 sentinelContextConfig 不为 null 的时候会先执行 ContextUtil.enter,然后使用 SphU.asyncEntry 创建了 AsyncEntry,最后在 finally 里头在 sentinelContextConfig 不为 null 的时候执行 ContextUtil.exit();
  • 这里 hookOnNext、hookOnComplete、hookOnError 都调用了 tryCompleteEntry 方法,它主要是尝试退出 AsyncEntry

小结

  • SentinelGatewayFilter 实现了 GatewayFilter、GlobalFilter 接口;其 filter 方法主要是获取 route 信息,然后对 asyncResult 进行 transform,这里使用的是 SentinelReactorTransformer
  • SentinelReactorTransformer 使用 entryConfig 创建了 MonoSentinelOperator 或者 MonoSentinelOperator;它们在 subscribe 的时候,使用的是 SentinelReactorSubscriber
  • SentinelReactorSubscriber 主要是在 hookOnSubscribe 的时候调用了 entryWhenSubscribed 方法创建 AsyncEntry,在 hookOnNext、hookOnComplete、hookOnError 的时候调用了 tryCompleteEntry 方法,尝试退出 AsyncEntry

doc

  • SentinelGatewayFilter

正文完
 0