关于hystrix:hystrix源码分析一

6次阅读

共计 8572 个字符,预计需要花费 22 分钟才能阅读完成。

一,类图

二,HystrixCommandAspect 切面解析及 HystrixCommand 对象创立

咱们在应用 Hystrix 的时候个别会应用 @HystrixCommand 注解,再设置好相干参数与 fallback 逻辑后就能够了,那 @HystrixCommand 是如何解析的呢?解析完了又做了哪些封装呢?咱们一起来看看源码。

@Aspect
public class HystrixCommandAspect {@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {}

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {}

    //aop 监控的办法
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {Method method = AopUtils.getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
        } else {HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP
                .get(HystrixCommandAspect.HystrixPointcutType.of(method));
            
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            // 构建 hystrixCommand 的实现类
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() 
                ?metaHolder.getCollapserExecutionType() 
                : metaHolder.getExecutionType();

            try {
                Object result;
                if (!metaHolder.isObservable()) {result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {result = this.executeObservable(invokable, executionType, metaHolder);
                }

                return result;
            } catch (...) {...} 
        }
    }
}

答案就是 HystrixCommandAspect 这个切面,它会解析 @HystrixCommand 注解。
重点咱们再看下:

   HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);

是如何创立 HystrixCommand 对象的。

    public HystrixInvokable create(MetaHolder metaHolder) {
        Object executable;
        // 判断是不是 HystrixCollapser 注解
        if (metaHolder.isCollapserAnnotationPresent()) {executable = new CommandCollapser(metaHolder);
        } else if (metaHolder.isObservable()) {executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        } else {
            // 会执行这个。executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        }

        return (HystrixInvokable)executable;
    }
 

咱们剖析的是 HystrixCommand 注解,所以走 else 里的剖析。整体结构过程是 GenericCommand -> AbstractHystrixCommand -> HystrixCommand -> AbstractCommand, 构建 GenericCommand 的过程,咱们次要还是看 AbstractCommand 的构造方法。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    
    
    // 构造方法
    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, 
                              HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, 
                              HystrixThreadPool threadPool,
                              HystrixCommandProperties.Setter commandPropertiesDefaults, 
                              HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
                              HystrixCommandMetrics metrics,
                              TryableSemaphore fallbackSemaphore, 
                              TryableSemaphore executionSemaphore,
                              HystrixPropertiesStrategy propertiesStrategy, 
                              HystrixCommandExecutionHook executionHook) {this.commandGroup = initGroupKey(group);
        this.commandKey = initCommandKey(key, getClass());
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, 
                                               this.commandGroup,         
                                               this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        // 初始化熔断器
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), 
                                                 circuitBreaker, this.commandGroup, 
                                                 this.commandKey, this.properties, this.metrics);        
        // 初始化线程池
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

        //Strategies from plugins
        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, 
                                                                           this.metrics, this.circuitBreaker, 
                                                                           this.properties);
        this.executionHook = initExecutionHook(executionHook);

        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

        /* fallback semaphore override if applicable */
        this.fallbackSemaphoreOverride = fallbackSemaphore;

        /* execution semaphore override if applicable */
        this.executionSemaphoreOverride = executionSemaphore;
    }
}

三,线程池与熔断器的初始化

先剖析线程池初始化,再剖析熔断器的初始化.

线程池的初始化

      public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.queueSize = properties.maxQueueSize().get();

            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
            this.threadPool = this.metrics.getThreadPool();
            this.queue = this.threadPool.getQueue();

            /* strategy: HystrixMetricsPublisherThreadPool */
            HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
        }


    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        // 判断允不容许设置最大的线程数
        if (allowMaximumSizeToDivergeFromCoreSize) {final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            // 最大线程数就等于外围数
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }

这个就是创立线程池外围的代码了。须要留神下的是,通过 threadPoolKey 和 groupKey 的逻辑作为 key 去工厂中取相应的线程池,没有则创立,所以就说如果两个 HystrixCommand 的 threadPoolKey 雷同时会用同一个线程池,如果不存在 threadPoolKey 状况下,如果 groupKey 是雷同的话也会用同一个线程池。

熔断器的初始化

private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
             HystrixCommandProperties properties, HystrixCommandMetrics metrics) {if (enabled) {if (fromConstructor == null) {
            // get the default implementation of HystrixCircuitBreaker
            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
        } else {return fromConstructor;}
    } else {return new NoOpCircuitBreaker();
    }
}

  public static class Factory {

        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand =
            new ConcurrentHashMap<String, HystrixCircuitBreaker>();

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, 
                                                        HystrixCommandGroupKey group, HystrixCommandProperties 
                                                        properties, 
                                                        HystrixCommandMetrics metrics) {HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {return previouslyCached;}
            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), 
                                    new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            
            if (cbForCommand == null) {
                // this means the putIfAbsent step just created a new one so let's retrieve and return it
                return circuitBreakersByCommand.get(key.name());
            } else {return cbForCommand;}
        }
 }

 /* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
        private AtomicBoolean circuitOpen = new AtomicBoolean(false);

        /* when the circuit was marked open or was last allowed to try a 'singleTest' */
        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, 
                                            HystrixCommandProperties properties, 
                                            HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;
        }
       
}

熔断器初始化代码上整体构造和初始化线程池的过程差不多,都是通过工厂类外面的 ConcurrentHashMap 来治理熔断器,且 key 也是依据 HystrixCommandKey 来做判断

正文完
 0