这仿佛是 Reactor 的热门搜寻之一,至多当我在谷歌中输出 onErrorContinue 时,onErrorResume 会在它旁边弹出。让我把我的测试代码和我的一些解释粘贴在上面。

1 根底性能

这是一个简略的函数,将 5 个间断的数字别离乘以 2,而后相加,当 i==2 时抛出一个异样:

public static void main(String... args) {      Flux.range(1,5)              .doOnNext(i -> System.out.println("input=" + i))              .map(i -> i == 2 ? i / 0 : i)              .map(i -> i * 2)              .reduce((i,j) -> i+j)              .doOnNext(i -> System.out.println("sum=" + i))              .block();  }

显然,输入如下:

input=1input=2Exception in thread "main" java.lang.ArithmeticException: / by zero

2 只有 onErrorResume ()

public static void main(String... args) {      Flux.range(1,5)              .doOnNext(i -> System.out.println("input=" + i))              .map(i -> i == 2 ? i / 0 : i)              .map(i -> i * 2)              .onErrorResume(err -> {                  log.info("onErrorResume");                  return Flux.empty();              })              .reduce((i,j) -> i+j)              .doOnNext(i -> System.out.println("sum=" + i))              .block();  }

输入如下:

input=1input=217:40:47.828 [main] INFO com.example.demo.config.TestRunner - onErrorResumesum=2

正如官网文档形容(onErrorResume)的那样,onErrorResume 用返回内容替换 Flux,因而在 2 之后的数据不会被解决。惟一值得一提的是,onErrorResume() 不用马上在谬误之后捕捉异样。在 onErrorContinue() 的官网文档中(onErrorContinue),只有 onErrorContinue() 强调了了 upstream 关键词,但显然,它还有其余含意。

3 只有 onErrorContinue()

public static void main(String... args) {  Flux.range(1,5)          .doOnNext(i -> System.out.println("input=" + i))          .map(i -> i == 2 ? i / 0 : i)          .map(i -> i * 2)          .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})          .reduce((i,j) -> i+j)          .doOnNext(i -> System.out.println("sum=" + i))          .block();}

输入:

input=1input=217:43:10.656 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2input=3input=4input=5sum=26

显然,onErrorContinue 会抛弃谬误数据 2, 而后持续数字 3 直到 5。

4 onErrorResume() 而后 onErrorContinue()

public static void main(String... args) {    Flux.range(1,5)            .doOnNext(i -> System.out.println("input=" + i))            .map(i -> i == 2 ? i / 0 : i)            .map(i -> i * 2)            .onErrorResume(err -> {                log.info("onErrorResume");                return Flux.empty();            })            .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})            .reduce((i,j) -> i+j)            .doOnNext(i -> System.out.println("sum=" + i))            .block();}

输入和下面一样:

input=1input=217:47:05.789 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2input=3input=4input=5sum=26

这样的后果,你想到了吗?onErrorContinue() 会在 onErrorResume() 失去谬误之前解决这个谬误。当两个谬误处理函数在同一个函数中的时候很显著,然而当你的函数中只有 onErrorResume(),而一些调用者实际上有 onErrorContinue() 时,你的 onErrorResume() 没有被调用的起因可能就不那么显著了。

5 应用 onErrorResume() 模仿 onErrorContinue()

有些博客倡议咱们齐全不必 onErrorContinue(),且在所有场景中仅用 onErrorResume()。然而上述示例曾经展现了它们会产生不同的后果。那咱们怎么实现呢?

public static void main(String... args) {    Flux.range(1,5)            .doOnNext(i -> System.out.println("input=" + i))            .flatMap(i -> Mono.just(i)                    .map(j -> j == 2 ? j / 0 : j)                    .map(j -> j * 2)                    .onErrorResume(err -> {                        System.out.println("onErrorResume");                        return Mono.empty();                    })            )            .reduce((i,j) -> i+j)            .doOnNext(i -> System.out.println("sum=" + i))            .block();}

因而,实质上是将可能在 flatMap 或 concatMap 中抛出谬误的操作包装起来,并在其上应用 onErrorResume()。这样,它会产生雷同的后果:

input=1input=2onErrorResumeinput=3input=4input=5sum=26

6 应用 onErrorResume() 和上游的 onErrorContinue() 模仿 onErrorContinue()

有时候,onErrorContinue() 放在调用程序中,您无法控制它。但你依然须要 onErrorResume()。你该怎么办?

public static void main(String... args) {    Flux.range(1,5)            .doOnNext(i -> System.out.println("input=" + i))            .flatMap(i -> Mono.just(i)                    .map(j -> j == 2 ? j / 0 : j)                    .map(j -> j * 2)                    .onErrorResume(err -> {                        System.out.println("onErrorResume");                        return Mono.empty();                    })                    .onErrorStop()            )            .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})            .reduce((i,j) -> i+j)            .doOnNext(i -> System.out.println("sum=" + i))            .block();}

秘诀是在 onErrorResume() 代码块的开端增加 onErrorStop() ——这会阻塞 onErrorContinue(),这样它就不会在 onErrorResume() 之前占用谬误。尝试删除 onErrorStop(),你会看到 onErrorContinue() 在 onErrorResume 之前弹出。

Reactor onErrorContinue VS onErrorResume