关于分布式:从源码分析Hystrix工作机制

3次阅读

共计 20381 个字符,预计需要花费 51 分钟才能阅读完成。

一、Hystrix 解决了什么问题?

在简单的分布式应用中有着许多的依赖,各个依赖都有不免在某个时刻失败,如果利用不隔离各个依赖,升高内部的危险,那容易拖垮整个利用。

举个电商场景中常见的例子,比方订单服务调用了库存服务、商品服务、积分服务、领取服务,零碎均失常状况下,订单模块失常运行。

然而当积分服务产生异样时且会阻塞 30s 时,订单服务就有有局部申请失败,且工作线程阻塞在调用积分服务上。

流量顶峰时,问题会更加重大,订单服务的所有申请都会阻塞在调用积分服务上,工作线程全副挂起,导致机器资源耗尽,订单服务也不可用,造成级联影响,整个集群宕机,这种称为雪崩效应。

所以须要一种机制,使得单个服务呈现故障时,整个集群可用性不受到影响。Hystrix 就是实现这种机制的框架,上面咱们剖析一下 Hystrix 整体的工作机制。

二、整体机制

【入口】Hystrix 的执行入口是 HystrixCommand 或 HystrixObservableCommand 对象,通常在 Spring 利用中会通过注解和 AOP 来实现对象的结构,以升高对业务代码的侵入性;

【缓存】HystrixCommand 对象理论开始执行后,首先是否开启缓存,若开启缓存且命中,则间接返回;

【熔断】若熔断器关上,则执行短路,间接走降级逻辑;若熔断器敞开,持续下一步,进入隔离逻辑。熔断器的状态次要基于窗口期内执行失败率,若失败率过高,则熔断器主动关上;

【隔离】用户可配置走线程池隔离或信号量隔离,判断线程池工作已满(或信号量),则进入降级逻辑;否则持续下一步,理论由线程池工作线程执行业务调用;

【执行】理论开始执行业务调用,若执行失败或异样,则进入降级逻辑;若执行胜利,则失常返回;

【超时】通过定时器延时工作检测业务调用执行是否超时,若超时则勾销业务执行的线程,进入降级逻辑;若未超时,则失常返回。线程池、信号量两种策略均隔离形式反对超时配置(信号量策略存在缺点);

【降级】进入降级逻辑后,当业务实现了 HystrixCommand.getFallback() 办法,则返回降级解决的数据;当未实现时,则返回异样;

【统计】业务调用执行后果胜利、失败、超时等均会进入统计模块,通过衰弱统计后果来决定熔断器关上或敞开。

都说源码里没有机密,上面咱们来剖析下外围性能源码,看看 Hystrix 如何实现整体的工作机制。

三、熔断

家用电路中都有保险丝,保险丝的作用场景是,当电路产生故障或异样时,随同着电流一直升高,并且升高的电流有可能损坏电路中的某些重要器件或贵重器件,也有可能烧毁电路甚至造成火灾。

若电路中正确地安置了保险丝,那么保险丝就会在电流异样升高到肯定的高度和肯定的时候,本身熔断切断电流,从而起到爱护电路平安运行的作用。Hystrix 提供的熔断器就有相似性能,利用调用某个服务提供者,当肯定工夫内申请总数超过配置的阈值,且窗口期内错误率过高,那 Hystrix 就会对调用申请熔断,后续的申请间接短路,间接进入降级逻辑,执行本地的降级策略。

Hystrix 具备自我调节的能力,熔断器关上在肯定工夫后,会尝试通过一个申请,并依据执行后果调整熔断器状态,让熔断器在 closed,open,half-open 三种状态之间主动切换。

【HystrixCircuitBreaker】boolean attemptExecution():每次 HystrixCommand 执行,都要调用这个办法,判断是否能够继续执行,若熔断器状态为关上且超过休眠窗口,更新熔断器状态为 half-open;通过 CAS 原子变更熔断器状态来保障只放过一条业务申请理论调用提供方,并依据执行后果调整状态。

public boolean attemptExecution() {
    // 判断配置是否强制关上熔断器
    if (properties.circuitBreakerForceOpen().get()) {return false;}
    // 判断配置是否强制敞开熔断器
    if (properties.circuitBreakerForceClosed().get()) {return true;}
    // 判断熔断器开关是否敞开
    if (circuitOpened.get() == -1) {return true;} else {
        // 判断申请是否在休眠窗口后
        if (isAfterSleepWindow()) {
            // 更新开关为半开,并容许本次申请通过
            if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {return true;} else {return false;}
        } else {
            // 拒绝请求
            return false;
        }
    }
}

【HystrixCircuitBreaker】void markSuccess():HystrixCommand 执行胜利后调用,当熔断器状态为 half-open,更新熔断器状态为 closed。此种状况为熔断器本来为 open,放过单条申请理论调用服务提供者,并且后续执行胜利,Hystrix 主动调节熔断器为 closed。

public void markSuccess() {
    // 更新熔断器开关为敞开
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        // 重置订阅衰弱统计
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        // 更新熔断器开关为敞开
        circuitOpened.set(-1L);
    }
}

【HystrixCircuitBreaker】void markNonSuccess():HystrixCommand 执行胜利后调用,若熔断器状态为 half-open,更新熔断器状态为 open。此种状况为熔断器本来为 open,放过单条申请理论调用服务提供者,并且后续执行失败,Hystrix 持续放弃熔断器关上,并把此次申请作为休眠窗口期开始工夫。

public void markNonSuccess() {
      // 更新熔断器开关,从半开变为关上
      if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
          // 记录失败工夫,作为休眠窗口开始工夫
          circuitOpened.set(System.currentTimeMillis());
      }
  }

【HystrixCircuitBreaker】void subscribeToStream():熔断器订阅衰弱统计后果,若以后申请数据大于肯定值且错误率大于阈值,自动更新熔断器状态为 opened,后续申请短路,不再理论调用服务提供者,间接进入降级逻辑。

 private Subscription subscribeToStream() {
    // 订阅监控统计信息
    return metrics.getHealthCountsStream()
            .observe()
            .subscribe(new Subscriber<HealthCounts>() {
                @Override
                public void onCompleted() {}
                @Override
                public void onError(Throwable e) {}
                @Override
                public void onNext(HealthCounts hc) {
                    // 判断总申请数量是否超过配置阈值,若未超过,则不扭转熔断器状态
                    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { } else {
                        // 判断申请错误率是否超过配置错误率阈值,若未超过,则不扭转熔断器状态;若超过,则错误率过高,更新熔断器状态未关上,回绝后续申请
                        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { } else {if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {circuitOpened.set(System.currentTimeMillis());
                            }
                        }
                    }
                }
            });
}

四、资源隔离

在货船中,为了避免漏水和火灾的扩散,个别会将货仓进行宰割,防止了一个货仓出事导致整艘船沉没的喜剧。同样的,在 Hystrix 中,也采纳了这样的舱壁模式,将零碎中的服务提供者隔离起来,一个服务提供者提早升高或者失败,并不会导致整个零碎的失败,同时也可能管制调用这些服务的并发度。如下图,订单服务调用上游积分、库存等服务应用不同的线程池,当积分服务故障时,只会把对应线程池打满,而不会影响到其余服务的调用。Hystrix 隔离模式反对线程池和信号量两种形式。

4.1 信号量模式

信号量模式管制单个服务提供者执行并发度,比方单个 CommondKey 下正在申请数为 N,若 N 小于 maxConcurrentRequests,则继续执行;若大于等于 maxConcurrentRequests,则间接回绝,进入降级逻辑。信号量模式应用申请线程自身执行,没有线程上下文切换,开销较小,但超时机制生效。

【AbstractCommand】Observable<R>applyHystrixSemantics(finalAbstractCommand<R> _cmd):尝试获取信号量,若能获取到,则持续调用服务提供者;若不能获取到,则进入降级策略。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {executionHook.onStart(_cmd);
    // 判断熔断器是否通过
    if (circuitBreaker.attemptExecution()) {
        // 获取信号量
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();
                }
            }
        };
        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };
        // 尝试获取信号量
        if (executionSemaphore.tryAcquire()) {
            try {
                // 记录业务执行开始工夫
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                // 继续执行业务
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {return Observable.error(e);
            }
        } else {
            // 信号量回绝,进入降级逻辑
            return handleSemaphoreRejectionViaFallback();}
    } else {
        // 熔断器回绝,间接短路,进入降级逻辑
        return handleShortCircuitViaFallback();}
}

【AbstractCommand】TryableSemaphore getExecutionSemaphore():获取信号量实例,若以后隔离模式为信号量,则依据 commandKey 获取信号量,不存在时初始化并缓存;若以后隔离模式为线程池,则应用默认信号量 TryableSemaphoreNoOp.DEFAULT,全副申请可通过。

protected TryableSemaphore getExecutionSemaphore() {
    // 判断隔离模式是否为信号量
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {if (executionSemaphoreOverride == null) {
            // 获取信号量
            TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
            if (_s == null) {
                // 初始化信号量并缓存
                executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
                // 返回信号量
                return executionSemaphorePerCircuit.get(commandKey.name());
            } else {return _s;}
        } else {return executionSemaphoreOverride;}
    } else {
        // 返回默认信号量,任何申请均可通过
        return TryableSemaphoreNoOp.DEFAULT;
    }
}

4.2 线程池模式

线程池模式管制单个服务提供者执行并发度,代码上都会先走获取信号量,只是应用默认信号量,全副申请可通过,而后理论调用线程池逻辑。线程池模式下,比方单个 CommondKey 下正在申请数为 N,若 N 小于 maximumPoolSize,会先从 Hystrix 治理的线程池外面取得一个线程,而后将参数传递给工作线程去执行真正调用,如果并发申请数多于线程池线程个数,就有工作须要进入队列排队,但排队队列也有下限,如果排队队列也满,则进去降级逻辑。线程池模式能够反对异步调用,反对超时调用,存在线程切换,开销大。

【AbstractCommand】Observable<R>executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd):从线程池中获取线程,并执行,过程中记录线程状态。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
      // 判断是否为线程池隔离模式
      if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {return Observable.defer(new Func0<Observable<R>>() {
              @Override
              public Observable<R> call() {executionResult = executionResult.setExecutionOccurred();
                  if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state :" + commandState.get().name()));
                  }
                  // 统计信息
                  metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
                  // 判断是否超时,若超时,间接抛出异样
                  if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {return Observable.error(new RuntimeException("timed out before executing run()"));
                  }
                  // 更新线程状态为已开始
                  if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {HystrixCounters.incrementGlobalConcurrentThreads();
                      threadPool.markThreadExecution();
                      endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                      executionResult = executionResult.setExecutedInThread();
                      // 执行 hook,若异样,则间接抛出异样
                      try {executionHook.onThreadStart(_cmd);
                          executionHook.onRunStart(_cmd);
                          executionHook.onExecutionStart(_cmd);
                          return getUserExecutionObservable(_cmd);
                      } catch (Throwable ex) {return Observable.error(ex);
                      }
                  } else {
                      // 空返回
                      return Observable.empty();}
              }
          }).doOnTerminate(new Action0() {
              @Override
              public void call() {// 完结逻辑,省略}
          }).doOnUnsubscribe(new Action0() {
              @Override
              public void call() {// 勾销订阅逻辑,省略}
              // 从线程池中获取业务执行线程
          }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
              @Override
              public Boolean call() {
                  // 判断是否超时
                  return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;}
          }));
      } else {
          // 信号量模式
          // 省略
      }
  }

【HystrixThreadPool】Subscription schedule(final Action0 action):HystrixContextScheduler 是 Hystrix 对 rx 中 Scheduler 调度器的重写,次要为了实现在 Observable 未被订阅时,不执行命令,以及反对在命令执行过程中可能打断运行。在 rx 中,Scheduler 将生成对应的 Worker 给 Observable 用于执行命令,由 Worker 具体负责相干执行线程的调度,ThreadPoolWorker 是 Hystrix 自行实现的 Worker,执行调度的外围办法。

public Subscription schedule(final Action0 action) {
    // 若无订阅,则不执行间接返回
    if (subscription.isUnsubscribed()) {return Subscriptions.unsubscribed();
    }
    ScheduledAction sa = new ScheduledAction(action);
    subscription.add(sa);
    sa.addParent(subscription);
    // 获取线程池
    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    // 提交执行工作
    FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
    return sa;
}

五、超时检测

Hystrix 超时机制升高了第三方依赖项提早过高对调用方的影响,使申请疾速失败。次要通过提早工作机制实现,包含注册延时工作过程和执行延时工作过程。

当隔离策略为线程池时,主线程订阅执行后果,线程池中工作线程调用提供者服务端,同时会有定时器线程在肯定工夫后检测工作是否实现,若未实现则示意工作超时,抛出超时异样,并且后续工作线程的执行后果也会跳过不再公布;若已实现则示意工作在超时工夫内实现执行实现,定时器检测工作完结。

当隔离策略为信号量时,主线程订阅执行后果并理论调用提供者服务端(没有工作线程),当超出指定工夫,主线程依然会执行完业务调用,而后抛出超时异样。信号量模式下超时配置有肯定缺点,不能取消在执行的调用,并不能限度主线程返回工夫。

【AbstractCommand】Observable<R>executeCommandAndObserve(finalAbstractCommand<R> \_cmd):超时检测入口,执行 lift(new HystrixObservableTimeoutOperator<R>(\_cmd)) 关联超时检测工作。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    // 省略
    Observable<R> execution;
    // 判断是否开启超时检测
​    if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd)
                // 减少超时检测操作
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        // 失常执行
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

【HystrixObservableTimeoutOperator】Subscriber<? super R> call(final Subscriber<? super R> child):创立检测工作,并关联提早工作;若检测工作执行时仍未执行实现,则抛出超时异样;若已执行实现或异样,则革除检测工作。

public Subscriber<? super R> call(final Subscriber<? super R> child) {final CompositeSubscription s = new CompositeSubscription();
        child.add(s);
        final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
        // 实列化监听器
        TimerListener listener = new TimerListener() {
            @Override
            public void tick() {
                // 若工作未执行实现,则更新为超时
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    // 上报超时失败
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                    // 勾销订阅
                    s.unsubscribe();
                    final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
 
                        @Override
                        public void run() {child.onError(new HystrixTimeoutException());
                        }
                    });
                    // 抛出超时异样
                    timeoutRunnable.run();}
            }
            // 超时工夫配置
            @Override
            public int getIntervalTimeInMilliseconds() {return originalCommand.properties.executionTimeoutInMilliseconds().get();}
        };
        // 注册监听器,关联检测工作
        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
        originalCommand.timeoutTimer.set(tl);
        Subscriber<R> parent = new Subscriber<R>() {
            @Override
            public void onCompleted() {if (isNotTimedOut()) {
                    // 未超时状况下,工作执行实现,革除超时检测工作
                    tl.clear();
                    child.onCompleted();}
            }
            @Override
            public void onError(Throwable e) {if (isNotTimedOut()) {
                    // 未超时状况下,工作执行异样,革除超时检测工作
                    tl.clear();
                    child.onError(e);
                }
            }
            @Override
            public void onNext(R v) {
                    // 未超时状况下,公布执行后果;超时时则间接跳过公布执行后果
                if (isNotTimedOut()) {child.onNext(v);
                }
            }
            // 判断是否超时
            private boolean isNotTimedOut() {return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                        originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
            }
        };
        s.add(parent);
        return parent;
    }
}

【HystrixTimer】Reference<TimerListener>addTimerListener(finalTimerListener listener):addTimerListener 通过 java 的定时工作服务 scheduleAtFixedRate 在提早超时工夫后执行。

public Reference<TimerListener> addTimerListener(final TimerListener listener) {// 初始化 xianstartThreadIfNeeded();// 结构检测工作 Runnable r = new Runnable() {

public Reference<TimerListener> addTimerListener(final TimerListener listener) {
    // 初始化 xian
    startThreadIfNeeded();
    // 结构检测工作
    Runnable r = new Runnable() {
 
        @Override
        public void run() {
            try {listener.tick();
            } catch (Exception e) {logger.error("Failed while ticking TimerListener", e);
            }
        }
    };
    // 提早执行检测工作
    ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
    return new TimerReference(listener, f);
}

六、降级

Hystrix 降级逻辑作为兜底的策略,当呈现业务执行异样、线程池或信号量已满、执行超时等状况时,会进入降级逻辑。降级逻辑中应从内存或动态逻辑获取通用返回,尽量不依赖依赖网络调用,如果未实现降级办法或降级办法中也出现异常,则业务线程中会引发异样。

【AbstractCommand】Observable<R> getFallbackOrThrowException(finalAbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException):首先判断是否为不可复原异样,若是则不走降级逻辑,间接异样返回;其次判断是否能获取到降级信号量,而后走降级逻辑;当降级逻辑中也产生异样或者没有降级办法实现时,则异样返回。

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
    executionResult = executionResult.addEvent((int) latency, eventType);
    // 判断是否为不可复原异样,如栈溢出、OOM 等
    if (isUnrecoverable(originalException)) {logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback.", originalException);
        Exception e = wrapWithOnErrorHook(failureType, originalException);
        // 间接返回异样
        return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + "" + message +" and encountered unrecoverable error.", e, null));
    } else {
        // 判断为是否可复原谬误
        if (isRecoverableError(originalException)) {logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
        }
        // 判断降级配置是否关上
        if (properties.fallbackEnabled().get()) {
          /**
            * 省略
            */
            final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                @Override
                public Observable<R> call(Throwable t) {Exception e = wrapWithOnErrorHook(failureType, originalException);
                    Exception fe = getExceptionFromThrowable(t);
 
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    Exception toEmit;
                    // 是否是不反对操作异样,当业务中没有覆写 getFallBack 办法时,会抛出此异样
                    if (fe instanceof UnsupportedOperationException) {logger.debug("No fallback for HystrixCommand.", fe);
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
                        toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + "" + message +" and no fallback available.", e, fe);
                    } else {
                        // 执行降级逻辑时产生异样
                        logger.debug("HystrixCommand execution" + failureType.name() + "and fallback failed.", fe);
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
                        toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + "" + message +" and fallback failed.", e, fe);
                    }
                    // 判断异样是否包装
                    if (shouldNotBeWrapped(originalException)) {
                        // 抛出异样
                        return Observable.error(e);
                    }
                    // 抛出异样
                    return Observable.error(toEmit);
                }
            };
            // 获取降级信号量
            final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {fallbackSemaphore.release();
                    }
                }
            };
            Observable<R> fallbackExecutionChain;
            // 尝试获取降级信号量
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    // 判断是否定义了 fallback 办法
                    if (isFallbackUserDefined()) {executionHook.onFallbackStart(this);
                        // 执行降级逻辑
                        fallbackExecutionChain = getFallbackObservable();} else {
                        // 执行降级逻辑
                        fallbackExecutionChain = getFallbackObservable();}
                } catch (Throwable ex) {fallbackExecutionChain = Observable.error(ex);
                }
                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                        .doOnNext(markFallbackEmit)
                        .doOnCompleted(markFallbackCompleted)
                        .onErrorResumeNext(handleFallbackError)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
                // 解决降级信号量回绝异样
               return handleFallbackRejectionByEmittingError();}
        } else {
            // 解决降级配置敞开时异样
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }
}

【HystrixCommand】R getFallback():HystrixCommand 默认抛出操作不反对异样,须要子类覆写 getFalBack 办法实现降级逻辑。

protected R getFallback() {throw new UnsupportedOperationException("No fallback available.");
}

七、衰弱统计

Hystrix 基于通过滑动窗口的数据统计断定服务失败占比选择性熔断,可能实现疾速失败并走降级逻辑。步骤如下:

  • AbstractCommand 执行实现后调⽤ handleCommandEnd ⽅法将执行后果 HystrixCommandCompletion 事件公布到事件流中;
  • 事件流通过 Observable.window() ⽅法将事件按工夫分组,并通过 flatMap() ⽅法将事件按类型(胜利、失败等)聚合成桶,造成桶流;
  • 再将各个桶使⽤ Observable.window() 按窗口内桶数量聚合成滑动窗⼝数据;
  • 将滑动窗口数据聚合成数据对象(如衰弱数据流、累计数据等);
  • 熔断器 CircuitBreaker 初始化时订阅衰弱数据流,依据衰弱状况批改熔断器的开关。

【AbstractCommand】void handleCommandEnd(boolean commandExecutionStarted):在业务执行结束后,会调用 handleCommandEnd 办法,在此办法中,上报执行后果 executionResult,这也是衰弱统计的入口。

private void handleCommandEnd(boolean commandExecutionStarted) {Reference<TimerListener> tl = timeoutTimer.get();
    if (tl != null) {tl.clear();
    }
​
    long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
    // 执行后果上报衰弱统计
    if (executionResultAtTimeOfCancellation == null) {metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
    } else {metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
    }
​
    if (endCurrentThreadExecutingCommand != null) {endCurrentThreadExecutingCommand.call();
    }
}

【BucketedRollingCounterStream】BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,final Func2<Bucket, Event, Bucket> appendRawEventToBucket,final Func2<Output, Bucket, Output> re-duceBucket)

衰弱统计类 HealthCountsStream 的滑动窗口实现次要是在父类 BucketedRollingCounterStream,首先父类 BucketedCounterStream 将事件流解决成桶流,BucketedRollingCounterStream 解决成滑动窗口,而后由 HealthCountsStream 传入的 reduceBucket 函数解决成衰弱统计信息.

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                       final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                       final Func2<Output, Bucket, Output> reduceBucket) {
    // 调用父类,数据处理成桶流
    super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
    // 依据传入的 reduceBucket 函数,解决滑动窗口内数据
    Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
        @Override
        public Observable<Output> call(Observable<Bucket> window) {return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
        }
    };
    // 对父类桶流数据进行操作
    this.sourceStream = bucketedStream
    // 窗口内桶数量为 numBuckets,每次挪动 1 个桶
            .window(numBuckets, 1)
            // 滑动窗口内数据处理
            .flatMap(reduceWindowToSummary)
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {isSourceCurrentlySubscribed.set(true);
                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {isSourceCurrentlySubscribed.set(false);
                }
            })
            .share()
            .onBackpressureDrop();}

【HealthCounts】HealthCounts plus(long[] eventTypeCounts):对桶内数据按事件类型累计,生成统计数据 HealthCounts;

public HealthCounts plus(long[] eventTypeCounts) {
    long updatedTotalCount = totalCount;
    long updatedErrorCount = errorCount;
​
    long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
    long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
    long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
    long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
    long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
    // 总数
    updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    // 失败数
    updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    return new HealthCounts(updatedTotalCount, updatedErrorCount);
}

八、总结

在分布式环境中,不可避免地会有许多服务的依赖项中有的失败。Hystrix 作为一个库,可通过增加熔断、隔离、降级等逻辑来帮忙用户管制分布式服务之间的交互,以进步零碎的整体弹性。次要性能如下:

  • 爱护零碎,管制来自拜访第三方依赖项(通常是通过网络)的提早和失败
  • 阻止简单分布式系统中的级联故障
  • 疾速失败并疾速复原
  • 平滑降级
  • 近乎实时的监控,警报和管制

Hystrix 应用过程中,有一些要留神的点:

  • 覆写的 getFallback() 办法,尽量不要有网络依赖。如果有网络依赖,倡议采纳屡次降级,即在 getFallback() 内实例化 HystrixCommand,并执行 Command。getFallback() 尽量保障高性能返回,疾速降级。
  • HystrixCommand 倡议采纳的是线程隔离策略。
  • hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize 设置为 true 时,hystrix.threadpool.default.maximumSize 才会失效。最大线程数须要依据业务本身状况和性能测试后果来考量,尽量初始时设置小一些,反对动静调整大小,因为它是缩小负载并避免资源在提早产生时被阻塞的次要工具。
  • 信号隔离策略下,执行业务逻辑时,应用的是应用服务的父级线程(如 Tomcat 容器线程)。所以,肯定要设置好并发量,有网络开销的调用,不倡议应用该策略,容易导致容器线程排队梗塞,从而影响整个应用服务。
  • 另外 Hystrix 高度依赖 RxJava 这个响应式函数编程框架,简略理解 RxJava 的应用形式,有利于了解源码逻辑。

参考文档

Hystrix Github 仓库:https://github.com/Netflix/Hystrix

正文完
 0