1 场景

调用多个平级服务,依照服务优先级返回第一个无效数据。

具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只须要返回第一个有数据的弹窗。然而又心愿所有弹窗之间的数据获取是异步的。这种场景应用 Reactor 怎么实现呢?

2 创立 service

2.1 创立根本接口和实体类

public interface TestServiceI {    Mono request();}

提供一个 request 办法,返回一个 Mono 对象。

@Data@ToString@AllArgsConstructor@NoArgsConstructorpublic class TestUser {    private String name;}

2.2 创立 service 实现

@Slf4jpublic class TestServiceImpl1 implements TestServiceI {    @Override    public Mono request() {        log.info("execute.test.service1");        return Mono.fromSupplier(() -> {                    try {                        System.out.println("service1.threadName=" + Thread.currentThread().getName());                        Thread.sleep(500);                    } catch (InterruptedException e) {                        throw new RuntimeException(e);                    }                    return "";                })                .map(name -> {                    return new TestUser(name);                });    }}

第一个 service 执行耗时 500ms。返回空对象;

创立第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep工夫即可。

持续创立第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 工夫,以及返回为 name3。

3 主体办法

public static void main(String[] args) {        long startTime = System.currentTimeMillis();        TestServiceI testServiceImpl4 = new TestServiceImpl4();        TestServiceI testServiceImpl5 = new TestServiceImpl5();        TestServiceI testServiceImpl6 = new TestServiceImpl6();        List<TestServiceI> serviceIList = new ArrayList<>();        serviceIList.add(testServiceImpl4);        serviceIList.add(testServiceImpl5);        serviceIList.add(testServiceImpl6);    // 执行 service 列表,这样有多少个 service 都能够        Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList)                .map(service -> {                    return service.request();                });    // flatMap(或者flatMapSequential) + map 实现异样持续下一个执行        Flux flux = monoFlux.flatMapSequential(mono -> {            return mono.map(user -> {                        TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class);                        if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) {                            return testUser;                        }            // null 在 reactor 中是异样数据。                        return null;                    })                    .onErrorContinue((err, i) -> {                        log.info("onErrorContinue={}", i);                    });        });        Mono mono = flux.elementAt(0, Mono.just(""));        Object block = mono.block();        System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime));    }

1、Flux.fromIterable 执行 service 列表,能够随便增删 service 服务。

2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异样持续下一个执行。具体参考:

Reactor 之 onErrorContinue 和 onErrorResume

3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一个失常数据。

执行输入:

20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1service1.threadName=main20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2service5.threadName=main20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3service6.threadName=mainTestUser(name=name3)blockFirst 执行耗时ms:2895

1、service1 和 service2 因为返回空,所以持续下一个,最终返回 name3。

2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。

总结:这样实现依照程序返回第一个失常数据。然而执行并没有异步。下一步:如何实现异步呢?

4 实现异步

4.1 subcribeOn 实现异步

批改 service 实现。减少 .subscribeOn(Schedulers.boundedElastic())

如下:

@Slf4jpublic class TestServiceImpl1 implements TestServiceI {    @Override    public Mono request() {        log.info("execute.test.service1");        return Mono.fromSupplier(() -> {                    try {                        System.out.println("service1.threadName=" + Thread.currentThread().getName());                        Thread.sleep(500);                    } catch (InterruptedException e) {                        throw new RuntimeException(e);                    }                    return "";                })                //减少subscribeOn                .subscribeOn(Schedulers.boundedElastic())                .map(name -> {                    return new TestUser(name);                });    }}

再次执行输入如下:

21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1service4.threadName=boundedElastic-121:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service221:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3service2.threadName=boundedElastic-2service3.threadName=boundedElastic-321:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)TestUser(name=name6)blockFirst 执行耗时ms:1242

1、发现具体实现 sleep 的线程都不是 main 线程,而是 boundedElastic

2、最终执行耗时 1242ms,只比执行工夫最长的 service2 和 service3 耗时 1000ms,多一些。证实是异步了。

4.2 CompletableFuture 实现异步

批改 service 实现,应用 CompletableFuture 执行耗时操作(这里是sleep,具体到我的项目中可能是内部接口调用,DB 操作等);而后应用 Mono.fromFuture 返回 Mono 对象。

@Slf4jpublic class TestServiceImpl1 implements TestServiceI{    @Override    public Mono request() {        log.info("execute.test.service1");        CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {            try {                System.out.println("service1.threadName=" + Thread.currentThread().getName());                Thread.sleep(500);            } catch (InterruptedException e) {                throw new RuntimeException(e);            }            return "testname1";        });        return Mono.fromFuture(uCompletableFuture).map(name -> {            return new TestUser(name);        });    }}

执行返回如下:

21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2service2.threadName=ForkJoinPool.commonPool-worker-121:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3service3.threadName=ForkJoinPool.commonPool-worker-221:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 service1.threadName=ForkJoinPool.commonPool-worker-321:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)TestUser(name=testname1)blockFirst 执行耗时ms:1238

1、耗时操作都是应用 ForkJoinPool 线程池中的线程执行。

2、最终耗时和办法1根本差不多。

大家都去试试吧~

相干链接:

Reactor 之 onErrorContinue 和 onErrorResume

Reactor 之 flatMap vs map 详解