共计 8872 个字符,预计需要花费 23 分钟才能阅读完成。
调用 Fallback 有以下几种状况:
- 熔断器开启
- 信号量回绝
- 线程池回绝
- 执行办法失败
hystrix – @EnableCircuitBreaker 那些事咱们晓得最终会调用 HystrixCommand 的 execute 办法,他这个办法就会调用 queue 办法。
public R execute() {
try {return queue().get();} catch (Exception e) {throw Exceptions.sneakyThrow(decomposeException(e));
}
}
queue 办法如下,这里是拿到一个 Future 对象。重点还是 toObservable 办法。
public Future<R> queue() {
// 其余略
final Future<R> delegate = toObservable().toBlocking().toFuture();
// 其余略
}
这里会定义 Observable,他会判断是否要从缓存取值,如果没有,afterCache 就取 applyHystrixSemantics,所以前面就会调用 applyHystrixSemantics 办法。
public Observable<R> toObservable() {
// 其余略
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
// 其余略
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 其余略
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();}
} else {afterCache = hystrixObservable;}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
熔断判断
这里是比拟重要的代码,首先判断是否开启熔断,如果开启了,就调用 handleShortCircuitViaFallback 办法,如果没有开启熔断,他还会去判断是否能申请到信号量,申请不到就调用 handleSemaphoreRejectionViaFallback 办法。如果都失常,就调用 executeCommandAndObserve 办法。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 其余略
if (circuitBreaker.allowRequest()) {
// 其余略
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {return Observable.error(e);
}
} else {return handleSemaphoreRejectionViaFallback();
}
} else {
//
return handleShortCircuitViaFallback();}
}
熔断开启
咱们先看看熔断开启后的调用,会通过 getFallbackObservable 办法获取 fallbackExecutionChain,getFallbackObservable 次要的作用就是调用 getFallback 办法。
private Observable<R> handleShortCircuitViaFallback() {
// 其余略
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
// 其余略
}
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// 其余略
fallbackExecutionChain = getFallbackObservable();
// 其余略
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
}
// 其余略
}
@Override
final protected Observable<R> getFallbackObservable() {return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {return Observable.just(getFallback());
} catch (Throwable ex) {return Observable.error(ex);
}
}
});
}
getFallback 是调用 fallback 办法的入口,MethodExecutionAction 会通过发射,调用咱们配置的办法。
@Override
protected Object getFallback() {final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {return process(new Action() {
@Override
Object execute() {MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {return super.getFallback();
}
}
信号量隔离
咱们看了下面熔断开启的代码,这边就比较简单,就是跟下面一样,调用 getFallbackOrThrowException 办法。
private Observable<R> handleSemaphoreRejectionViaFallback() {
// 其余略
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException);
}
线程池隔离
如果熔断没开,信号量又能够获取到,他就会调用 executeCommandAndObserve 办法。这个办法,handleFallback 定义了几个异样,比方线程池异样解决,工夫超时解决,申请异样解决,以及其余异样解决。而后调用 executeCommandWithSpecifiedIsolation 办法。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 其余略
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
// 其余略
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
这个办法次要是两件事件,一个是 subscribeOn 办法,这里是线程池隔离用的,另外一个就是失常状况下的调用,调用的是 getUserExecutionObservable 办法,这个办法在线程池前面讲。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 其余略
return getUserExecutionObservable(_cmd);
// 其余略
}
})// 其余略
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 其余略
}
创立相干类
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 动静配置线程池信息
touchConfig();
// 创立一个 HystrixContextScheduler 对象,这里又会创立一个 ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
这里创立一个 HystrixContextScheduler 对象,ThreadPoolScheduler 对象用于创立 ThreadPoolWorker,并赋值给 HystrixContextSchedulerWorker
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 动静配置线程池信息
touchConfig();
// 创立一个 HystrixContextScheduler 对象,这里又会创立一个 ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
public Worker createWorker() {return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
线程池判断的中央在 HystrixContextSchedulerWorker 的 schedule 办法,不够就抛 RejectedExecutionException 异样,异样的捕捉下面曾经讲了。
public Subscription schedule(Action0 action) {if (threadPool != null) {
// 判断线程池线程的中央
if (!threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
失常调用
如果下面都失常,就会调用 getUserExecutionObservable 办法。这个办法最初会调用 run 办法。
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
// 其余略
userObservable = getExecutionObservable();
// 其余略
}
@Override
final protected Observable<R> getExecutionObservable() {return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {return Observable.just(run());
} catch (Throwable ex) {return Observable.error(ex);
}
}
})// 其余略
}
在这里,同样通过反射,调用到咱们的办法。
protected Object run() throws Exception {LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {return getCommandAction().execute(getExecutionType());
}
});
}
总结
下面讲了几个 fallback 调用的办法,以及失常的流程,流程图如下