先温习下Hystrix的整体流程

  1. 结构一个 HystrixCommand或HystrixObservableCommand对象,用于封装申请,并在构造方法配置申请被执行须要的参数;
  2. 执行命令,Hystrix提供了4种执行命令的办法
  3. 判断是否应用缓存响应申请,若启用了缓存,且缓存可用,间接应用缓存响应申请。Hystrix反对申请缓存,但须要用户自定义启动;
  4. 判断熔断器是否关上,如果关上,执行第8步;
  5. 判断线程池/队列/信号量是否已满,已满则执行第8步;
  6. 执行HystrixObservableCommand.construct()或HystrixCommand.run(),如果执行失败或者超时,执行第8步;否则,跳到第9步;
  7. 统计熔断器监控指标;
  8. 走Fallback备用逻辑
  9. 返回申请响应

一,execute办法剖析

承接上篇,在HystrixCommandAspect这个切面里会创立HystrixInvokable对象,进而执行。

    Object result;        try {            if (!metaHolder.isObservable()) {                result = CommandExecutor.execute(invokable, executionType, metaHolder);            } else {                result = executeObservable(invokable, executionType, metaHolder);            }        } catch (HystrixBadRequestException e) {            throw e.getCause() != null ? e.getCause() : e;        } catch (HystrixRuntimeException e) {            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);        }

这里就来剖析下execute的流程。Hystrix是反对同步,异步,察看这个三个模式的,咱们只看同步,调用链路是:HystrixCommand.execute() -> queue() -> toObservable()

  public Observable<R> toObservable() {    .... 一些action的定义 .... final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {        public Observable<R> call() {                if(this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED)){                    return Observable.never()                 }else{                    applyHystrixSemantics(AbstractCommand.this);                }            }        };                ...        return Observable.defer(new Func0<Observable<R>>() {            public Observable<R> call() {                ...判断是否开启缓存,对应上整体流程的3步...                boolean requestCacheEnabled = AbstractCommand.this.isRequestCachingEnabled();                String cacheKey = AbstractCommand.this.getCacheKey();                if (requestCacheEnabled) {                        //拿去缓存,如果存在缓存的话,间接返回                         HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);                    if (fromCache != null) {                        isResponseFromCache = true;                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);                    }                }                Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);                Observable afterCache;                if (requestCacheEnabled && cacheKey != null) {                    ... 缓存后续的一些判断.....                } else {                    afterCache = hystrixObservable;                }                return     afterCache.doOnTerminate(terminateCommandCleanup)                    .doOnUnsubscribe(unsubscribeCommandCleanup)                    .doOnCompleted(fireOnCompletedHook);            }        });}

call外面的办法主要用途:

  • 判断一下是否开启了缓存,如果开启了就间接返回
  • 没有开启或者还没有缓存的时候就执行Observable.defer(applyHystrixSemantics),执行后返回。

熔断器敞开或关上的判断,这对应结尾整体流程的第4步。

    private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {        this.executionHook.onStart(_cmd);        //判读是不是熔断了。        if (this.circuitBreaker.allowRequest()) {            /**             *如果应用的是信号量返回TryableSemaphoreActual,不是返回                     *TryableSemaphoreNoOp,TryableSemaphoreNoOp.tryAcquire()永远都是返回true              */           final TryableSemaphore executionSemaphore = getExecutionSemaphore();            。。。            //信号量的管制            if (executionSemaphore.tryAccaquire()) {                try {                    this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());                       //如果都胜利的话会执行executeCommandAndObserve                    return this.executeCommandAndObserve(_cmd)                        .doOnError(markExceptionThrown)                        .doOnTerminate(singleSemaphoreRelease)                        .doOnUnsubscribe(singleSemaphoreRelease);                } catch (RuntimeException var7) {                    return Observable.error(var7);                }            } else {                return this.handleSemaphoreRejectionViaFallback();            }        } else {//执行熔断后的逻辑            return this.handleShortCircuitViaFallback();        }    }

二,熔断器降级剖析

接着剖析this.circuitBreaker.allowRequest()

static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {        private final HystrixCommandProperties properties;        private final HystrixCommandMetrics metrics;        //熔断器是否开启        /* track whether this circuit is open/closed at any given point in time (default to false==closed) */        private AtomicBoolean circuitOpen = new AtomicBoolean(false);        /* when the circuit was marked open or was last allowed to try a 'singleTest' */        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {            this.properties = properties;            this.metrics = metrics;        }        //当半开半闭状态下,如果这次申请胜利而了,则把熔断器设为false,且让统计指标reset        public void markSuccess() {            if (circuitOpen.get()) {                if (circuitOpen.compareAndSet(true, false)) {                    //win the thread race to reset metrics                    //Unsubscribe from the current stream to reset the health counts stream.  This only affects the health counts view,                    //and all other metric consumers are unaffected by the reset                    metrics.resetStream();                }            }        }        @Override        public boolean allowRequest() {            //判断是否强制关上熔断器            if (properties.circuitBreakerForceOpen().get()) {                return false;            }            //是否强制敞开熔断器            if (properties.circuitBreakerForceClosed().get()) {                isOpen();                return true;            }            return !isOpen() || allowSingleTest();        }            public boolean allowSingleTest() {            long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();            // 1) if the circuit is open            // 2) and it's been longer than 'sleepWindow' since we opened the circuit            //熔断器是开启的,且以后工夫比开启熔断器的工夫加上sleepWindow工夫还要长            if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {                // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.                // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.                //设置以后工夫到timeCircuitOpenedOrWasLastTested,                //如果半开半闭的状态下,如果这次申请胜利了则会调用markSuccess,让熔断器状态设为false,                //如果不胜利,就不须要了。                //案例:半开半合状态下,熔断开启工夫为00:00:00,sleepWindow为10s,如果00:00:15秒的时候调用,如果调用失败,                //在00:00:15至00:00:25秒这个区间都是熔断的,                if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {                    // if this returns true that means we set the time so we'll return true to allow the singleTest                    // if it returned false it means another thread raced us and allowed the singleTest before we did                    return true;                }            }            return false;        }        @Override        public boolean isOpen() {            //判断是否熔断了,circuitOpen是熔断的状态 ,true为熔断,false为不熔断            if (circuitOpen.get()) {                return true;            }            //获取统计到的指标信息            HealthCounts health = metrics.getHealthCounts();             // 一个工夫窗口(默认10s钟)总申请次数是否大于circuitBreakerRequestVolumeThreshold 默认为20s            if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {                return false;            }            // 错误率(总谬误次数/总申请次数)小于circuitBreakerErrorThresholdPercentage(默认50%)            if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {                return false;            } else {                // 反之,熔断状态将从CLOSED变为OPEN,且circuitOpened==>以后工夫戳                if (circuitOpen.compareAndSet(false, true)) {                    //并且把以后工夫设置到circuitOpenedOrLastTestedTime,可待前面的工夫的比照                    circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());                    return true;                } else {                    return true;                }            }        }    }
  1. 判断是否强制开启熔断器和强制敞开熔断器
  2. 先判断熔断是否开启,而后判断是否须要熔断,如果须要熔断则共性熔断状态并重置熔断工夫为以后工夫。熔断的条件是:
    1)工夫窗口内(默认10s钟)总申请次数大于20次
    2)工夫窗口内(默认10s钟)失败率大于50%
  3. 熔断的状况下就执行allowSingleTest,让开启熔断的都能往下执行。能够执行的条件是:
    1)circuitOpen.get() 为true,确保是一般的熔断,而不是强制熔断
    2)以后工夫比开启熔断器的工夫加上sleepWindow工夫还要长
  4. 在半开半必的状态下申请胜利了,再调用markSuccess()办法,从而将熔断器敞开并从新统计各项指标
allowSingleTest返回true的简略的能够叫为半开半闭状态

三,信号量隔离的剖析

这个对应整体流程里的第5步

  /* package */static class TryableSemaphoreActual implements TryableSemaphore {        protected final HystrixProperty<Integer> numberOfPermits;        private final AtomicInteger count = new AtomicInteger(0);        public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {            this.numberOfPermits = numberOfPermits;        }        @Override        public boolean tryAcquire() {            int currentCount = count.incrementAndGet();            if (currentCount > numberOfPermits.get()) {                count.decrementAndGet();                return false;            } else {                return true;            }        }    }                    /* package */static class TryableSemaphoreNoOp implements TryableSemaphore {        public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();        @Override        public boolean tryAcquire() {            return true;        }    }

开启了信号量隔离,TryableSemaphoreActual会把信号量减少1,如果currentCount > numberOfPermits.get()的时候就返回false,信号量降级。
没有开启信号量隔离,TryableSemaphoreNoOp.tryAcquire()永远都是返回true。

如果没熔断,没应用信号量,则会往下执行executeCommandAndObserve。

    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();        ....        Observable<R> execution;        //判断是否超时隔离        if (properties.executionTimeoutEnabled().get()) {            execution = executeCommandWithSpecifiedIsolation(_cmd)                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));        } else {            execution = executeCommandWithSpecifiedIsolation(_cmd);        }        //markEmits,markOnCompleted,handleFallback,setRequestContext都是匿名外部类,都在这个办法里定义了,        return execution.doOnNext(markEmits)                .doOnCompleted(markOnCompleted)                .onErrorResumeNext(handleFallback)                .doOnEach(setRequestContext);    }

四,超时隔离剖析

  private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {        final AbstractCommand<R> originalCommand;        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {            this.originalCommand = originalCommand;        }        @Override        public Subscriber<? super R> call(final Subscriber<? super R> child) {            final CompositeSubscription s = new CompositeSubscription();            // if the child unsubscribes we unsubscribe our parent as well            child.add(s);            //超时的时候抛出new HystrixTimeoutException()            final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {                @Override                public void run() {                    child.onError(new HystrixTimeoutException());                }            });            //设置定时调度            TimerListener listener = new TimerListener() {                //定时触发的办法                @Override                public void tick() {                    //把状态从未执行设为timeout                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {                        // report timeout failure                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);                        // shut down the original request                        s.unsubscribe();                        timeoutRunnable.run();                    }                }                //获取定时的的工夫                @Override                public int getIntervalTimeInMilliseconds() {                    return originalCommand.properties.executionTimeoutInMilliseconds().get();                }            };            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);            // set externally so execute/queue can see this            originalCommand.timeoutTimer.set(tl);            /**             * If this subscriber receives values it means the parent succeeded/completed             */            Subscriber<R> parent = new Subscriber<R>() {                ...            };            // if s is unsubscribed we want to unsubscribe the parent            s.add(parent);            return parent;        }    }    public Reference<TimerListener> addTimerListener(final TimerListener listener) {        startThreadIfNeeded();        // add the listener        Runnable r = new Runnable() {            @Override            public void run() {                try {                    listener.tick();                } catch (Exception e) {                    logger.error("Failed while ticking TimerListener", e);                }            }        };//getIntervalTimeInMilliseconds获取定时工夫        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);        return new TimerReference(listener, f);    }

次要逻辑,定义了一个定时器TimerListener,外面定时的工夫就是咱们设置的@HystrixCommand的超时的工夫。如果超时了执行:

  1. 把状态从NOT_EXECUTED设置为TIMED_OUT
  2. 发送TIMEOUT事件
  3. s.unsubscribe()勾销事件订阅
  4. timeoutRunnable.run();抛出timeoutRunnable异样

演绎一下就是设置了一个定时器,定时工夫是咱们设置的超时工夫,如果定时工夫到了,咱们就扭转相应的状态,发送相应的外部事件,勾销Obserable的订阅,抛出异样,而做到一个超时的隔离。

再看看executeCommandWithSpecifiedIsolation办法

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)            return Observable.defer(new Func0<Observable<R>>() {                @Override                public Observable<R> call() {                      ...                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {                          ...                        return Observable.error(new RuntimeException("timed out before executing run()"));                    }                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {                               ....                        try {                            executionHook.onThreadStart(_cmd);                            executionHook.onRunStart(_cmd);                            executionHook.onExecutionStart(_cmd);                            //最初执行这个                            return getUserExecutionObservable(_cmd);                        } catch (Throwable ex) {                            return Observable.error(ex);                        }                    } else {                        //command has already been unsubscribed, so return immediately                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));                    }                }            }).doOnTerminate(...).doOnUnsubscribe(...)              //这个办法是用于指定一个线程池去执行咱们被观察者observable触发时的办法              .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {                @Override                public Boolean call() {                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;                }            }));        } else {          ...        }    }

五,指定线程池执行办法

在executeCommandWithSpecifiedIsolation这个办法里的subscribeOn调用用于指定一个线程池去执行咱们被观察者observable触发时的办法

    /* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {        private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);        private final HystrixThreadPoolProperties properties;        private final BlockingQueue<Runnable> queue;        private final ThreadPoolExecutor threadPool;        private final HystrixThreadPoolMetrics metrics;        private final int queueSize;        ...         @Override        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {            touchConfig();            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);        }        //动静调整线程池的大小        // allow us to change things via fast-properties by setting it each time        private void touchConfig() {            final int dynamicCoreSize = properties.coreSize().get();            final int configuredMaximumSize = properties.maximumSize().get();            int dynamicMaximumSize = properties.actualMaximumSize();            final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();            boolean maxTooLow = false;            if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {                dynamicMaximumSize = dynamicCoreSize;                maxTooLow = true;            }            // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.            if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {                  ...                threadPool.setCorePoolSize(dynamicCoreSize);                threadPool.setMaximumPoolSize(dynamicMaximumSize);            }            threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);        }}public class HystrixContextScheduler extends Scheduler {    private final HystrixConcurrencyStrategy concurrencyStrategy;    private final Scheduler actualScheduler;    private final HystrixThreadPool threadPool;        。。。      public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {        this.concurrencyStrategy = concurrencyStrategy;        this.threadPool = threadPool;        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);    }    @Override    public Worker createWorker() {             // 构建一个默认的Worker,这里的actualScheduler就是ThreadPoolScheduler        //actualScheduler.createWorker()就是ThreadPoolWorker        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());    }        //HystrixContextSchedulerWorker类    private class HystrixContextSchedulerWorker extends Worker {        private final Worker worker;        private HystrixContextSchedulerWorker(Worker actualWorker) {            this.worker = actualWorker;        }           ...        @Override        public Subscription schedule(Action0 action) {            if (threadPool != null) {                if (!threadPool.isQueueSpaceAvailable()) {                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");                }            }            //这里的worker其实就是ThreadPoolWorker            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));        }    }    //ThreadPoolScheduler类    private static class ThreadPoolScheduler extends Scheduler {        private final HystrixThreadPool threadPool;        private final Func0<Boolean> shouldInterruptThread;        public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {            this.threadPool = threadPool;            this.shouldInterruptThread = shouldInterruptThread;        }        @Override        public Worker createWorker() {            //默认的worker为:ThreadPoolWorker            return new ThreadPoolWorker(threadPool, shouldInterruptThread);        }    }    //ThreadPoolWorker类    private static class ThreadPoolWorker extends Worker {        private final HystrixThreadPool threadPool;        private final CompositeSubscription subscription = new CompositeSubscription();        private final Func0<Boolean> shouldInterruptThread;        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {            this.threadPool = threadPool;            this.shouldInterruptThread = shouldInterruptThread;        }        ...        @Override        public Subscription schedule(final Action0 action) {            if (subscription.isUnsubscribed()) {                // don't schedule, we are unsubscribed                return Subscriptions.unsubscribed();            }            // This is internal RxJava API but it is too useful.            ScheduledAction sa = new ScheduledAction(action);            subscription.add(sa);            sa.addParent(subscription);            // 获取线程池            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();            // 将包装后的HystrixCommand submit到线程池,而后返回FutureTask            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));            return sa;        }       ...    }}

几个外部类的作用:

  • HystrixContextSchedulerWorker: 对外提供schedule()办法,这里会判断线程池队列是否曾经满,如果满了这会抛出异样:Rejected command because thread-pool queueSize is at rejection threshold。 如果配置的队列大小为-1 则默认返回true
  • ThreadPoolScheduler:执行createWorker()办法,默认应用ThreadPoolWorker()类
  • ThreadPoolWorker: 执行command的外围逻辑
    private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {        Observable<R> userObservable;        try {            userObservable = getExecutionObservable();        } catch (Throwable ex) {            // the run() method is a user provided implementation so can throw instead of using Observable.onError            // so we catch it here and turn it into Observable.error            userObservable = Observable.error(ex);        }        return userObservable                .lift(new ExecutionHookApplication(_cmd))                .lift(new DeprecatedOnRunHookApplication(_cmd));    }
   @Override    final protected Observable<R> getExecutionObservable() {        return Observable.defer(new Func0<Observable<R>>() {            @Override            public Observable<R> call() {                try {                    //能够看到run()办法了。 HystrixCommand.run()其实就是咱们本人写的代码里的办法                    return Observable.just(run());                } catch (Throwable ex) {                    return Observable.error(ex);                }            }        }).doOnSubscribe(new Action0() {            @Override            public void call() {                // Save thread on which we get subscribed so that we can interrupt it later if needed                executionThread.set(Thread.currentThread());            }        });    }

最初能够看到会调用Observable.just(run()) ,这个就是咱们咱们本人写的代码里的办法,到这里就是咱们整体的执行过程了。