关于java:hystrix-Fallback是怎么调用的

调用Fallback有以下几种状况:

  • 熔断器开启
  • 信号量回绝
  • 线程池回绝
  • 执行办法失败

hystrix – @EnableCircuitBreaker那些事咱们晓得最终会调用HystrixCommand的execute办法,他这个办法就会调用queue办法。

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

queue办法如下,这里是拿到一个Future对象。重点还是toObservable办法。

 public Future<R> queue() {
    // 其余略
    final Future<R> delegate = toObservable().toBlocking().toFuture();
    // 其余略    
}

这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以前面就会调用applyHystrixSemantics办法。

public Observable<R> toObservable() {
    //其余略
    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            return applyHystrixSemantics(_cmd);
        }
    };
    //其余略
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
             //其余略
            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            /* try from cache first */
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }

            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // put in cache
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

熔断判断

这里是比拟重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback办法,如果没有开启熔断,他还会去判断是否能申请到信号量,申请不到就调用handleSemaphoreRejectionViaFallback办法。如果都失常,就调用executeCommandAndObserve办法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    //其余略
    if (circuitBreaker.allowRequest()) {
        //其余略

        if (executionSemaphore.tryAcquire()) {
            try {
                /* used to track userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        // 
        return handleShortCircuitViaFallback();
    }
}

熔断开启

咱们先看看熔断开启后的调用,会通过getFallbackObservable办法获取fallbackExecutionChain,getFallbackObservable次要的作用就是调用getFallback办法。

private Observable<R> handleShortCircuitViaFallback() {
    // 其余略
    return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                "short-circuited", shortCircuitException);
    // 其余略
}

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    // 其余略
    fallbackExecutionChain = getFallbackObservable();
    // 其余略            

    return fallbackExecutionChain
            .doOnEach(setRequestContext)
            .lift(new FallbackHookApplication(_cmd))
            .lift(new DeprecatedOnFallbackHookApplication(_cmd))
            .doOnNext(markFallbackEmit)
            .doOnCompleted(markFallbackCompleted)
            .onErrorResumeNext(handleFallbackError)
            .doOnTerminate(singleSemaphoreRelease)
            .doOnUnsubscribe(singleSemaphoreRelease);
    
    }
    // 其余略
}

@Override
final protected Observable<R> getFallbackObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    });
}

getFallback是调用fallback办法的入口,MethodExecutionAction会通过发射,调用咱们配置的办法。

@Override
protected Object getFallback() {
    final CommandAction commandAction = getFallbackAction();
    if (commandAction != null) {
        try {
            return process(new Action() {
                @Override
                Object execute() {
                    MetaHolder metaHolder = commandAction.getMetaHolder();
                    Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                    return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                }
            });
        } catch (Throwable e) {
            LOGGER.error(FallbackErrorMessageBuilder.create()
                    .append(commandAction, e).build());
            throw new FallbackInvocationException(unwrapCause(e));
        }
    } else {
        return super.getFallback();
    }
}

信号量隔离

咱们看了下面熔断开启的代码,这边就比较简单,就是跟下面一样,调用getFallbackOrThrowException办法。

private Observable<R> handleSemaphoreRejectionViaFallback() {
    // 其余略
    return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
            "could not acquire a semaphore for execution", semaphoreRejectionException);
}

线程池隔离

如果熔断没开,信号量又能够获取到,他就会调用executeCommandAndObserve办法。这个办法,handleFallback定义了几个异样,比方线程池异样解决,工夫超时解决,申请异样解决,以及其余异样解决。而后调用executeCommandWithSpecifiedIsolation办法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    //其余略
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) {
                return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) {
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {
                return handleBadRequestByEmittingError(e);
            } else {
                /*
                 * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                 */
                if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }

                return handleFailureViaFallback(e);
            }
        }
    };
 
    //其余略
    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

这个办法次要是两件事件,一个是subscribeOn办法,这里是线程池隔离用的,另外一个就是失常状况下的调用,调用的是getUserExecutionObservable办法,这个办法在线程池前面讲。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                // 其余略
                return getUserExecutionObservable(_cmd);
                // 其余略    
            }
        })// 其余略    
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } 
    // 其余略
}

创立相干类

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    // 动静配置线程池信息
    touchConfig();
    // 创立一个HystrixContextScheduler对象,这里又会创立一个ThreadPoolScheduler
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

这里创立一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创立ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    // 动静配置线程池信息
    touchConfig();
    // 创立一个HystrixContextScheduler对象,这里又会创立一个ThreadPoolScheduler
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

public Worker createWorker() {
    return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

线程池判断的中央在HystrixContextSchedulerWorker的schedule办法,不够就抛RejectedExecutionException异样,异样的捕捉下面曾经讲了。

public Subscription schedule(Action0 action) {
    if (threadPool != null) {
        // 判断线程池线程的中央
        if (!threadPool.isQueueSpaceAvailable()) {
            throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
        }
    }
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}

失常调用

如果下面都失常,就会调用getUserExecutionObservable办法。这个办法最初会调用run办法。

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    //其余略
    userObservable = getExecutionObservable();
    //其余略
}

@Override
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    })//其余略
}

在这里,同样通过反射,调用到咱们的办法。

protected Object run() throws Exception {
    LOGGER.debug("execute command: {}", getCommandKey().name());
    return process(new Action() {
        @Override
        Object execute() {
            return getCommandAction().execute(getExecutionType());
        }
    });
}

总结

下面讲了几个fallback调用的办法,以及失常的流程,流程图如下

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理