共计 5567 个字符,预计需要花费 14 分钟才能阅读完成。
Reactor 整合 Resilence4j
1 引入 pom 包
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-all</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
</dependency>
2 配置阐明
2.1 限流 ratelimiter
两个限流配置:backendA 1s 中最多容许 10 次申请;
backendB 每 500ms 最多容许 6 次申请。
resilience4j.ratelimiter:
instances:
backendA:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 10ms
registerHealthIndicator: true
eventConsumerBufferSize: 100
backendB:
limitForPeriod: 6
limitRefreshPeriod: 500ms
timeoutDuration: 3s
配置属性 | 默认值 | 形容 |
---|---|---|
timeoutDuration | 5【s】 | 一个线程期待许可的默认等待时间 |
limitRefreshPeriod | 500【ns】 | 限度刷新的周期。在每个周期之后,速率限制器将其权限计数设置回 limitForPeriod 值 |
limitForPeriod | 50 | 一个 limitRefreshPeriod(周期)容许拜访的数量(许可数量) |
2.2 重试 retry
留神指定须要重试的异样,不是所有的异样重试都无效。比方 DB 相干校验异样,如惟一束缚等,重试也不会胜利的。
重试配置:
resilience4j.retry:
instances:
backendA:
maxAttempts: 3
waitDuration: 10s
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
backendB:
maxAttempts: 3
waitDuration: 10s
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
配置属性 | 默认值 | 形容 |
---|---|---|
maxAttempts | 3 | 最大重试次数(包含第一次) |
waitDuration | 500【ms】 | 两次重试之间的期待距离 |
intervalFunction | numOfAttempts -> waitDuration | 批改失败后期待距离的函数。默认状况下,等待时间是个常量。 |
retryOnResultPredicate | result->false | 配置一个判断后果是否应该重试的 predicate 函数。如果后果应该重试,Predicate 必须返回 true,否则它必须返回 false。 |
retryExceptionPredicate | throwable -> true | 和 retryOnResultPredicate 相似,如果要重试,Predicate 必须返回 true,否则返回 false。 |
retryExceptions | 空 | 须要重试的异样类型列表 |
ignoreExceptions | 空 | 不须要重试的异样类型列表 |
failAfterMaxAttempts | false | 当重试达到配置的 maxAttempts 并且后果仍未通过 retryOnResultPredicate 时启用或禁用抛出 MaxRetriesExceededException 的布尔值 |
intervalBiFunction | (numOfAttempts, Either<throwable, result>) -> waitDuration | 依据 maxAttempts 和后果或异样批改失败后期待间隔时间的函数。与 intervalFunction 一起应用时会抛出 IllegalStateException。 |
2.3 超时 TimeLimiter
超时配置:
resilience4j.timelimiter:
instances:
backendA:
timeoutDuration: 2s
cancelRunningFuture: true
backendB:
timeoutDuration: 1s
cancelRunningFuture: false
超时配置比较简单,次要是配置 timeoutDuration 也就是超时的工夫。
cancelRunningFuture 的意思是:是否应该在运行的 Future 调用 cancel 去掉调用。
2.4 断路器 circuitbreaker
断路器有几种状态:敞开、关上、半开。留神:关上,意味着不能拜访,会迅速失败。
CircuitBreaker 应用 滑动窗口 来存储和汇总调用后果。您能够在基于计数的滑动窗口和基于工夫的滑动窗口之间进行抉择。基于计数的滑动窗口聚合最初 N 次调用的后果。基于工夫的滑动窗口聚合了最初 N 秒的调用后果。
断路器配置:
resilience4j.circuitbreaker:
instances:
backendA:
// 衰弱指标参数,非断路器属性
registerHealthIndicator: true
slidingWindowSize: 100
配置属性 | 默认值 | 形容 |
---|---|---|
slidingWindowSize | 100 | 记录断路器敞开状态下(能够拜访的状况下)的调用的滑动窗口大小 |
failureRateThreshold | 50(百分比) | 当失败比例超过 failureRateThreshold 的时候,断路器会关上,并开始短路呼叫 |
slowCallDurationThreshold | 60000【ms】 | 申请被定义为慢申请的阈值 |
slowCallRateThreshold | 100(百分比) | 慢申请百分比大于等于该值时,关上断路器开关 |
permittedNumberOfCalls | 10 | 半开 状态下容许通过的申请数 |
maxWaitDurationInHalfOpenState | 0 | 配置最大期待持续时间,该持续时间管制断路器在切换到关上之前能够放弃在半开状态的最长工夫。 |
值 0 示意断路器将在 HalfOpen 状态下有限期待,直到所有容许的调用都已实现。
2.5 壁仓 bulkhead
resilience4j 提供了两种实现壁仓的办法:
SemaphoreBulkhead
应用 Semaphore 实现FixedThreadPoolBulkhead
应用有界队列和固定线程池实现
resilience4j.bulkhead:
instances:
backendA:
maxConcurrentCalls: 10
backendB:
maxWaitDuration: 10ms
maxConcurrentCalls: 20
resilience4j.thread-pool-bulkhead:
instances:
backendC:
maxThreadPoolSize: 1
coreThreadPoolSize: 1
queueCapacity: 1
2.5.1 SemaphoreBulkhead
配置属性 | 默认值 | 形容 |
---|---|---|
maxConcurrentCalls | 25 | 容许的并发执行的数量 |
maxWaitDuration | 0 | 尝试进入饱和隔板时线程应被阻止的最长工夫 |
2.5.2 FixedThreadPoolBulkhead
配置属性 | 默认值 | 形容 |
---|---|---|
maxThreadPoolSize | Runtime.getRuntime().availableProcessors() | 线程池最大线程个数 |
coreThreadPoolSize | Runtime.getRuntime().availableProcessors()-1 | 线程池外围线程个数 |
queueCapacity | 100 | 线程池队列容量 |
keepAliveDuration | 20【ms】 | 线程数超过外围线程数之后,空余线程在终止之前期待的最长工夫 |
3 应用
3.1 配置
在 application.yml 文件中增加以下 resilience4j 配置:
resilience4j.circuitbreaker:
instances:
backendA:
registerHealthIndicator: true
slidingWindowSize: 100
resilience4j.retry:
instances:
backendA:
maxAttempts: 3
waitDuration: 10s
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
backendB:
maxAttempts: 3
waitDuration: 10s
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
resilience4j.bulkhead:
instances:
backendA:
maxConcurrentCalls: 10
backendB:
maxWaitDuration: 10ms
maxConcurrentCalls: 20
resilience4j.thread-pool-bulkhead:
instances:
backendC:
maxThreadPoolSize: 1
coreThreadPoolSize: 1
queueCapacity: 1
resilience4j.ratelimiter:
instances:
backendA:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 10ms
registerHealthIndicator: true
eventConsumerBufferSize: 100
backendB:
limitForPeriod: 6
limitRefreshPeriod: 500ms
timeoutDuration: 3s
resilience4j.timelimiter:
instances:
backendA:
timeoutDuration: 2s
cancelRunningFuture: true
backendB:
timeoutDuration: 1s
cancelRunningFuture: false
3.2 应用注解实现
间接在须要限流的办法上减少注解@RateLimiter
实现限流;减少注解@Retry
实现重试;减少注解 @CircuitBreaker
熔断;减少注解 @Bulkhead
实现壁仓。name 属性中别离填写限流器、重试、熔断、壁仓组件的名字。
@Bulkhead(name = "backendA")
@CircuitBreaker(name = "backendA")
@Retry(name = "backendA")
@RateLimiter(name = "backendA")
public Mono<List<User>> list() {long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {return userRepository.findAll();
}).doOnError(e -> {
// 打印异样日志 & 减少监控(自行处理)logger.error("list.user.error, e", e);
})
.doFinally(e -> {
// 耗时 & 整体健康
logger.info("list.user.time={},", System.currentTimeMillis() - startTime);
});
}
@Bulkhead(name = "backendA")
@CircuitBreaker(name = "backendA")// 最多反对 10 个并发量
@Retry(name = "backendA")// 应用 backendA 重试器,如果抛出 IOException 会重试三次。@RateLimiter(name = "backendA")// 限流 10 Qps
public Mono<Boolean> save(User user) {long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {return userRepository.save(user) != null;
})
.doOnError(e -> {
// 打印异样日志 & 减少监控(自行处理)logger.error("save.user.error, user={}, e", user, e);
})
.doFinally(e -> {
// 耗时 & 整体健康
logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);
});
}
留神:以上所有组件,都反对自定义。
上述代码都能够在下述仓库中找到:https://github.com/prepared48…
本文参加了思否技术征文,欢送正在浏览的你也退出。