调用Fallback有以下几种状况:
- 熔断器开启
- 信号量回绝
- 线程池回绝
- 执行办法失败
hystrix - @EnableCircuitBreaker那些事咱们晓得最终会调用HystrixCommand的execute办法,他这个办法就会调用queue办法。
public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); }}
queue办法如下,这里是拿到一个Future对象。重点还是toObservable办法。
public Future<R> queue() { // 其余略 final Future<R> delegate = toObservable().toBlocking().toFuture(); // 其余略 }
这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以前面就会调用applyHystrixSemantics办法。
public Observable<R> toObservable() { //其余略 final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; //其余略 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { //其余略 final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); /* try from cache first */ if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // put in cache if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } });}
熔断判断
这里是比拟重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback办法,如果没有开启熔断,他还会去判断是否能申请到信号量,申请不到就调用handleSemaphoreRejectionViaFallback办法。如果都失常,就调用executeCommandAndObserve办法。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { //其余略 if (circuitBreaker.allowRequest()) { //其余略 if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { // return handleShortCircuitViaFallback(); }}
熔断开启
咱们先看看熔断开启后的调用,会通过getFallbackObservable办法获取fallbackExecutionChain,getFallbackObservable次要的作用就是调用getFallback办法。
private Observable<R> handleShortCircuitViaFallback() { // 其余略 return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited", shortCircuitException); // 其余略}private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { // 其余略 fallbackExecutionChain = getFallbackObservable(); // 其余略 return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .lift(new DeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } // 其余略}@Overridefinal protected Observable<R> getFallbackObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(getFallback()); } catch (Throwable ex) { return Observable.error(ex); } } });}
getFallback是调用fallback办法的入口,MethodExecutionAction会通过发射,调用咱们配置的办法。
@Overrideprotected Object getFallback() { final CommandAction commandAction = getFallbackAction(); if (commandAction != null) { try { return process(new Action() { @Override Object execute() { MetaHolder metaHolder = commandAction.getMetaHolder(); Object[] args = createArgsForFallback(metaHolder, getExecutionException()); return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args); } }); } catch (Throwable e) { LOGGER.error(FallbackErrorMessageBuilder.create() .append(commandAction, e).build()); throw new FallbackInvocationException(unwrapCause(e)); } } else { return super.getFallback(); }}
信号量隔离
咱们看了下面熔断开启的代码,这边就比较简单,就是跟下面一样,调用getFallbackOrThrowException办法。
private Observable<R> handleSemaphoreRejectionViaFallback() { // 其余略 return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution", semaphoreRejectionException);}
线程池隔离
如果熔断没开,信号量又能够获取到,他就会调用executeCommandAndObserve办法。这个办法,handleFallback定义了几个异样,比方线程池异样解决,工夫超时解决,申请异样解决,以及其余异样解决。而后调用executeCommandWithSpecifiedIsolation办法。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { //其余略 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; //其余略 Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext);}
这个办法次要是两件事件,一个是subscribeOn办法,这里是线程池隔离用的,另外一个就是失常状况下的调用,调用的是getUserExecutionObservable办法,这个办法在线程池前面讲。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 其余略 return getUserExecutionObservable(_cmd); // 其余略 } })// 其余略 }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } // 其余略}
创立相干类
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { // 动静配置线程池信息 touchConfig(); // 创立一个HystrixContextScheduler对象,这里又会创立一个ThreadPoolScheduler return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);}
这里创立一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创立ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { // 动静配置线程池信息 touchConfig(); // 创立一个HystrixContextScheduler对象,这里又会创立一个ThreadPoolScheduler return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);}public Worker createWorker() { return new HystrixContextSchedulerWorker(actualScheduler.createWorker());}
线程池判断的中央在HystrixContextSchedulerWorker的schedule办法,不够就抛RejectedExecutionException异样,异样的捕捉下面曾经讲了。
public Subscription schedule(Action0 action) { if (threadPool != null) { // 判断线程池线程的中央 if (!threadPool.isQueueSpaceAvailable()) { throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); } } return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));}
失常调用
如果下面都失常,就会调用getUserExecutionObservable办法。这个办法最初会调用run办法。
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { //其余略 userObservable = getExecutionObservable(); //其余略}@Overridefinal protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } })//其余略}
在这里,同样通过反射,调用到咱们的办法。
protected Object run() throws Exception { LOGGER.debug("execute command: {}", getCommandKey().name()); return process(new Action() { @Override Object execute() { return getCommandAction().execute(getExecutionType()); } });}
总结
下面讲了几个fallback调用的办法,以及失常的流程,流程图如下