1. 问题
背景
SpringCloud 框架,没有特殊的实现。即,请求到达 Zuul 网关后,由 Ribbon 负载均衡到目标组件节点,由 Hystrix 转发请求。
关键配置
hystrix.command.default.execution.isolation.strategy=THREAD
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000
现象
一次调用中发现,请求过程超过 10s 后,后台已经打印 HystrixTimeoutException 且进入了自定义的 FallbackProvider,前端仍然没有收到响应,直到请求链路处理完成,前端才返回 FallbackProvider 中返回的异常响应。
2. 分析
尝试一
看文档 -Hystrix 官方文档,THREAD 隔离模式下是请求超时是会取消调用线程从而立即返回的,SEMAPHORE 模式下会等待响应回来再判断是否超时。而上述配置的是所有的 Route 都默认是 THREAD- 线程隔离模式,遂认为配置没问题;
尝试二
跟踪源码,RxJava 实现的,响应式编程不熟悉,初期调试时没头苍蝇一样到处打断点,看得不明所以 (现在也是)。网上的资料大多是翻译上述文档,只知道是 HystrixCommand.execute() 发送请求,AbstractCommand.handleTimeoutViaFallback()触发 FallbackProvider,中间超时如何处理没有说清;
尝试三
开启 DEBUG 级别日志,用日志跟踪一个请求的全部过程。发现打印的配置是executionIsolationStrategy=SEMAPHORE!!!查阅 SpringCloud 相关资料,发现用 Hystrix+Ribbon 的时候,发送请求用的是 HystrixCommand 的 AbstractRibbonCommand 实现,而后者的部分配置会覆盖掉 HystrixCommandProperties 中的配置,其中就有隔离模式这项配置,用的是 ZuulProperties 中的默认值 SEMAPHORE:
AbstractRibbonCommand
protected AbstractRibbonCommand(Setter setter, LBC client,
RibbonCommandContext context,
ZuulFallbackProvider fallbackProvider, IClientConfig config) {
// 将 setter 传到 HystrixCommand 的构造方法中
super(setter);
this.client = client;
this.context = context;
this.zuulFallbackProvider = fallbackProvider;
this.config = config;
}
// 创建 Setter
protected static HystrixCommandProperties.Setter createSetter(IClientConfig config, String commandKey, ZuulProperties zuulProperties) {int hystrixTimeout = getHystrixTimeout(config, commandKey);
return HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
//executionIsolationStrategy 用的是 ZuulProperties 中的值
zuulProperties.getRibbonIsolationStrategy()).withExecutionTimeoutInMilliseconds(hystrixTimeout);
}
ZuulProperties
// 默认是 SEMAPHORE
private ExecutionIsolationStrategy ribbonIsolationStrategy = SEMAPHORE;
HystrixCommandProperties
// 最终 setter 作为参数 builder 传入
protected HystrixCommandProperties(HystrixCommandKey key, HystrixCommandProperties.Setter builder, String propertyPrefix) {
this.key = key;
// 省略其它配置
this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);
3. 解决方式
a. 指定 commandKey 的方式
hystrix.command.aService.execution.isolation.strategy=THREAD
b. 修改 Zuul 配置的方式,注意 useSeparateThreadPools 默认为 false,此时所有组件共用一个 commandKey=RibbinCommand 的线程池
zuul.ribbonIsolationStrategy=THREAD 指定 ribbon 的隔离模式
zuul.threadPool.useSeparateThreadPools=true 每个 commandKey 一个线程池
4. Hystrix 源码解析
在对 RxJava 有大概了解的准备下,梳理 Hystrix 关键请求流程如下:
a. 发送请求,进入 Zuul 过滤器 RibbonRoutingFilter,通过工厂类创建 AbstractRibbonCommand,调用其 execute 方法
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {Map<String, Object> info = this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());
// 创建 AbstractRibbonCommand
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
// 调用 execute 方法
ClientHttpResponse response = command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
}
catch (HystrixRuntimeException ex) {return handleException(info, ex);
}
}
b. 进入 HystrixCommand.execute(),实际是调用 Future.get()来立即获取异步方法 HystrixCommand.queue()的结果
public R execute() {
try {
//queue 方法返回的是 Future
return queue().get();
} catch (Exception e) {throw Exceptions.sneakyThrow(decomposeException(e));
}
}
c. 通过 AbstractCommand.toObservable()创建一个待订阅的被观察对象(即 Observable),创建过程:
- 没有缓存时进入 applyHystrixSemantics 方法
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();}
} else {
// 没有缓存使用 applyHystrixSemantics
afterCache = hystrixObservable;
}
- 获取到信号量后进入 executeCommandAndObserve,在 THREAD 模式下 executionSemaphore 的实现是 TryableSemaphoreNoOp,tryAcquire()默认返回 true
// 获取信号量,THREAD 模式下
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {return Observable.error(e);
}
} else {return handleSemaphoreRejectionViaFallback();
}
- 对 Observable 进行一些装饰,触发事件、记录状态、异常处理,之后进入 executeCommandWithSpecifiedIsolation
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
// 超时异常在此处理
return handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
- executeCommandWithSpecifiedIsolation 中对不同的隔离模式进行了不同的处理,主要区别是 THREAD 模式下对 Observable 调用了 subscribeOn 方法,切换到 threadPool.getScheduler 中的线程执行
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {...}).doOnTerminate(new Action0() {...}).doOnUnsubscribe(new Action0() {...}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
// 原始 HystrixCommand 的状态为 TIMED_OUT 时
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;}
}));
}
- threadPool 是 AbstractCommand 初始化时创建的 HystrixThreadPool,默认实现是 HystrixThreadPoolDefault,getScheduler 方法动态更新配置并返回一个 HystrixContextScheduler,参数 shouldInterruptThread 表示超时后是否打断请求执行线程
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 动态更新配置
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
- (不知道哪里)调用 HystrixContextScheduler.creataeWorker 返回 HystrixContextSchedulerWorker,继续调用 HystrixContextSchedulerWorker.schedule
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
// actualScheduler 是 ThreadPoolScheduler
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
@Override
public Worker createWorker() {
// 创建 HystrixContextSchedulerWorker,参数是 ThreadPoolWorker
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
- 由实现类 ThreadPoolWorker 执行 schedule 方法,包装 action,从 AbstractCommand 的 threadPool 中获取执行器执行之,将上述参数 shouldInterruptThread 作为参数构建一个 FutureCompleterWithConfigurableInterrupt,作为订阅消息加入到执行任务中,超时后会将
@Override
public Subscription schedule(final Action0 action) {if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();}
// This is internal RxJava API but it is too useful.
// 包装 action
ScheduledAction sa = new ScheduledAction(action);
// 这里不懂这个操作啥意思
subscription.add(sa);
sa.addParent(subscription);
// 获取执行器
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// 执行 action
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
// 加入中断线程的 subscription
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
- FutureCompleterWithConfigurableInterrupt 取消订阅时移除任务,中断请求
@Override
public void unsubscribe() {
// 移除上述 action
executor.remove(f);
if (shouldInterruptThread.call()) {// 结果为 true 取消 future
f.cancel(true);
} else {f.cancel(false);
}
}
- 后面的流程就不写了,与本问题无关