先温习下Hystrix的整体流程
- 结构一个 HystrixCommand或HystrixObservableCommand对象,用于封装申请,并在构造方法配置申请被执行须要的参数;
- 执行命令,Hystrix提供了4种执行命令的办法
- 判断是否应用缓存响应申请,若启用了缓存,且缓存可用,间接应用缓存响应申请。Hystrix反对申请缓存,但须要用户自定义启动;
- 判断熔断器是否关上,如果关上,执行第8步;
- 判断线程池/队列/信号量是否已满,已满则执行第8步;
- 执行HystrixObservableCommand.construct()或HystrixCommand.run(),如果执行失败或者超时,执行第8步;否则,跳到第9步;
- 统计熔断器监控指标;
- 走Fallback备用逻辑
- 返回申请响应
一,execute办法剖析
承接上篇,在HystrixCommandAspect这个切面里会创立HystrixInvokable对象,进而执行。
Object result; try { if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause() != null ? e.getCause() : e; } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); }
这里就来剖析下execute的流程。Hystrix是反对同步,异步,察看这个三个模式的,咱们只看同步,调用链路是:HystrixCommand.execute() -> queue() -> toObservable()
public Observable<R> toObservable() { .... 一些action的定义 .... final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { public Observable<R> call() { if(this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED)){ return Observable.never() }else{ applyHystrixSemantics(AbstractCommand.this); } } }; ... return Observable.defer(new Func0<Observable<R>>() { public Observable<R> call() { ...判断是否开启缓存,对应上整体流程的3步... boolean requestCacheEnabled = AbstractCommand.this.isRequestCachingEnabled(); String cacheKey = AbstractCommand.this.getCacheKey(); if (requestCacheEnabled) { //拿去缓存,如果存在缓存的话,间接返回 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks); Observable afterCache; if (requestCacheEnabled && cacheKey != null) { ... 缓存后续的一些判断..... } else { afterCache = hystrixObservable; } return afterCache.doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); } });}
call外面的办法主要用途:
- 判断一下是否开启了缓存,如果开启了就间接返回
- 没有开启或者还没有缓存的时候就执行Observable.defer(applyHystrixSemantics),执行后返回。
熔断器敞开或关上的判断,这对应结尾整体流程的第4步。
private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) { this.executionHook.onStart(_cmd); //判读是不是熔断了。 if (this.circuitBreaker.allowRequest()) { /** *如果应用的是信号量返回TryableSemaphoreActual,不是返回 *TryableSemaphoreNoOp,TryableSemaphoreNoOp.tryAcquire()永远都是返回true */ final TryableSemaphore executionSemaphore = getExecutionSemaphore(); 。。。 //信号量的管制 if (executionSemaphore.tryAccaquire()) { try { this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis()); //如果都胜利的话会执行executeCommandAndObserve return this.executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException var7) { return Observable.error(var7); } } else { return this.handleSemaphoreRejectionViaFallback(); } } else {//执行熔断后的逻辑 return this.handleShortCircuitViaFallback(); } }
二,熔断器降级剖析
接着剖析this.circuitBreaker.allowRequest()
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; //熔断器是否开启 /* track whether this circuit is open/closed at any given point in time (default to false==closed) */ private AtomicBoolean circuitOpen = new AtomicBoolean(false); /* when the circuit was marked open or was last allowed to try a 'singleTest' */ private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; } //当半开半闭状态下,如果这次申请胜利而了,则把熔断器设为false,且让统计指标reset public void markSuccess() { if (circuitOpen.get()) { if (circuitOpen.compareAndSet(true, false)) { //win the thread race to reset metrics //Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view, //and all other metric consumers are unaffected by the reset metrics.resetStream(); } } } @Override public boolean allowRequest() { //判断是否强制关上熔断器 if (properties.circuitBreakerForceOpen().get()) { return false; } //是否强制敞开熔断器 if (properties.circuitBreakerForceClosed().get()) { isOpen(); return true; } return !isOpen() || allowSingleTest(); } public boolean allowSingleTest() { long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 1) if the circuit is open // 2) and it's been longer than 'sleepWindow' since we opened the circuit //熔断器是开启的,且以后工夫比开启熔断器的工夫加上sleepWindow工夫还要长 if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try. // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'. //设置以后工夫到timeCircuitOpenedOrWasLastTested, //如果半开半闭的状态下,如果这次申请胜利了则会调用markSuccess,让熔断器状态设为false, //如果不胜利,就不须要了。 //案例:半开半合状态下,熔断开启工夫为00:00:00,sleepWindow为10s,如果00:00:15秒的时候调用,如果调用失败, //在00:00:15至00:00:25秒这个区间都是熔断的, if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { // if this returns true that means we set the time so we'll return true to allow the singleTest // if it returned false it means another thread raced us and allowed the singleTest before we did return true; } } return false; } @Override public boolean isOpen() { //判断是否熔断了,circuitOpen是熔断的状态 ,true为熔断,false为不熔断 if (circuitOpen.get()) { return true; } //获取统计到的指标信息 HealthCounts health = metrics.getHealthCounts(); // 一个工夫窗口(默认10s钟)总申请次数是否大于circuitBreakerRequestVolumeThreshold 默认为20s if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { return false; } // 错误率(总谬误次数/总申请次数)小于circuitBreakerErrorThresholdPercentage(默认50%) if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { // 反之,熔断状态将从CLOSED变为OPEN,且circuitOpened==>以后工夫戳 if (circuitOpen.compareAndSet(false, true)) { //并且把以后工夫设置到circuitOpenedOrLastTestedTime,可待前面的工夫的比照 circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true; } else { return true; } } } }
- 判断是否强制开启熔断器和强制敞开熔断器
- 先判断熔断是否开启,而后判断是否须要熔断,如果须要熔断则共性熔断状态并重置熔断工夫为以后工夫。熔断的条件是:
1)工夫窗口内(默认10s钟)总申请次数大于20次
2)工夫窗口内(默认10s钟)失败率大于50% - 熔断的状况下就执行allowSingleTest,让开启熔断的都能往下执行。能够执行的条件是:
1)circuitOpen.get() 为true,确保是一般的熔断,而不是强制熔断
2)以后工夫比开启熔断器的工夫加上sleepWindow工夫还要长 - 在半开半必的状态下申请胜利了,再调用markSuccess()办法,从而将熔断器敞开并从新统计各项指标
allowSingleTest返回true的简略的能够叫为半开半闭状态
三,信号量隔离的剖析
这个对应整体流程里的第5步
/* package */static class TryableSemaphoreActual implements TryableSemaphore { protected final HystrixProperty<Integer> numberOfPermits; private final AtomicInteger count = new AtomicInteger(0); public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) { this.numberOfPermits = numberOfPermits; } @Override public boolean tryAcquire() { int currentCount = count.incrementAndGet(); if (currentCount > numberOfPermits.get()) { count.decrementAndGet(); return false; } else { return true; } } } /* package */static class TryableSemaphoreNoOp implements TryableSemaphore { public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp(); @Override public boolean tryAcquire() { return true; } }
开启了信号量隔离,TryableSemaphoreActual会把信号量减少1,如果currentCount > numberOfPermits.get()的时候就返回false,信号量降级。
没有开启信号量隔离,TryableSemaphoreNoOp.tryAcquire()永远都是返回true。
如果没熔断,没应用信号量,则会往下执行executeCommandAndObserve。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); .... Observable<R> execution; //判断是否超时隔离 if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } //markEmits,markOnCompleted,handleFallback,setRequestContext都是匿名外部类,都在这个办法里定义了, return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
四,超时隔离剖析
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { final AbstractCommand<R> originalCommand; public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { this.originalCommand = originalCommand; } @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); // if the child unsubscribes we unsubscribe our parent as well child.add(s); //超时的时候抛出new HystrixTimeoutException() final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); //设置定时调度 TimerListener listener = new TimerListener() { //定时触发的办法 @Override public void tick() { //把状态从未执行设为timeout if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // report timeout failure originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // shut down the original request s.unsubscribe(); timeoutRunnable.run(); } } //获取定时的的工夫 @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); // set externally so execute/queue can see this originalCommand.timeoutTimer.set(tl); /** * If this subscriber receives values it means the parent succeeded/completed */ Subscriber<R> parent = new Subscriber<R>() { ... }; // if s is unsubscribed we want to unsubscribe the parent s.add(parent); return parent; } } public Reference<TimerListener> addTimerListener(final TimerListener listener) { startThreadIfNeeded(); // add the listener Runnable r = new Runnable() { @Override public void run() { try { listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } };//getIntervalTimeInMilliseconds获取定时工夫 ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); }
次要逻辑,定义了一个定时器TimerListener,外面定时的工夫就是咱们设置的@HystrixCommand的超时的工夫。如果超时了执行:
- 把状态从NOT_EXECUTED设置为TIMED_OUT
- 发送TIMEOUT事件
- s.unsubscribe()勾销事件订阅
- timeoutRunnable.run();抛出timeoutRunnable异样
演绎一下就是设置了一个定时器,定时工夫是咱们设置的超时工夫,如果定时工夫到了,咱们就扭转相应的状态,发送相应的外部事件,勾销Obserable的订阅,抛出异样,而做到一个超时的隔离。
再看看executeCommandWithSpecifiedIsolation办法
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ... metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { ... return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { .... try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); //最初执行这个 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(...).doOnUnsubscribe(...) //这个办法是用于指定一个线程池去执行咱们被观察者observable触发时的办法 .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { ... } }
五,指定线程池执行办法
在executeCommandWithSpecifiedIsolation这个办法里的subscribeOn调用用于指定一个线程池去执行咱们被观察者observable触发时的办法
/* package */static class HystrixThreadPoolDefault implements HystrixThreadPool { private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class); private final HystrixThreadPoolProperties properties; private final BlockingQueue<Runnable> queue; private final ThreadPoolExecutor threadPool; private final HystrixThreadPoolMetrics metrics; private final int queueSize; ... @Override public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { touchConfig(); return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } //动静调整线程池的大小 // allow us to change things via fast-properties by setting it each time private void touchConfig() { final int dynamicCoreSize = properties.coreSize().get(); final int configuredMaximumSize = properties.maximumSize().get(); int dynamicMaximumSize = properties.actualMaximumSize(); final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get(); boolean maxTooLow = false; if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) { dynamicMaximumSize = dynamicCoreSize; maxTooLow = true; } // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed. if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) { ... threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); }}public class HystrixContextScheduler extends Scheduler { private final HystrixConcurrencyStrategy concurrencyStrategy; private final Scheduler actualScheduler; private final HystrixThreadPool threadPool; 。。。 public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.concurrencyStrategy = concurrencyStrategy; this.threadPool = threadPool; this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread); } @Override public Worker createWorker() { // 构建一个默认的Worker,这里的actualScheduler就是ThreadPoolScheduler //actualScheduler.createWorker()就是ThreadPoolWorker return new HystrixContextSchedulerWorker(actualScheduler.createWorker()); } //HystrixContextSchedulerWorker类 private class HystrixContextSchedulerWorker extends Worker { private final Worker worker; private HystrixContextSchedulerWorker(Worker actualWorker) { this.worker = actualWorker; } ... @Override public Subscription schedule(Action0 action) { if (threadPool != null) { if (!threadPool.isQueueSpaceAvailable()) { throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); } } //这里的worker其实就是ThreadPoolWorker return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action)); } } //ThreadPoolScheduler类 private static class ThreadPoolScheduler extends Scheduler { private final HystrixThreadPool threadPool; private final Func0<Boolean> shouldInterruptThread; public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.threadPool = threadPool; this.shouldInterruptThread = shouldInterruptThread; } @Override public Worker createWorker() { //默认的worker为:ThreadPoolWorker return new ThreadPoolWorker(threadPool, shouldInterruptThread); } } //ThreadPoolWorker类 private static class ThreadPoolWorker extends Worker { private final HystrixThreadPool threadPool; private final CompositeSubscription subscription = new CompositeSubscription(); private final Func0<Boolean> shouldInterruptThread; public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.threadPool = threadPool; this.shouldInterruptThread = shouldInterruptThread; } ... @Override public Subscription schedule(final Action0 action) { if (subscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } // This is internal RxJava API but it is too useful. ScheduledAction sa = new ScheduledAction(action); subscription.add(sa); sa.addParent(subscription); // 获取线程池 ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); // 将包装后的HystrixCommand submit到线程池,而后返回FutureTask FutureTask<?> f = (FutureTask<?>) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; } ... }}
几个外部类的作用:
- HystrixContextSchedulerWorker: 对外提供schedule()办法,这里会判断线程池队列是否曾经满,如果满了这会抛出异样:Rejected command because thread-pool queueSize is at rejection threshold。 如果配置的队列大小为-1 则默认返回true
- ThreadPoolScheduler:执行createWorker()办法,默认应用ThreadPoolWorker()类
- ThreadPoolWorker: 执行command的外围逻辑
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { userObservable = getExecutionObservable(); } catch (Throwable ex) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error userObservable = Observable.error(ex); } return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)); }
@Override final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { //能够看到run()办法了。 HystrixCommand.run()其实就是咱们本人写的代码里的办法 return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); }
最初能够看到会调用Observable.just(run()) ,这个就是咱们咱们本人写的代码里的办法,到这里就是咱们整体的执行过程了。