Reactor 整合 Resilence4j

1 引入 pom 包

<dependency>    <groupId>io.github.resilience4j</groupId>    <artifactId>resilience4j-all</artifactId></dependency><dependency>    <groupId>io.github.resilience4j</groupId>    <artifactId>resilience4j-spring-boot2</artifactId></dependency>

2 配置阐明

2.1 限流 ratelimiter

两个限流配置:backendA 1s 中最多容许 10 次申请;

backendB 每 500ms 最多容许 6 次申请。

resilience4j.ratelimiter:  instances:    backendA:      limitForPeriod: 10      limitRefreshPeriod: 1s      timeoutDuration: 10ms      registerHealthIndicator: true      eventConsumerBufferSize: 100    backendB:      limitForPeriod: 6      limitRefreshPeriod: 500ms      timeoutDuration: 3s
配置属性默认值形容
timeoutDuration5【s】一个线程期待许可的默认等待时间
limitRefreshPeriod500【ns】限度刷新的周期。在每个周期之后,速率限制器将其权限计数设置回 limitForPeriod 值
limitForPeriod50一个 limitRefreshPeriod (周期)容许拜访的数量(许可数量)

2.2 重试 retry

留神指定须要重试的异样,不是所有的异样重试都无效。比方 DB 相干校验异样,如惟一束缚等,重试也不会胜利的。

重试配置:

resilience4j.retry:  instances:    backendA:      maxAttempts: 3      waitDuration: 10s      enableExponentialBackoff: true      exponentialBackoffMultiplier: 2      retryExceptions:        - org.springframework.web.client.HttpServerErrorException        - java.io.IOException    backendB:      maxAttempts: 3      waitDuration: 10s      retryExceptions:        - org.springframework.web.client.HttpServerErrorException        - java.io.IOException
配置属性默认值形容
maxAttempts3最大重试次数(包含第一次)
waitDuration500【ms】两次重试之间的期待距离
intervalFunctionnumOfAttempts -> waitDuration批改失败后期待距离的函数。默认状况下,等待时间是个常量。
retryOnResultPredicateresult->false配置一个判断后果是否应该重试的 predicate 函数。如果后果应该重试,Predicate 必须返回 true,否则它必须返回 false。
retryExceptionPredicatethrowable -> true和 retryOnResultPredicate 相似,如果要重试,Predicate 必须返回true,否则返回 false。
retryExceptions须要重试的异样类型列表
ignoreExceptions不须要重试的异样类型列表
failAfterMaxAttemptsfalse当重试达到配置的 maxAttempts 并且后果仍未通过 retryOnResultPredicate 时启用或禁用抛出 MaxRetriesExceededException 的布尔值
intervalBiFunction(numOfAttempts, Either<throwable, result>) -> waitDuration依据 maxAttempts 和后果或异样批改失败后期待间隔时间的函数。与 intervalFunction 一起应用时会抛出 IllegalStateException。

2.3 超时 TimeLimiter

超时配置:

resilience4j.timelimiter:  instances:    backendA:      timeoutDuration: 2s      cancelRunningFuture: true    backendB:      timeoutDuration: 1s      cancelRunningFuture: false

超时配置比较简单,次要是配置 timeoutDuration 也就是超时的工夫。

cancelRunningFuture 的意思是:是否应该在运行的 Future 调用 cancel 去掉调用。

2.4 断路器 circuitbreaker

断路器有几种状态:敞开、关上、半开。留神:关上,意味着不能拜访,会迅速失败。

CircuitBreaker 应用滑动窗口来存储和汇总调用后果。您能够在基于计数的滑动窗口和基于工夫的滑动窗口之间进行抉择。基于计数的滑动窗口聚合最初 N 次调用的后果。基于工夫的滑动窗口聚合了最初 N 秒的调用后果。

断路器配置:

resilience4j.circuitbreaker:  instances:    backendA:      // 衰弱指标参数,非断路器属性      registerHealthIndicator: true      slidingWindowSize: 100
配置属性默认值形容
slidingWindowSize100记录断路器敞开状态下(能够拜访的状况下)的调用的滑动窗口大小
failureRateThreshold50(百分比)当失败比例超过 failureRateThreshold 的时候,断路器会关上,并开始短路呼叫
slowCallDurationThreshold60000【ms】申请被定义为慢申请的阈值
slowCallRateThreshold100(百分比)慢申请百分比大于等于该值时,关上断路器开关
permittedNumberOfCalls10半开状态下容许通过的申请数
maxWaitDurationInHalfOpenState0配置最大期待持续时间,该持续时间管制断路器在切换到关上之前能够放弃在半开状态的最长工夫。

值 0 示意断路器将在 HalfOpen 状态下有限期待,直到所有容许的调用都已实现。

2.5 壁仓 bulkhead

resilience4j 提供了两种实现壁仓的办法:

  • SemaphoreBulkhead 应用 Semaphore 实现
  • FixedThreadPoolBulkhead 应用有界队列和固定线程池实现
resilience4j.bulkhead:  instances:    backendA:      maxConcurrentCalls: 10    backendB:      maxWaitDuration: 10ms      maxConcurrentCalls: 20resilience4j.thread-pool-bulkhead:  instances:    backendC:      maxThreadPoolSize: 1      coreThreadPoolSize: 1      queueCapacity: 1

2.5.1 SemaphoreBulkhead

配置属性默认值形容
maxConcurrentCalls25容许的并发执行的数量
maxWaitDuration0尝试进入饱和隔板时线程应被阻止的最长工夫

2.5.2 FixedThreadPoolBulkhead

配置属性默认值形容
maxThreadPoolSizeRuntime.getRuntime().availableProcessors()线程池最大线程个数
coreThreadPoolSizeRuntime.getRuntime().availableProcessors()-1线程池外围线程个数
queueCapacity100线程池队列容量
keepAliveDuration20【ms】线程数超过外围线程数之后,空余线程在终止之前期待的最长工夫

3 应用

3.1 配置

在 application.yml 文件中增加以下 resilience4j 配置:

resilience4j.circuitbreaker:  instances:    backendA:      registerHealthIndicator: true      slidingWindowSize: 100resilience4j.retry:  instances:    backendA:      maxAttempts: 3      waitDuration: 10s      enableExponentialBackoff: true      exponentialBackoffMultiplier: 2      retryExceptions:        - org.springframework.web.client.HttpServerErrorException        - java.io.IOException    backendB:      maxAttempts: 3      waitDuration: 10s      retryExceptions:        - org.springframework.web.client.HttpServerErrorException        - java.io.IOExceptionresilience4j.bulkhead:  instances:    backendA:      maxConcurrentCalls: 10    backendB:      maxWaitDuration: 10ms      maxConcurrentCalls: 20resilience4j.thread-pool-bulkhead:  instances:    backendC:      maxThreadPoolSize: 1      coreThreadPoolSize: 1      queueCapacity: 1resilience4j.ratelimiter:  instances:    backendA:      limitForPeriod: 10      limitRefreshPeriod: 1s      timeoutDuration: 10ms      registerHealthIndicator: true      eventConsumerBufferSize: 100    backendB:      limitForPeriod: 6      limitRefreshPeriod: 500ms      timeoutDuration: 3sresilience4j.timelimiter:  instances:    backendA:      timeoutDuration: 2s      cancelRunningFuture: true    backendB:      timeoutDuration: 1s      cancelRunningFuture: false

3.2 应用注解实现

间接在须要限流的办法上减少注解@RateLimiter 实现限流;减少注解@Retry 实现重试;减少注解 @CircuitBreaker 熔断;减少注解 @Bulkhead 实现壁仓。name 属性中别离填写限流器、重试、熔断、壁仓组件的名字。

@Bulkhead(name = "backendA")@CircuitBreaker(name = "backendA")@Retry(name = "backendA")@RateLimiter(name = "backendA")public Mono<List<User>> list() {  long startTime = System.currentTimeMillis();  return Mono.fromSupplier(() -> {        return userRepository.findAll();      }).doOnError(e -> {        // 打印异样日志&减少监控(自行处理)        logger.error("list.user.error, e", e);      })      .doFinally(e -> {        // 耗时 & 整体健康        logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);      });}  @Bulkhead(name = "backendA")@CircuitBreaker(name = "backendA")//最多反对10个并发量@Retry(name = "backendA")//应用 backendA 重试器,如果抛出 IOException 会重试三次。@RateLimiter(name = "backendA")// 限流 10 Qpspublic Mono<Boolean> save(User user) {  long startTime = System.currentTimeMillis();  return Mono.fromSupplier(() -> {        return userRepository.save(user) != null;      })      .doOnError(e -> {        // 打印异样日志&减少监控(自行处理)        logger.error("save.user.error, user={}, e", user, e);      })      .doFinally(e -> {        // 耗时 & 整体健康        logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);      });}  

留神:以上所有组件,都反对自定义。

上述代码都能够在下述仓库中找到:https://github.com/prepared48...

本文参加了思否技术征文,欢送正在浏览的你也退出。