关于hystrix:hystrix源码分析二

先温习下Hystrix的整体流程 结构一个 HystrixCommand或HystrixObservableCommand对象,用于封装申请,并在构造方法配置申请被执行须要的参数;执行命令,Hystrix提供了4种执行命令的办法判断是否应用缓存响应申请,若启用了缓存,且缓存可用,间接应用缓存响应申请。Hystrix反对申请缓存,但须要用户自定义启动;判断熔断器是否关上,如果关上,执行第8步;判断线程池/队列/信号量是否已满,已满则执行第8步;执行HystrixObservableCommand.construct()或HystrixCommand.run(),如果执行失败或者超时,执行第8步;否则,跳到第9步;统计熔断器监控指标;走Fallback备用逻辑返回申请响应一,execute办法剖析承接上篇,在HystrixCommandAspect这个切面里会创立HystrixInvokable对象,进而执行。 Object result; try { if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause() != null ? e.getCause() : e; } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); }这里就来剖析下execute的流程。Hystrix是反对同步,异步,察看这个三个模式的,咱们只看同步,调用链路是:HystrixCommand.execute() -> queue() -> toObservable() public Observable<R> toObservable() { .... 一些action的定义 .... final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { public Observable<R> call() { if(this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED)){ return Observable.never() }else{ applyHystrixSemantics(AbstractCommand.this); } } }; ... return Observable.defer(new Func0<Observable<R>>() { public Observable<R> call() { ...判断是否开启缓存,对应上整体流程的3步... boolean requestCacheEnabled = AbstractCommand.this.isRequestCachingEnabled(); String cacheKey = AbstractCommand.this.getCacheKey(); if (requestCacheEnabled) { //拿去缓存,如果存在缓存的话,间接返回 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks); Observable afterCache; if (requestCacheEnabled && cacheKey != null) { ... 缓存后续的一些判断..... } else { afterCache = hystrixObservable; } return afterCache.doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); } });}call外面的办法主要用途: ...

February 12, 2022 · 8 min · jiezi

关于hystrix:hystrix源码分析一

一,类图 二,HystrixCommandAspect切面解析及HystrixCommand对象创立咱们在应用Hystrix的时候个别会应用@HystrixCommand注解,再设置好相干参数与fallback逻辑后就能够了,那@HystrixCommand是如何解析的呢?解析完了又做了哪些封装呢?咱们一起来看看源码。 @Aspectpublic class HystrixCommandAspect { @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } //aop监控的办法 @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable { Method method = AopUtils.getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint}); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time"); } else { HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP .get(HystrixCommandAspect.HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //构建hystrixCommand的实现类 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); try { Object result; if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = this.executeObservable(invokable, executionType, metaHolder); } return result; } catch (...) { ... } } }}答案就是HystrixCommandAspect这个切面,它会解析@HystrixCommand注解。重点咱们再看下: ...

February 12, 2022 · 3 min · jiezi

关于hystrix:hystrix使用示例

hystrix三种降级策略,别离是: 熔断触发降级超时触发降级资源隔离触发降级,又分了线程池与信号量两种上面联合示例别离介绍下。 一,熔断触发降级1,当某个服务失败率达到肯定限度时将开启熔断器,这个服务后续再被调用时会被间接拒绝执行fallback逻辑(被调用方服务呈现了问题,调用方进行熔断)2,熔断器关上的两个条件 申请数达到设定的阀值申请谬误占比达到设定的阀值3,示例 /** * HystrixProperty的参数可参考 hystrixCommandProperties * 熔断触发降级 * @return * 10s内当发动了超过5次申请,且失败率超过50%,熔断主动开启, * 从熔断开启到后续5s之内的申请,都不会进入到办法里,并 * 且间接触发fallback这个回调办法返回。 */ @GetMapping("/circuitBreaker/{num}") @HystrixCommand(commandProperties = { //开启熔断器性能 @HystrixProperty (name = "circuitBreaker.enabled" ,value = "true"), //设置最小申请数 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value ="5"), //熔断工夫5秒 @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds" , value ="5000"), //谬误流程比例 @HystrixProperty(name="circuitBreaker.errorThresholdPercentage",value = "50") } ,fallbackMethod = "fallback") public String circuitBreaker(@PathVariable("num")int num){ if(num%2==0){ return "失常拜访"; } throw new RuntimeException(""); } //入参加申请办法入参需统一 public String fallback(int num){ return "熔断触发降级"; }关上熔断器开关 ...

February 12, 2022 · 2 min · jiezi

关于hystrix:hystrix学习

一,hystrix整体流程 结构一个 HystrixCommand或HystrixObservableCommand对象,用于封装申请,并在构造方法配置申请被执行须要的参数;执行命令,Hystrix提供了4种执行命令的办法判断是否应用缓存响应申请,若启用了缓存,且缓存可用,间接应用缓存响应申请。Hystrix反对申请缓存,但须要用户自定义启动;判断熔断器是否关上,如果关上,执行第8步;判断线程池/队列/信号量是否已满,已满则执行第8步;执行HystrixObservableCommand.construct()或HystrixCommand.run(),如果执行失败或者超时,执行第8步;否则,跳到第9步;统计熔断器监控指标;走Fallback备用逻辑返回申请响应留神:第5步线程池/队列/信号量已满时,还会执行第7步逻辑,更新熔断器统计信息,而第6步无论胜利与否,都会更新熔断器统计信息。 hystrxi设计理念 应用命令模式将所有对外部服务(或依赖关系)的调用包装在HystrixCommand或HystrixObservableCommand对象中,并将该对象放在独自的线程中执行;每个依赖都保护着一个线程池(或信号量),线程池被耗尽则拒绝请求(而不是让申请排队)。记录申请胜利,失败,超时和线程回绝。服务谬误百分比超过了阈值,熔断器开关主动关上,一段时间内进行对该服务的所有申请。申请失败,被回绝,超时或熔断时执行降级逻辑。近实时地监控指标和配置的批改。二,hystrix容错一,资源隔离 资源隔离次要指对线程的隔离。Hystrix提供了两种线程隔离形式:线程池和信号量,默认为线程池。应用线程池时,发送申请的线程和执行依赖服务的线程不是同一个;而应用信号量时,发送申请的线程和执行依赖服务的线程是同一个,都是发动申请的线程。 线程切换反对异步反对超时反对熔断限流开销信号量否否否是是小线程池是是是是是大线程池和信号量都反对熔断和限流。相比线程池,信号量不须要线程切换,如果是tomcat服务,那信号量应用的就是tomcat的线程而线程池则是tomcat线程创立的线程池,因而防止了不必要的开销。然而信号量不反对异步,也不反对超时,也就是说当所申请的服务不可用时,信号量会管制超过限度的申请立刻返回,然而曾经持有信号量的线程只能期待服务响应或从超时中返回,即可能呈现长时间期待。线程池模式下,当超过指定工夫未响应的服务,Hystrix会通过响应中断的形式告诉线程立刻完结并返回。 基于下面两者的特点,咱们能够晓得它们别离实用的场景: 信号量 申请并发大且耗时短(本服务的内存操作等);不会拜访依赖内部的服务,因为信号量是不反对超时的,它解决不了timeout的问题线程池 申请并发大且耗时长;因为线程池是反对超时的,所以拜访的服务是否依赖内部都行;如果利用恰好适宜异步执行那线程池会是一个不错的抉择。当然线程池也是须要保护的所以线程上下文切换也会有开销二,熔断 熔断器里6个重要的参数: circuitBreaker.enabled是否启用熔断器,默认是TRUE。circuitBreaker.forceOpen熔断器强制关上,始终保持关上状态,不关注熔断开关的理论状态。默认值FLASE。circuitBreaker.forceClosed熔断器强制敞开,始终保持敞开状态,不关注熔断开关的理论状态。默认值FLASE。circuitBreaker.errorThresholdPercentage错误率,默认值50%,例如一段时间(10s)内有100个申请,其中有54个超时或者异样,那么这段时间内的错误率是54%,大于了默认值50%,这种状况下会触发熔断器关上。circuitBreaker.requestVolumeThreshold默认值20。含意是一段时间内至多有20个申请才进行errorThresholdPercentage计算。比方一段时间了有19个申请,且这些申请全副失败了,错误率是100%,但熔断器不会关上,总申请数不满足20。circuitBreaker.sleepWindowInMilliseconds半开状态试探睡眠工夫,默认值5000ms。如:当熔断器开启5000ms之后,会尝试放过来一部分流量进行试探,确定依赖服务是否复原。断路开启,也就是由 close 转换到 open 状态(close -> open)。那么之后在 SleepWindowInMilliseconds 工夫内(默认值5000ms),所有通过该断路器的申请全副都会被断路,不调用后端服务,间接走 fallback 降级机制。 而在该参数工夫过后,断路器会变为 half-open 半开闭状态,尝试让一条申请通过断路器,看能不能失常调用。如果调用胜利了,那么就主动复原,断路器转为 close 状态。 三,降级 降级,通常指务高峰期,为了保障外围服务失常运行,须要停掉一些不太重要的业务,或者某些服务不可用时,执行备用逻辑从故障服务中疾速失败或疾速返回,以保障主体业务不受影响。Hystrix提供的降级次要是为了容错,保障以后服务不受依赖服务故障的影响,从而进步服务的健壮性。要反对回退或降级解决,能够重写HystrixCommand的getFallBack办法或HystrixObservableCommand的resumeWithFallback办法。 Hystrix在以下几种状况下会走降级逻辑: 执行construct()或run()抛出异样熔断器关上导致命令短路命令的线程池和队列或信号量的容量超额,命令被回绝命令执行超时降级回退形式有多种,这里列举两种: 疾速失败,疾速失败是最一般的命令执行办法,命令没有重写降级逻辑。 如果命令执行产生任何类型的故障,它将间接抛出异样。无声失败,指在降级办法中通过返回null,空Map,空List或其余相似的响应来实现。咱们能够依据本人的业务须要开发降级计划,然而须要留神降落级逻辑是否会出异样的可能。 参考的文章:深刻 Hystrix 断路器执行原理深刻 Hystrix 线程池隔离与接口限流Hystrix原理与实战

February 11, 2022 · 1 min · jiezi

关于hystrix:分布式RPC框架Dubbo实现服务治理集成Kryo实现高速序列化集成Hystrix实现熔断器

Dubbo+Kryo实现高速序列化Dubbo RPC是Dubbo体系中最外围的一种高性能,高吞吐量的近程调用形式,是一种多路复用的TCP长连贯调用: 长连贯: 防止每次调用新建TCP连贯,进步调用的响应速度多路复用: 单个TCP连贯可交替传输多个申请和响应的音讯,升高了连贯的等待时间,从而缩小了同样并发数的状况下网络连接数,进步了零碎的云吞吐量Dubbo RPC次要用于两个Dubbo之间的近程调用,适宜高并发,小数据的互联网场景.序列化对于近程调用的响应速度,吞吐量,网络带宽耗费等同样也起着至关重要的作用,是晋升分布式系统性能的最关键因素之一Dubbo中反对的序列化形式: dubbo序列化: 阿里的高效java序列化实现hessian2序列化: hessian是一种高效跨语言的二进制序列化形式.这里不是原生的hessian2序列化,而是阿里批改过的hessian lite,是Dubbo RPC默认启动的序列化形式json序列化: 目前有两种实现- 采纳阿里的fastjson库采纳dubbo中实现的简略json库json这种文本序列化性能不如dubbo序列化,hessian2序列化这两种二进制序列化java序列化: 次要采纳JDK自带的Java序列化实现,性能差序列化形式: 针对Java语言的序列化形式:Kryo,FST跨语言的序列化形式:Protostuff,ProtoBuf,Thrift,Avro,MsgPack 序列化:1.序列化(serialization)在计算机科学的材料解决中,是指将数据结构或物件状态转换成可取用格局(例如存成档案,存于缓冲,或经由网络中传送),以留待后续在雷同或另一台计算机环境中,能复原原先状态的过程。按照序列化格局从新获取字节的后果时,能够利用它来产生与原始物件雷同语义的正本。2.简略的来讲就是将某种数据结构或者对象转换成一种数据格式,数据格式能够通过网络传送或者存入数据库中,同时能够依据数据格式还原出原来的数据结构(反序列化)。在 Java 中,对象只有在 JVM 运行时才会存在,如果想要把对象存储到本地或者发送到近程的服务器,则必须通过序列化将对象转换成相应的字节而后进行存储或者传送,之后再将字节组装成对象。3.在以下场景中都会遇到序列化: 3.1将对象状态保留到文件或者数据库中 3.2通过 socket 在网络中传送对象 3.3通过RMI(近程办法调用)传输对象在面向生产的环境中,应用Dubbo+Kryo实现序列化: 引入Kryo依赖kryo-serializers <dependency> <groupId>de.javakaffee</groupId> <artifactId>kryo-serializers</artifactId> <version>0.42</version></dependency> 配置文件中减少配置 dubbo.protocol. serialization=kryo注册被序列化类 要让Kryo施展高性能,须要将须要被序列化的实体类注册到Dubbo零碎中,实现如下回调接口: public class SerializationOptimizerImpl implements SerializationOptimizerImpl{public Collection<class> getSerializableClasses(){ List<Class> classes=new LinkedList<class>(); classes.add(provider.class); classes.add(consumer.class); return classes;}}配置文件中减少配置 dubbo.protocol.optimizer=com.oxford.SerializationOptimizerImpl注册这些类后,序列化的性能大大晋升,特地是针对小数量的嵌套对象1.为什么须要手动注册,不在配置文件中注册?因为要注册的类往往数量较多,导致配置文件简短在没有好的IDE反对下,配置文件的编写和重构都比Java类简单得多这些注册的类个别是不须要在我的项目编译打包后还须要动静批改的2.为什么不必@annotation标注而后零碎发现并注册?因为annotation只能用来标注你能够批改的类,很多序列化的类是无奈批改的(第三方库,JDK零碎和其它我的项目的类)3.除了annotation,能够用其它形式来主动注册被序列化的类,如扫描门路,主动发现实现Serializable接口(甚至包含Externalizable)的类并注册,类门路上找到Serializable类可能十分多,能够用package前缀来肯定水平限定扫描范畴在主动注册机制中,要保障服务提供端和生产端以同样的程序(或者ID)来注册类,防止错位.因为可被发现而后注册的类的数量可能都是不一样的==留神:==(无参构造函数和Serializable接口)如果被序列化的类,不蕴含无参构造函数,则会导致Kryo序列化性能升高.因为底层将会应用Java的序列化来通明取代Kryo序列化.尽可能为每一个被序列化的类增加无参构造函数(Java类如果不自定义构造函数,默认就有无参构造函数)Kryo和FST都不须要被序列化类实现Serializable接口,但还是须要每个序列化类都去实现Serializable接口,放弃和Java序列化以及dubbo序列化兼容性 Dubbo+Hystrix实现服务熔断熔断器: 在微服务架构中,依据业务拆分成一个个的服务,服务服务之间通过RPC互相调用为了保障高可用,单个服务采纳集群部署,因为网络或者本身的起因,服务不能保障100%可用如果单个服务呈现问题,调用这个服务就会呈现呈现线程阻塞,此时若大量的申请涌入,servlet容器的线程就会被耗费结束,导致服务瘫痪,服务与服务之间的依赖性会导致故障流传,进而导致整个微服务瘫痪,这就是"服务雪崩效应"为了解决服务雪崩效应,提出熔断器的模型熔断器模型: 底层的服务呈现故障,会导致连锁故障当对特定服务调用的不可用达到一个阈值(Hystrix默认5秒20次),熔断器就会被关上熔断器关上后,为了防止连锁故障,通过fallback办法间接返回一个固定值 Dubbo Provider中应用熔断器在Provider(服务提供者)中减少依赖spring-cloud-starter-netflix-hystrix在主类中标注@EnableHystrix注解在接口实现类的服务调用办法上标注@HystrixCommand注解,调用Hystrix代理 能够在@HystrixCommand中的@HystrixProperty中配置阈值Dubbo Consumer中应用熔断器在Consumer(服务消费者)中减少依赖spring-cloud-starter-netflix-hystrix在主类上标注@EnableHystrix注解在调用类controller中的调用办法上标注 @HystrixCommand(fallback="熔断返回页面的办法名")Dubbo+Hystrix熔断器仪表盘在Provider和Consumer中都须要配置Hystrix仪表盘,配置形式统一 Dubbo+Hystrix配置熔断器仪表盘减少Hystrix仪表盘依赖spring-cloud-starter-netflix-hystrix-dashboard在主类上标注@EnableHystrixDashboard注解开启Hystrix仪表盘性能创立hystrix.stream(监控门路)的Servlet配置 @Configurationpublic class HystrixDashBoardConfiguration{ @Bean public ServletRegistrationBean getServlet(){ HystrixMetricsStreamServlet streamServlet=new HystrixMetricsStreamServlet(); ServletRegistrationBean registrationBean=new ServletRegistrationBean(streamServlet); registrationBean.setLoadOnStartup(1); registrationBean.addUrlMappings("/hystrix.stream"); registrationBea.setName("HystrixMetricsStreamServlet"); return registrationBean; }}Hystrix阐明触发fallback办法参数形容FAILURE执行抛出异样TIMEOUT执行开始,但没有在指定的工夫内实现SHORT_CIRCUITED断路器关上,不尝试执行THREAD_POOL_REJECTED线程池回绝,不尝试执行SEMAPHORE_REJECTED信号量回绝,不尝试执行fallback办法抛出异样参数形容FALLBACK_FAILUREFallback执行抛出出错FALLBACK_REJECTEDFallback信号量回绝,不尝试执行FallBack_MISSING没有Fallback实例Hystrix罕用配置信息超时工夫(默认1000ms)hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds: 在Consumer中配置,Provider的所有办法的超时工夫都是该值,优先级低于上面的指定配置hystrix.command.HystrixCommandKey.execution.isolation.thread.timeoutInMilliseconds: 在Consumer中配置,Provider的指定办法(HystrixCommandKey办法名)的超时工夫都是该值 ...

May 19, 2021 · 1 min · jiezi

关于hystrix:spring-cloud-hystrix-简易配置

个别和feign一块解决 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>启用hystrix# 启用hystrixfeign.hystrix.enabled=trueService a 调用 service bpackage com.itheima.hystrix.servicea.agent;import feign.hystrix.FallbackFactory;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.GetMapping;@FeignClient(name = "feign-hystrix-service-b",fallbackFactory = ServiceBAgentHystrix.class)public interface ServiceBAgent { @GetMapping("/service-b/service") String service();}@Componentclass ServiceBAgentHystrix implements FallbackFactory<ServiceBAgent>{ @Override public ServiceBAgent create(Throwable cause) { return new ServiceBAgent() { @Override public String service() { return "service-b熔断..."; } }; }}

December 29, 2020 · 1 min · jiezi

关于hystrix:Hystrix-如何解决-ThreadLocal-信息丢失

本文分享 ThreadLocal 遇到 Hystrix 时上下文信息传递的计划。 一、背景笔者在业务开发中波及到应用 ThreadLocal 来寄存上下文链路中一些要害信息,其中一些业务实现对外部接口依赖,对这些依赖接口应用了Hystrix作熔断爱护,但在应用Hystrix作熔断爱护的办法中发现了获取 ThreadLocal 信息与预期不统一问题,本文旨在探讨如何解决这一问题。 二、ThreadLocal在Java编程语言里ThreadLocal是用来不便开发人员在同一线程上下文中不同类、不同办法中共享信息的,ThreadLocal变量不受其余线程的影响,不同线程间互相隔离,也就是线程平安的。在理论的业务链路中从入口到具体的业务实现有时候须要共享某些通用信息,比方用户惟一标识、链路追踪惟一标识等,这些信息就能够应用ThreadLocal来存储实现,上面就是一个简略的同一链路中共享traceId的示例代码。 public class ThreadLocalUtil { private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>(); public static void setTraceId(String traceId) { TRACE_ID.set(traceId); } public static String getTraceId() { return TRACE_ID.get(); } public static void clearTraceId() { TRACE_ID.remove(); }}三、Hystrix在分布式环境中,每个零碎所依赖的内部服务不可避免的会呈现失败或超时的状况,Hystrix 通过减少对依赖服务的延时容错及失败容错逻辑,也就是所谓的「熔断」,以帮忙开发人员去灵便管制所依赖的分布式服务。 Hystrix通过隔离服务间的拜访点,阻断服务间的级联故障,并提供降级选项,这一切都是为了提供零碎整体的健壮性,在大规模分布式服务中,零碎的健壮性尤其重要。Hystrix具体的介绍能够看:Hystrix介绍 四、ThreadLocal遇上Hystrix当业务链路中的具体实现有依赖内部服务,且作了相干熔断爱护,那么本文的两个配角就这么遇上了。 依据Hystrix的相干文档介绍咱们理解到,Hystrix提供两种线程隔离模式:信号量和线程池。 信号量模式下执行业务逻辑时处于同一线程上下文,而线程池模式则应用Hystrix提供的线程池去执行相干业务逻辑。在日常业务开发中更多须要熔断的是波及到内部网络IO调用的(如RPC调用),Hystrix存在的一个目标就是想缩小内部依赖的调用对服务容器线程的耗费,信号量模式显然不太适宜,因而咱们在绝大部分场景下应用的都是线程池模式,而Hystrix默认状况下启用的也是线程池模式。 本文想要解决的也正是在这种默认模式下才会有的问题: 1、InheritableThreadLocal有人可能会想到是不是能够用InheritableThreadLocal去解决? InheritableThreadLocal能够将以后线程中的线程变量信息共享到以后线程所创立的「子线程」中,但这边疏忽了一个很重要的信息,Hystrix中的线程模式底层应用的是本人保护的一个线程池,也就是其中的线程会呈现复用的状况,那么就会呈现每个线程所共享的信息都是之前首次获取到的「父线程」的共享信息,这显然不是咱们所期待的,所以InheritableThreadLocal被排除。 那么想要在Hystrix中解决这个问题怎么办? 优良的Hystrix曾经帮大家提供了相干解决方案,而且是插件化,按需定制。Hystrix的插件具体介绍请看这:Hystrix插件介绍,本文给大家介绍两种计划。 如何让ThreadLocal变量信息在HystrixCommand执行时能在Hystrix线程中正确的传递? 2、Concurrency Strategy应用 HystrixConcurrencyStrategy插件能够来包装Hystrix线程所执行的办法,具体间接看示例代码: public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { String traceId = ThreadLocalUtil.getTraceId(); return () -> { ThreadLocalUtil.setTraceId(traceId); try { return callable.call(); } finally { ThreadLocalUtil.clearTraceId(); } }; }} // 业务代码中某处适合的中央注册下以后的策略插件HystrixPlugins.getInstance().registerConcurrencyStrategy(new MyHystrixConcurrencyStrategy());应用这种形式非常简单,只有开发人员将本人关注的ThreadLocal值进行「复制」即可,那是不是应用这种形式就行了? ...

November 10, 2020 · 3 min · jiezi

关于hystrix:第五阶段1030

ribbonribbon 提供了负载平衡和重试性能, 它底层是应用 RestTemplate 进行 Rest api 调用 RestTemplateRestTemplate是SpringBoot提供的一个Rest近程调用工具它的罕用办法:getForObject() 执行get申请postForObject() 执行post申请 ribbon负载平衡和重试 Ribbon负载平衡 RestTemplate 设置 @LoadBalanced@LoadBalanced 负载平衡注解,会对 RestTemplate 实例进行封装,创立动静代理对象,并切入(AOP)负载平衡代码,把申请散发到集群中的服务器 ribbon重试1.增加spring-retry依赖2.ribbon: MaxAutoRetriesNextServer: 2 MaxAutoRetries: 1 OkToRetryOnAllOperations: trueOkToRetryOnAllOperations=true默认只对GET申请重试, 当设置为true时, 对POST等所有类型申请都重试MaxAutoRetriesNextServer更换实例的次数MaxAutoRetries以后实例重试次数,尝试失败会更换下一个实例 Hystrix断路器微服务宕机时,ribbon无奈转发申请 增加hystrix依赖批改application.ymlstring: application: name: hystrix主程序增加 @EnableCircuitBreaker启用hystrix断路器启动断路器,断路器提供两个外围性能: 降级,超时、出错、不可达到时,对服务降级,返回错误信息或者是缓存数据熔断,当服务压力过大,谬误比例过多时,熔断所有申请,所有申请间接降级能够应用 @SpringCloudApplication 注解代替三个注解在contller中增加降级办法例如getItems()中增加getItemsFB()增加 @HystrixCommand 注解,指定降级办法名 hystrix超时设置hystrix期待超时后, 会执行降级代码, 疾速向客户端返回降级后果, 默认超时工夫是1000毫秒 为了测试 hystrix 降级,咱们把 hystrix 期待超时设置得十分小(500毫秒) 此设置个别应大于 ribbon 的重试超时时长,例如 10 秒hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 500hystrix dashboard断路器仪表盘hystrix 对申请的降级和熔断,能够产生监控信息,hystrix dashboard能够实时的进行监控actuator 是 spring boot 提供的服务监控工具,提供了各种监控信息的监控端点 ...

October 30, 2020 · 1 min · jiezi

关于hystrix:Hystrix

Hystrix断路器零碎容错限流启动断路器,断路器提供两个外围性能: 降级,超时、出错、不可达到时,对服务降级,返回错误信息或者是缓存数据熔断,当服务压力过大,谬误比例过多时,熔断所有申请,所有申请间接降级降级调用近程服务失败(异样,服务不存在,超时),能够执行以后服务的一段代码,向客户端响应 返回谬误提醒返回缓存数据实现降级 1.增加 hystrx 依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency>2.增加注解 @EnableCircuitBreaker3.近程调用代码上,增加 @HystrixCommand(fallbackMethod="降级办法名")4.实现降级办法 //降级办法的参数和返回值,须要和原始办法统一,办法名任意 public JsonResult<List<Item>> getItemsFB(String orderId) { return JsonResult.err("获取订单商品列表失败"); } public JsonResult decreaseNumberFB(List<Item> items) { return JsonResult.err("更新商品库存失败"); } public JsonResult<User> getUserFB(Integer userId) { return JsonResult.err("获取用户信息失败");} public JsonResult addScoreFB(Integer userId, Integer score) { return JsonResult.err("减少用户积分失败"); } public JsonResult<Order> getOrderFB(String orderId) { return JsonResult.err("获取订单失败"); } public JsonResult addOrderFB() { return JsonResult.err("增加订单失败"); }熔断非凡状况,在特定条件下触发 10秒内20次申请(必须首先满足)50%失败,执行降级代码熔断时,不会向后盾服务调用,而是间接执行以后服务的降级代码返回后果 断路器关上几秒后,会进入“半开状态”,会尝试向后盾服务发送调用,如果失败持续放弃关上状态;如果胜利断路器主动敞开,恢复正常 Hystrix 超时启用 Hystrix 后,它有默认的超时工夫:1秒 ...

September 25, 2020 · 1 min · jiezi

全栈之路微服务课程12Hystrix之初遇见

简介Hystrix是Netflix开源的一个延迟和容错库,用于隔离访问远程系统、服务或者第三方库,防止级联失败,从而提升系统的可用性与容错性。 断路器刨析实时监测应用,如果发现在一定时间内失败次数/失败率达到一定阈值,就“跳闸”,断路器打开——此时,请求直接返回,而不去调用原本调用的逻辑。跳闸一段时间后(例如15秒),断路器会进入半开状态,这是一个瞬间态,此时允许一次请求调用该调的逻辑,如果成功,则断路器关闭,应用正常调用;如果调用依然不成功,断路器继续回到打开状态,过段时间再进入半开状态尝试——通过”跳闸“,应用可以保护自己,而且避免浪费资源;而通过半开的设计,可实现应用的“自我修复“。 解决方案熔断模式 这种模式主要是参考电路熔断,如果一条线路电压过高,保险丝会熔断,防止火灾。放到我们的系统中,如果某个目标服务调用慢或者有大量超时,此时,熔断该服务的调用,对于后续调用请求,不在继续调用目标服务,直接返回,快速释放资源。如果目标服务情况好转则恢复调用。隔离模式这种模式就像对系统请求按类型划分成一个个小岛的一样,当某个小岛被火烧光了,不会影响到其他的小岛。例如可以对不同类型的请求使用线程池来资源隔离,每种类型的请求互不影响,如果一种类型的请求线程资源耗尽,则对后续的该类型请求直接返回,不再调用后续资源。这种模式使用场景非常多,例如将一个服务拆开,对于重要的服务使用单独服务器来部署,再或者公司最近推广的多中心。 限流模式上述的熔断模式和隔离模式都属于出错后的容错处理机制,而限流模式则可以称为预防模式。限流模式主要是提前对各个类型的请求设置最高的QPS阈值,若高于设置的阈值则对该请求直接返回,不再调用后续资源。这种模式不能解决服务依赖的问题,只能解决系统整体资源分配问题,因为没有被限流的请求依然有可能造成雪崩效应。 降级降级与熔断紧密相关,熔断后业务如何表现,约定一个快速失败的 Fallback,即为服务降级代码实现加依赖<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency>启动类加注解@EnableCircuitBreaker 控制器加实现 测试http://localhost:8010/movies/usersByFeign/1 { "id": 1, "username": "默认用户", "name": "默认用户", "age": 0, "balance": 1}监控持续不断地访问http://localhost:8010/movies/users/1 多次(至少20次),再访问http://localhost:8010/actuator/health,返现断路器已被开启。 { "status": "UP", "details": { "diskSpace": { "status": "UP", "details": { "total": 107374178304, "free": 15773237248, "threshold": 10485760 } }, "db": { "status": "UP", "details": { "database": "MySQL", "hello": 1 } }, "refreshScope": { "status": "UP" }, "discoveryComposite": { "status": "UP", "details": { "discoveryClient": { "status": "UP", "details": { "services": [ "shop-discovery-eureka-ha", "shop-consumer-movie" ] } }, "eureka": { "description": "Remote status from Eureka server", "status": "UP", "details": { "applications": { "SHOP-DISCOVERY-EUREKA-HA": 1, "SHOP-CONSUMER-MOVIE": 1 } } } } }, "hystrix": { "status": "CIRCUIT_OPEN", "details": { "openCircuitBreakers": [ "MovieController::findByIdByFeign" ] } } }}

September 20, 2019 · 1 min · jiezi

全栈之路微服务课程13Feign之Hystrix

前言默认Feign是不启用Hystrix的,需要添加如下配置启用Hystrix,这样所有的Feign Client都会受到Hystrix保护! 新增配置feign: hystrix: enabled: true提供Fallback@FeignClient(name = "microservice-provider-user", fallback = UserFeignClientFallback.class)public interface UserFeignClient { @GetMapping("/users/{id}") User findById(@PathVariable("id") Long id);}@Componentclass UserFeignClientFallback implements UserFeignClient { @Override public User findById(Long id) { return new User(id, "默认用户", "默认用户", 0, new BigDecimal(1)); }}获取原因@FeignClient(name = "shop-provider-user", fallbackFactory = UserFeignClientFallbackFactory.class)public interface UserFeignClient { @GetMapping("/users/{id}") User findById(@PathVariable("id") Long id);}@Component@Slf4jclass UserFeignClientFallbackFactory implements FallbackFactory<UserFeignClient> { @Override public UserFeignClient create(Throwable throwable) { return new UserFeignClient() { @Override public User findById(Long id) { log.error("进入回退逻辑", throwable); return new User(id, "默认用户", "默认用户", 0, new BigDecimal(1)); } }; }}

September 20, 2019 · 1 min · jiezi

跟我学SpringCloud-第五篇熔断监控Hystrix-Dashboard和Turbine

SpringCloud系列教程 | 第五篇:熔断监控Hystrix Dashboard和TurbineSpringboot: 2.1.6.RELEASESpringCloud: Greenwich.SR1 如无特殊说明,本系列教程全采用以上版本 Hystrix-dashboard是一款针对Hystrix进行实时监控的工具,通过Hystrix Dashboard我们可以在直观地看到各Hystrix Command的请求响应时间, 请求成功率等数据。但是只使用Hystrix Dashboard的话, 你只能看到单个应用内的服务信息, 这明显不够。我们需要一个工具能让我们汇总系统内多个服务的数据并显示到Hystrix Dashboard上, 这个工具就是Turbine。 1. Hystrix Dashboard创建一个新的项目hystrix-dashboard,延用上一篇文章提到的eureka和producer两个项目。 1. hystrix-dashboard pom.xml 依赖包管理<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.springcloud</groupId> <artifactId>hystrix-dashboard</artifactId> <version>0.0.1-SNAPSHOT</version> <name>hystrix-dashboard</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR1</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>前面介绍过的包我这里不再多说,讲几个前面没有见过的包: ...

September 10, 2019 · 2 min · jiezi

跟我学SpringCloud-第四篇熔断器Hystrix

Springboot: 2.1.6.RELEASESpringCloud: Greenwich.SR1 如无特殊说明,本系列教程全采用以上版本 1. 熔断器服务雪崩在正常的微服务架构体系下,一个业务很少有只需要调用一个服务就可以返回数据的情况,这种比较常见的是出现在demo中,一般都是存在调用链的,比如A->B->C->D,如果D在某一个瞬间出现问题,比如网络波动,io偏高,导致卡顿,随着时间的流逝,后续的流量继续请求,会造成D的压力上升,有可能引起宕机。 你以为这就是结束么,图样图森破,这才是噩梦的开始,在同一个调用链上的ABC三个服务都会随着D的宕机而引发宕机,这还不是结束,一个服务不可能只有一个接口,当它开始卡顿宕机时,会影响到其他调用链的正常调用,最终导致所有的服务瘫痪。 如下图所示: 熔断器相信大家都知道家用电闸,原来老式的电闸是使用保险丝的(现在很多都是空气开关了),当家里用电量过大的时候,保险丝经常烧断,这么做是保护家里的用电器,防止过载。 熔断器的作用和这个很像,它可以实现快速失败,如果在一段时间内服务调用失败或者异常,会强制要求当前调用失败,不在走远程调用,走服务降级操作(返回固定数据或者其他一些降级操作)。从而防止应用程序不断地尝试执行可能会失败的操作,使得应用程序继续执行而不用等待修正错误,或者浪费CPU时间去等到长时间的超时产生。熔断器也可以自动诊断错误是否已经修正,如果已经修正,应用程序会再次尝试调用操作。 熔断器模式就像是那些容易导致错误的操作的一种代理。这种代理能够记录最近调用发生错误的次数,然后决定使用允许操作继续,或者立即返回错误。 Hystrix会有一个熔断时间窗口,具体转换逻辑如下: 熔断器就是保护服务高可用的最后一道防线。 2. Hystrix1. 断路器机制断路器很好理解, 当Hystrix Command请求后端服务失败数量超过一定比例(默认50%), 断路器会切换到开路状态(Open)。这时所有请求会直接失败而不会发送到后端服务。断路器保持在开路状态一段时间后(默认5秒), 自动切换到半开路状态(HALF-OPEN)。这时会判断下一次请求的返回情况, 如果请求成功, 断路器切回闭路状态(CLOSED), 否则重新切换到开路状态(OPEN)。Hystrix的断路器就像我们家庭电路中的保险丝, 一旦后端服务不可用, 断路器会直接切断请求链, 避免发送大量无效请求影响系统吞吐量, 并且断路器有自我检测并恢复的能力。 2. FallbackFallback相当于是降级操作。对于查询操作, 我们可以实现一个fallback方法, 当请求后端服务出现异常的时候, 可以使用fallback方法返回的值。fallback方法的返回值一般是设置的默认值或者来自缓存。 3. 资源隔离在Hystrix中, 主要通过线程池来实现资源隔离。通常在使用的时候我们会根据调用的远程服务划分出多个线程池。例如调用产品服务的Command放入A线程池, 调用账户服务的Command放入B线程池。这样做的主要优点是运行环境被隔离开了。这样就算调用服务的代码存在bug或者由于其他原因导致自己所在线程池被耗尽时, 不会对系统的其他服务造成影响。但是带来的代价就是维护多个线程池会对系统带来额外的性能开销。如果是对性能有严格要求而且确信自己调用服务的客户端代码不会出问题的话, 可以使用Hystrix的信号模式(Semaphores)来隔离资源。 3. Feign Hystrix上一篇我们使用了producer和consumers,熔断器是只作用在服务调用端,因此上一篇使用到的consumers我们可以直接拿来使用。因为,Feign中已经依赖了Hystrix所以在maven配置上不用做任何改动。 1. 配置文件application.yml新增server: port: 8081spring: application: name: spring-cloud-consumerseureka: client: service-url: defaultZone: http://localhost:8761/eureka/feign: hystrix: enabled: true其中新增了feign.hystrix.enabled = true 2. 创建fallback类,继承与HelloRemote实现回调的方法package com.springcloud.consumers.fallback;import com.springcloud.consumers.remote.HelloRemote;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.RequestParam;/** * Created with IntelliJ IDEA. * * @User: weishiyao * @Date: 2019/7/2 * @Time: 23:14 * @email: inwsy@hotmail.com * Description: */@Componentpublic class HelloRemoteFallBack implements HelloRemote { @Override public String hello(@RequestParam(value = "name") String name) { return "hello " + name + ", i am fallback massage"; }}3. 添加fallback属性在HelloRemote类添加指定fallback类,在服务熔断的时候返回fallback类中的内容。 ...

September 10, 2019 · 1 min · jiezi

微服务架构之容错Hystrix

文章来源:http://www.liangsonghua.me作者介绍:京东资深工程师-梁松华,长期关注稳定性保障、敏捷开发、JAVA高级、微服务架构 一、容错的必要性假设单体应用可用率为99.99%,即使拆分后每个微服务的可用率还是保持在99.99%,总体的可用率还是下降的。因为凡是依赖都可能会失败,凡是资源都是有限制的,另外网络并不可靠。有可能一个很不起眼的微服务模块高延迟最后导致整体服务不可用 二、容错的基本模块1、主动超时,一般设置成2秒或者5秒超时时间2、服务降级,一般会降级成直接跳转到静态CDN托底页或者提示活动太火爆,以免开天窗3、限流,一般使用令牌机制限制最大并发数4、隔离,对不同依赖进行隔离,容器CPU绑核就是一种隔离措施5、弹性熔断,错误数达到一定阀值后,开始拒绝请求,健康检查发现恢复后再次接受请求三、Hystrix主要概念Hystrix流程 想要使用Hystrix,只需要继承HystrixCommand或者HystrixObservableCommand并重写业务逻辑方法即可,区别在于HystrixCommand.run()返回一个结果或者异常,HystrixObservableCommand.construct()返回一个Observable对象 编者按:关于反应式编程可参考文章Flux反应式编程结合多线程实现任务编排 Hystrix真正执行命令逻辑是通过execute()、queue()、observe()、toObservable()的其中一种,区别在于execute是同步阻塞的,queue通过myObservable.toList().toBlocking().toFuture()实现异步非阻塞,observe是事件注册前执行,toObservable是事件注册后执行,后两者是基于发布和订阅响应式的调用 每个熔断器默认维护10个bucket,每秒一个bucket,每个bucket记录成功,失败,超时,拒绝的状态,默认错误超过50%且10秒内超过20个请求才进行中断拦截。当断路器打开时,维护一个窗口,每过一个窗口时间,会放过一个请求以探测后端服务健康状态,如果已经恢复则断路器会恢复到关闭状态 当断路器打开、线程池提交任务被拒绝、信号量获得被拒绝、执行异常、执行超时任一情况发生都会触发降级fallBack,Hystrix提供两种fallBack方式,HystrixCommand.getFallback()和HystrixObservableCommand.resumeWithFallback() 四、线程和信号量隔离 1、线程隔离,针对不同的服务依赖创建线程池2、信号量隔离,本质是一个共享锁。当信号量中有可用的许可时,线程能获取该许可(seaphore.acquire()),否则线程必须等待,直到有可用的许可为止。线程用完必须释放(seaphore.release())否则其他线程永久等待 五、Hystrix主要配置项 六、使用1、请求上下文,下面将要提到的请求缓存、请求合并都依赖请求上下文,我们可以在拦截器中进行管理 public class HystrixRequestContextServletFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } }}2、请求缓存,减少相同参数请求后端服务的开销,需要重写getCacheKey方法返回缓存key public class CommandUsingRequestCache extends HystrixCommand<Boolean> { private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); }}3、请求合并。请求合并在Nginx静态资源加载中也很常见,Nginx使用的是nginx-http-concat扩展模块。但是在Hystric中请求合并会导致延迟增加,所以要求两者启动执行间隔时长足够小,减少等待合并的时间,超过10ms间隔不会自动合并 ...

July 12, 2019 · 2 min · jiezi

微服务熔断限流Hystrix之流聚合

简介上一篇介绍了 Hystrix Dashboard 监控单体应用的例子,在生产环境中,监控的应用往往是一个集群,我们需要将每个实例的监控信息聚合起来分析,这就用到了 Turbine 工具。Turbine有一个重要的功能就是汇聚监控信息,并将汇聚到的监控信息提供给Hystrix Dashboard来集中展示和监控。 流程 实验工程说明工程名端口作用eureka-server8761注册中心service-hi8762服务提供者service-consumer8763服务消费者service-turbine8765Turbine服务核心代码eureka-server 、service-hi、service-consumer 工程代码与上一节 微服务熔断限流Hystrix之Dashboard 相同,下面是 service-turbine 工程的核心代码。 pom.xml<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-turbine</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId></dependency>application.ymlserver: port: 8765spring: application: name: service-turbineeureka: client: service-url: defaultZone: http://localhost:8761/eureka/turbine: app-config: service-consumer cluster-name-expression: new String("default") combine-host-port: true参数说明: turbine.app-config:指定要监控的应用名turbine.cluster-name-expression:指定集群的名字turbine.combine-host-port:表示同一主机上的服务通过host和port的组合来进行区分,默认情况下是使用host来区分,这样会使本地调试有问题启动类@SpringBootApplication@EnableEurekaClient@EnableHystrixDashboard@EnableTurbinepublic class ServiceTurbineApplication { public static void main(String[] args) { SpringApplication.run( ServiceTurbineApplication.class, args ); }}模拟多实例启动多个 service-consumer 工程,来模拟多实例,可以通过命令java -jar service-consumer.jar --server.port=XXXX 来实现。 为了方便,在编辑器中实现启动工程。但 idea 不支持单个应用的多次启动, 需要开启并行启动: ...

May 9, 2019 · 1 min · jiezi

微服务熔断限流Hystrix之Dashboard

简介Hystrix Dashboard是一款针对Hystrix进行实时监控的工具,通过Hystrix Dashboard可以直观地看到各Hystrix Command的请求响应时间,请求成功率等数据。 快速上手工程说明工程名端口作用eureka-server8761注册中心service-hi8762服务提供者service-consumer8763服务消费者核心代码eureka-server 工程pom.xml<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency>application.ymlserver: port: 8761eureka: instance: hostname: localhost client: register-with-eureka: false fetch-registry: false service-url: defaultZone: http://${eureka.instance.hostname}:/${server.port}/eureka/spring: application: name: eureka-server启动类@SpringBootApplication@EnableEurekaServerpublic class EurekaServerApplication { public static void main(String[] args) { SpringApplication.run( EurekaServerApplication.class, args ); }}service-hi 工程pom.xml<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>application.ymlserver: port: 8762spring: application: name: service-hieureka: client: service-url: defaultZone: http://localhost:8761/eureka/HelloController@RestControllerpublic class HelloController { @GetMapping("/hi") public String hi() { return "hello ~"; } @GetMapping("/hey") public String hey() { return "hey ~"; } @GetMapping("/oh") public String oh() { return "ah ~"; } @GetMapping("/ah") public String ah() { //模拟接口1/3的概率超时 Random rand = new Random(); int randomNum = rand.nextInt(3) + 1; if (3 == randomNum) { try { Thread.sleep( 3000 ); } catch (InterruptedException e) { e.printStackTrace(); } } return "来了老弟~"; }}启动类@SpringBootApplication@EnableEurekaClientpublic class ServiceHiApplication { public static void main(String[] args) { SpringApplication.run( ServiceHiApplication.class, args ); }}service-consumer 工程pom.xml<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-ribbon</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId></dependency>application.ymlserver: port: 8763 tomcat: uri-encoding: UTF-8 max-threads: 1000 max-connections: 20000spring: application: name: service-consumereureka: client: service-url: defaultZone: http://localhost:8761/eureka/management: endpoints: web: exposure: include: "*" cors: allowed-origins: "*" allowed-methods: "*"HelloService@Servicepublic class HelloService { @Autowired private RestTemplate restTemplate; /** * 简单用法 */ @HystrixCommand public String hiService() { return restTemplate.getForObject("http://SERVICE-HI/hi" , String.class); } /** * 定制超时 */ @HystrixCommand(commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "30000") }) public String heyService() { return restTemplate.getForObject("http://SERVICE-HI/hey" , String.class); } /** * 定制降级方法 */ @HystrixCommand(fallbackMethod = "getFallback") public String ahService() { return restTemplate.getForObject("http://SERVICE-HI/ah" , String.class); } /** * 定制线程池隔离策略 */ @HystrixCommand(fallbackMethod = "getFallback", threadPoolKey = "studentServiceThreadPool", threadPoolProperties = { @HystrixProperty(name="coreSize", value="30"), @HystrixProperty(name="maxQueueSize", value="50") } ) public String ohService() { return restTemplate.getForObject("http://SERVICE-HI/oh" , String.class); } public String getFallback() { return "Oh , sorry , error !"; }}HelloController@RestControllerpublic class HelloController { @Autowired private HelloService helloService; @GetMapping("/hi") public String hi() { return helloService.hiService(); } @GetMapping("/hey") public String hey() { return helloService.heyService(); } @GetMapping("/oh") public String oh() { return helloService.ohService(); } @GetMapping("/ah") public String ah() { return helloService.ahService(); }}启动类@SpringBootApplication@EnableEurekaClient@EnableHystrixDashboard@EnableHystrix@EnableCircuitBreakerpublic class ServiceConsumerApplication { public static void main(String[] args) { SpringApplication.run( ServiceConsumerApplication.class, args ); } @LoadBalanced @Bean public RestTemplate restTemplate() { return new RestTemplate(); }}Hystrix Dashboard 的使用JSON格式监控信息先访问http://localhost:8762/hi 再打开http://localhost:8763/actuator/hystrix.stream,可以看到一些具体的数据: ...

May 6, 2019 · 2 min · jiezi

微服务容错限流Hystrix入门

为什么需要容错限流复杂分布式系统通常有很多依赖,如果一个应用不能对来自依赖 故障进行隔离,那么应用本身就处在被拖垮的风险中。在一个高流量的网站中,某个单一后端一旦发生延迟,将会在数秒内导致 所有应用资源被耗尽(一个臭鸡蛋影响一篮筐)。如秒杀、抢购、双十一等场景,在某一时间点会有爆发式的网络流量涌入,如果没有好的网络流量限制,任由流量压到后台服务实例,很有可能造成资源耗尽,服务无法响应,甚至严重的导致应用崩溃。Hystrix是什么Hystrix 能使你的系统在出现依赖服务失效的时候,通过隔离系统所依赖的服务,防止服务级联失败,同时提供失败回退机制,更优雅地应对失效,并使你的系统能更快地从异常中恢复。 Hystrix能做什么在通过第三方客户端访问(通常是通过网络)依赖服务出现高延迟或者失败时,为系统提供保护和控制在分布式系统中防止级联失败快速失败(Fail fast)同时能快速恢复提供失败回退(Fallback)和优雅的服务降级机制提供近似实时的监控、报警和运维控制手段Hystrix设计原则防止单个依赖耗尽容器(例如 Tomcat)内所有用户线程降低系统负载,对无法及时处理的请求快速失败(fail fast)而不是排队提供失败回退,以在必要时让失效对用户透明化使用隔离机制(例如『舱壁』/『泳道』模式,熔断器模式等)降低依赖服务对整个系统的影响针对系统服务的度量、监控和报警,提供优化以满足近似实时性的要求在 Hystrix 绝大部分需要动态调整配置并快速部署到所有应用方面,提供优化以满足快速恢复的要求能保护应用不受依赖服务的整个执行过程中失败的影响,而不仅仅是网络请求Hystrix设计思想来源舱壁隔离模式货船为了进行防止漏水和火灾的扩散,会将货仓分隔为多个,当发生灾害时,将所在货仓进行隔离就可以降低整艘船的风险。 断路器模式熔断器就像家里的保险丝,当电流过载了就会跳闸,不过Hystrix的熔断机制相对复杂一些。 熔断器开关由关闭到打开的状态转换是通过当前服务健康状况和设定阈值比较决定的. 当熔断器开关关闭时,请求被允许通过熔断器。如果当前健康状况高于设定阈值,开关继续保持关闭。如果当前健康状况低于设定阈值,开关则切换为打开状态。当熔断器开关打开时,请求被禁止通过。当熔断器开关处于打开状态,经过一段时间后,熔断器会自动进入半开状态,这时熔断器只允许一个请求通过。当该请求调用成功时,熔断器恢复到关闭状态。若该请求失败,熔断器继续保持打开状态, 接下来的请求被禁止通过。Hystrix工作流程官网原图 中文版 流程说明每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.执行execute()/queue做同步或异步调用.当前调用是否已被缓存,是则直接返回结果,否则进入步骤 4判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤 8,进行降级策略,如果关闭进入步骤 5判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤 6调用HystrixCommand的run方法.运行依赖逻辑 6.1. 调用是否出现异常,否:继续,是进入步骤8,6.2. 调用是否超时,否:返回调用结果,是进入步骤8搜集5、6步骤所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态getFallback()降级逻辑.四种触发getFallback调用情况(图中步骤8的箭头来源):返回执行成功结果 两种资源隔离模式线程池隔离模式使用一个线程池来存储当前的请求,线程池对请求作处理,设置任务返回处理超时时间,堆积的请求堆积入线程池队列。这种方式需要为每个依赖的服务申请线程池,有一定的资源消耗,好处是可以应对突发流量(流量洪峰来临时,处理不完可将数据存储到线程池队里慢慢处理)。 信号量隔离模式使用一个原子计数器(或信号量)来记录当前有多少个线程在运行,请求来先判断计数器的数值,若超过设置的最大线程个数则丢弃改类型的新请求,若不超过则执行计数操作请求来计数器+1,请求返回计数器-1。这种方式是严格的控制线程且立即返回模式,无法应对突发流量(流量洪峰来临时,处理的线程超过数量,其他的请求会直接返回,不继续去请求依赖的服务)。 线程池隔离模式 VS 信号量隔离模式 Hystrix主要配置项 快速上手pom.xml<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-core</artifactId> <version>1.5.12</version></dependency><dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-metrics-event-stream</artifactId> <version>1.5.12</version></dependency><dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-javanica</artifactId> <version>1.5.12</version></dependency>HystrixConfig@Configurationpublic class HystrixConfig { /** * 声明一个HystrixCommandAspect代理类,现拦截HystrixCommand的功能 */ @Bean public HystrixCommandAspect hystrixCommandAspect() { return new HystrixCommandAspect(); }}HelloService@Servicepublic class HelloService { @HystrixCommand(fallbackMethod = "helloError", commandProperties = { @HystrixProperty(name = "execution.isolation.strategy", value = "THREAD"), @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000"), @HystrixProperty(name = "circuitBreaker.enabled", value = "true"), @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "2")}, threadPoolProperties = { @HystrixProperty(name = "coreSize", value = "5"), @HystrixProperty(name = "maximumSize", value = "5"), @HystrixProperty(name = "maxQueueSize", value = "10") }) public String sayHello(String name) { try { Thread.sleep( 15000 ); return "Hello " + name + " !"; } catch (InterruptedException e) { e.printStackTrace(); } return null; } public String helloError(String name) { return "服务器繁忙,请稍后访问~"; }}启动类@SpringBootApplication@RestControllerpublic class HystrixSimpleApplication { @Autowired private HelloService helloService; public static void main(String[] args) { SpringApplication.run( HystrixSimpleApplication.class, args ); } @GetMapping("/hi") public String hi(String name) { return helloService.sayHello( name ); }}测试访问 http://localhost:80809/hi?name=zhangsan ...

April 26, 2019 · 1 min · jiezi

翻译:Hystrix - How To Use

转载请注明出处: 翻译:Hystrix - How To UseHello World!下面的代码展示了HystrixCommand版的Hello World:public class CommandHelloWorld extends HystrixCommand<String> { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)); this.name = name; } @Override protected String run() { // a real example would do work like a network call here return “Hello " + name + “!”; }}查看源码HystrixObservableCommand的同等实现如下:public class CommandHelloWorld extends HystrixObservableCommand<String> { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)); this.name = name; } @Override protected Observable<String> construct() { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext(“Hello”); observer.onNext(name + “!”); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); }}Synchronous Execution可以通过调用HystrixCommand.execute()方法实现同步执行, 示例如下:String s = new CommandHelloWorld(“World”).execute();测试如下: @Test public void testSynchronous() { assertEquals(“Hello World!”, new CommandHelloWorld(“World”).execute()); assertEquals(“Hello Bob!”, new CommandHelloWorld(“Bob”).execute()); }HystrixObservableCommand不提供同步执行方法, 但是如果确定其只会产生一个值, 那么也可以用如下方式实现:HystrixObservableCommand.observe().observe().toBlocking().toFuture().get()HystrixObservableCommand.toObservable().observe().toBlocking().toFuture().get()如果实际上产生了多个值, 上述的代码将会抛出java.lang.IllegalArgumentException: Sequence contains too many elements.Asynchronous Execution可以通过调用HystrixCommand.queue()方法实现异步执行, 示例如下:Future<String> fs = new CommandHelloWorld(“World”).queue();此时可以通过Future.get()方法获取command执行结果:String s = fs.get();测试代码如下: @Test public void testAsynchronous1() throws Exception { assertEquals(“Hello World!”, new CommandHelloWorld(“World”).queue().get()); assertEquals(“Hello Bob!”, new CommandHelloWorld(“Bob”).queue().get()); } @Test public void testAsynchronous2() throws Exception { Future<String> fWorld = new CommandHelloWorld(“World”).queue(); Future<String> fBob = new CommandHelloWorld(“Bob”).queue(); assertEquals(“Hello World!”, fWorld.get()); assertEquals(“Hello Bob!”, fBob.get()); }下面的两种实现是等价的:String s1 = new CommandHelloWorld(“World”).execute();String s2 = new CommandHelloWorld(“World”).queue().get();HystrixObservableCommand不提供queue方法, 但是如果确定其只会产生一个值, 那么也可以用如下方式实现:HystrixObservableCommand.observe().observe().toBlocking().toFuture()HystrixObservableCommand.toObservable().observe().toBlocking().toFuture()如果实际上产生了多个值, 上述的代码将会抛出java.lang.IllegalArgumentException: Sequence contains too many elements.Reactive Execution你也可以将HystrixCommand当做一个可观察对象(Observable)来观察(Observe)其产生的结果, 可以使用以下任意一个方法实现:observe(): 一旦调用该方法, 请求将立即开始执行, 其利用ReplaySubject特性可以保证不会丢失任何command产生的结果, 即使结果在你订阅之前产生的也不会丢失.toObservable(): 调用该方法后不会立即执行请求, 而是当有订阅者订阅时才会执行.Observable<String> ho = new CommandHelloWorld(“World”).observe();// or Observable<String> co = new CommandHelloWorld(“World”).toObservable();然后你可以通过订阅到这个Observable来取得command产生的结果:ho.subscribe(new Action1<String>() { @Override public void call(String s) { // value emitted here }});测试如下:@Testpublic void testObservable() throws Exception { Observable<String> fWorld = new CommandHelloWorld(“World”).observe(); Observable<String> fBob = new CommandHelloWorld(“Bob”).observe(); // blocking assertEquals(“Hello World!”, fWorld.toBlockingObservable().single()); assertEquals(“Hello Bob!”, fBob.toBlockingObservable().single()); // non-blocking // - this is a verbose anonymous inner-class approach and doesn’t do assertions fWorld.subscribe(new Observer<String>() { @Override public void onCompleted() { // nothing needed here } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println(“onNext: " + v); } }); // non-blocking // - also verbose anonymous inner-class // - ignore errors and onCompleted signal fBob.subscribe(new Action1<String>() { @Override public void call(String v) { System.out.println(“onNext: " + v); } });}使用Java 8的Lambda表达式可以使代码更简洁: fWorld.subscribe((v) -> { System.out.println(“onNext: " + v); }) // - or while also including error handling fWorld.subscribe((v) -> { System.out.println(“onNext: " + v); }, (exception) -> { exception.printStackTrace(); })关于Observable的信息可以在这里查阅Reactive Commands相比将HystrixCommand使用上述方法转换成一个Observable, 你也可以选择创建一个HystrixObservableCommand对象. HystrixObservableCommand包装的Observable允许产生多个结果(译者注: Subscriber.onNext可以调用多次), 而HystrixCommand即使转换成了Observable也只能产生一个结果.使用HystrixObservableCommnad时, 你需要重载construct方法来实现你的业务逻辑, 而不是重载run方法, contruct方法将会返回你需要包装的Observable.使用下面任意一个方法可以从HystrixObservableCommand中获取Observable对象:observe(): 一旦调用该方法, 请求将立即开始执行, 其利用ReplaySubject特性可以保证不会丢失任何command产生的结果, 即使结果在你订阅之前产生的也不会丢失.toObservable(): 调用该方法后不会立即执行请求, 而是当有订阅者订阅时才会执行.Fallback大多数情况下, 我们都希望command在执行失败时能够有一个候选方法来处理, 如: 返回一个默认值或执行其他失败处理逻辑, 除了以下几个情况:执行写操作的command: 当command的目标是执行写操作而不是读操作, 那么通常需要将写操作失败的错误交给调用者处理.批处理系统/离线计算: 如果command的目标是做一些离线计算、生成报表、填充缓存等, 那么同样应该将失败交给调用者处理.无论command是否实现了getFallback()方法, command执行失败时, Hystrix的状态和断路器(circuit-breaker)的状态/指标都会进行更新.HystrixCommand可以通过实现getFallback()方法来实现降级处理, run()方法异常、执行超时、线程池或信号量已满拒绝提供服务、断路器短路时, 都会调用getFallback():public class CommandHelloFailure extends HystrixCommand<String> { private final String name; public CommandHelloFailure(String name) { super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)); this.name = name; } @Override protected String run() { throw new RuntimeException(“this command always fails”); } @Override protected String getFallback() { return “Hello Failure " + name + “!”; }}查看源码这个命令的run()方法总是会执行失败, 但是调用者总是能收到getFallback()方法返回的值, 而不是收到一个异常: @Test public void testSynchronous() { assertEquals(“Hello Failure World!”, new CommandHelloFailure(“World”).execute()); assertEquals(“Hello Failure Bob!”, new CommandHelloFailure(“Bob”).execute()); }HystrixObservableCommand可以通过重载resumeWithFallback方法实现原Observable执行失败时返回回另一个Observable, 需要注意的是, 原Observable有可能在发出多个结果之后才出现错误, 因此在fallback实现的逻辑中不应该假设订阅者只会收到失败逻辑中发出的结果.Hystrix内部使用了RxJava的onErrorResumeNext操作符来实现Observable之间的无缝转移.Error Propagation除HystrixBadRequestException异常外, run方法中抛出的所有异常都会被认为是执行失败且会触发getFallback()方法和断路器的逻辑.你可以在HystrixBadRequestException中包装想要抛出的异常, 然后通过getCause()方法获取. HystrixBadRequestException使用在不应该被错误指标(failure metrics)统计和不应该触发getFallback()方法的场景, 例如报告参数不合法或者非系统异常等.对于HystrixObservableCommand, 不可恢复的错误都会在通过onError方法通知, 并通过获取用户实现的resumeWithFallback()方法返回的Observable来完成回退机制.执行异常类型Failure TypeException classException.causeFAILUREHystrixRuntimeExceptionunderlying exception(user-controlled)TIMEOUTHystrixRuntimeExceptionj.u.c.TimeoutExceptionSHORT_CIRCUITEDHystrixRuntimeExceptionj.l.RuntimeExceptionTHREAD_POOL_REJECTEDHystrixRuntimeExceptionj.u.c.RejectedExecutionExceptionSEMAPHORE_REJECTEDHystrixRuntimeExceptionj.l.RuntimeExceptionBAD_REQUESTHystrixBadRequestExceptionunderlying exception(user-controller)Command Name默认的command name是从类名中派生的:getClass().getSimpleName()可以通过HystrixCommand或HystrixObservableCommand的构造器来指定command name: public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“HelloWorld”))); this.name = name; }可以通过如下方式来重用Setter: private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“HelloWorld”)); public CommandHelloWorld(String name) { super(cachedSetter); this.name = name; }HystrixCommandKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:HystrixCommandKey.Factory.asKey(“Hello World”);Command GroupHystrix使用command group来为分组, 分组信息主要用于报告、警报、仪表盘上显示, 或者是标识团队/库的拥有者.默认情况下, 除非已经用这个名字定义了一个信号量, 否则 Hystrix将使用这个名称来定义command的线程池.HystrixCommandGroupKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:HystrixCommandGroupKey.Factory.asKey(“Example Group”)Command Thread-poolthread-pool key主要用于在监控、指标发布、缓存等类似场景中标识一个HystrixThreadPool, 一个HystrixCommand于其构造函数中传入的HystrixThreadPoolKey指定的HystrixThreadPool相关联, 如果未指定的话, 则使用HystrixCommandGroupKey来获取/创建HystrixThreadPool.可以通过HystrixCommand或HystrixObservableCommand的构造器来指定其值: public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“HelloWorld”)) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(“HelloWorldPool”))); this.name = name; }HystrixCommandThreadPoolKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:HystrixThreadPoolKey.Factory.asKey(“Hello World Pool”)使用HystrixThreadPoolKey而不是使用不同的HystrixCommandGroupKey的原因是: 可能会有多条command在逻辑功能上属于同一个组(group), 但是其中的某些command需要和其他command隔离开, 例如:两条用于访问视频元数据的command两条command的group name都是VideoMetadatacommand A与资源#1互斥command B与资源#2互斥如果command A由于延迟等原因导致其所在的线程池资源耗尽, 不应该影响command B对#2的执行, 因为他们访问的是不同的后端资源.因此, 从逻辑上来说, 我们希望这两条command应该被分到同一个分组, 但是我们同样系统将这两条命令的执行隔离开来, 因此我们使用HystrixThreadPoolKey将其分配到不同的线程池.Request Cache可以通过实现HystrixCommand或HystrixObservableCommand的getCacheKey()方法开启用对请求的缓存功能:public class CommandUsingRequestCache extends HystrixCommand<Boolean> { private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)); this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); }}由于该功能依赖于请求的上下文信息, 因此我们必须初始化一个HystrixRequestContext, 使用方式如下: @Test public void testWithoutCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache(2).execute()); assertFalse(new CommandUsingRequestCache(1).execute()); assertTrue(new CommandUsingRequestCache(0).execute()); assertTrue(new CommandUsingRequestCache(58672).execute()); } finally { context.shutdown(); } }通常情况下, 上下文信息(HystrixRequestContext)应该在持有用户请求的ServletFilter或者其他拥有生命周期管理功能的类来初始化和关闭.下面的例子展示了command如何从缓存中获取数据, 以及如何查询一个数据是否是从缓存中获取到的: @Test public void testWithCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command2a = new CommandUsingRequestCache(2); CommandUsingRequestCache command2b = new CommandUsingRequestCache(2); assertTrue(command2a.execute()); // this is the first time we’ve executed this command with // the value of “2” so it should not be from cache assertFalse(command2a.isResponseFromCache()); assertTrue(command2b.execute()); // this is the second time we’ve executed this command with // the same value so it should return from cache assertTrue(command2b.isResponseFromCache()); } finally { context.shutdown(); } // start a new request context context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command3b = new CommandUsingRequestCache(2); assertTrue(command3b.execute()); // this is a new request context so this // should not come from cache assertFalse(command3b.isResponseFromCache()); } finally { context.shutdown(); } }Request Collapsing请求合并可以用于将多条请求绑定到一起, 由同一个HystrixCommand实例执行.collapser可以通过batch size和batch创建以来的耗时来自动将请求合并执行.Hystrix支持两个请求合并方式: 请求级的合并和全局级的合并. 默认是请求范围的合并, 可以在构造collapser时指定值.请求级(request-scoped)的collapser只会合并每一个HystrixRequestContext中的请求, 而全局级(globally-scoped)的collapser则可以跨HystrixRequestContext合并请求. 因此, 如果你下游的依赖者无法再一个command中处理多个HystrixRequestContext的话, 那么你应该使用请求级的合并.在Netflix, 我们只会使用请求级的合并, 因为我们当前所有的系统都是基于一个command对应一个HystrixRequestContext的设想下构建的. 因此, 当一个command使用不同的参数在一个请求中并发执行时, 合并是有效的.下面的代码展示了如何实现请求级的HystrixCollapser:public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } @Override public Integer getRequestArgument() { return key; } @Override protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) { return new BatchCommand(requests); } @Override protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) { int count = 0; for (CollapsedRequest<String, Integer> request : requests) { request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand<List<String>> { private final Collection<CollapsedRequest<String, Integer>> requests; private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“GetValueForKey”))); this.requests = requests; } @Override protected List<String> run() { ArrayList<String> response = new ArrayList<String>(); for (CollapsedRequest<String, Integer> request : requests) { // artificial response for each argument received in the batch response.add(“ValueForKey: " + request.getArgument()); } return response; } }}下面的代码展示了如果使用collapser自动合并4个CommandCollapserGetValueForKey到一个HystrixCommand中执行:@Testpublic void testCollapser() throws Exception { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { Future<String> f1 = new CommandCollapserGetValueForKey(1).queue(); Future<String> f2 = new CommandCollapserGetValueForKey(2).queue(); Future<String> f3 = new CommandCollapserGetValueForKey(3).queue(); Future<String> f4 = new CommandCollapserGetValueForKey(4).queue(); assertEquals(“ValueForKey: 1”, f1.get()); assertEquals(“ValueForKey: 2”, f2.get()); assertEquals(“ValueForKey: 3”, f3.get()); assertEquals(“ValueForKey: 4”, f4.get()); // assert that the batch command ‘GetValueForKey’ was in fact // executed and that it executed only once assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0]; // assert the command is the one we’re expecting assertEquals(“GetValueForKey”, command.getCommandKey().name()); // confirm that it was a COLLAPSED command execution assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); // and that it was successful assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); }}Request Context Setup使用请求级的特性时(如: 请求缓存、请求合并、请求日志)你必须管理HystrixRequestContext的生命周期(或者实现HystrixConcurrencyStategy).这意味着你必须在请求之前执行如下代码:HystrixRequestContext context = HystrixRequestContext.initializeContext();并在请求结束后执行如下代码:context.shutdown();在标准的Java web应用中, 你可以使用Setvlet Filter实现的如下的过滤器来管理:public class HystrixRequestContextServletFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } }}可以在web.xml中加入如下代码实现对所有的请求都使用该过滤器: <filter> <display-name>HystrixRequestContextServletFilter</display-name> <filter-name>HystrixRequestContextServletFilter</filter-name> <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class> </filter> <filter-mapping> <filter-name>HystrixRequestContextServletFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>Common Patterns以下是HystrixCommand和HystrixObservableCommand的一般用法和使用模式.Fail Fast最基本的使用是执行一条只做一件事情且没有实现回退方法的command, 这样的command在发生任何错误时都会抛出异常:public class CommandThatFailsFast extends HystrixCommand<String> { private final boolean throwException; public CommandThatFailsFast(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException(“failure from CommandThatFailsFast”); } else { return “success”; } }下面的代码演示了上述行为:@Testpublic void testSuccess() { assertEquals(“success”, new CommandThatFailsFast(false).execute());}@Testpublic void testFailure() { try { new CommandThatFailsFast(true).execute(); fail(“we should have thrown an exception”); } catch (HystrixRuntimeException e) { assertEquals(“failure from CommandThatFailsFast”, e.getCause().getMessage()); e.printStackTrace(); }}HystrixObservableCommand需要重载resumeWithFallback()方法来实现同样的行为: @Override protected Observable<String> resumeWithFallback() { if (throwException) { return Observable.error(new Throwable(“failure from CommandThatFailsFast”)); } else { return Observable.just(“success”); } }Fail Silent静默失败等同于返回一个空的响应或者移除功能. 可以是返回null、空Map、空List, 或者其他类似的响应.可以通过实现HystrixCommand.getFallback()方法实现该功能:public class CommandThatFailsSilently extends HystrixCommand<String> { private final boolean throwException; public CommandThatFailsSilently(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException(“failure from CommandThatFailsFast”); } else { return “success”; } } @Override protected String getFallback() { return null; }}@Testpublic void testSuccess() { assertEquals(“success”, new CommandThatFailsSilently(false).execute());}@Testpublic void testFailure() { try { assertEquals(null, new CommandThatFailsSilently(true).execute()); } catch (HystrixRuntimeException e) { fail(“we should not get an exception as we fail silently with a fallback”); }}或者返回一个空List的实现如下: @Override protected List<String> getFallback() { return Collections.emptyList(); }HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为: @Override protected Observable<String> resumeWithFallback() { return Observable.empty(); }Fallback: StaticFallback可以返回代码里设定的默认值, 这种方式可以通过默认行为来有效避免于静默失败带来影响.例如, 如果一个应返回true/false的用户认证的command执行失败了, 那么其默认行为可以如下: @Override protected Boolean getFallback() { return true; }对于HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为: @Override protected Observable<Boolean> resumeWithFallback() { return Observable.just( true ); }Fallback: Stubbed当command返回的是一个包含多个字段的复合对象, 且该对象的一部分字段值可以通过其他请求状态获得, 另一部分状态可以通过设置默认值获得时, 你通常需要使用存根(stubbed)模式.你可能可以从存根值(stubbed values)中得到适当的值的情况如下:cookies请求参数和请求头当前失败请求的前一个服务请求的响应在fallback代码块内可以静态地获取请求范围内的存根(stubbed)值, 但是通常我们更推荐在构建command实例时注入这些值, 就像下面实例的代码中的countryCodeFromGeoLookup一样:public class CommandWithStubbedFallback extends HystrixCommand<UserAccount> { private final int customerId; private final String countryCodeFromGeoLookup; /** * @param customerId * The customerID to retrieve UserAccount for * @param countryCodeFromGeoLookup * The default country code from the HTTP request geo code lookup used for fallback. / protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) { super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)); this.customerId = customerId; this.countryCodeFromGeoLookup = countryCodeFromGeoLookup; } @Override protected UserAccount run() { // fetch UserAccount from remote service // return UserAccountClient.getAccount(customerId); throw new RuntimeException(“forcing failure for example”); } @Override protected UserAccount getFallback() { /* * Return stubbed fallback with some static defaults, placeholders, * and an injected value ‘countryCodeFromGeoLookup’ that we’ll use * instead of what we would have retrieved from the remote service. / return new UserAccount(customerId, “Unknown Name”, countryCodeFromGeoLookup, true, true, false); } public static class UserAccount { private final int customerId; private final String name; private final String countryCode; private final boolean isFeatureXPermitted; private final boolean isFeatureYPermitted; private final boolean isFeatureZPermitted; UserAccount(int customerId, String name, String countryCode, boolean isFeatureXPermitted, boolean isFeatureYPermitted, boolean isFeatureZPermitted) { this.customerId = customerId; this.name = name; this.countryCode = countryCode; this.isFeatureXPermitted = isFeatureXPermitted; this.isFeatureYPermitted = isFeatureYPermitted; this.isFeatureZPermitted = isFeatureZPermitted; } }}下面的代码演示了上述行为: @Test public void test() { CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, “ca”); UserAccount account = command.execute(); assertTrue(command.isFailedExecution()); assertTrue(command.isResponseFromFallback()); assertEquals(1234, account.customerId); assertEquals(“ca”, account.countryCode); assertEquals(true, account.isFeatureXPermitted); assertEquals(true, account.isFeatureYPermitted); assertEquals(false, account.isFeatureZPermitted); }对于HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为:@Overrideprotected Observable<Boolean> resumeWithFallback() { return Observable.just( new UserAccount(customerId, “Unknown Name”, countryCodeFromGeoLookup, true, true, false) );}如果你想要从Observable中发出多个值, 那么当失败发生时, 原本的Observable可能已经发出的一部分值, 此时你或许更希望能够只从fallback逻辑中发出另一部分未被发出的值, 下面的例子就展示了如何实现这一个目的: 它通过追踪原Observable发出的最后一个值来实现fallback逻辑中的Observable应该从什么地方继续发出存根值(stubbed value) :@Overrideprotected Observable<Integer> construct() { return Observable.just(1, 2, 3) .concatWith(Observable.<Integer> error(new RuntimeException(“forced error”))) .doOnNext(new Action1<Integer>() { @Override public void call(Integer t1) { lastSeen = t1; } }) .subscribeOn(Schedulers.computation());}@Overrideprotected Observable<Integer> resumeWithFallback() { if (lastSeen < 4) { return Observable.range(lastSeen + 1, 4 - lastSeen); } else { return Observable.empty(); }}Fallback: Cache via Network有时后端的服务异常也会引起command执行失败, 此时我们也可以从缓存中(如: memcached)取得相关的数据.由于在fallback的逻辑代码中访问网络可能会再次失败, 因此必须构建新的HystrixCommand或HystrixObservableCommand来执行:很重要的一点是执行fallback逻辑的command需要在一个不同的线程池中执行, 否则如果原command的延迟变高且其所在线程池已经满了的话, 执行fallback逻辑的command将无法在同一个线程池中执行.下面的代码展示了CommandWithFallbackViaNetwork如何在getFallback()方法中执行FallbackViaNetwork.注意, FallbackViaNetwork同样也具有回退机制, 这里通过返回null来实现fail silent.FallbackViaNetwork默认会从HystrixCommandGroupKey中继承线程池的配置RemoteServiceX, 因此需要在其构造器中注入HystrixThreadPoolKey.Factory.asKey(“RemoteServiceXFallback”)来使其在不同的线程池中执行.这样, CommandWithFallbackViaNetwork会在名为RemoteServiceX的线程池中执行, 而FallbackViaNetwork会在名为RemoteServiceXFallback的线程池中执行.public class CommandWithFallbackViaNetwork extends HystrixCommand<String> { private final int id; protected CommandWithFallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“RemoteServiceX”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“GetValueCommand”))); this.id = id; } @Override protected String run() { // RemoteServiceXClient.getValue(id); throw new RuntimeException(“force failure for example”); } @Override protected String getFallback() { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand<String> { private final int id; public FallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“RemoteServiceX”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“GetValueFallbackCommand”)) // use a different threadpool for the fallback command // so saturating the RemoteServiceX pool won’t prevent // fallbacks from executing .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(“RemoteServiceXFallback”))); this.id = id; } @Override protected String run() { MemCacheClient.getValue(id); } @Override protected String getFallback() { // the fallback also failed // so this fallback-of-a-fallback will // fail silently and return null return null; } }}Primary + Secondary with Fallback有些系统可能具有是以双系统模式搭建的 — 主从模式或主备模式.有时从系统或备用系统会被认为是失败状态的一种, 仅在执行fallback逻辑是才使用它;这种场景和Cache via Network一节中描述的场景是一样的.然而, 如果切换到从系统是一个很正常时, 例如发布新代码时(这是有状态的系统发布代码的一种方式), 此时每当切换到从系统使用时, 主系统都是处于不可用状态,断路器将会打开且发出警报.这并不是我们期望发生的事, 这种狼来了式的警报可能会导致真正发生问题的时候我们却把它当成正常的误报而忽略了.因此, 我们可以通过在其前面放置一个门面HystrixCommand(见下文), 将主/从系统的切换视为正常的、健康的状态.主从HystrixCommand都是需要访问网络且实现了特定的业务逻辑, 因此其实现上应该是线程隔离的. 它们可能具有显著的性能差距(通常从系统是一个静态缓存), 因此将两个command隔离的另一个好处是可以针对性地调优.你不需要将这两个command都公开发布, 只需要将它们隐藏在另一个由信号量隔离的HystrixCommand中(称之为门面HystrixCommand), 在这个command中去实现主系统还是从系统的调用选择. 只有当主从系统都失败了, 才会去执行这个门面command的fallback逻辑.门面HystrixCommand可以使用信号量隔离的, 因为其业务逻辑仅仅是调用另外两个线程隔离的HystrixCommand, 它不涉及任何的网络访问、重试等容易出错的事, 因此没必要将这部分代码放到其他线程去执行.public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> { private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty(“primarySecondary.usePrimary”, true); private final int id; public CommandFacadeWithPrimarySecondary(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey(“SystemX”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“PrimarySecondaryCommand”)) .andCommandPropertiesDefaults( // we want to default to semaphore-isolation since this wraps // 2 others commands that are already thread isolated HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { if (usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return “static-fallback-” + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand<String> { private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey(“SystemX”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“PrimaryCommand”)) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(“PrimaryCommand”)) .andCommandPropertiesDefaults( // we default to a 600ms timeout for primary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { // perform expensive ‘primary’ service call return “responseFromPrimary-” + id; } } private static class SecondaryCommand extends HystrixCommand<String> { private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey(“SystemX”)) .andCommandKey(HystrixCommandKey.Factory.asKey(“SecondaryCommand”)) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(“SecondaryCommand”)) .andCommandPropertiesDefaults( // we default to a 100ms timeout for secondary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100))); this.id = id; } @Override protected String run() { // perform fast ‘secondary’ service call return “responseFromSecondary-” + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty(“primarySecondary.usePrimary”, true); assertEquals(“responseFromPrimary-20”, new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty(“primarySecondary.usePrimary”, false); assertEquals(“responseFromSecondary-20”, new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } }}Client Doesn’t Perform Network Access当你使用HystrixCommand实现的业务逻辑不涉及到网络访问、对延迟敏感且无法接受多线程带来的开销时, 你需要设置executionIsolationStrategy)属性的值为ExecutionIsolationStrategy.SEMAPHORE, 此时Hystrix会使用信号量隔离代替线程隔离.下面的代码展示了如何为command设置该属性(也可以在运行时动态改变这个属性的值):public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> { private final int id; public CommandUsingSemaphoreIsolation(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)) // since we’re doing an in-memory cache lookup we choose SEMAPHORE isolation .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { // a real implementation would retrieve data from in memory data structure return “ValueFromHashMap_” + id; }}Get-Set-Get with Request Cache InvalidationGet-Set-Get是指: Get请求的结果被缓存下来后, 另一个command对同一个资源发出了Set请求, 此时由Get请求缓存的结果应该失效, 避免随后的Get请求获取到过时的缓存结果, 此时可以通过调用HystrixRequestCache.clear())方法来使缓存失效.public class CommandUsingRequestCacheInvalidation { / represents a remote data store / private static volatile String prefixStoredOnRemoteDataStore = “ValueBeforeSet_”; public static class GetterCommand extends HystrixCommand<String> { private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey(“GetterCommand”); private final int id; public GetterCommand(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“GetSetGet”)) .andCommandKey(GETTER_KEY)); this.id = id; } @Override protected String run() { return prefixStoredOnRemoteDataStore + id; } @Override protected String getCacheKey() { return String.valueOf(id); } /* * Allow the cache to be flushed for this object. * * @param id * argument that would normally be passed to the command */ public static void flushCache(int id) { HystrixRequestCache.getInstance(GETTER_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id)); } } public static class SetterCommand extends HystrixCommand<Void> { private final int id; private final String prefix; public SetterCommand(int id, String prefix) { super(HystrixCommandGroupKey.Factory.asKey(“GetSetGet”)); this.id = id; this.prefix = prefix; } @Override protected Void run() { // persist the value against the datastore prefixStoredOnRemoteDataStore = prefix; // flush the cache GetterCommand.flushCache(id); // no return value return null; } }} @Test public void getGetSetGet() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertEquals(“ValueBeforeSet_1”, new GetterCommand(1).execute()); GetterCommand commandAgainstCache = new GetterCommand(1); assertEquals(“ValueBeforeSet_1”, commandAgainstCache.execute()); // confirm it executed against cache the second time assertTrue(commandAgainstCache.isResponseFromCache()); // set the new value new SetterCommand(1, “ValueAfterSet_”).execute(); // fetch it again GetterCommand commandAfterSet = new GetterCommand(1); // the getter should return with the new prefix, not the value from cache assertFalse(commandAfterSet.isResponseFromCache()); assertEquals(“ValueAfterSet_1”, commandAfterSet.execute()); } finally { context.shutdown(); } } }Migrating a Library to Hystrix如果你要迁移一个已有的客户端库到Hystrix, 你应该将所有的服务方法(service methods)替换成HystrixCommand.服务方法(service methods)转而调用HystrixCommand且不在包含任何额外的业务逻辑.因此, 在迁移之前, 一个服务库可能是这样的:迁移完成之后, 服务库的用户要能直接访问到HystrixCommand, 或者通过服务门面(service facade)的代理间接访问到HystrixCommand. ...

February 25, 2019 · 11 min · jiezi