共计 4978 个字符,预计需要花费 13 分钟才能阅读完成。
案例概述
在本文中,我们讨论一下 Resilience4j 库。该库通过管理远程通信的容错性来帮助实现弹性系统。这个库受到 Hystrix 的启发,但提供了更方便的 API 和许多其他特性,如速率限制器 (阻塞太频繁的请求)、Bulkhead(避免太多并发请求) 等。
Maven 设置
首先,我们需要将目标模块添加到我们的 pom.xml 中(例如,我们添加了 Circuit Breaker):
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>0.12.1</version>
</dependency>
在这里,我们使用的是断路器模块。所有模块及其最新版本均可在 Maven Central 上找到。在接下来的部分中,我们将介绍库中最常用的模块。
断路器
请注意,对于此模块,我们需要上面显示的设置 resilience4j-circuitbreaker 依赖项。
断路器模式可以帮助我们在远程服务中断时防止一连串的故障。
在多次失败的尝试之后,我们可以认为服务不可用 / 重载,并急切地拒绝所有后续的请求。通过这种方式,我们可以为可能失败的调用节省系统资源。
让我们看看我们如何通过 Resilience4j 实现这一目标。
首先,我们需要定义要使用的设置。最简单的方法是使用默认设置:
CircuitBreakerRegistry circuitBreakerRegistry
= CircuitBreakerRegistry.ofDefaults();
也可以使用自定义参数:
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(20)
.ringBufferSizeInClosedState(5)
.build();
在这里,我们将速率阈值设置为 20%,并将尝试呼叫设置为最少 5 次。
然后,我们创建一个 CircuitBreaker 对象并通过它调用远程服务:
interface RemoteService {
int process(int i);
}
CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
CircuitBreaker circuitBreaker = registry.circuitBreaker(“my”);
Function<Integer, Integer> decorated = CircuitBreaker
.decorateFunction(circuitBreaker, service::process);
最后,让我们通过 JUnit 测试看看它是如何工作的。
我们将尝试调用服务 10 次。我们应该能够验证该呼叫至少尝试了 5 次,然后在 20% 的呼叫失败后停止:
when(service.process(any(Integer.class))).thenThrow(new RuntimeException());
for (int i = 0; i < 10; i++) {
try {
decorated.apply(i);
} catch (Exception ignore) {}
}
verify(service, times(5)).process(any(Integer.class));
断路器的状态和设置
断路器可以处于以下三种状态之一:
CLOSED – 一切正常,不涉及短路
OPEN – 远程服务器已关闭,所有请求都被短路
HALF_OPEN – 从进入开放状态到现在已经经过了一段时间,断路器允许请求检查远程服务是否重新上线
我们可以配置以下设置:
断路器打开并开始短路呼叫的故障率阈值
等待时间,它定义了断路器在切换到半开状态之前应该保持打开状态的时间
断路器半开或半闭时环形缓冲器的尺寸
处理断路器事件的定制电路断路器事件监听器
一个自定义谓词,用于评估异常是否应算作故障,从而提高故障率
速率限制器
与上一节类似,此功能需要 resilience4j-ratelimiter 依赖项。
顾名思义,此功能允许限制对某些服务的访问。它的 API 与 CircuitBreaker 非常相似 - 有 Registry,Config 和 Limiter 类。
以下是它的示例:
RateLimiterConfig config = RateLimiterConfig.custom().limitForPeriod(2).build();
RateLimiterRegistry registry = RateLimiterRegistry.of(config);
RateLimiter rateLimiter = registry.rateLimiter(“my”);
Function<Integer, Integer> decorated
= RateLimiter.decorateFunction(rateLimiter, service::process);
现在,如果需要的话,所有对已修饰的服务块的调用都要符合速率限制器配置。
我们可以配置如下参数:
限制刷新的时间段
刷新周期的权限限制
默认等待权限持续时间
Bulkhead
在这里,我们首先需要 relasticience4j-bulkhead 依赖项。
可以限制对特定服务的并发调用数。
让我们看一个使用 Bulkhead API 配置最多一个并发调用的示例:
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(1).build();
BulkheadRegistry registry = BulkheadRegistry.of(config);
Bulkhead bulkhead = registry.bulkhead(“my”);
Function<Integer, Integer> decorated
= Bulkhead.decorateFunction(bulkhead, service::process);
要测试此配置,我们将调用模拟服务的方法。
然后,我们确保 Bulkhead 不允许任何其他调用:
CountDownLatch latch = new CountDownLatch(1);
when(service.process(anyInt())).thenAnswer(invocation -> {
latch.countDown();
Thread.currentThread().join();
return null;
});
ForkJoinTask<?> task = ForkJoinPool.commonPool().submit(() -> {
try {
decorated.apply(1);
} finally {
bulkhead.onComplete();
}
});
latch.await();
assertThat(bulkhead.isCallPermitted()).isFalse();
我们可以配置以下设置:
Bulkhead 允许的最大并行执行量
尝试进入饱和舱壁时线程等待的最长时间
重试
对于此功能,我们需要将 resilience4j-retry 库添加到项目中。
我们可以使用 Retry API 自动重试失败的呼叫:
RetryConfig config = RetryConfig.custom().maxAttempts(2).build();
RetryRegistry registry = RetryRegistry.of(config);
Retry retry = registry.retry(“my”);
Function<Integer, Void> decorated
= Retry.decorateFunction(retry, (Integer s) -> {
service.process(s);
return null;
});
现在让我们模拟在远程服务调用期间抛出异常的情况,并确保库自动重试失败的调用:
when(service.process(anyInt())).thenThrow(new RuntimeException());
try {
decorated.apply(1);
fail(“Expected an exception to be thrown if all retries failed”);
} catch (Exception e) {
verify(service, times(2)).process(any(Integer.class));
}
我们还可以配置以下内容:
最大尝试次数
重试前的等待时间
自定义函数,用于修改失败后的等待间隔
自定义谓词,用于评估异常是否应导致重试调用
缓存
Cache 模块需要 resilience4j-cache 依赖项。
初始化看起来与其他模块略有不同:
javax.cache.Cache cache = …; // Use appropriate cache here
Cache<Integer, Integer> cacheContext = Cache.of(cache);
Function<Integer, Integer> decorated
= Cache.decorateSupplier(cacheContext, () -> service.process(1));
这里的缓存是通过使用 JSR-107 Cache 实现完成的,Resilience4j 提供了一种应用它的方法。
请注意,没有用于装饰功能的 API(如 Cache.decorateFunction(Function)),API 仅支持 Supplier 和 Callable 类型。
TimeLimiter
对于此模块,我们必须添加 resilience4j-timelimiter 依赖项。可以使用 TimeLimiter 限制调用远程服务所花费的时间。为了演示,让我们设置一个配置超时为 1 毫秒的 TimeLimiter:
long ttl = 1;
TimeLimiterConfig config
= TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(ttl)).build();
TimeLimiter timeLimiter = TimeLimiter.of(config);
接下来,让我们验证 Resilience4j 是否使用预期的超时调用 Future.get():
Future futureMock = mock(Future.class);
Callable restrictedCall
= TimeLimiter.decorateFutureSupplier(timeLimiter, () -> futureMock);
restrictedCall.call();
verify(futureMock).get(ttl, TimeUnit.MILLISECONDS);
我们也可以将它与 CircuitBreaker 结合使用:
Callable chainedCallable
= CircuitBreaker.decorateCallable(circuitBreaker, restrictedCall);
附加模块
Resilience4j 还提供了许多附加模块,可以简化与流行框架和库的集成。
一些比较知名的集成是:
Spring Boot – resilience4j-spring-boot
Ratpack – resilience4j-ratpack
Retrofit – resilience4j-retrofit
Vertx – resilience4j-vertx
Dropwizard – resilience4j-metrics
Prometheus – resilience4j-prometheus
案例结论
在本文中,我们了解了 Resilience4j 库的各个方面,并学习了如何使用它来解决服务器间通信中的各种容错问题。