乐趣区

关于reactor:GrowingIO-Reactor速成指南

简介

Reactor 响应式编程(Reactive Programming):

Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing“backpressure”). It integrates directly with the Java 8 functional APIs, notably CompletableFuture, Stream, and Duration. It offers composable asynchronous sequence APIs — Flux (for [N] elements) and Mono (for [0|1] elements) — and extensively implements the Reactive Streams specification.

翻译一下就是:Reactor 是 JVM 的一个齐全无阻塞的响应式编程根底,具备高效的需要治理 (以治理“背压”的模式)。它间接与 Java 8 性能 api 集成,尤其是 CompletableFuture、Stream 和 Duration。它提供了可组合的异步序列 api——Flux(用于[N] 元素)和 Mono(用于 [0|1] 元素)——并且宽泛地实现了反馈流标准。

背景

咱们服务端的我的项目大多采纳了 Spring WebFlux,reactor 是 Spring WebFlux 的首选反馈库,WebFlux 须要 Reactor 作为外围依赖项。Reactor 存在肯定的学习老本,在开发中咱们遇到了些 bug,相当一部分是因为咱们不够理解 reactor,踩了很多坑。所以在本文档中咱们次要针对的是一些学习过程容易让新人感到迷茫的知识点(map、flatMap、异步、并发),冀望能让新人更好上手 Spring WebFlux。

Mono

Mono<T> 是非凡的 Publisher<T>,它通过 onNext 信号最多收回一个我的项目,而后以一个 onComplete 信号(胜利 Mono,有或没有值)终止,或者只收回一个 onError 信号。某些 Operator(尤其是那些将 Mono 与其余 Publisher 联合在一起的 Operator)能够把 Mono 切换到 Flux。例如,Mono.concatWith(Publisher)返回 Flux,而 Mono.then(Mono)返回另一个 Mono。

Flux

Flux<T> 是规范的 Publisher<T>,示意它是能够发送 0 到 N 个元素的异步序列,可选的终止操作有 onComplete 或 onError。与 Reactive Streams 标准一样,这三种信号转换为对上游的 onNext,onComplete 和 onError 办法的调用。Flux 是通用的响应式类型。请留神,所有事件,甚至是终止事件,都是可选的,意思是:可能没有 onNext 事件,但只有 onComplete 事件,这就示意此 Flux 是一个空的无限序列。删除 onComplete 则能够失去一个有限的空序列(除了勾销测试外,它没有什么用途)。同样,有限序列不肯定为空。例如,Flux.interval(Duration)有限生产 Flux<Long>。

map、flatMap 以及 flatMapSequential 区别:

办法签名

//Map 的办法签名
<V> Flux<V> map(Function<? super T, ? extends V> mapper)

//FlatMap 的办法签名
<R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

map:通过对每个元素利用同步函数来转换此 Flux 收回的元素,并且是一对一的转换流元素。

flatMap:将此 Flux 收回的元素异步转换为 Publisher,而后通过合并将这些外部发布者扁平化为单个 Flux。Flux 可能蕴含 N 个元素,所以 flatMap 是一对多的转换。将各个 publisher 合并的过程不会放弃 源 Flux 公布 的程序,可能呈现交织。

flatMapSequential:将此 Flux 收回的元素异步转换为 Publisher,而后通过合并将这些外部发布者扁平化为单个 Flux。于 flatMap 不同的是 flatMapSequential 在合并 publisher 时会 按源元素的程序合并它们。

map 是一个同步运算符,它只是一种将一个值转换为另一个值的办法。

flatMap 能够是同步的,也能够是异步的,这取决于 flatMap 中调用的办法是否应用。

示例 1:

void demo() {
    //1. Flux.interval 按时生产 Long 的有限流
    final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
            .map(log()) //2. 调用办法 log,订阅后 log()会在以后线程执行
            .flatMap(logOfFlux()); //3. 通过 flatMap 调用 log(), 会在以后线程执行
    // 实现 flux 的定义,调用 subscribe 后才会真正开始执行
    flux.subscribe();}
​
​
Function<Long, Long> log() {
    return aLong -> {log.info("num is {}", aLong);
        return aLong;
    };
}
​
Function<Long, Flux<Long>> logOfFlux() {
    return aLong -> {log.info("num is {}", aLong);
        return Flux.just(aLong);
    };
}

在咱们的示例代码中,该 flatMap 操作是同步的,因为咱们应用 Flux.just()办法收回元素。上面咱们会介绍如何在 flatMap 中实现异步操作。

上边代码中讲到了 Publisher 调用 subscribe 后才会真正开始执行,然而 subscribe 中的代码并不一定会执行。

当 Mono 是空序列时:

void monoOfEmpty() {Mono.empty()
        .map(m -> func())
        .subscribe(message -> {responseObserver.onNext(message);
          responseObserver.onCompleted(););
}

会有同学喜爱在 subscribe 中解决响应(例如 rpc),但这个场景中 responseObserver.onCompleted() 不会被执行。

正确的做法应该是:

void monoOfEmpty() {Mono.empty()
        .map(m -> func())
        .doOnSuccess(message -> responseObserver.onCompleted())
        .subscribe(responseObserver::onNext);
}

异步与多线程

Reactor 被视为与并发无关的。也就是说,取得 Flux 或 Mono 并不一定意味着它在专用线程中运行。取而代之的是,大多数 Operator 会持续在执行前一个 Operator 的线程中工作。除非指定,否则最顶层的 Operator(源)自身运行在进行 subscribe()调用的线程上。

先分享一个案例:
在 Growing 查问服务 olap 中因为应用谬误的操作符带来性能问题

景象:在查看某些看板时,响应工夫会超时(一分钟)。

通过剖析发现这些看板中的单图查问时间段蕴含“明天”,因为须要保障实时数据,olap 在解决蕴含“明天”的查问时不会生成缓存;通过日志发现 olap 在查问 clickhouse 时始终都是一个线程在提交。

对应代码:

//queries sql List
Flux.fromArray(queries.values())
    // 须要应用 flatMapSequential 保障 Flux 订阅时返回的程序
    .flatMapSequential(query -> {
        // 执行 sql
        return executor.getFromCacheOrExecute(...);
    });
})
​
Mono<DataSet> getFromCacheOrExecute() {return existsCache(hashing)
        .flatMap(exists-> {if(exists) {// 查问缓存}else{
             // 查问 clickhouse
             return writeCache(hashing, doExecute(...))
             }
        });
}
​
// 查问缓存
public Mono<Boolean> existsKey(String key) {return stringRedisTemplate.hasKey(key);
}
​
// 提交查问至 clickhouse
Mono<DataSet> doExecute(...) {return Mono.fromCallable(() -> execute(...));
}

代码中没有应用 subscriptOn 或者 publishOn 操作符,stringRedisTemplate.hasKey(key) 会使后续的操作符都在 lettuce-1 线程中执行 并且整个流的执行都是串行的,咱们须要将查问过程改为异步查问,多个 sql 能够并发查问。上面就介绍实现异步并发的一些办法,及 GrowingIO 利用于这个场景的抉择。

Reactor 提供了两种在流中切换执行上下文(或 Scheduler)的形式:

咱们能够通过这两种形式达到异步执行的目标

publishOn : 此运算符影响线程上下文,它上面的链中的其余运算符将在其中执行,直到新呈现的 publishOn。

void demo() {final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
            .map(log())//1
            .publishOn(Schedulers.parallel())//2
            .map(log())//3
            .publishOn(Schedulers.elastic())//4
            .flatMap(logOfMono());//5
    flux.subscribe();}
  1. 会在调用 flux.subscribe()的线程上执行
    援用
  2. 指定后续的运算符执行的上下文
    援用
  3. 会在 parallel 线程上执行
    援用
  4. 指定后续的运算符执行的上下文
    援用
  5. 会在 elastic 线程上执行

subscribeOn:整个流在指定的 Scheduler 的 Scheduler.Worker 上运行,直至呈现 publishOn,publishOn 后续的操作符由 publishOn 决定执行上下文。

void demo() {final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
            .map(log())//1
            .publishOn(Schedulers.parallel())
            .map(log())//2
            .subscribeOn(Schedulers.elastic())
            .flatMap(logOfMono());
    flux.subscribe();}
  1. 会在 elastic 线程执行
    援用
  2. 会在 parallel 线程上执行,publishOn 会“笼罩”subscribeOn 的行为

这样咱们就能够异步的去执行办法了。这里有一个误区,刚接触 reactor 的小伙伴会误认为 Flux 中多个元素通过 log()或者 logOfMono() 会别离打印在不同线程上

Flux 顺次收回了 1,2,3 三个元素
无论是 subscribeOn、还是 publishOn,通过 flatMap 都会在 parallel-1 线程上执行,也就是说 Flux 中的所有元素只是从主线程收回,在另一个线程中执行。

在调用一些阻塞办法时(rpc、redis、io),咱们冀望每个元素通过 flatMap 中时能够运行在不同线程上(串行 -> 并发),应该怎么做?
将 Flux 转为 ParallelFlux,应用 runOn 来指明须要的线程池

要取得 ParallelFlux,能够在 Flux 上应用 parallel()运算符。

为了通知 ParallelFlux 在哪里运行每个元素,必须应用 runOn(Scheduler)。如果并行处理后,您想复原到“失常”状态 Flux 并以程序形式利用运算符链的其余部分,能够应用 sequential()

void demo() {final Flux<Long> flux = Flux.fromIterable(Lists.newArrayList(3L, 1L, 2L))
            .parallel().runOn(Schedulers.elastic())
            .flatMap(logOfMono())
            .sequential();
    flux.subscribe();}
  1. fluxMap 中调用的办法须要在外部通过 publishOn 表明执行上下文。
void demo() {Flux.fromIterable(Lists.newArrayList(3L, 1L, 2L))
        .flatMap(this::blockFunction)
        .subscribe();}
​
// 通过 publishOn 来表明 blockFunction() 的上下文
Mono<Long> blockFunctionAsync(Long i) {return Mono.just(i).publishOn(Schedulers.elastic()).flatMap(this::blockFunction);
}
​
Mono<Long> blockFunction(Long i) {Thread.sleep(i * 1000);
    return Mono.just(i);
}
  1. 应用 Mono.fromFuture() 创立流
Mono.fromFuture(CompletableFuture.supplyAsync(() -> blockFunction()));

Mono.fromFuture() 创立一个 Mono,应用提供的 CompletableFuture 产生它的值。

CompletableFuture.supplyAsync() 返回一个 CompletableFuture,它将在 ForkJoinPool.commonPool() 上运行工作,异步实现。

ForkJoinPool 一个全局线程池,次要利用于计算密集型的场景。

  1. 应用 Mono.create()、Flux.create() 办法创立流
// 自定义线程池
final Executor executor = new ThreadPoolExecutor(...);

// 创立 Mono、Flux 时 指明上下文
Mono.create(sink -> executor.execute(() -> {sink.success(blockFunction());
}));

目前咱们举荐应用 这种办法来创立 Mono、Flux 来实现异步

这样做的益处:

Reactor 对老手来说有肯定了解老本,在调用一个返回值为 Publisher 的类型的办法时,不点进去无奈晓得这是同步办法还是异步办法,防止对于调用者造成的心智累赘。

更不便得应用自定义线程池

咱们针对 olap 查问业务场景 采取适合的办法:

办法一:ParallelFlux 不能保障 sql 返回程序,不满足该业务场景。

办法二:Reactor 对老手来说有肯定了解老本,在调用一个返回值为 Publisher 的类型的办法时,不点进去无奈晓得这是同步办法还是异步办法,对于调用者存在心智累赘。

办法三:无奈指定自定义线程池

咱们采纳了第 4 种形式来实现异步。

reactor 提供的线程池

  1. 以后线程执行(Schedulers.immediate()): 在解决时,将间接执行提交的 Runnable,从而在以后线程上无效地运行它们(能够视为“空对象”或无操作调度程序)。
  2. 单个可重用线程(Schedulers.single())。请留神,此办法对所有调用方都应用雷同的线程,直到调度程序被开释为止。如果须要每次调用一个专用线程,请对每个调用应用 Schedulers.newSingle()。
  3. 无限度的弹性线程池(Schedulers.elastic())。它依据须要创立新的工作池并重用闲暇的工作池,随着 Flux 中元素的增多,会无限度创立线程去执行,存在安全隐患,不举荐应用。
  4. 有界弹性线程池 (Schedulers.boundedElastic())。依据须要创立新的工作池并重用闲暇的工作池。闲置工夫过长(默认值为 60s)的工作池也将被抛弃。与 elastic() 有所不同,它对能够创立的线程数进行限度(默认为 CPU 外围数 x 10)。达到下限后,最多可再提交 10 万个工作,并在有线程可用时从新调度(当工作被设置提早执行时,提早计时是在线程可用时开始)。这是 I / O 阻塞工作的更好抉择。Schedulers.boundedElastic()是一种为阻塞解决提供本人的线程的简便办法,这样它就不会占用其余资源。毛病是这个线程池是全局的,假如有两个阻塞办法 getFiles、getData,冀望别离应用不同线程池去方便管理,就不适宜用 boundedElastic 了。
  5. 为并行工作而调整的固定工作线程池(Schedulers.parallel())。它创立的工作线程数量与 CPU 内核数量一样多。

处理错误

在响应式流中,谬误(error)是终止(terminal)事件。当有谬误产生时,它会导致流序列进行,并且谬误信号会沿着操作链条向下传递,直至遇到定义的 Subscriber 及其 onError 办法。

在 try-catch 代码块中解决异样的几种办法。常见的包含如下几种:

  1. 捕捉并返回一个动态的缺省值。
  2. 捕捉并执行一个异样解决办法。
  3. 捕捉并动静计算一个候补值来顶替。
  4. 捕捉,并再包装为某一个 业务相干的异样,而后再抛出业务异样。
  5. 捕捉,记录谬误日志,而后持续抛出。
  6. 应用 finally 来清理资源

以上所有这些在 Reactor 都有等效的 操作符解决形式。

与第 (1) 条(捕捉并返回一个动态的缺省值)对应的是 onErrorReturn:

Flux.just(10)
    .flatMap(this::function)
    .onErrorReturn("Error");// 返回一个动态的缺省值

依据谬误类型返回对应值

Flux.just(10)
    .flatMap(this::function)
    .onErrorReturn(e -> e.getMessage().equals("boom-1"), "Error-1");

与第 (2、3、4) 条(捕捉并执行一个异样解决办法)对应的是 onErrorResume

Flux.just(10)
    .flatMap(m -> function(k).onErrorResume(e -> handleErr(k)));
​
Flux.just(10)
    .flatMap(m -> function(k).onErrorResume(e -> errFunc(k)));
​
Flux.just(10)
    .flatMap(m -> function(k)
        .onErrorResume(e -> Flux.error(new Exception(k)))
    );

对应第 (5) 条(捕捉,记录谬误日志,并持续抛出)

Flux.just(10)
    .flatMap(k -> function(k)) 
    .doOnError(e -> 
        log.error(e.getLocalizedMessage(), e);
    });

对应(6)doFinally 在序列终止(无论是 onComplete、onError 还是勾销)的时候被执行,并且可能判断是什么类型的终止事件

Flux.just(10)
    .doFinally(type -> {if (type == SignalType.CANCEL){log.info("a log"); 
        }
    })

我须要哪个运算符

官网文档

A.1. 创立一个新序列
收回一个 T , 当曾经有值(不须要再去 IO):just

基于一个 Optional<T>:Mono.justOrEmpty(Optional<T>)

基于一个可能为 null 的 T:Mono.justOrEmpty(T)

收回一个 T,且还是由 just 返回

然而须要“懒”创立的:应用 Mono.fromSupplier 或用 defer 包装 just

// 会起到提早执行的作用,在订阅时才去执行 func()
Mono.defer(()-> Mono.just(func()));
收回许多 T,这些元素我能够明确列举进去:Flux.just(T…)

基于迭代数据结构:

一个数组:Flux.fromArray

一个汇合或 iterable:Flux.fromIterable

一个 Integer 的 range:Flux.range

一个 Stream 提供给每一个订阅:Flux.fromStream(Supplier<Stream>)

基于一个参数值给出的源:

一个 Supplier<T>:Mono.fromSupplier

一个工作:Mono.fromCallable,Mono.fromRunnable

一个 CompletableFuture<T>:Mono#fromFuture

间接实现:empty

立刻生成谬误:error

然而“懒”创立的形式生成 Throwable.error(Supplier<Throwable>)

什么都不做:never

依赖一个可回收的资源:using

可编程地生成事件(能够应用状态):

同步且一一的:Flux.generate

异步(也可同步)的,每次尽可能多收回元素:Flux.create(Mono#create 也是异步的,只不过只能发一个)

A.2. 对现有序列进行转化
我想转化一个序列:

1 对 1 地转化(比方字符串转化为它的长度):map

类型转化:cast

为了取得每个元素的序号:Flux.index

1 对 n 地转化(如字符串转化为一串字符):flatMap + 应用一个工厂办法

1 对 n 地转化可自定义转化办法和 / 或状态:handle

对每一个元素执行一个异步操作(如对 url 执行 http 申请):flatMap+ 一个异步的返回类型为 Publisher 的办法

疏忽一些数据:在 flatMap lambda 中依据条件返回一个 Mono.empty()

保留原来的序列程序:Flux.flatMapSequential(对每个元素的异步工作会立刻执行,但会将后果依照原序列程序排序)

当 Mono 元素的异步工作会返回多个元素的序列时:Mono.flatMapMany

我想增加一些数据元素到一个现有的序列:

在结尾增加:Flux#startWith(T…)

在最初增加:Flux#concatWith(T…)

我想将 Flux 转化为汇合(一下都是针对 Flux 的)

转化为 List:collectList,collectSortedList

转化为 Map:collectMap,collectMultiMap

转化为自定义汇合:collect

计数:count

reduce 算法(将上个元素的 reduce 后果与以后元素值作为输出执行 reduce 办法,如 sum)reduce

将每次 reduce 的后果立刻收回:scan

转化为一个 boolean 值:

对所有元素判断都为 true:all

对至多一个元素判断为 true:any

判断序列是否有元素(不为空):hasElements

判断序列中是否有匹配的元素:hasElement

我想合并 publishers…

按序连贯:Flux.concat 或者 concatWith(other)

即便有谬误,也会等所有的 publishers 连贯实现:Flux.concatDelayError

按订阅程序连贯(这里的合并依然能够了解成序列的连贯):Flux.mergeSequential

按元素收回的程序合并(无论哪个序列的,元素先到先合并):Flux.merge 或者 Flux.mergeWith(other)

元素类型会发生变化:Flux.zip / Flux.zipWith

将元素组合:

2 个 Mono 组成 1 个 Tuple2:Mono.zipWith

n 个 Monos 的元素都收回来后组成一个 Tuple:Mono#zip

在终止信号呈现时“采取行动”:

在 Mono 终止时转换为一个 Mono<Void>:Mono.and

当 n 个 Mono 都终止时返回 Mono<Void>:Mono.when

返回一个寄存组合数据的类型,对于被合并的多个序列:

每个序列都收回一个元素时:Flux.zip

任何一个序列收回元素时:Flux.combineLatest

只取各个序列的第一个元素:Flux#first,Mono#first,mono.or (otherMono).or(thirdMono),`flux.or(otherFlux).or(thirdFlux)

由一个序列触发(相似于 flatMap,不过“喜新厌旧”):switchMap

由每个新序列开始时触发(也是“喜新厌旧”格调):switchOnNext

我想反复一个序列:repeat

以肯定的距离反复:Flux.interval(duration).flatMap(tick -> myExistingPublisher)

我有一个空序列,然而…

我想要一个缺省值来代替:defaultIfEmpty

我想要一个缺省的序列来代替:switchIfEmpty

我有一个序列,然而我对序列的元素值不感兴趣:ignoreElements

…并且我心愿用 Mono 来示意序列曾经完结:then

…并且我想在序列完结后期待另一个工作实现:thenEmpty

…并且我想在序列完结之后返回一个 Mono:Mono#then(mono)

…并且我想在序列完结之后返回一个值:Mono#thenReturn(T)

…并且我想在序列完结之后返回一个 Flux:thenMany

我有一个 Mono 但我想提早实现…

…当有 1 个或 N 个其余 publishers 都收回(或完结)时才实现:Mono.delayUntilOther

…应用一个函数式来定义如何获取“其余 publisher”:Mono.delayUntil(Function)

我想基于一个递归的生成序列的规定扩大每一个元素,而后合并为一个序列收回:

…广度优先:expand(Function)

…深度优先:expandDeep(Function)

A.3.“窥视”(只读)序列
在不对序列造成扭转的状况下,我想:

失去告诉或执行一些操作:

收回元素:doOnNext

序列实现:Flux#doOnComplete,Mono#doOnSuccess

因谬误终止:doOnError

勾销:doOnCancel

订阅时:doOnSubscribe

申请时:doOnRequest

实现或谬误终止:doOnTerminate(Mono 的办法可能蕴含有后果)

然而在终止信号向上游传递 之后:doAfterTerminate

所有类型的信号(Signal):Flux.doOnEach

所有完结的状况(实现 complete、谬误 error、勾销 cancel):doFinally

记录日志:log

我想晓得所有的事件:

每一个事件都体现为一个 single 对象:

执行 callback:doOnEach

每个元素转化为 single 对象:materialize

…在转化回元素:dematerialize

转化为一行日志:log

A.4. 过滤序列
我想过滤一个序列

基于给定的判断条件:filter

…异步地进行判断:filterWhen

仅限于指定类型的对象:ofType

疏忽所有元素:ignoreElements

去重:

对于整个序列:Flux#distinct

去掉间断反复的元素:Flux#distinctUntilChanged

我只想要一部分序列:

只有 N 个元素:

从序列的第一个元素开始算:Flux.take(long)

…取一段时间内收回的元素:Flux.take(Duration)

…只取第一个元素放到 Mono 中返回:Flux.next()

…应用 request(N) 而不是勾销:Flux.limitRequest(long)

从序列的最初一个元素倒数:Flux.takeLast

直到满足某个条件(蕴含):Flux.takeUntil(基于判断条件),Flux#takeUntilOther(基于对 publisher 的比拟)

直到满足某个条件(不蕴含):Flux.takeWhile

最多只取 1 个元素:

给定序号:Flux.elementAt

最初一个:.takeLast(1)

…如果为序列空则收回谬误信号:Flux.last()

…如果序列为空则返回默认值:Flux.last(T)

跳过一些元素:

从序列的第一个元素开始跳过:Flux.skip(long)

…跳过一段时间内收回的元素:Flux.skip(Duration)

跳过最初的 n 个元素:Flux.skipLast

直到满足某个条件(蕴含):Flux.skipUntil(基于判断条件),Flux.skipUntilOther(基于对 publisher 的比拟)

直到满足某个条件(不蕴含):Flux.skipWhile

采样:

给定采样周期:Flux.sample(Duration)

取采样周期里的第一个元素而不是最初一个:sampleFirst

基于另一个 publisher:Flux.sample(Publisher)

基于 publisher“超时”:Flux.sampleTimeout(每一个元素会触发一个 publisher,如果这个 publisher 不被下一个元素触发的 publisher 笼罩就收回这个元素)

我只想要一个元素(如果多于一个就返回谬误)…

如果序列为空,收回谬误信号:Flux.single()

如果序列为空,收回一个缺省值:Flux.single(T)

如果序列为空就返回一个空序列:Flux.singleOrEmpty

A.5. 错误处理
我想创立一个谬误序列:error

…替换一个实现的 Flux:.concat(Flux.error(e))

…替换一个实现的 Mono:.then(Mono.error(e))

…如果元素超时未收回:timeout

…“懒”创立:error(Supplier<Throwable>)

我想要相似 try/catch 的表达方式:

抛出异样:error

捕捉异样:

而后返回缺省值:onErrorReturn

而后返回一个 Flux 或 Mono:onErrorResume

包装异样后再抛出:.onErrorMap(t -> new RuntimeException(t))

finally 代码块:doFinally

Java 7 之后的 try-with-resources 写法:using 工厂办法

我想从谬误中复原…

返回一个缺省的:

的值:onErrorReturn

Publisher:Flux.onErrorResume 和 Mono.onErrorResume

重试:retry

…由一个用于随同 Flux 触发:retryWhen

我想解决背压谬误(向上游收回“MAX”的 request,如果上游的 request 比拟少,则利用策略)…

抛出 IllegalStateException:Flux#onBackpressureError

抛弃策略:Flux.onBackpressureDrop

…然而不抛弃最初一个元素:Flux.onBackpressureLatest

缓存策略(无限或有限):Flux.onBackpressureBuffer

…当无限的缓存空间用满则利用给定策略:Flux.onBackpressureBuffer 带有策略 BufferOverflowStrategy

A.6. 基于工夫的操作
我想将元素转换为带有工夫信息的 Tuple2<Long, T>

从订阅时开始:elapsed

记录时间戳:timestamp

如果元素间提早过长则停止序列:timeout

以固定的周期收回元素:Flux.interval

在肯定的提早后收回 0:static Mono.delay

我想引入提早:

对每一个元素:Mono.delayElement,Flux.delayElements

提早订阅:delaySubscription

A.7. 拆分 Flux

我想将一个 Flux<T> 拆分为一个 Flux<Flux<T>>:

以个数为界:window(int)

…会呈现重叠或抛弃的状况:window(int, int)

以工夫为界:window(Duration)

…会呈现重叠或抛弃的状况:window(Duration, Duration)

以个数或工夫为界:windowTimeout(int, Duration)

基于对元素的判断条件:windowUntil

…触发判断条件的元素会分到下一波(cutBefore 变量):.windowUntil(predicate, true)

…满足条件的元素在一波,直到不满足条件的元素收回开始下一波:windowWhile(不满足条件的元素会被抛弃)

通过另一个 Publisher 的每一个 onNext 信号来拆分序列:window(Publisher),windowWhen

我想将一个 Flux<T> 的元素拆分到汇合…

拆分为一个一个的 List:

以个数为界:buffer(int)

…会呈现重叠或抛弃的状况:buffer(int, int)

以工夫为界:buffer(Duration)

…会呈现重叠或抛弃的状况:buffer(Duration, Duration)

以个数或工夫为界:bufferTimeout(int, Duration)

基于对元素的判断条件:bufferUntil(Predicate)

…触发判断条件的元素会分到下一个 buffer:.bufferUntil(predicate, true)

…满足条件的元素在一个 buffer,直到不满足条件的元素收回开始下一 buffer:bufferWhile(Predicate)

通过另一个 Publisher 的每一个 onNext 信号来拆分序列:buffer(Publisher),bufferWhen

拆分到指定类型的 “collection”:buffer(int, Supplier<C>)

我想将 Flux<T> 中具备独特特色的元素分组到子 Flux:groupBy(Function<T,K>) TIP:留神返回值是 Flux<GroupedFlux<K, T>>,每一个 GroupedFlux 具备雷同的 key 值 K,能够通过 key() 办法获取。

A.8. 回到同步的世界
我有一个 Flux<T>,我想:

在拿到第一个元素前阻塞:Flux.blockFirst

…并给出超时时限:Flux.blockFirst(Duration)

在拿到最初一个元素前阻塞(如果序列为空则返回 null):Flux#blockLast

…并给出超时时限:Flux.blockLast(Duration)

同步地转换为 Iterable<T>:Flux.toIterable

同步地转换为 Java 8 Stream<T>:Flux.toStream

我有一个 Mono<T>,我想:

在拿到元素前阻塞:Mono.block

…并给出超时时限:Mono#block(Duration)

转换为 CompletableFuture<T>:Mono.toFuture

退出移动版