一,类图
二,HystrixCommandAspect切面解析及HystrixCommand对象创立
咱们在应用Hystrix的时候个别会应用@HystrixCommand注解,再设置好相干参数与fallback逻辑后就能够了,那@HystrixCommand是如何解析的呢?解析完了又做了哪些封装呢?咱们一起来看看源码。
@Aspectpublic class HystrixCommandAspect { @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } //aop监控的办法 @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable { Method method = AopUtils.getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint}); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time"); } else { HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP .get(HystrixCommandAspect.HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //构建hystrixCommand的实现类 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); try { Object result; if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = this.executeObservable(invokable, executionType, metaHolder); } return result; } catch (...) { ... } } }}
答案就是HystrixCommandAspect这个切面,它会解析@HystrixCommand注解。
重点咱们再看下:
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
是如何创立HystrixCommand对象的。
public HystrixInvokable create(MetaHolder metaHolder) { Object executable; //判断是不是HystrixCollapser注解 if (metaHolder.isCollapserAnnotationPresent()) { executable = new CommandCollapser(metaHolder); } else if (metaHolder.isObservable()) { executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } else { //会执行这个。 executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } return (HystrixInvokable)executable; }
咱们剖析的是HystrixCommand注解,所以走else里的剖析。整体结构过程是 GenericCommand -> AbstractHystrixCommand -> HystrixCommand -> AbstractCommand, 构建GenericCommand的过程,咱们次要还是看AbstractCommand的构造方法。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { //构造方法 protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { this.commandGroup = initGroupKey(group); this.commandKey = initCommandKey(key, getClass()); this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); //初始化熔断器 this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); //初始化线程池 this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //Strategies from plugins this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); this.executionHook = initExecutionHook(executionHook); this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); /* fallback semaphore override if applicable */ this.fallbackSemaphoreOverride = fallbackSemaphore; /* execution semaphore override if applicable */ this.executionSemaphoreOverride = executionSemaphore; }}
三,线程池与熔断器的初始化
先剖析线程池初始化,再剖析熔断器的初始化.
线程池的初始化
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); this.threadPool = this.metrics.getThreadPool(); this.queue = this.threadPool.getQueue(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); } public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); final int dynamicCoreSize = threadPoolProperties.coreSize().get(); final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize); //判断允不容许设置最大的线程数 if (allowMaximumSizeToDivergeFromCoreSize) { final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); if (dynamicCoreSize > dynamicMaximumSize) { return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { //最大线程数就等于外围数 return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } }
这个就是创立线程池外围的代码了。须要留神下的是,通过threadPoolKey和groupKey的逻辑作为key去工厂中取相应的线程池,没有则创立,所以就说如果两个HystrixCommand的threadPoolKey雷同时会用同一个线程池,如果不存在threadPoolKey状况下,如果groupKey是雷同的话也会用同一个线程池。
熔断器的初始化
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { if (enabled) { if (fromConstructor == null) { // get the default implementation of HystrixCircuitBreaker return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics); } else { return fromConstructor; } } else { return new NoOpCircuitBreaker(); }} public static class Factory { private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>(); public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name()); if (previouslyCached != null) { return previouslyCached; } HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics)); if (cbForCommand == null) { // this means the putIfAbsent step just created a new one so let's retrieve and return it return circuitBreakersByCommand.get(key.name()); } else { return cbForCommand; } } } /* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; /* track whether this circuit is open/closed at any given point in time (default to false==closed) */ private AtomicBoolean circuitOpen = new AtomicBoolean(false); /* when the circuit was marked open or was last allowed to try a 'singleTest' */ private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; } }
熔断器初始化代码上整体构造和初始化线程池的过程差不多,都是通过工厂类外面的ConcurrentHashMap来治理熔断器,且key也是依据HystrixCommandKey来做判断