乐趣区

问题分析HystrixTimeout后请求没有立即中断

1. 问题

背景

SpringCloud 框架,没有特殊的实现。即,请求到达 Zuul 网关后,由 Ribbon 负载均衡到目标组件节点,由 Hystrix 转发请求。

关键配置

    hystrix.command.default.execution.isolation.strategy=THREAD
    hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000

现象

一次调用中发现,请求过程超过 10s 后,后台已经打印 HystrixTimeoutException 且进入了自定义的 FallbackProvider,前端仍然没有收到响应,直到请求链路处理完成,前端才返回 FallbackProvider 中返回的异常响应。

2. 分析

尝试一

看文档 -Hystrix 官方文档,THREAD 隔离模式下是请求超时是会取消调用线程从而立即返回的,SEMAPHORE 模式下会等待响应回来再判断是否超时。而上述配置的是所有的 Route 都默认是 THREAD- 线程隔离模式,遂认为配置没问题;

尝试二

跟踪源码,RxJava 实现的,响应式编程不熟悉,初期调试时没头苍蝇一样到处打断点,看得不明所以 (现在也是)。网上的资料大多是翻译上述文档,只知道是 HystrixCommand.execute() 发送请求,AbstractCommand.handleTimeoutViaFallback()触发 FallbackProvider,中间超时如何处理没有说清;

尝试三

开启 DEBUG 级别日志,用日志跟踪一个请求的全部过程。发现打印的配置是executionIsolationStrategy=SEMAPHORE!!!查阅 SpringCloud 相关资料,发现用 Hystrix+Ribbon 的时候,发送请求用的是 HystrixCommand 的 AbstractRibbonCommand 实现,而后者的部分配置会覆盖掉 HystrixCommandProperties 中的配置,其中就有隔离模式这项配置,用的是 ZuulProperties 中的默认值 SEMAPHORE:

AbstractRibbonCommand

    protected AbstractRibbonCommand(Setter setter, LBC client,
                                 RibbonCommandContext context,
                                 ZuulFallbackProvider fallbackProvider, IClientConfig config) {
        // 将 setter 传到 HystrixCommand 的构造方法中                                 
        super(setter);
        this.client = client;
        this.context = context;
        this.zuulFallbackProvider = fallbackProvider;
        this.config = config;
    }

    // 创建 Setter
    protected static HystrixCommandProperties.Setter createSetter(IClientConfig config, String commandKey, ZuulProperties zuulProperties) {int hystrixTimeout = getHystrixTimeout(config, commandKey);
        return HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
                //executionIsolationStrategy 用的是 ZuulProperties 中的值
                zuulProperties.getRibbonIsolationStrategy()).withExecutionTimeoutInMilliseconds(hystrixTimeout);
    }

ZuulProperties

    // 默认是 SEMAPHORE
    private ExecutionIsolationStrategy ribbonIsolationStrategy = SEMAPHORE;

HystrixCommandProperties

    // 最终 setter 作为参数 builder 传入
    protected HystrixCommandProperties(HystrixCommandKey key, HystrixCommandProperties.Setter builder, String propertyPrefix) {
        this.key = key;
        // 省略其它配置
        this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);

3. 解决方式

a. 指定 commandKey 的方式

hystrix.command.aService.execution.isolation.strategy=THREAD

b. 修改 Zuul 配置的方式,注意 useSeparateThreadPools 默认为 false,此时所有组件共用一个 commandKey=RibbinCommand 的线程池

zuul.ribbonIsolationStrategy=THREAD 指定 ribbon 的隔离模式
zuul.threadPool.useSeparateThreadPools=true 每个 commandKey 一个线程池

4. Hystrix 源码解析

在对 RxJava 有大概了解的准备下,梳理 Hystrix 关键请求流程如下:
a. 发送请求,进入 Zuul 过滤器 RibbonRoutingFilter,通过工厂类创建 AbstractRibbonCommand,调用其 execute 方法

    protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {Map<String, Object> info = this.helper.debug(context.getMethod(),
                context.getUri(), context.getHeaders(), context.getParams(),
                context.getRequestEntity());
        // 创建 AbstractRibbonCommand
        RibbonCommand command = this.ribbonCommandFactory.create(context);
        try {
            // 调用 execute 方法
            ClientHttpResponse response = command.execute();
            this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
            return response;
        }
        catch (HystrixRuntimeException ex) {return handleException(info, ex);
        }

    }

b. 进入 HystrixCommand.execute(),实际是调用 Future.get()来立即获取异步方法 HystrixCommand.queue()的结果

    public R execute() {
        try {
        //queue 方法返回的是 Future
            return queue().get();
        } catch (Exception e) {throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

c. 通过 AbstractCommand.toObservable()创建一个待订阅的被观察对象(即 Observable),创建过程:

  • 没有缓存时进入 applyHystrixSemantics 方法
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // put in cache
                if (requestCacheEnabled && cacheKey != null) {
                    // wrap it for caching
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // another thread beat us so we'll use the cached value instead
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // we just created an ObservableCommand so we cast and return it
                        afterCache = toCache.toObservable();}
                } else {
                    // 没有缓存使用 applyHystrixSemantics
                    afterCache = hystrixObservable;
                }
  • 获取到信号量后进入 executeCommandAndObserve,在 THREAD 模式下 executionSemaphore 的实现是 TryableSemaphoreNoOp,tryAcquire()默认返回 true
            // 获取信号量,THREAD 模式下
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {return Observable.error(e);
                }
            } else {return handleSemaphoreRejectionViaFallback();
            }
  • 对 Observable 进行一些装饰,触发事件、记录状态、异常处理,之后进入 executeCommandWithSpecifiedIsolation
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    // 超时异常在此处理
                    return handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };
  • executeCommandWithSpecifiedIsolation 中对不同的隔离模式进行了不同的处理,主要区别是 THREAD 模式下对 Observable 调用了 subscribeOn 方法,切换到 threadPool.getScheduler 中的线程执行
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {...}).doOnTerminate(new Action0() {...}).doOnUnsubscribe(new Action0() {...}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    // 原始 HystrixCommand 的状态为 TIMED_OUT 时
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;}
            }));
        }
  • threadPool 是 AbstractCommand 初始化时创建的 HystrixThreadPool,默认实现是 HystrixThreadPoolDefault,getScheduler 方法动态更新配置并返回一个 HystrixContextScheduler,参数 shouldInterruptThread 表示超时后是否打断请求执行线程
        @Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
            // 动态更新配置
            touchConfig();
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }
  • (不知道哪里)调用 HystrixContextScheduler.creataeWorker 返回 HystrixContextSchedulerWorker,继续调用 HystrixContextSchedulerWorker.schedule
    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        // actualScheduler 是 ThreadPoolScheduler
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }
    
    @Override
    public Worker createWorker() {
        // 创建 HystrixContextSchedulerWorker,参数是 ThreadPoolWorker
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }
  • 由实现类 ThreadPoolWorker 执行 schedule 方法,包装 action,从 AbstractCommand 的 threadPool 中获取执行器执行之,将上述参数 shouldInterruptThread 作为参数构建一个 FutureCompleterWithConfigurableInterrupt,作为订阅消息加入到执行任务中,超时后会将
        @Override
        public Subscription schedule(final Action0 action) {if (subscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();}

            // This is internal RxJava API but it is too useful.
            // 包装 action
            ScheduledAction sa = new ScheduledAction(action);

            // 这里不懂这个操作啥意思
            subscription.add(sa);
            sa.addParent(subscription);

            // 获取执行器
            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            // 执行 action
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            // 加入中断线程的 subscription
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }
  • FutureCompleterWithConfigurableInterrupt 取消订阅时移除任务,中断请求
        @Override
        public void unsubscribe() {
            // 移除上述 action
            executor.remove(f);
            if (shouldInterruptThread.call()) {// 结果为 true 取消 future
                f.cancel(true);
            } else {f.cancel(false);
            }
        }
  • 后面的流程就不写了,与本问题无关
退出移动版