乐趣区

关于hystrix:hystrix源码分析二

先温习下 Hystrix 的整体流程

  1. 结构一个 HystrixCommand 或 HystrixObservableCommand 对象,用于封装申请,并在构造方法配置申请被执行须要的参数;
  2. 执行命令,Hystrix 提供了 4 种执行命令的办法
  3. 判断是否应用缓存响应申请,若启用了缓存,且缓存可用,间接应用缓存响应申请。Hystrix 反对申请缓存,但须要用户自定义启动;
  4. 判断熔断器是否关上,如果关上,执行第 8 步;
  5. 判断线程池 / 队列 / 信号量是否已满,已满则执行第 8 步;
  6. 执行 HystrixObservableCommand.construct()或 HystrixCommand.run(),如果执行失败或者超时,执行第 8 步;否则,跳到第 9 步;
  7. 统计熔断器监控指标;
  8. 走 Fallback 备用逻辑
  9. 返回申请响应

一,execute 办法剖析

承接上篇,在 HystrixCommandAspect 这个切面里会创立 HystrixInvokable 对象,进而执行。

    Object result;
        try {if (!metaHolder.isObservable()) {result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {throw e.getCause() != null ? e.getCause() : e;} catch (HystrixRuntimeException e) {throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }

这里就来剖析下 execute 的流程。Hystrix 是反对同步,异步,察看这个三个模式的,咱们只看同步,调用链路是:HystrixCommand.execute() -> queue() -> toObservable()

  public Observable<R> toObservable() {
    .... 一些 action 的定义 ....
 final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {public Observable<R> call() {if(this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED)){return Observable.never() 
                }else{applyHystrixSemantics(AbstractCommand.this);
                }
            }
        };
        
        ...
        return Observable.defer(new Func0<Observable<R>>() {public Observable<R> call() {
                ... 判断是否开启缓存,对应上整体流程的 3 步...
                boolean requestCacheEnabled = AbstractCommand.this.isRequestCachingEnabled();
                String cacheKey = AbstractCommand.this.getCacheKey();
                if (requestCacheEnabled) {
                        // 拿去缓存,如果存在缓存的话,间接返回
                         HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

                Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
                Observable afterCache;
                if (requestCacheEnabled && cacheKey != null) {... 缓存后续的一些判断.....} else {afterCache = hystrixObservable;}

                return     afterCache.doOnTerminate(terminateCommandCleanup)
                    .doOnUnsubscribe(unsubscribeCommandCleanup)
                    .doOnCompleted(fireOnCompletedHook);

            }
        });
}

call 外面的办法主要用途:

  • 判断一下是否开启了缓存,如果开启了就间接返回
  • 没有开启或者还没有缓存的时候就执行 Observable.defer(applyHystrixSemantics),执行后返回。

熔断器敞开或关上的判断,这对应结尾整体流程的第 4 步。

    private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {this.executionHook.onStart(_cmd);
        // 判读是不是熔断了。if (this.circuitBreaker.allowRequest()) {
            /**
             * 如果应用的是信号量返回 TryableSemaphoreActual,不是返回        
             *TryableSemaphoreNoOp,TryableSemaphoreNoOp.tryAcquire()永远都是返回 true
              */
           final TryableSemaphore executionSemaphore = getExecutionSemaphore();。。。// 信号量的管制
            if (executionSemaphore.tryAccaquire()) {
                try {this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
                       // 如果都胜利的话会执行 executeCommandAndObserve
                    return this.executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException var7) {return Observable.error(var7);
                }
            } else {return this.handleSemaphoreRejectionViaFallback();
            }
        } else {// 执行熔断后的逻辑
            return this.handleShortCircuitViaFallback();}
    }

二,熔断器降级剖析

接着剖析this.circuitBreaker.allowRequest()

static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        // 熔断器是否开启
        /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
        private AtomicBoolean circuitOpen = new AtomicBoolean(false);

        /* when the circuit was marked open or was last allowed to try a 'singleTest' */
        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;
        }

    
    // 当半开半闭状态下,如果这次申请胜利而了,则把熔断器设为 false, 且让统计指标 reset
        public void markSuccess() {if (circuitOpen.get()) {if (circuitOpen.compareAndSet(true, false)) {
                    //win the thread race to reset metrics
                    //Unsubscribe from the current stream to reset the health counts stream.  This only affects the health counts view,
                    //and all other metric consumers are unaffected by the reset
                    metrics.resetStream();}
            }
        }

        @Override
        public boolean allowRequest() {
            // 判断是否强制关上熔断器
            if (properties.circuitBreakerForceOpen().get()) {return false;}
            // 是否强制敞开熔断器
            if (properties.circuitBreakerForceClosed().get()) {isOpen();
                return true;
            }
            return !isOpen() || allowSingleTest();
        }

    
        public boolean allowSingleTest() {long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
            // 1) if the circuit is open
            // 2) and it's been longer than'sleepWindow' since we opened the circuit
            // 熔断器是开启的,且以后工夫比开启熔断器的工夫加上 sleepWindow 工夫还要长
            if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
                // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
                // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
                // 设置以后工夫到 timeCircuitOpenedOrWasLastTested,// 如果半开半闭的状态下,如果这次申请胜利了则会调用 markSuccess,让熔断器状态设为 false,
                // 如果不胜利,就不须要了。// 案例:半开半合状态下,熔断开启工夫为 00:00:00,sleepWindow 为 10s,如果 00:00:15 秒的时候调用,如果调用失败,// 在 00:00:15 至 00:00:25 秒这个区间都是熔断的,if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                    // if this returns true that means we set the time so we'll return true to allow the singleTest
                    // if it returned false it means another thread raced us and allowed the singleTest before we did
                    return true;
                }
            }
            return false;
        }

        @Override
        public boolean isOpen() {
            // 判断是否熔断了,circuitOpen 是熔断的状态,true 为熔断,false 为不熔断
            if (circuitOpen.get()) {return true;}

            // 获取统计到的指标信息
            HealthCounts health = metrics.getHealthCounts();
             // 一个工夫窗口 (默认 10s 钟) 总申请次数是否大于 circuitBreakerRequestVolumeThreshold 默认为 20s
            if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {return false;}
            // 错误率 (总谬误次数 / 总申请次数) 小于 circuitBreakerErrorThresholdPercentage(默认 50%)
            if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {return false;} else {
                // 反之,熔断状态将从 CLOSED 变为 OPEN,且 circuitOpened==> 以后工夫戳
                if (circuitOpen.compareAndSet(false, true)) {
                    // 并且把以后工夫设置到 circuitOpenedOrLastTestedTime,可待前面的工夫的比照
                    circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                    return true;
                } else {return true;}
            }
        }

    }

  1. 判断是否强制开启熔断器和强制敞开熔断器
  2. 先判断熔断是否开启,而后判断是否须要熔断,如果须要熔断则共性熔断状态并重置熔断工夫为以后工夫。熔断的条件是:
    1)工夫窗口内 (默认 10s 钟) 总申请次数大于 20 次
    2) 工夫窗口内 (默认 10s 钟) 失败率大于 50%
  3. 熔断的状况下就执行 allowSingleTest,让开启熔断的都能往下执行。能够执行的条件是:
    1)circuitOpen.get() 为 true,确保是一般的熔断,而不是强制熔断
    2) 以后工夫比开启熔断器的工夫加上 sleepWindow 工夫还要长
  4. 在半开半必的状态下申请胜利了,再调用 markSuccess()办法,从而将熔断器敞开并从新统计各项指标

allowSingleTest 返回 true 的简略的能够叫为半开半闭状态

三,信号量隔离的剖析

这个对应整体流程里的第 5 步

  /* package */static class TryableSemaphoreActual implements TryableSemaphore {
        protected final HystrixProperty<Integer> numberOfPermits;
        private final AtomicInteger count = new AtomicInteger(0);

        public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {this.numberOfPermits = numberOfPermits;}

        @Override
        public boolean tryAcquire() {int currentCount = count.incrementAndGet();
            if (currentCount > numberOfPermits.get()) {count.decrementAndGet();
                return false;
            } else {return true;}
        }
    }
        
        
    /* package */static class TryableSemaphoreNoOp implements TryableSemaphore {public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();

        @Override
        public boolean tryAcquire() {return true;}
    }

开启了信号量隔离,TryableSemaphoreActual 会把信号量减少 1,如果 currentCount > numberOfPermits.get()的时候就返回 false, 信号量降级。
没有开启信号量隔离,TryableSemaphoreNoOp.tryAcquire()永远都是返回 true。

如果没熔断,没应用信号量,则会往下执行 executeCommandAndObserve。

    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
        ....
        Observable<R> execution;
        // 判断是否超时隔离
        if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        //markEmits,markOnCompleted,handleFallback,setRequestContext 都是匿名外部类,都在这个办法里定义了,return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

四,超时隔离剖析

  private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

        final AbstractCommand<R> originalCommand;

        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {this.originalCommand = originalCommand;}

        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) {final CompositeSubscription s = new CompositeSubscription();
            // if the child unsubscribes we unsubscribe our parent as well
            child.add(s);
            // 超时的时候抛出 new HystrixTimeoutException()
            final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
                @Override
                public void run() {child.onError(new HystrixTimeoutException());
                }
            });

            // 设置定时调度
            TimerListener listener = new TimerListener() {

                // 定时触发的办法
                @Override
                public void tick() {
                    // 把状态从未执行设为 timeout
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                        // shut down the original request
                        s.unsubscribe();
                        timeoutRunnable.run();}
                }
                // 获取定时的的工夫
                @Override
                public int getIntervalTimeInMilliseconds() {return originalCommand.properties.executionTimeoutInMilliseconds().get();}
            };

            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
            // set externally so execute/queue can see this
            originalCommand.timeoutTimer.set(tl);
            /**
             * If this subscriber receives values it means the parent succeeded/completed
             */
            Subscriber<R> parent = new Subscriber<R>() {...};

            // if s is unsubscribed we want to unsubscribe the parent
            s.add(parent);

            return parent;
        }

    }

    public Reference<TimerListener> addTimerListener(final TimerListener listener) {startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {listener.tick();
                } catch (Exception e) {logger.error("Failed while ticking TimerListener", e);
                }
            }
        };
//getIntervalTimeInMilliseconds 获取定时工夫
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }

次要逻辑,定义了一个定时器 TimerListener,外面定时的工夫就是咱们设置的 @HystrixCommand 的超时的工夫。如果超时了执行:

  1. 把状态从 NOT_EXECUTED 设置为 TIMED_OUT
  2. 发送 TIMEOUT 事件
  3. s.unsubscribe()勾销事件订阅
  4. timeoutRunnable.run(); 抛出 timeoutRunnable 异样

演绎一下就是设置了一个定时器,定时工夫是咱们设置的超时工夫,如果定时工夫到了,咱们就扭转相应的状态,发送相应的外部事件,勾销 Obserable 的订阅,抛出异样,而做到一个超时的隔离。

再看看 executeCommandWithSpecifiedIsolation 办法

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                      ...
                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                          ...
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                               ....
                        try {executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            // 最初执行这个
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {return Observable.error(ex);
                        }
                    } else {
                        //command has already been unsubscribed, so return immediately
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).doOnTerminate(...).doOnUnsubscribe(...)
              // 这个办法是用于指定一个线程池去执行咱们被观察者 observable 触发时的办法
              .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else {...}
    }

五,指定线程池执行办法

在 executeCommandWithSpecifiedIsolation 这个办法里的 subscribeOn 调用用于指定一个线程池去执行咱们被观察者 observable 触发时的办法

    /* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);

        private final HystrixThreadPoolProperties properties;
        private final BlockingQueue<Runnable> queue;
        private final ThreadPoolExecutor threadPool;
        private final HystrixThreadPoolMetrics metrics;
        private final int queueSize;

        ...
 

        @Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {touchConfig();
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }

        // 动静调整线程池的大小
        // allow us to change things via fast-properties by setting it each time
        private void touchConfig() {final int dynamicCoreSize = properties.coreSize().get();
            final int configuredMaximumSize = properties.maximumSize().get();
            int dynamicMaximumSize = properties.actualMaximumSize();
            final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
            boolean maxTooLow = false;

            if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
                dynamicMaximumSize = dynamicCoreSize;
                maxTooLow = true;
            }

            // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
            if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
                  ...
                threadPool.setCorePoolSize(dynamicCoreSize);
                threadPool.setMaximumPoolSize(dynamicMaximumSize);
            }

            threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
        }
}


public class HystrixContextScheduler extends Scheduler {

    private final HystrixConcurrencyStrategy concurrencyStrategy;
    private final Scheduler actualScheduler;
    private final HystrixThreadPool threadPool;。。。public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }

    @Override
    public Worker createWorker() {
             // 构建一个默认的 Worker, 这里的 actualScheduler 就是 ThreadPoolScheduler
        //actualScheduler.createWorker()就是 ThreadPoolWorker
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }

    
    //HystrixContextSchedulerWorker 类
    private class HystrixContextSchedulerWorker extends Worker {

        private final Worker worker;

        private HystrixContextSchedulerWorker(Worker actualWorker) {this.worker = actualWorker;}

           ...

        @Override
        public Subscription schedule(Action0 action) {if (threadPool != null) {if (!threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            // 这里的 worker 其实就是 ThreadPoolWorker
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
        }

    }

    //ThreadPoolScheduler 类
    private static class ThreadPoolScheduler extends Scheduler {

        private final HystrixThreadPool threadPool;
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

        @Override
        public Worker createWorker() {
            // 默认的 worker 为:ThreadPoolWorker
            return new ThreadPoolWorker(threadPool, shouldInterruptThread);
        }

    }

    
//ThreadPoolWorker 类
    private static class ThreadPoolWorker extends Worker {

        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }
        ...
        @Override
        public Subscription schedule(final Action0 action) {if (subscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();}

            // This is internal RxJava API but it is too useful.
            ScheduledAction sa = new ScheduledAction(action);

            subscription.add(sa);
            sa.addParent(subscription);
            // 获取线程池
            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            // 将包装后的 HystrixCommand submit 到线程池,而后返回 FutureTask
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }

       ...
    }


}

几个外部类的作用:

  • HystrixContextSchedulerWorker: 对外提供 schedule()办法,这里会判断线程池队列是否曾经满,如果满了这会抛出异样:Rejected command because thread-pool queueSize is at rejection threshold。如果配置的队列大小为 -1 则默认返回 true
  • ThreadPoolScheduler:执行 createWorker()办法,默认应用 ThreadPoolWorker()类
  • ThreadPoolWorker:执行 command 的外围逻辑
    private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
        Observable<R> userObservable;

        try {userObservable = getExecutionObservable();
        } catch (Throwable ex) {// the run() method is a user provided implementation so can throw instead of using Observable.onError
            // so we catch it here and turn it into Observable.error
            userObservable = Observable.error(ex);
        }

        return userObservable
                .lift(new ExecutionHookApplication(_cmd))
                .lift(new DeprecatedOnRunHookApplication(_cmd));
    }
   @Override
    final protected Observable<R> getExecutionObservable() {return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {// 能够看到 run()办法了。HystrixCommand.run()其实就是咱们本人写的代码里的办法
                    return Observable.just(run());
                } catch (Throwable ex) {return Observable.error(ex);
                }
            }
        }).doOnSubscribe(new Action0() {
            @Override
            public void call() {
                // Save thread on which we get subscribed so that we can interrupt it later if needed
                executionThread.set(Thread.currentThread());
            }
        });
    }

最初能够看到会调用 Observable.just(run()),这个就是咱们咱们本人写的代码里的办法,到这里就是咱们整体的执行过程了。

退出移动版