乐趣区

关于java:Java-项目中使用-Resilience4j-框架实现异步超时处理

到目前为止,在本系列中,咱们曾经理解了 Resilience4j 及其 [Retry](
https://icodewalker.com/blog/…) 和 [RateLimiter](
https://icodewalker.com/blog/…) 模块。在本文中,咱们将通过 TimeLimiter 持续摸索 Resilience4j。咱们将理解它解决了什么问题,何时以及如何应用它,并查看一些示例。

代码示例

本文附有 [GitHub 上](
https://github.com/thombergs/…)的工作代码示例。

什么是 Resilience4j?

请参阅上一篇文章中的形容,疾速理解 [Resilience4j 的个别工作原理](
https://icodewalker.com/blog/…)。

什么是限时?

对咱们违心期待操作实现的工夫设置限度称为工夫限度。如果操作没有在咱们指定的工夫内实现,咱们心愿通过超时谬误收到告诉。

有时,这也称为“设定最初期限”。

咱们这样做的一个次要起因是确保咱们不会让用户或客户无限期地期待。不提供任何反馈的迟缓服务可能会让用户感到丧气。

咱们对操作设置工夫限度的另一个起因是确保咱们不会无限期地占用服务器资源。咱们在应用 Spring 的 @Transactional 注解时指定的 timeout 值就是一个例子——在这种状况下,咱们不想长时间占用数据库资源。

什么时候应用 Resilience4j TimeLimiter?

Resilience4j 的 TimeLimiter 可用于设置应用 CompleteableFutures 实现的异步操作的工夫限度(超时)。

Java 8 中引入的 CompletableFuture 类使异步、非阻塞编程变得更容易。能够在不同的线程上执行慢速办法,开释以后线程来解决其余工作。咱们能够提供一个当 slowMethod() 返回时执行的回调:

int slowMethod() {
  // time-consuming computation or remote operation
return 42;
}

CompletableFuture.supplyAsync(this::slowMethod)
.thenAccept(System.out::println);

这里的 slowMethod() 能够是一些计算或近程操作。通常,咱们心愿在进行这样的异步调用时设置工夫限度。咱们不想无限期地期待 slowMethod() 返回。例如,如果 slowMethod() 破费的工夫超过一秒,咱们可能想要返回先前计算的、缓存的值,甚至可能会出错。

在 Java 8 的 CompletableFuture 中,没有简略的办法来设置异步操作的工夫限度。CompletableFuture 实现了 Future 接口,Future 有一个重载的 get() 办法来指定咱们能够期待多长时间:

CompletableFuture<Integer> completableFuture = CompletableFuture
  .supplyAsync(this::slowMethod);
Integer result = completableFuture.get(3000, TimeUnit.MILLISECONDS);
System.out.println(result);

然而这里有一个问题—— get() 办法是一个阻塞调用。所以它首先违反了应用 CompletableFuture 的目标,即开释以后线程。

这是 Resilience4j 的 TimeLimiter 解决的问题——它让咱们在异步操作上设置工夫限度,同时保留在 Java 8 中应用 CompletableFuture 时非阻塞的益处。

CompletableFuture 的这种限度已在 Java 9 中失去解决。咱们能够在 Java 9 及更高版本中应用 CompletableFuture 上的 orTimeout()completeOnTimeout() 等办法间接设置工夫限度。然而,凭借 Resilience4J 的 指标 和 事件,与一般的 Java 9 解决方案相比,它依然提供了附加值。

Resilience4j TimeLimiter 概念

TimeLimiter反对 FutureCompletableFuture。然而将它与 Future 一起应用相当于 Future.get(long timeout, TimeUnit unit)。因而,咱们将在本文的其余部分关注 CompletableFuture

与其余 Resilience4j 模块一样,TimeLimiter 的工作形式是应用所需的性能装璜咱们的代码 – 如果在这种状况下操作未在指定的 timeoutDuration 内实现,则返回 TimeoutException

咱们为 TimeLimiter 提供 timeoutDurationScheduledExecutorService 和异步操作自身,示意为 CompletionStageSupplier。它返回一个 CompletionStage 的装璜 Supplier

在外部,它应用调度器来调度一个超时工作——通过抛出一个 TimeoutException 来实现 CompletableFuture 的工作。如果操作先实现,TimeLimiter 勾销外部超时工作。

除了 timeoutDuration 之外,还有另一个与 TimeLimiter 关联的配置 cancelRunningFuture。此配置仅实用于 Future 而不适用于 CompletableFuture。当超时产生时,它会在抛出 TimeoutException 之前勾销正在运行的 Future

应用 Resilience4j TimeLimiter 模块

TimeLimiterRegistryTimeLimiterConfigTimeLimiter 是 resilience4j-timelimiter 的次要形象。

TimeLimiterRegistry 是用于创立和治理 TimeLimiter 对象的工厂。

TimeLimiterConfig 封装了 timeoutDurationcancelRunningFuture 配置。每个 TimeLimiter 对象都与一个 TimeLimiterConfig 相关联。

TimeLimiter 提供辅助办法来为 FutureCompletableFuture Suppliers 创立或执行装璜器。

让咱们看看如何应用 TimeLimiter 模块中可用的各种性能。咱们将应用与本系列前几篇文章雷同的示例。假如咱们正在为一家航空公司建设一个网站,以容许其客户搜寻和预订航班。咱们的服务与 FlightSearchService 类封装的近程服务对话。

第一步是创立一个 TimeLimiterConfig

TimeLimiterConfig config = TimeLimiterConfig.ofDefaults();

这将创立一个 TimeLimiterConfig,其默认值为 timeoutDuration (1000ms) 和 cancelRunningFuture (true)。

假如咱们想将超时值设置为 2s 而不是默认值:

TimeLimiterConfig config = TimeLimiterConfig.custom()
  .timeoutDuration(Duration.ofSeconds(2))
  .build();

而后咱们创立一个 TimeLimiter

TimeLimiterRegistry registry = TimeLimiterRegistry.of(config);

TimeLimiter limiter = registry.timeLimiter("flightSearch");

咱们想要异步调用
FlightSearchService.searchFlights(),它返回一个 List<Flight>。让咱们将其示意为 Supplier<CompletionStage<List<Flight>>>

Supplier<List<Flight>> flightSupplier = () -> service.searchFlights(request);
Supplier<CompletionStage<List<Flight>>> origCompletionStageSupplier =
() -> CompletableFuture.supplyAsync(flightSupplier);

而后咱们能够应用 TimeLimiter 装璜 Supplier

ScheduledExecutorService scheduler =
  Executors.newSingleThreadScheduledExecutor();
Supplier<CompletionStage<List<Flight>>> decoratedCompletionStageSupplier =  
  limiter.decorateCompletionStage(scheduler, origCompletionStageSupplier);

最初,让咱们调用装璜的异步操作:

decoratedCompletionStageSupplier.get().whenComplete((result, ex) -> {if (ex != null) {System.out.println(ex.getMessage());
  }
  if (result != null) {System.out.println(result);
  }
});

以下是胜利航行搜寻的示例输入,其耗时少于咱们指定的 2 秒 timeoutDuration

Searching for flights; current time = 19:25:09 783; current thread = ForkJoinPool.commonPool-worker-3

Flight search successful

[Flight{flightNumber='XY 765', flightDate='08/30/2020', from='NYC', to='LAX'}, Flight{flightNumber='XY 746', flightDate='08/30/2020', from='NYC', to='LAX'}] on thread ForkJoinPool.commonPool-worker-3

这是超时的航班搜寻的示例输入:

Exception java.util.concurrent.TimeoutException: TimeLimiter 'flightSearch' recorded a timeout exception on thread pool-1-thread-1 at 19:38:16 963

Searching for flights; current time = 19:38:18 448; current thread = ForkJoinPool.commonPool-worker-3

Flight search successful at 19:38:18 461

下面的工夫戳和线程名称表明,即便异步操作稍后在另一个线程上实现,调用线程也会收到 TimeoutException。

如果咱们想创立一个装璜器并在代码库的不同地位重用它,咱们将应用decorateCompletionStage()。如果咱们想创立它并立刻执行 Supplier<CompletionStage>,咱们能够应用 executeCompletionStage() 实例办法代替:

CompletionStage<List<Flight>> decoratedCompletionStage =  
  limiter.executeCompletionStage(scheduler, origCompletionStageSupplier);

TimeLimiter 事件

TimeLimiter 有一个 EventPublisher,它生成 TimeLimiterOnSuccessEventTimeLimiterOnErrorEventTimeLimiterOnTimeoutEvent 类型的事件。咱们能够监听这些事件并记录它们,例如:

TimeLimiter limiter = registry.timeLimiter("flightSearch");

limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));

limiter.getEventPublisher().onError(e -> System.out.println(e.toString()));

limiter.getEventPublisher().onTimeout(e -> System.out.println(e.toString()));

示例输入显示了记录的内容:

2020-08-07T11:31:48.181944: TimeLimiter 'flightSearch' recorded a successful call.

... other lines omitted ...

2020-08-07T11:31:48.582263: TimeLimiter 'flightSearch' recorded a timeout exception.

TimeLimiter 指标

TimeLimiter 跟踪胜利、失败和超时的调用次数。

首先,咱们像平常一样创立 TimeLimiterConfigTimeLimiterRegistryTimeLimiter。而后,咱们创立一个 MeterRegistry 并将 TimeLimiterRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedTimeLimiterMetrics.ofTimeLimiterRegistry(registry)
  .bindTo(meterRegistry);

运行几次限时操作后,咱们显示捕捉的指标:

Consumer<Meter> meterConsumer = meter -> {String desc = meter.getId().getDescription();
  String metricName = meter.getId().getName();
  String metricKind = meter.getId().getTag("kind");
  Double metricValue =
    StreamSupport.stream(meter.measure().spliterator(), false)
    .filter(m -> m.getStatistic().name().equals("COUNT"))
    .findFirst()
    .map(Measurement::getValue)
    .orElse(0.0);
  System.out.println(desc + "-" +
                     metricName +
                     "(" + metricKind + ")" +
                     ":" + metricValue);
};
meterRegistry.forEachMeter(meterConsumer);

这是一些示例输入:

The number of timed out calls - resilience4j.timelimiter.calls(timeout): 6.0

The number of successful calls - resilience4j.timelimiter.calls(successful): 4.0

The number of failed calls - resilience4j.timelimiter.calls(failed): 0.0

在理论利用中,咱们会定期将数据导出到监控零碎并在仪表板上进行剖析。

施行工夫限度时的陷阱和良好实际

通常,咱们解决两种操作 – 查问(或读取)和命令(或写入)。对查问进行工夫限度是平安的,因为咱们晓得它们不会扭转零碎的状态。咱们看到的 searchFlights() 操作是查问操作的一个例子。

命令通常会扭转零碎的状态。bookFlights() 操作将是命令的一个示例。在对命令进行工夫限度时,咱们必须记住,当咱们超时时,该命令很可能仍在运行。例如,bookFlights() 调用上的 TimeoutException 并不一定意味着命令失败。

在这种状况下,咱们须要治理用户体验——兴许在超时时,咱们能够告诉用户操作破费的工夫比咱们预期的要长。而后咱们能够查问上游以查看操作的状态并稍后告诉用户。

论断

在本文中,咱们学习了如何应用 Resilience4j 的 TimeLimiter 模块为异步、非阻塞操作设置工夫限度。咱们通过一些理论示例理解了何时应用它以及如何配置它。

您能够应用 [GitHub 上](
https://github.com/thombergs/…)的代码演示一个残缺的应用程序来阐明这些想法。


本文译自:
https://reflectoring.io/time-…

退出移动版