一,类图

二,HystrixCommandAspect切面解析及HystrixCommand对象创立

咱们在应用Hystrix的时候个别会应用@HystrixCommand注解,再设置好相干参数与fallback逻辑后就能够了,那@HystrixCommand是如何解析的呢?解析完了又做了哪些封装呢?咱们一起来看看源码。

@Aspectpublic class HystrixCommandAspect {         @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")    public void hystrixCommandAnnotationPointcut() {    }    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")    public void hystrixCollapserAnnotationPointcut() {    }    //aop监控的办法    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")    public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {        Method method = AopUtils.getMethodFromTarget(joinPoint);        Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");        } else {            HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP                .get(HystrixCommandAspect.HystrixPointcutType.of(method));                        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);            //构建hystrixCommand的实现类            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent()                 ?metaHolder.getCollapserExecutionType()                 : metaHolder.getExecutionType();            try {                Object result;                if (!metaHolder.isObservable()) {                    result = CommandExecutor.execute(invokable, executionType, metaHolder);                } else {                    result = this.executeObservable(invokable, executionType, metaHolder);                }                return result;            } catch (...) {                 ...            }         }    }}

答案就是HystrixCommandAspect这个切面,它会解析@HystrixCommand注解。
重点咱们再看下:

   HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);

是如何创立HystrixCommand对象的。

    public HystrixInvokable create(MetaHolder metaHolder) {        Object executable;        //判断是不是HystrixCollapser注解        if (metaHolder.isCollapserAnnotationPresent()) {            executable = new CommandCollapser(metaHolder);        } else if (metaHolder.isObservable()) {            executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));        } else {            //会执行这个。            executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));        }        return (HystrixInvokable)executable;    } 

咱们剖析的是HystrixCommand注解,所以走else里的剖析。整体结构过程是 GenericCommand -> AbstractHystrixCommand -> HystrixCommand -> AbstractCommand, 构建GenericCommand的过程,咱们次要还是看AbstractCommand的构造方法。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {            //构造方法    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key,                               HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker,                               HystrixThreadPool threadPool,                              HystrixCommandProperties.Setter commandPropertiesDefaults,                               HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,                              HystrixCommandMetrics metrics,                              TryableSemaphore fallbackSemaphore,                               TryableSemaphore executionSemaphore,                              HystrixPropertiesStrategy propertiesStrategy,                               HystrixCommandExecutionHook executionHook) {        this.commandGroup = initGroupKey(group);        this.commandKey = initCommandKey(key, getClass());        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);        this.threadPoolKey = initThreadPoolKey(threadPoolKey,                                                this.commandGroup,                                                        this.properties.executionIsolationThreadPoolKeyOverride().get());        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);        //初始化熔断器        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(),                                                  circuitBreaker, this.commandGroup,                                                  this.commandKey, this.properties, this.metrics);                //初始化线程池        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);        //Strategies from plugins        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup,                                                                            this.metrics, this.circuitBreaker,                                                                            this.properties);        this.executionHook = initExecutionHook(executionHook);        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);        /* fallback semaphore override if applicable */        this.fallbackSemaphoreOverride = fallbackSemaphore;        /* execution semaphore override if applicable */        this.executionSemaphoreOverride = executionSemaphore;    }}

三,线程池与熔断器的初始化

先剖析线程池初始化,再剖析熔断器的初始化.

线程池的初始化

      public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();            this.queueSize = properties.maxQueueSize().get();            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),                    properties);            this.threadPool = this.metrics.getThreadPool();            this.queue = this.threadPool.getQueue();            /* strategy: HystrixMetricsPublisherThreadPool */            HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);        }    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();        final int dynamicCoreSize = threadPoolProperties.coreSize().get();        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);        //判断允不容许设置最大的线程数        if (allowMaximumSizeToDivergeFromCoreSize) {            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();            if (dynamicCoreSize > dynamicMaximumSize) {                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);            } else {                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);            }        } else {            //最大线程数就等于外围数            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);        }    }

这个就是创立线程池外围的代码了。须要留神下的是,通过threadPoolKey和groupKey的逻辑作为key去工厂中取相应的线程池,没有则创立,所以就说如果两个HystrixCommand的threadPoolKey雷同时会用同一个线程池,如果不存在threadPoolKey状况下,如果groupKey是雷同的话也会用同一个线程池。

熔断器的初始化

private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,             HystrixCommandProperties properties, HystrixCommandMetrics metrics) {    if (enabled) {        if (fromConstructor == null) {            // get the default implementation of HystrixCircuitBreaker            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);        } else {            return fromConstructor;        }    } else {        return new NoOpCircuitBreaker();    }}  public static class Factory {        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand =            new ConcurrentHashMap<String, HystrixCircuitBreaker>();        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key,                                                         HystrixCommandGroupKey group, HystrixCommandProperties                                                         properties,                                                         HystrixCommandMetrics metrics) {                       HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());            if (previouslyCached != null) {                return previouslyCached;            }            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(),                                     new HystrixCircuitBreakerImpl(key, group, properties, metrics));                        if (cbForCommand == null) {                // this means the putIfAbsent step just created a new one so let's retrieve and return it                return circuitBreakersByCommand.get(key.name());            } else {                return cbForCommand;            }        } } /* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {        private final HystrixCommandProperties properties;        private final HystrixCommandMetrics metrics;        /* track whether this circuit is open/closed at any given point in time (default to false==closed) */        private AtomicBoolean circuitOpen = new AtomicBoolean(false);        /* when the circuit was marked open or was last allowed to try a 'singleTest' */        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup,                                             HystrixCommandProperties properties,                                             HystrixCommandMetrics metrics) {            this.properties = properties;            this.metrics = metrics;        }       }

熔断器初始化代码上整体构造和初始化线程池的过程差不多,都是通过工厂类外面的ConcurrentHashMap来治理熔断器,且key也是依据HystrixCommandKey来做判断