调用Fallback有以下几种状况:

  • 熔断器开启
  • 信号量回绝
  • 线程池回绝
  • 执行办法失败

hystrix - @EnableCircuitBreaker那些事咱们晓得最终会调用HystrixCommand的execute办法,他这个办法就会调用queue办法。

public R execute() {    try {        return queue().get();    } catch (Exception e) {        throw Exceptions.sneakyThrow(decomposeException(e));    }}

queue办法如下,这里是拿到一个Future对象。重点还是toObservable办法。

 public Future<R> queue() {    // 其余略    final Future<R> delegate = toObservable().toBlocking().toFuture();    // 其余略    }

这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以前面就会调用applyHystrixSemantics办法。

public Observable<R> toObservable() {    //其余略    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {        @Override        public Observable<R> call() {            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {                return Observable.never();            }            return applyHystrixSemantics(_cmd);        }    };    //其余略    return Observable.defer(new Func0<Observable<R>>() {        @Override        public Observable<R> call() {             //其余略            final boolean requestCacheEnabled = isRequestCachingEnabled();            final String cacheKey = getCacheKey();            /* try from cache first */            if (requestCacheEnabled) {                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);                if (fromCache != null) {                    isResponseFromCache = true;                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);                }            }            Observable<R> hystrixObservable =                    Observable.defer(applyHystrixSemantics)                            .map(wrapWithAllOnNextHooks);            Observable<R> afterCache;            // put in cache            if (requestCacheEnabled && cacheKey != null) {                // wrap it for caching                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);                if (fromCache != null) {                    // another thread beat us so we'll use the cached value instead                    toCache.unsubscribe();                    isResponseFromCache = true;                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);                } else {                    // we just created an ObservableCommand so we cast and return it                    afterCache = toCache.toObservable();                }            } else {                afterCache = hystrixObservable;            }            return afterCache                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once                    .doOnCompleted(fireOnCompletedHook);        }    });}

熔断判断

这里是比拟重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback办法,如果没有开启熔断,他还会去判断是否能申请到信号量,申请不到就调用handleSemaphoreRejectionViaFallback办法。如果都失常,就调用executeCommandAndObserve办法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {    //其余略    if (circuitBreaker.allowRequest()) {        //其余略        if (executionSemaphore.tryAcquire()) {            try {                /* used to track userThreadExecutionTime */                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());                return executeCommandAndObserve(_cmd)                        .doOnError(markExceptionThrown)                        .doOnTerminate(singleSemaphoreRelease)                        .doOnUnsubscribe(singleSemaphoreRelease);            } catch (RuntimeException e) {                return Observable.error(e);            }        } else {            return handleSemaphoreRejectionViaFallback();        }    } else {        //         return handleShortCircuitViaFallback();    }}

熔断开启

咱们先看看熔断开启后的调用,会通过getFallbackObservable办法获取fallbackExecutionChain,getFallbackObservable次要的作用就是调用getFallback办法。

private Observable<R> handleShortCircuitViaFallback() {    // 其余略    return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,                "short-circuited", shortCircuitException);    // 其余略}private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {    // 其余略    fallbackExecutionChain = getFallbackObservable();    // 其余略                return fallbackExecutionChain            .doOnEach(setRequestContext)            .lift(new FallbackHookApplication(_cmd))            .lift(new DeprecatedOnFallbackHookApplication(_cmd))            .doOnNext(markFallbackEmit)            .doOnCompleted(markFallbackCompleted)            .onErrorResumeNext(handleFallbackError)            .doOnTerminate(singleSemaphoreRelease)            .doOnUnsubscribe(singleSemaphoreRelease);        }    // 其余略}@Overridefinal protected Observable<R> getFallbackObservable() {    return Observable.defer(new Func0<Observable<R>>() {        @Override        public Observable<R> call() {            try {                return Observable.just(getFallback());            } catch (Throwable ex) {                return Observable.error(ex);            }        }    });}

getFallback是调用fallback办法的入口,MethodExecutionAction会通过发射,调用咱们配置的办法。

@Overrideprotected Object getFallback() {    final CommandAction commandAction = getFallbackAction();    if (commandAction != null) {        try {            return process(new Action() {                @Override                Object execute() {                    MetaHolder metaHolder = commandAction.getMetaHolder();                    Object[] args = createArgsForFallback(metaHolder, getExecutionException());                    return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);                }            });        } catch (Throwable e) {            LOGGER.error(FallbackErrorMessageBuilder.create()                    .append(commandAction, e).build());            throw new FallbackInvocationException(unwrapCause(e));        }    } else {        return super.getFallback();    }}

信号量隔离

咱们看了下面熔断开启的代码,这边就比较简单,就是跟下面一样,调用getFallbackOrThrowException办法。

private Observable<R> handleSemaphoreRejectionViaFallback() {    // 其余略    return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,            "could not acquire a semaphore for execution", semaphoreRejectionException);}

线程池隔离

如果熔断没开,信号量又能够获取到,他就会调用executeCommandAndObserve办法。这个办法,handleFallback定义了几个异样,比方线程池异样解决,工夫超时解决,申请异样解决,以及其余异样解决。而后调用executeCommandWithSpecifiedIsolation办法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {    //其余略    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {        @Override        public Observable<R> call(Throwable t) {            Exception e = getExceptionFromThrowable(t);            executionResult = executionResult.setExecutionException(e);            if (e instanceof RejectedExecutionException) {                return handleThreadPoolRejectionViaFallback(e);            } else if (t instanceof HystrixTimeoutException) {                return handleTimeoutViaFallback();            } else if (t instanceof HystrixBadRequestException) {                return handleBadRequestByEmittingError(e);            } else {                /*                 * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.                 */                if (e instanceof HystrixBadRequestException) {                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);                    return Observable.error(e);                }                return handleFailureViaFallback(e);            }        }    };     //其余略    Observable<R> execution;    if (properties.executionTimeoutEnabled().get()) {        execution = executeCommandWithSpecifiedIsolation(_cmd)                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));    } else {        execution = executeCommandWithSpecifiedIsolation(_cmd);    }    return execution.doOnNext(markEmits)            .doOnCompleted(markOnCompleted)            .onErrorResumeNext(handleFallback)            .doOnEach(setRequestContext);}

这个办法次要是两件事件,一个是subscribeOn办法,这里是线程池隔离用的,另外一个就是失常状况下的调用,调用的是getUserExecutionObservable办法,这个办法在线程池前面讲。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)        return Observable.defer(new Func0<Observable<R>>() {            @Override            public Observable<R> call() {                // 其余略                return getUserExecutionObservable(_cmd);                // 其余略                }        })// 其余略            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {            @Override            public Boolean call() {                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;            }        }));    }     // 其余略}

创立相干类

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {    // 动静配置线程池信息    touchConfig();    // 创立一个HystrixContextScheduler对象,这里又会创立一个ThreadPoolScheduler    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);}

这里创立一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创立ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {    // 动静配置线程池信息    touchConfig();    // 创立一个HystrixContextScheduler对象,这里又会创立一个ThreadPoolScheduler    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);}public Worker createWorker() {    return new HystrixContextSchedulerWorker(actualScheduler.createWorker());}

线程池判断的中央在HystrixContextSchedulerWorker的schedule办法,不够就抛RejectedExecutionException异样,异样的捕捉下面曾经讲了。

public Subscription schedule(Action0 action) {    if (threadPool != null) {        // 判断线程池线程的中央        if (!threadPool.isQueueSpaceAvailable()) {            throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");        }    }    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));}

失常调用

如果下面都失常,就会调用getUserExecutionObservable办法。这个办法最初会调用run办法。

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {    //其余略    userObservable = getExecutionObservable();    //其余略}@Overridefinal protected Observable<R> getExecutionObservable() {    return Observable.defer(new Func0<Observable<R>>() {        @Override        public Observable<R> call() {            try {                return Observable.just(run());            } catch (Throwable ex) {                return Observable.error(ex);            }        }    })//其余略}

在这里,同样通过反射,调用到咱们的办法。

protected Object run() throws Exception {    LOGGER.debug("execute command: {}", getCommandKey().name());    return process(new Action() {        @Override        Object execute() {            return getCommandAction().execute(getExecutionType());        }    });}

总结

下面讲了几个fallback调用的办法,以及失常的流程,流程图如下