共计 14338 个字符,预计需要花费 36 分钟才能阅读完成。
前言
原本这一篇筹备写 Jetpack 对应的 paging 的,但在整顿材料的时候,发现 Kotlin 还有 Flow 未解说,这个也是一大重点,因而本篇将对 Flow 进行详解!
不便后续联合 Flow 与 Paging,进行混合解说!
既然如此!那么 Flow 是什么呢?
1、意识 Flow
1.1 Kotlin Flow 介绍
官网文档给予了一句话简略的介绍:
Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);
Flow 从文档的介绍来看,它有点相似 RxJava 的 Observable。因为 Observable 也有 Cold、Hot 之分。
官网的就是太官网了,对于不相熟的 RxJava 的小伙伴,光凭这个概念还是有点云里雾里。
咱们还是通过一系列小案例来逐渐深刻 Flow!
1.1 如何示意多个值?
认真想想:挂起函数能够异步的返回单个值,然而该如何异步返回多个计算好的值呢?
emmm….
异步返回多个值能从哪些方面动手呢?
- 汇合?
- 序列?
- 挂起函数?
- 当然还有要讲的 Flow?
咱们来看看这几种计划怎么实现的?
1.1.1 汇合
fun simpleList(): List<Int> = listOf<Int>(1, 2, 3)
@Test
fun `test multiple values`() {simpleList().forEach {value -> println(value) }
}
这种形式的确是返回了多个值,但不是异步!
1.1.2 序列
fun simpleSequence(): Sequence<Int> = sequence {for (i in 1..3) {//Thread.sleep(1000) // 间接阻塞以后线程,并非异步!//delay(1000) // 又没有 suspend 润饰,必定用不了这个挂起!就算这里用了,上面的 test 也用不了!yield(i)
}
}
@Test
fun `test multiple values`() {simpleSequence().forEach {value -> println(value) }
}
这种形式也是返回了多个值,但不是异步!
1.1.3 挂起函数
suspend fun simpleList2(): List<Int> {delay(1000)
return listOf<Int>(1, 2, 3)
}
@Test
fun `test multiple values2`() = runBlocking<Unit> {simpleList2().forEach {value -> println(value) }
}
这种形式既返回了多个值,并且也是异步!满足异步返回多个值!
那么 Flow 形式该怎么呢?
1.1.4 Flow 形式
fun simpleFlow() = flow<Int> {for (i in 1..3) {delay(1000) // 伪装在一些重要的事件
emit(i) // 发射,产生一个元素
}
}
@Test
fun `test multiple values3`() = runBlocking<Unit> {simpleFlow().collect {value -> println(value) }
}
这里咱们看到 simpleFlow
办法,外面应用了 delay
挂起函数,但这个办法并没有suspend
修饰符,因而该办法并不是挂起函数!能够在任意中央应用!(非协程模块,非挂起模块)
看完了下面的示例,当初总结下:
1.1.5 Flow 与其余形式的区别
- 名为 flow 的 Flow 类型构建起函数
flow{...}
构建块中的代码能够挂起- 函数 simpleFlow 不再标有 suspend 修饰符
- 流应用 emit 函数发射值
- 流应用 collect 函数收集值
再来一张图,加深下印象
当初对 Flow 有了大抵的印象,那么它有啥作用呢?
1.2 Flow 利用
在 Android 开发中,文件下载是 Flow 的一个十分经典的案例
如图所示
当文件下载时,对应的后盾下载进度,就能够通过 Flow 外面的 emit 发送数据,通过 collect 接管对应的数据。(前面将会联合 Jetpack 对应的 paging 进行解说)
1.3 冷流
fun simpleFlow2() = flow<Int> {println("Flow started")
for (i in 1..3) {delay(1000)
emit(i)
}
}
@Test
fun `test flow is cold`() = runBlocking<Unit> {val flow = simpleFlow2()
println("Calling collect...")
flow.collect {value -> println(value) }
println("Calling collect again...")
flow.collect {value -> println(value) }
}
运行后果
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
从这个运行后果能够看出:
Flow 是一种相似于 序列的冷流,flow 构建器中的代码直到流被收集的时候才运行。
留神:下篇筹备讲的 channelFlow 是热流!
1.4 流的连续性
@Test
fun `test flow continuation`() = runBlocking<Unit> {(1..5).asFlow().filter {it % 2 == 0}.map {"string $it"}.collect {println("Collect $it")
}
}
运行后果
Collect string 2
Collect string 4
从这个运行成果可知:
- 流的每次独自收集都是按程序执行的,除非应用非凡操作符
- 从上游到上游每个适度操作符都会解决每个发射出的值,而后再交给末端操作符
1.5 流构建器
@Test
fun `test flow builder`() = runBlocking<Unit> {flowOf("one","two","three")
.onEach {delay(1000) }
.collect { value ->
println(value)
}
(1..3).asFlow().collect { value ->
println(value)
}
}
运行成果
one
two
three
1
2
3
从这个运行成果能够得悉:能够通过对应的 flowOf
与asFlow
构建对应的 flow 流
1.6 流的上下文
这里将通过几个小案例进行详解
1.6.1 案例一
fun simpleFlow3() = flow<Int> {println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {delay(1000)
emit(i)
}
}
@Test
fun `test flow context`() = runBlocking<Unit> {simpleFlow3()
.collect {value -> println("Collected $value ${Thread.currentThread().name}") }
}
运行成果
Flow started Test worker @coroutine#1
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
从这里能够看出,从 emit 到 collect
上下文贯通了所有,都为同一个上下文。
那如果说,想在 emit
时应用另一个上下文该怎么呢?
1.6.2 案例二
fun simpleFlow4() = flow<Int> {withContext(Dispatchers.Default) {println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {delay(1000)
emit(i)
}
}
}
@Test
fun `test flow context`() = runBlocking<Unit> {simpleFlow4()
.collect {value -> println("Collected $value ${Thread.currentThread().name}") }
}
运行成果
Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@3f3e9270, BlockingEventLoop@67bc91f8],
... 略
能够发现,这样写间接解体了,并不是Flow invariant is violated:
,那么该如何应用呢?
1.6.3 案例三
fun simpleFlow5() = flow<Int> {println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {delay(1000)
emit(i)
}
}.flowOn(Dispatchers.Default)
@Test
fun `test flow context`() = runBlocking<Unit> {simpleFlow5()
.collect {value -> println("Collected $value ${Thread.currentThread().name}") }
}
运行成果
Flow started DefaultDispatcher-worker-1 @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
从这几个小案例能够总结出
- 流的收集总是在调用协程的上下文中产生,流的该属性成为 上下文保留。
flow{...}
构建器中的代码必须遵循上下文保留属性,并且不容许从其余上下文中产生(emit)flowOn 操作符
,该函数用于 更改流发射的上下文
1.7 启动流
1.7.1 案例一
// 事件源
fun events() = (1..3)
.asFlow()
.onEach {delay(100) }
.flowOn(Dispatchers.Default)
@Test
fun `test flow launch`() = runBlocking<Unit> {val job = events()
.onEach {event -> println("Event: $event ${Thread.currentThread().name}") }
// .collect {}
.launchIn(CoroutineScope(Dispatchers.IO)) // 这里应用另一个上下文传入 Flow
// .launchIn(this)// 这里应用以后上下文传入 Flow
delay(200)
job.cancelAndJoin()}
这里咱们看到曾经勾销了collect
,而改为了launchIn
,对应传入了新的上下文作为 Flow 的保留上下文。
正因为这里传入的非以后上下文所以须要调用 job.join()
或者 job.cancelAndJoin()
,来期待对应 Flow 实现对应操作
运行成果
Event: 1 DefaultDispatcher-worker-1 @coroutine#2
Event: 2 DefaultDispatcher-worker-1 @coroutine#2
因为这里挂起了两秒就勾销了,所以这里并没有打印所有的日志,同时上下文为:DefaultDispatcher
再来看看案例二
1.7.2 案例二
// 事件源
fun events() = (1..3)
.asFlow()
.onEach {delay(100) }
.flowOn(Dispatchers.Default)
@Test
fun `test flow launch`() = runBlocking<Unit> {val job = events()
.onEach {event -> println("Event: $event ${Thread.currentThread().name}") }
// .collect {}
// .launchIn(CoroutineScope(Dispatchers.IO)) // 这里应用另一个上下文传入 Flow
.launchIn(this)// 这里应用以后上下文传入 Flow
// delay(200)
// job.cancelAndJoin()}
运行成果
Event: 1 Test worker @coroutine#2
Event: 2 Test worker @coroutine#2
Event: 3 Test worker @coroutine#2
因为这里 Flow 传入的以后上下文,因而不须要额定通过其余形式期待执行实现。
1.8 流的勾销
1.8.1 被动勾销
fun simpleFlow6() = flow<Int> {for (i in 1..3) {delay(1000)
emit(i)
println("Emitting $i")
}
}
@Test
fun `test cancel flow`() = runBlocking<Unit> {withTimeoutOrNull(2500) {simpleFlow6().collect {value -> println(value) }
}
println("Done")
}
运行成果
1
Emitting 1
2
Emitting 2
Done
这里咱们看到,应用了 withTimeoutOrNull
设置超时的办法,让它在超时状况下勾销并进行执行。
不过这个勾销都是被动勾销,如果被动勾销该是怎么呢?
1.8.2 被动勾销
@Test
fun `test cancel flow check`() = runBlocking<Unit> {(1..5).asFlow().collect { value ->
println(value)
if (value == 3) cancel()
println("cancel check ${coroutineContext[Job]?.isActive}")
}
}
运行成果
1
cancel check true
2
cancel check true
3
cancel check false
4
cancel check false
5
cancel check false
这里咱们看到,被动勾销时,对应状态曾经变了,然而还是全副执行了!
这是因为这里并没有退出勾销检测!
1.8.3 流的勾销检测
@Test
fun `test cancel flow check`() = runBlocking<Unit> {(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
println("cancel check ${coroutineContext[Job]?.isActive}")
}
}
这里咱们看到应用了 cancellable
办法!
再次运行看看成果
1
cancel check true
2
cancel check true
3
cancel check false
OK,这里能够看到曾经胜利的勾销了!进入下一专题!
1.9 背压
讲这个之前,咱们先看比拟原始的案例
fun simpleFlow8() = flow<Int> {for (i in 1..3) {delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {simpleFlow8()
.collect { value ->
delay(300) // 解决这个元素耗费 300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
来看看运行成果
Collected 1 Test worker @coroutine#1
Emitting 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Emitting 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#1
Collected in 1237 ms
这里咱们看到,这是十分规范的一个生产者—消费者模式,都是一一对应的。那么加上不同的关键字试试?
1.9.1 buffer(xx)
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {simpleFlow8()
.buffer(50)
.collect { value ->
delay(300) // 解决这个元素耗费 300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
这里咱们看到退出了.buffer(50)
!
来看看运行成果
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Emitting 3 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
Collected in 1108 ms
这里咱们看到生产的音讯先是全堆在一起,而后集中发送,总耗时也比规范的少了一点。
如图所示
咱们暂可了解为,buffer(50)
将对应的传输通道变长了,使传输通道可能装更多的元素。
接下来,持续看下一个操作符!
1.9.2 conflate()
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {simpleFlow8()
.conflate()
.collect { value ->
delay(300) // 解决这个元素耗费 300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
运行成果
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Emitting 3 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
Collected in 800 ms
这里咱们看到,消费者并非全副解决完对应生产者的元素。接着看下一个!
1.9.3 collectLatest{}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {simpleFlow8()
.collectLatest { value ->
delay(300) // 解决这个元素耗费 300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
这次,将 collect
换成了 collectLatest
,看其名,感觉像是只解决最初一个元素, 来看看运行成果
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Emitting 3 Test worker @coroutine#2
Collected 3 Test worker @coroutine#5
Collected in 807 ms
果真如此,该操作符只会解决最初一个元素!最初再来个总结!
1.9.4 背压总结
- buffer(),并发运行流中发射元素的代码;
- conflate(),合并发射项,不对每个值进行解决;
- collectLatest(),勾销并从新发射最初一个值
2、操作符
2.1 过渡流操作符
2.1.1 案例一
suspend fun performRequest(request: Int): String {delay(500)
return "response $request"
}
@Test
fun `test transform flow operator`() = runBlocking<Unit> {(1..3).asFlow()
.map {request -> performRequest(request) }
.collect {value -> println(value) }
}
运行成果
response 1
response 2
response 3
这里咱们看到,将数据流转为 Map 类型,而后顺次发送每个元素!
那如果说,想要发送前后还额定想发送本人的自定义元素该怎么办呢?
suspend fun performRequest(request: Int): String {delay(500)
return "response $request"
}
@Test
fun `test transform flow operator`() = runBlocking<Unit> {(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}.collect {value -> println(value) }
}
这里咱们看到应用了 transform
,在对应闭包里应用了emit
发送元素,来看看运行成果
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
从这个运行成果可知:咱们能够通过这样的形式,来自定义发射元素!
2.1.2 案例二
fun numbers() = flow<Int> {
try {emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {println("Finally in numbers")
}
}
@Test
fun `test limit length operator`() = runBlocking<Unit> {//take(2), 示意 当计数元素被耗费时,原始流被勾销
numbers().take(2).collect {value -> println(value) }
}
来看看运行成果
1
2
Finally in numbers
从这个运行成果可知:当消费者解决元素 2 时,就将对应的原始流给勾销掉了!
2.1.3 总结
过渡流操作符:
- 能够应用操作符转换流,就像应用汇合与序列一样;
- 过渡操作符利用于上游流,并返回上游流;
- 这些操作符也是冷操作符,就像流一样。这类操作符自身不是挂起函数
- 它运行速度很快,返回新的转换的定义
2.2 末端操作符
末端操作符是在流上用户 ** 启动流收集的挂起函数。**collect 是最根底的末端操作符,然而还有另外一些更方便使用的末端操作符:
- 转化为各种汇合,比方 toList 与 toSet;
- 获取第一个(first)值与确保流发射单个(single)值的操作符
- 应用 reduce 与 fold 将流规约到单个值
说了这么多,来看看怎么应用的!
@Test
fun `test terminal operator`() = runBlocking<Unit> {val sum = (1..5).asFlow()
.map {println("it * it= ${it * it}")
it * it
}
// 从第一个元素开始累加值,并将操作利用于以后累加器值和每个元素
.reduce { a, b ->
println("a=$a,b=$b,a+b=${a + b}")
a + b
}
println(sum)
}
运行成果
it * it= 1
it * it= 4
a=1,b=4,a+b=5
it * it= 9
a=5,b=9,a+b=14
it * it= 16
a=14,b=16,a+b=30
it * it= 25
a=30,b=25,a+b=55
55
所有尽在正文中,尽管有点少!下一个!
2.3 组合多个流
就像 Kotlin 规范库重的 Sequence.zip 扩大函数一样,流领有一个 zip 操作符用于组合两个流中的相干值!
话不多说,间接开整!
2.3.1 案例一
@Test
fun `test zip`() = runBlocking<Unit> {val numbs = (1..3).asFlow()
val strs = flowOf("One", "Two", "Three")
numbs.zip(strs) {a, b -> "$a -> $b"}.collect {println(it) }
}
运行成果
1 -> One
2 -> Two
3 -> Three
很简略,通过 zip
将对应流亚索成一个,而后输入!狠简略,再来个略微简单点的!
2.3.2 案例二
@Test
fun `test zip2`() = runBlocking<Unit> {val numbs = (1..3).asFlow().onEach { delay(300) }
val strs = flowOf("One", "Two", "Three").onEach {delay(400) }
val startTime = System.currentTimeMillis()
numbs.zip(strs) {a, b -> "$a -> $b"}.collect {println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
哈哈哈,略微简单,就是在对应流前面加了额定的挂起期待,就是模仿对应耗时操作
运行成果
1 -> One at 462 ms from start
2 -> Two at 861 ms from start
3 -> Three at 1269 ms from start
因为这里是挂起,并非阻塞,因而numbs
与strs
在合并发射元素时,他们相互不烦扰,各做各的。
因而每次发射用的根底工夫是以strs
为准,而不是numbs
与strs
这两者之和。 置信读者可能进一步意识挂起与阻塞的区别。
2.4 展平流
流示意异步接管的值序列,所以很容易遇到这样的状况:每个值都会触发对另一个值序列的申请,然而,因为流具备异步的性质,因而须要不同的展平模式,所以存在一系列的流展平操作符:
- flatMapConcat 连贯模式;
- flatMapMerge 合并模式;
- flatMapLatest 最新展平模式
来看看怎么应用!
2.4.1 flatMapConcat 连贯模式
fun requestFlow(i: Int) = flow<String> {emit("$i: First")
delay(500)
emit("$i: Second")
}
@Test
fun `test flatMapConcat`() = runBlocking<Unit> {
//Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach {delay(100) }
//.map {requestFlow(it) }
.flatMapConcat {requestFlow(it) }
.collect {println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
运行成果
1: First at 147 ms from start
1: Second at 653 ms from start
2: First at 755 ms from start
2: Second at 1256 ms from start
3: First at 1357 ms from start
3: Second at 1859 ms from start
这里咱们看到这里将两个流连贯了起来,相似于串联模式
2.4.2 flatMapMerge 合并模式
fun requestFlow(i: Int) = flow<String> {emit("$i: First")
delay(500)
emit("$i: Second")
}
@Test
fun `test flatMapMerge`() = runBlocking<Unit> {
//Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach {delay(100) }
//.map {requestFlow(it) }
.flatMapMerge {requestFlow(it) }
.collect {println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
运行成果
1: First at 166 ms from start
2: First at 261 ms from start
3: First at 366 ms from start
1: Second at 668 ms from start
2: Second at 762 ms from start
3: Second at 871 ms from start
这种模式相似于下面的组合 + 背压外面的 conflate 模式,先是生产者将元素全副生成好,随后再告诉消费者生产!
2.4.3 flatMapLatest 最新展平模式
@Test
fun `test flatMapLatest`() = runBlocking<Unit> {
//Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach {delay(100) }
//.map {requestFlow(it) }
.flatMapLatest {requestFlow(it) }
.collect {println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
运行成果
1: First at 164 ms from start
2: First at 364 ms from start
3: First at 469 ms from start
3: Second at 971 ms from start
这种模式就相似于下面的组合 +collectLatest 模式。很简略!
3、异样解决
3.1 流的异样解决
当运算符中的发射器或代码抛出异样时,有几种解决异样的办法:
- try/catch 块
- catch 函数
3.1.1 案例一(try/catch 块)
fun simpleFlow() = flow<Int> {for (i in 1..3) {println("Emitting $i")
emit(i)
}
}
@Test
fun `test flow exception`() = runBlocking<Unit> {
try {simpleFlow().collect { value ->
println(value)
// 如果值为 false,则抛出 IllegalStateException 和调用 lazyMessage 的后果。check(value <= 1) {"Collected $value"}
}
} catch (e: Throwable) {println("Caught $e")
}
}
运行成果
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
这个狠简略,下一个!
3.1.2 案例二(catch 函数)
@Test
fun `test flow exception2`() = runBlocking<Unit> {
flow {emit(1)
throw ArithmeticException("Div 0")
}.catch {e: Throwable -> println("Caught $e") }
.flowOn(Dispatchers.IO)
.collect {println(it) }
}
当初咱们间接通过 catch {}
代码块,来解决对应的异样信息。
运行成果
1
Caught java.lang.ArithmeticException: Div 0
咱们看到这样写,只确保了异样被捕捉了,然而消费者并不知道是否产生了异样,于是乎!
@Test
fun `test flow exception2`() = runBlocking<Unit> {
flow {emit(1)
throw ArithmeticException("Div 0")
}.catch { e: Throwable ->
println("Caught $e")
emit(10)
}.flowOn(Dispatchers.IO).collect {println(it) }
}
咱们看到,在 catch 里,额定发了一条音讯,当消费者受到对应音讯时,就晓得了生产者抛异样了!
来看看运行成果
Caught java.lang.ArithmeticException: Div 0
1
10
3.2 流的实现
当流收集实现时(一般状况或异常情况),它可能须要执行一个动作。
- 命令式 finally 块
- onCompletion 申明式解决
3.2.1 案例一(finally 块)
fun simpleFlow2() = (1..3).asFlow()
@Test
fun `test flow complete in finally`() = runBlocking<Unit> {
try {simpleFlow2().collect {println(it) }
} finally {println("Done")
}
}
很简略的代码块,这个就不贴运行成果了哈,不必想最终会打印 Done!
3.2.2 案例二(onCompletion)
@Test
fun `test flow complete in onCompletion`() = runBlocking<Unit> {simpleFlow2()
.onCompletion {println("Done") }
.collect {println(it) }
}
这里在上一个案例根底上去掉了 try…finally,并额定加了onCompletion
代码块,外面实现了对应的逻辑
来看看运行成果
1
2
3
Done
很显著和下面一样!
结束语
好了,本篇到这就完结了!置信看到这的小伙伴对 Flow 异步流有了一个比拟清晰的认知!在下一篇中,将会解说 Flow 对应的 通道 - 多路复用 - 并发平安
因为文章篇幅无限,文档资料内容较多,须要《2022 最新 Android 面试真题 + 解析》、数据结构与算法面试题、Java 面试题、Android 四大组件、Android 面试题、UI 控件篇、网络通信篇、架构设计篇、性能优化篇、源码流程篇、Kotlin 方面、第三方框架、大厂面经,能够【点击这里收费获取】,心愿可能共同进步,独特学习,共勉!
本文转自 https://juejin.cn/post/7040001489904861191,如有侵权,请分割删除。