前言

原本这一篇筹备写Jetpack对应的paging的,但在整顿材料的时候,发现Kotlin还有Flow未解说,这个也是一大重点,因而本篇将对Flow进行详解!

不便后续联合Flow与Paging,进行混合解说!

既然如此!那么Flow是什么呢?

1、意识Flow

1.1 Kotlin Flow 介绍

官网文档给予了一句话简略的介绍:

Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);

Flow 从文档的介绍来看,它有点相似 RxJava 的 Observable。因为 Observable 也有 Cold 、Hot 之分。

官网的就是太官网了,对于不相熟的RxJava 的小伙伴,光凭这个概念还是有点云里雾里。

咱们还是通过一系列小案例来逐渐深刻Flow!

1.1 如何示意多个值?

认真想想:挂起函数能够异步的返回单个值,然而该如何异步返回多个计算好的值呢?

emmm....

异步返回多个值能从哪些方面动手呢?

  • 汇合?
  • 序列?
  • 挂起函数?
  • 当然还有要讲的Flow?

咱们来看看这几种计划怎么实现的?

1.1.1 汇合

    fun simpleList(): List<Int> = listOf<Int>(1, 2, 3)    @Test    fun `test multiple values`() {        simpleList().forEach { value -> println(value) }    }

这种形式的确是返回了多个值,但不是异步!

1.1.2 序列

    fun simpleSequence(): Sequence<Int> = sequence {        for (i in 1..3) {            //Thread.sleep(1000)  //间接阻塞以后线程,并非异步!            //delay(1000) //又没有suspend润饰,必定用不了这个挂起!就算这里用了,上面的test也用不了!            yield(i)        }    }    @Test    fun `test multiple values`() {        simpleSequence().forEach { value -> println(value) }    }

这种形式也是返回了多个值,但不是异步!

1.1.3 挂起函数

    suspend fun simpleList2(): List<Int> {        delay(1000)        return listOf<Int>(1, 2, 3)    }        @Test    fun `test multiple values2`() = runBlocking<Unit> {        simpleList2().forEach { value -> println(value) }    }

这种形式既返回了多个值,并且也是异步!满足异步返回多个值!

那么Flow形式该怎么呢?

1.1.4 Flow形式

    fun simpleFlow() = flow<Int> {        for (i in 1..3) {            delay(1000) //伪装在一些重要的事件            emit(i) //发射,产生一个元素        }    }    @Test    fun `test multiple values3`() = runBlocking<Unit> {        simpleFlow().collect { value -> println(value) }    }

这里咱们看到simpleFlow办法,外面应用了delay挂起函数,但这个办法并没有suspend 修饰符,因而该办法并不是挂起函数!能够在任意中央应用!(非协程模块,非挂起模块)

看完了下面的示例,当初总结下:

1.1.5 Flow与其余形式的区别

  • 名为flow的Flow类型构建起函数
  • flow{...}构建块中的代码能够挂起
  • 函数simpleFlow不再标有suspend修饰符
  • 流应用emit函数发射值
  • 流应用collect函数收集值

再来一张图,加深下印象

当初对Flow有了大抵的印象,那么它有啥作用呢?

1.2 Flow利用

在Android开发中,文件下载是Flow的一个十分经典的案例

如图所示

当文件下载时,对应的后盾下载进度,就能够通过Flow外面的emit发送数据,通过collect接管对应的数据。(前面将会联合Jetpack对应的paging进行解说)

1.3 冷流

    fun simpleFlow2() = flow<Int> {        println("Flow started")        for (i in 1..3) {            delay(1000)            emit(i)        }    }    @Test    fun `test flow is cold`() = runBlocking<Unit> {        val flow = simpleFlow2()        println("Calling collect...")        flow.collect { value -> println(value) }        println("Calling collect again...")        flow.collect { value -> println(value) }    }

运行后果

Calling collect...Flow started123Calling collect again...Flow started123

从这个运行后果能够看出:

Flow是一种相似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。

留神:下篇筹备讲的channelFlow是热流!

1.4 流的连续性

    @Test    fun `test flow continuation`() = runBlocking<Unit> {        (1..5).asFlow().filter {            it % 2 == 0        }.map {            "string $it"        }.collect {            println("Collect $it")        }    }

运行后果

Collect string 2Collect string 4

从这个运行成果可知:

  • 流的每次独自收集都是按程序执行的,除非应用非凡操作符
  • 从上游到上游每个适度操作符都会解决每个发射出的值,而后再交给末端操作符

1.5 流构建器

    @Test    fun `test flow builder`() = runBlocking<Unit> {        flowOf("one","two","three")                .onEach { delay(1000) }                .collect { value ->                    println(value)                }        (1..3).asFlow().collect { value ->            println(value)        }    }

运行成果

onetwothree123

从这个运行成果能够得悉:能够通过对应的flowOfasFlow构建对应的flow流

1.6 流的上下文

这里将通过几个小案例进行详解

1.6.1 案例一

    fun simpleFlow3() = flow<Int> {        println("Flow started ${Thread.currentThread().name}")        for (i in 1..3) {            delay(1000)            emit(i)        }    }        @Test    fun `test flow context`() = runBlocking<Unit> {        simpleFlow3()                .collect { value -> println("Collected $value ${Thread.currentThread().name}") }    }

运行成果

Flow started Test worker @coroutine#1Collected 1 Test worker @coroutine#1Collected 2 Test worker @coroutine#1Collected 3 Test worker @coroutine#1

从这里能够看出,从emit到collect 上下文贯通了所有,都为同一个上下文。

那如果说,想在emit时应用另一个上下文该怎么呢?

1.6.2 案例二

    fun simpleFlow4() = flow<Int> {        withContext(Dispatchers.Default) {            println("Flow started ${Thread.currentThread().name}")            for (i in 1..3) {                delay(1000)                emit(i)            }        }    }        @Test    fun `test flow context`() = runBlocking<Unit> {        simpleFlow4()                .collect { value -> println("Collected $value ${Thread.currentThread().name}") }    }

运行成果

Flow invariant is violated:        Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@3f3e9270, BlockingEventLoop@67bc91f8],...略

能够发现,这样写间接解体了,并不是Flow invariant is violated:,那么该如何应用呢?

1.6.3 案例三

    fun simpleFlow5() = flow<Int> {        println("Flow started ${Thread.currentThread().name}")        for (i in 1..3) {            delay(1000)            emit(i)        }    }.flowOn(Dispatchers.Default)        @Test    fun `test flow context`() = runBlocking<Unit> {        simpleFlow5()                .collect { value -> println("Collected $value ${Thread.currentThread().name}") }    }

运行成果

Flow started DefaultDispatcher-worker-1 @coroutine#2Collected 1 Test worker @coroutine#1Collected 2 Test worker @coroutine#1Collected 3 Test worker @coroutine#1

从这几个小案例能够总结出

  • 流的收集总是在调用协程的上下文中产生,流的该属性成为上下文保留。
  • flow{...}构建器中的代码必须遵循上下文保留属性,并且不容许从其余上下文中产生(emit)
  • flowOn操作符,该函数用于更改流发射的上下文

1.7 启动流

1.7.1 案例一

    //事件源    fun events() = (1..3)            .asFlow()            .onEach { delay(100) }            .flowOn(Dispatchers.Default)    @Test    fun `test flow launch`() = runBlocking<Unit> {        val job = events()                .onEach { event -> println("Event: $event ${Thread.currentThread().name}") }//                .collect {}                .launchIn(CoroutineScope(Dispatchers.IO)) //这里应用另一个上下文传入Flow//                .launchIn(this)//这里应用以后上下文传入Flow        delay(200)        job.cancelAndJoin()    }

这里咱们看到曾经勾销了collect ,而改为了launchIn,对应传入了新的上下文作为Flow的保留上下文。

正因为这里传入的非以后上下文所以须要调用job.join()或者 job.cancelAndJoin(),来期待对应Flow实现对应操作

运行成果

Event: 1 DefaultDispatcher-worker-1 @coroutine#2Event: 2 DefaultDispatcher-worker-1 @coroutine#2

因为这里挂起了两秒就勾销了,所以这里并没有打印所有的日志,同时上下文为:DefaultDispatcher

再来看看案例二

1.7.2 案例二

    //事件源    fun events() = (1..3)            .asFlow()            .onEach { delay(100) }            .flowOn(Dispatchers.Default)    @Test    fun `test flow launch`() = runBlocking<Unit> {        val job = events()                .onEach { event -> println("Event: $event ${Thread.currentThread().name}") }//                .collect {}//                .launchIn(CoroutineScope(Dispatchers.IO)) //这里应用另一个上下文传入Flow                .launchIn(this)//这里应用以后上下文传入Flow//        delay(200) //       job.cancelAndJoin()    }

运行成果

Event: 1 Test worker @coroutine#2Event: 2 Test worker @coroutine#2Event: 3 Test worker @coroutine#2

因为这里Flow传入的以后上下文,因而不须要额定通过其余形式期待执行实现。

1.8 流的勾销

1.8.1 被动勾销

    fun simpleFlow6() = flow<Int> {        for (i in 1..3) {            delay(1000)            emit(i)            println("Emitting $i")        }    }    @Test    fun `test cancel flow`() = runBlocking<Unit> {        withTimeoutOrNull(2500) {            simpleFlow6().collect { value -> println(value) }        }        println("Done")    }

运行成果

1Emitting 12Emitting 2Done

这里咱们看到,应用了withTimeoutOrNull设置超时的办法,让它在超时状况下勾销并进行执行。

不过这个勾销都是被动勾销,如果被动勾销该是怎么呢?

1.8.2 被动勾销

    @Test    fun `test cancel flow check`() = runBlocking<Unit> {        (1..5).asFlow().collect { value ->            println(value)            if (value == 3) cancel()            println("cancel check ${coroutineContext[Job]?.isActive}")        }    }

运行成果

1cancel check true2cancel check true3cancel check false4cancel check false5cancel check false

这里咱们看到,被动勾销时,对应状态曾经变了,然而还是全副执行了!

这是因为这里并没有退出勾销检测!

1.8.3 流的勾销检测

    @Test    fun `test cancel flow check`() = runBlocking<Unit> {        (1..5).asFlow().cancellable().collect { value ->            println(value)            if (value == 3) cancel()            println("cancel check ${coroutineContext[Job]?.isActive}")        }    }

这里咱们看到应用了cancellable办法!

再次运行看看成果

1cancel check true2cancel check true3cancel check false

OK,这里能够看到曾经胜利的勾销了!进入下一专题!

1.9 背压

讲这个之前,咱们先看比拟原始的案例

    fun simpleFlow8() = flow<Int> {        for (i in 1..3) {            delay(100)            emit(i)            println("Emitting $i ${Thread.currentThread().name}")        }    }    @Test    fun `test flow back pressure`() = runBlocking<Unit> {        val time = measureTimeMillis {            simpleFlow8()                .collect { value ->                    delay(300)   //解决这个元素耗费300ms                    println("Collected $value ${Thread.currentThread().name}")                }        }        println("Collected in $time ms")    }

来看看运行成果

Collected 1 Test worker @coroutine#1Emitting 1 Test worker @coroutine#1Collected 2 Test worker @coroutine#1Emitting 2 Test worker @coroutine#1Collected 3 Test worker @coroutine#1Emitting 3 Test worker @coroutine#1Collected in 1237 ms

这里咱们看到,这是十分规范的一个生产者—消费者模式,都是一一对应的。那么加上不同的关键字试试?

1.9.1 buffer(xx)

    @Test    fun `test flow back pressure`() = runBlocking<Unit> {        val time = measureTimeMillis {            simpleFlow8()                .buffer(50)                .collect { value ->                    delay(300)   //解决这个元素耗费300ms                    println("Collected $value ${Thread.currentThread().name}")                }        }        println("Collected in $time ms")    }

这里咱们看到退出了.buffer(50)

来看看运行成果

Emitting 1 Test worker @coroutine#2Emitting 2 Test worker @coroutine#2Emitting 3 Test worker @coroutine#2Collected 1 Test worker @coroutine#1Collected 2 Test worker @coroutine#1Collected 3 Test worker @coroutine#1Collected in 1108 ms

这里咱们看到生产的音讯先是全堆在一起,而后集中发送,总耗时也比规范的少了一点。

如图所示

咱们暂可了解为,buffer(50)将对应的传输通道变长了,使传输通道可能装更多的元素。

接下来,持续看下一个操作符!

1.9.2 conflate()

    @Test    fun `test flow back pressure`() = runBlocking<Unit> {        val time = measureTimeMillis {            simpleFlow8()                .conflate()                .collect { value ->                    delay(300)   //解决这个元素耗费300ms                    println("Collected $value ${Thread.currentThread().name}")                }        }        println("Collected in $time ms")    }

运行成果

Emitting 1 Test worker @coroutine#2Emitting 2 Test worker @coroutine#2Emitting 3 Test worker @coroutine#2Collected 1 Test worker @coroutine#1Collected 3 Test worker @coroutine#1Collected in 800 ms

这里咱们看到,消费者并非全副解决完对应生产者的元素。接着看下一个!

1.9.3 collectLatest{}

    @Test    fun `test flow back pressure`() = runBlocking<Unit> {        val time = measureTimeMillis {            simpleFlow8()                .collectLatest { value ->                    delay(300)   //解决这个元素耗费300ms                    println("Collected $value ${Thread.currentThread().name}")                }        }        println("Collected in $time ms")    }

这次,将collect换成了collectLatest ,看其名,感觉像是只解决最初一个元素,来看看运行成果

Emitting 1 Test worker @coroutine#2Emitting 2 Test worker @coroutine#2Emitting 3 Test worker @coroutine#2Collected 3 Test worker @coroutine#5Collected in 807 ms

果真如此,该操作符只会解决最初一个元素!最初再来个总结!

1.9.4 背压总结

  • buffer(),并发运行流中发射元素的代码;
  • conflate(),合并发射项,不对每个值进行解决;
  • collectLatest(),勾销并从新发射最初一个值

2、操作符

2.1 过渡流操作符

2.1.1 案例一

    suspend fun performRequest(request: Int): String {        delay(500)        return "response $request"    }      @Test    fun `test transform flow operator`() = runBlocking<Unit> {        (1..3).asFlow()            .map { request -> performRequest(request) }            .collect { value -> println(value) }    }

运行成果

response 1response 2response 3

这里咱们看到,将数据流转为Map类型,而后顺次发送每个元素!

那如果说,想要发送前后还额定想发送本人的自定义元素该怎么办呢?

    suspend fun performRequest(request: Int): String {        delay(500)        return "response $request"    }      @Test    fun `test transform flow operator`() = runBlocking<Unit> {        (1..3).asFlow()                .transform { request ->                    emit("Making request $request")                    emit(performRequest(request))                }.collect { value -> println(value) }    }

这里咱们看到应用了transform ,在对应闭包里应用了emit发送元素,来看看运行成果

Making request 1response 1Making request 2response 2Making request 3response 3

从这个运行成果可知:咱们能够通过这样的形式,来自定义发射元素!

2.1.2 案例二

    fun numbers() = flow<Int> {        try {            emit(1)            emit(2)            println("This line will not execute")            emit(3)        } finally {            println("Finally in numbers")        }    }    @Test    fun `test limit length operator`() = runBlocking<Unit> {        //take(2),示意 当计数元素被耗费时,原始流被勾销        numbers().take(2).collect { value -> println(value) }    }

来看看运行成果

12Finally in numbers

从这个运行成果可知:当消费者解决元素2时,就将对应的原始流给勾销掉了!

2.1.3 总结

过渡流操作符:

  • 能够应用操作符转换流,就像应用汇合与序列一样;
  • 过渡操作符利用于上游流,并返回上游流;
  • 这些操作符也是冷操作符,就像流一样。这类操作符自身不是挂起函数
  • 它运行速度很快,返回新的转换的定义

2.2 末端操作符

末端操作符是在流上用户**启动流收集的挂起函数。**collect是最根底的末端操作符,然而还有另外一些更方便使用的末端操作符:

  • 转化为各种汇合,比方toList与toSet;
  • 获取第一个(first)值与确保流发射单个(single)值的操作符
  • 应用reduce与fold将流规约到单个值

说了这么多,来看看怎么应用的!

    @Test    fun `test terminal operator`() = runBlocking<Unit> {        val sum = (1..5).asFlow()            .map {                println("it * it= ${it * it}")                it * it            }            //从第一个元素开始累加值,并将操作利用于以后累加器值和每个元素            .reduce { a, b ->                println("a=$a,b=$b,a+b=${a + b}")                a + b            }        println(sum)    }

运行成果

it * it= 1it * it= 4a=1,b=4,a+b=5it * it= 9a=5,b=9,a+b=14it * it= 16a=14,b=16,a+b=30it * it= 25a=30,b=25,a+b=5555

所有尽在正文中,尽管有点少!下一个!

2.3 组合多个流

就像Kotlin规范库重的Sequence.zip扩大函数一样,流领有一个zip操作符用于组合两个流中的相干值!

话不多说,间接开整!

2.3.1 案例一

    @Test    fun `test zip`() = runBlocking<Unit> {        val numbs = (1..3).asFlow()        val strs = flowOf("One", "Two", "Three")        numbs.zip(strs) { a, b -> "$a -> $b" }.collect { println(it) }    }

运行成果

1 -> One2 -> Two3 -> Three

很简略,通过zip将对应流亚索成一个,而后输入!狠简略,再来个略微简单点的!

2.3.2 案例二

   @Test    fun `test zip2`() = runBlocking<Unit> {        val numbs = (1..3).asFlow().onEach { delay(300) }        val strs = flowOf("One", "Two", "Three").onEach { delay(400) }        val startTime = System.currentTimeMillis()        numbs.zip(strs) { a, b -> "$a -> $b" }.collect {            println("$it at ${System.currentTimeMillis() - startTime} ms from start")        }    }

哈哈哈,略微简单,就是在对应流前面加了额定的挂起期待,就是模仿对应耗时操作

运行成果

1 -> One at 462 ms from start2 -> Two at 861 ms from start3 -> Three at 1269 ms from start

因为这里是挂起,并非阻塞,因而numbsstrs 在合并发射元素时,他们相互不烦扰,各做各的。

因而每次发射用的根底工夫是以strs 为准,而不是numbsstrs 这两者之和。 置信读者可能进一步意识挂起与阻塞的区别。

2.4 展平流

流示意异步接管的值序列,所以很容易遇到这样的状况:每个值都会触发对另一个值序列的申请,然而,因为流具备异步的性质,因而须要不同的展平模式,所以存在一系列的流展平操作符:

  • flatMapConcat 连贯模式;
  • flatMapMerge 合并模式;
  • flatMapLatest 最新展平模式

来看看怎么应用!

2.4.1 flatMapConcat 连贯模式

    fun requestFlow(i: Int) = flow<String> {        emit("$i: First")        delay(500)        emit("$i: Second")    }    @Test    fun `test flatMapConcat`() = runBlocking<Unit> {        //Flow<Flow<String>>        val startTime = System.currentTimeMillis()        (1..3).asFlow()            .onEach { delay(100) }            //.map { requestFlow(it) }            .flatMapConcat { requestFlow(it) }            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }    }

运行成果

1: First at 147 ms from start1: Second at 653 ms from start2: First at 755 ms from start2: Second at 1256 ms from start3: First at 1357 ms from start3: Second at 1859 ms from start

这里咱们看到这里将两个流连贯了起来,相似于串联模式

2.4.2 flatMapMerge 合并模式

    fun requestFlow(i: Int) = flow<String> {        emit("$i: First")        delay(500)        emit("$i: Second")    }        @Test    fun `test flatMapMerge`() = runBlocking<Unit> {        //Flow<Flow<String>>        val startTime = System.currentTimeMillis()        (1..3).asFlow()            .onEach { delay(100) }            //.map { requestFlow(it) }            .flatMapMerge { requestFlow(it) }            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }    }

运行成果

1: First at 166 ms from start2: First at 261 ms from start3: First at 366 ms from start1: Second at 668 ms from start2: Second at 762 ms from start3: Second at 871 ms from start

这种模式相似于下面的组合+背压外面的conflate模式,先是生产者将元素全副生成好,随后再告诉消费者生产!

2.4.3 flatMapLatest 最新展平模式

    @Test    fun `test flatMapLatest`() = runBlocking<Unit> {        //Flow<Flow<String>>        val startTime = System.currentTimeMillis()        (1..3).asFlow()            .onEach { delay(100) }            //.map { requestFlow(it) }            .flatMapLatest { requestFlow(it) }            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }    }

运行成果

1: First at 164 ms from start2: First at 364 ms from start3: First at 469 ms from start3: Second at 971 ms from start

这种模式就相似于下面的组合+collectLatest模式。很简略!

3、异样解决

3.1 流的异样解决

当运算符中的发射器或代码抛出异样时,有几种解决异样的办法:

  • try/catch块
  • catch函数

3.1.1 案例一(try/catch块)

    fun simpleFlow() = flow<Int> {        for (i in 1..3) {            println("Emitting $i")            emit(i)        }    }    @Test    fun `test flow exception`() = runBlocking<Unit> {        try {            simpleFlow().collect { value ->                println(value)                //如果值为false,则抛出IllegalStateException和调用lazyMessage的后果。                check(value <= 1) { "Collected $value" }            }        } catch (e: Throwable) {            println("Caught $e")        }    }

运行成果

Emitting 11Emitting 22Caught java.lang.IllegalStateException: Collected 2

这个狠简略,下一个!

3.1.2 案例二(catch函数)

    @Test    fun `test flow exception2`() = runBlocking<Unit> {        flow {            emit(1)            throw ArithmeticException("Div 0")        }.catch { e: Throwable -> println("Caught $e") }            .flowOn(Dispatchers.IO)            .collect { println(it) }    }

当初咱们间接通过catch {}代码块,来解决对应的异样信息。

运行成果

1Caught java.lang.ArithmeticException: Div 0

咱们看到这样写,只确保了异样被捕捉了,然而消费者并不知道是否产生了异样,于是乎!

    @Test    fun `test flow exception2`() = runBlocking<Unit> {        flow {            emit(1)            throw ArithmeticException("Div 0")        }.catch { e: Throwable ->            println("Caught $e")            emit(10)        }.flowOn(Dispatchers.IO).collect { println(it) }    }

咱们看到,在catch里,额定发了一条音讯,当消费者受到对应音讯时,就晓得了生产者抛异样了!

来看看运行成果

Caught java.lang.ArithmeticException: Div 0110

3.2 流的实现

当流收集实现时(一般状况或异常情况),它可能须要执行一个动作。

  • 命令式finally块
  • onCompletion申明式解决

3.2.1 案例一(finally块)

    fun simpleFlow2() = (1..3).asFlow()     @Test    fun `test flow complete in finally`() = runBlocking<Unit> {        try {            simpleFlow2().collect { println(it) }        } finally {            println("Done")        }    }

很简略的代码块,这个就不贴运行成果了哈,不必想最终会打印Done!

3.2.2 案例二(onCompletion)

    @Test    fun `test flow complete in onCompletion`() = runBlocking<Unit> {        simpleFlow2()                .onCompletion { println("Done") }                .collect { println(it) }    }

这里在上一个案例根底上去掉了try...finally,并额定加了onCompletion 代码块,外面实现了对应的逻辑

来看看运行成果

123Done

很显著和下面一样!

结束语

好了,本篇到这就完结了!置信看到这的小伙伴对Flow异步流有了一个比拟清晰的认知!在下一篇中,将会解说Flow对应的通道-多路复用-并发平安

因为文章篇幅无限,文档资料内容较多,须要《2022最新Android面试真题+解析》、数据结构与算法面试题、Java 面试题、Android四大组件、Android 面试题、UI控件篇、网络通信篇、架构设计篇、性能优化篇、源码流程篇、 Kotlin方面、第三方框架、大厂面经,能够【点击这里收费获取】,心愿可能共同进步,独特学习,共勉!

本文转自 https://juejin.cn/post/7040001489904861191,如有侵权,请分割删除。