关于java:hystrix-Fallback是怎么调用的

26次阅读

共计 8872 个字符,预计需要花费 23 分钟才能阅读完成。

调用 Fallback 有以下几种状况:

  • 熔断器开启
  • 信号量回绝
  • 线程池回绝
  • 执行办法失败

hystrix – @EnableCircuitBreaker 那些事咱们晓得最终会调用 HystrixCommand 的 execute 办法,他这个办法就会调用 queue 办法。

public R execute() {
    try {return queue().get();} catch (Exception e) {throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

queue 办法如下,这里是拿到一个 Future 对象。重点还是 toObservable 办法。

 public Future<R> queue() {
    // 其余略
    final Future<R> delegate = toObservable().toBlocking().toFuture();
    // 其余略    
}

这里会定义 Observable,他会判断是否要从缓存取值,如果没有,afterCache 就取 applyHystrixSemantics,所以前面就会调用 applyHystrixSemantics 办法。

public Observable<R> toObservable() {
    // 其余略
    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {return Observable.never();
            }
            return applyHystrixSemantics(_cmd);
        }
    };
    // 其余略
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
             // 其余略
            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            /* try from cache first */
            if (requestCacheEnabled) {HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }

            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // put in cache
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();}
            } else {afterCache = hystrixObservable;}

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

熔断判断

这里是比拟重要的代码,首先判断是否开启熔断,如果开启了,就调用 handleShortCircuitViaFallback 办法,如果没有开启熔断,他还会去判断是否能申请到信号量,申请不到就调用 handleSemaphoreRejectionViaFallback 办法。如果都失常,就调用 executeCommandAndObserve 办法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // 其余略
    if (circuitBreaker.allowRequest()) {
        // 其余略

        if (executionSemaphore.tryAcquire()) {
            try {
                /* used to track userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {return Observable.error(e);
            }
        } else {return handleSemaphoreRejectionViaFallback();
        }
    } else {
        // 
        return handleShortCircuitViaFallback();}
}

熔断开启

咱们先看看熔断开启后的调用,会通过 getFallbackObservable 办法获取 fallbackExecutionChain,getFallbackObservable 次要的作用就是调用 getFallback 办法。

private Observable<R> handleShortCircuitViaFallback() {
    // 其余略
    return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                "short-circuited", shortCircuitException);
    // 其余略
}

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    // 其余略
    fallbackExecutionChain = getFallbackObservable();
    // 其余略            

    return fallbackExecutionChain
            .doOnEach(setRequestContext)
            .lift(new FallbackHookApplication(_cmd))
            .lift(new DeprecatedOnFallbackHookApplication(_cmd))
            .doOnNext(markFallbackEmit)
            .doOnCompleted(markFallbackCompleted)
            .onErrorResumeNext(handleFallbackError)
            .doOnTerminate(singleSemaphoreRelease)
            .doOnUnsubscribe(singleSemaphoreRelease);
    
    }
    // 其余略
}

@Override
final protected Observable<R> getFallbackObservable() {return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {return Observable.just(getFallback());
            } catch (Throwable ex) {return Observable.error(ex);
            }
        }
    });
}

getFallback 是调用 fallback 办法的入口,MethodExecutionAction 会通过发射,调用咱们配置的办法。

@Override
protected Object getFallback() {final CommandAction commandAction = getFallbackAction();
    if (commandAction != null) {
        try {return process(new Action() {
                @Override
                Object execute() {MetaHolder metaHolder = commandAction.getMetaHolder();
                    Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                    return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                }
            });
        } catch (Throwable e) {LOGGER.error(FallbackErrorMessageBuilder.create()
                    .append(commandAction, e).build());
            throw new FallbackInvocationException(unwrapCause(e));
        }
    } else {return super.getFallback();
    }
}

信号量隔离

咱们看了下面熔断开启的代码,这边就比较简单,就是跟下面一样,调用 getFallbackOrThrowException 办法。

private Observable<R> handleSemaphoreRejectionViaFallback() {
    // 其余略
    return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
            "could not acquire a semaphore for execution", semaphoreRejectionException);
}

线程池隔离

如果熔断没开,信号量又能够获取到,他就会调用 executeCommandAndObserve 办法。这个办法,handleFallback 定义了几个异样,比方线程池异样解决,工夫超时解决,申请异样解决,以及其余异样解决。而后调用 executeCommandWithSpecifiedIsolation 办法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    // 其余略
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) {return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);
            } else {
                /*
                 * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                 */
                if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }

                return handleFailureViaFallback(e);
            }
        }
    };
 
    // 其余略
    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

这个办法次要是两件事件,一个是 subscribeOn 办法,这里是线程池隔离用的,另外一个就是失常状况下的调用,调用的是 getUserExecutionObservable 办法,这个办法在线程池前面讲。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                // 其余略
                return getUserExecutionObservable(_cmd);
                // 其余略    
            }
        })// 其余略    
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } 
    // 其余略
}

创立相干类

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    // 动静配置线程池信息
    touchConfig();
    // 创立一个 HystrixContextScheduler 对象,这里又会创立一个 ThreadPoolScheduler
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

这里创立一个 HystrixContextScheduler 对象,ThreadPoolScheduler 对象用于创立 ThreadPoolWorker,并赋值给 HystrixContextSchedulerWorker

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    // 动静配置线程池信息
    touchConfig();
    // 创立一个 HystrixContextScheduler 对象,这里又会创立一个 ThreadPoolScheduler
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

public Worker createWorker() {return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

线程池判断的中央在 HystrixContextSchedulerWorker 的 schedule 办法,不够就抛 RejectedExecutionException 异样,异样的捕捉下面曾经讲了。

public Subscription schedule(Action0 action) {if (threadPool != null) {
        // 判断线程池线程的中央
        if (!threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
        }
    }
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}

失常调用

如果下面都失常,就会调用 getUserExecutionObservable 办法。这个办法最初会调用 run 办法。

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    // 其余略
    userObservable = getExecutionObservable();
    // 其余略
}

@Override
final protected Observable<R> getExecutionObservable() {return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {return Observable.just(run());
            } catch (Throwable ex) {return Observable.error(ex);
            }
        }
    })// 其余略
}

在这里,同样通过反射,调用到咱们的办法。

protected Object run() throws Exception {LOGGER.debug("execute command: {}", getCommandKey().name());
    return process(new Action() {
        @Override
        Object execute() {return getCommandAction().execute(getExecutionType());
        }
    });
}

总结

下面讲了几个 fallback 调用的办法,以及失常的流程,流程图如下

正文完
 0