Flow 异步流
-
意识
- 个性
- 构建器和上下文
- 启动
- 勾销与勾销检测
- 缓冲
-
操作符
- 过渡操作符
- 末端操作符
- 组合
- 展平
-
异样
- 异样解决
- 实现
如何示意多个值?
挂起函数能够异步的返回单个值,然而如何异步返回多个计算好的值呢?
计划
- 汇合
- 序列
- 挂起函数
-
Flow
用汇合,返回多个值,但不是异步的。private fun createList() = listOf<Int>(1, 2, 3) @Test fun test_list() {createList().forEach {println(it) } }
用序列,返回一个整数序列
private fun createSequence(): Sequence<Int> { return sequence {for (i in 1..3) {Thread.sleep(1000) // 伪装在计算,此处是阻塞,不能做其余事件了 // delay(1000) 这里不能用挂起函数 yield(i) } } } @Test fun test_sequence() {createSequence().forEach {println(it) } }
看下源码
public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T> = Sequence {iterator(block) }
传入的是一个 SequenceScope 的扩大函数。
@RestrictsSuspension @SinceKotlin("1.3") public abstract class SequenceScope<in T> internal constructor()
而 RestrictsSuspension 限度只能应用外面提供的已有的挂起函数,如 yield,yieldAll 等。
createSequence 返回了多个值,然而也是同步的。// 返回多个值,异步 private suspend fun createList2(): List<Int> {delay(5000) return listOf<Int>(1, 2, 3) } @Test fun test_list2() = runBlocking<Unit> {createList().forEach {println(it) } }
能够应用 suspend 函数返回多个值,是异步,然而是是一次性返回了多个值,是否像流一样返回多个值并放弃异步呢?Flow 能够解决这个问题。
private suspend fun createFlow(): Flow<Int> = flow {for (i in 1..3) {delay(1000) emit(i) // 发射,产生一个元素 } } @Test fun test_flow() = runBlocking<Unit> {createFlow().collect {println(it) } // collect 是一个末端操作符,前面讲 }
每隔 1 秒钟产生一个元素,这里是挂起的。用例子来证实一下:
private suspend fun createFlow(): Flow<Int> = flow {for (i in 1..3) {delay(1000) emit(i) } } @Test fun test_flow2() = runBlocking<Unit> { launch {for (i in 1..3) {println("I am running and not blocked $i") delay(1500) } } createFlow().collect { println(it) } }
输入
I am running and not blocked 1 1 I am running and not blocked 2 2 I am running and not blocked 3 3 Process finished with exit code 0
collect 收集后果的过程并没有阻塞另外的协程,打印完 1,而后在 delay 挂起时,去执行其余,并没有阻塞,两个工作来回切换执行。
Flow 真正地做到了返回多个值,并且是异步的。
Flow 与其余形式的区别
- 名为 flow 的 Flow 类型的构建器函数
- flow{…}构建块中的代码能够挂起
- 函数 createFlow()不再标有 suspend 修饰符, 下面代码中的 suspend 修饰符能够去掉
- 流应用 emit 函数发射值
-
流应用 collect 函数收集值
Flow 利用
在 android 中,文件下载是 Flow 的一个十分典型的利用。
冷流
Flow 是一种相似于序列的冷流,flow 构建器中的代码直到流被收集的时候才运行。
private fun createFlow2() = flow<Int> {println("Flow started.")
for (i in 1..3) {delay(1000)
emit(i)
}
}
@Test
fun test_flow_cold() = runBlocking<Unit> {val flow = createFlow2()
println("calling collect...")
flow.collect {value -> println(value) }
println("calling collect again...")
flow.collect {value -> println(value) }
}
calling collect...
Flow started.
1
2
3
calling collect again...
Flow started.
1
2
3
Process finished with exit code 0
能够看到,当调用 collect 办法的时候,流才开始运行,并且能够屡次调用。
流的连续性
- 流的每次独自收集都是按程序执行的,除非应用非凡操作符。
-
从上游到上游,每个过渡操作符都会解决每个发射出的值,而后再交给末端操作符。
@Test fun test_flow_continuation() = runBlocking<Unit> {(1..5).asFlow() .filter {it % 2 == 0} .map {"string $it"} .collect {println("collect $it") } }
collect string 2 collect string 4 Process finished with exit code 0
改例子通过了如下步骤:生成一个流,过滤出偶数,转成字符串,开始收集
流的构建器
- flowOf 构建器定义了一个发射固定值集的流。
-
应用.asFlow()扩大函数,能够将各种汇合与序列转换为流。
@Test fun test_flow_builder() = runBlocking<Unit> { // flowOf 构建器 flowOf("one", "two", "three") .onEach {delay(1000) } .collect {value -> println(value) } // asFlow 扩大函数 (1..3).asFlow().collect { value -> println(value) } }
one two three 1 2 3 Process finished with exit code 0
流的上下文
- 流的收集总是在调用协程的上下文中产生,流的该属性成为上下文保留。
- flow{…}构建器中的代码必须遵循上下文保留属性,并且不容许从其余上下文中发射。
-
flowOn 操作符,该函数用于更改流发射的上下文。
private fun createFlow3() = flow<Int> {println("Flow started ${Thread.currentThread()}") for (i in 1..3) {delay(1000) emit(i) } } @Test fun test_flow_context() = runBlocking<Unit> {createFlow3() .collect {println("$it, thread: ${Thread.currentThread()}") } }
Flow started Thread[main @coroutine#1,5,main] 1, thread: Thread[main @coroutine#1,5,main] 2, thread: Thread[main @coroutine#1,5,main] 3, thread: Thread[main @coroutine#1,5,main] Process finished with exit code 0
不做线程切换,收集和构建都在同一上下文,运行的线程是一样的。
试着更改一下线程,如下private fun createFlow4() = flow {withContext(Dispatchers.IO) { // 用 io 线程 println("Flow started ${Thread.currentThread().name}") for (i in 1..3) {delay(1000) emit(i) } } } @Test fun test_flow_on() = runBlocking {createFlow4().collect {println("collect $it, ${Thread.currentThread()}") } }
Flow started DefaultDispatcher-worker-1 @coroutine#1 java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@4600ac86, BlockingEventLoop@1e1d1f06], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@79c0c2ed, Dispatchers.IO]. Please refer to 'flow' documentation or use 'flowOn' instead
能够看到,构建流在 IO 线程中执行, 但在收集流的时候报错了, 不容许这样做, 倡议应用 flowOn.
正确的做法如下:private fun createFlow5() = flow {println("Flow started ${Thread.currentThread().name}") for (i in 1..3) {delay(1000) emit(i) } }.flowOn(Dispatchers.IO) @Test fun test_flow_on2() = runBlocking {createFlow5().collect {println("collect $it, ${Thread.currentThread().name}") } }
Flow started DefaultDispatcher-worker-1 @coroutine#2 collect 1, main @coroutine#1 collect 2, main @coroutine#1 collect 3, main @coroutine#1 Process finished with exit code 0
流在 IO 线程中构建和发射,在 main 线程中收集。
启动流
-
应用 launchIn 替换 collect,能够在独自的协程中启动流的收集。
private fun events() = (1..3).asFlow() .onEach {delay(1000) println("$it, ${Thread.currentThread().name}") }.flowOn(Dispatchers.Default) @Test fun testFlowLaunch() = runBlocking<Unit> {events() .onEach {e -> println("Event: $e ${Thread.currentThread().name}") } //.collect() .launchIn(CoroutineScope(Dispatchers.IO)) .join()}
1, DefaultDispatcher-worker-3 @coroutine#3 Event: 1 DefaultDispatcher-worker-1 @coroutine#2 2, DefaultDispatcher-worker-1 @coroutine#3 Event: 2 DefaultDispatcher-worker-1 @coroutine#2 3, DefaultDispatcher-worker-1 @coroutine#3 Event: 3 DefaultDispatcher-worker-2 @coroutine#2 Process finished with exit code 0
onEach 是过渡操作符,并不会触发收集数据,collect 是末端操作符,能力触发收集数据。过渡操作符就像是过滤器,末端操作符就像是水龙头的阀门,不关上阀门水就流不进去,无论两头加了多少个过滤安装。
如果想指定在哪个协程外面收集数据,就能够用末端操作符 launchIn(), 能够传递一个作用域进去,而作用域又能够指定调度器,launchIn(CoroutineScope(Dispatchers.IO)).
launchIn 返回的是一个 job 对象,能够进行 cancel 等操作。例如
@Test
fun testFlowLaunch2() = runBlocking<Unit> {val job = events()
.onEach {e -> println("Event: $e ${Thread.currentThread().name}") }
.launchIn(CoroutineScope(Dispatchers.IO))
delay(2000)
job.cancel()}
1, DefaultDispatcher-worker-1 @coroutine#3
Event: 1 DefaultDispatcher-worker-3 @coroutine#2
Process finished with exit code 0
如上,只收集了一个数字,job 就勾销了。
其实 runBlockint 自身就是一个主线程作用域,能够放到 launchIn 中,如下
@Test
fun testFlowLaunch3() = runBlocking<Unit> {val job = events()
.onEach {e -> println("Event: $e ${Thread.currentThread().name}") }
.launchIn(this)
}
1, DefaultDispatcher-worker-1 @coroutine#3
Event: 1 main @coroutine#2
2, DefaultDispatcher-worker-1 @coroutine#3
Event: 2 main @coroutine#2
3, DefaultDispatcher-worker-1 @coroutine#3
Event: 3 main @coroutine#2
Process finished with exit code 0
流的勾销
-
流采纳与协程同样的合作勾销。流的收集能够是当流在一个可勾销的挂起函数(如 delay)中挂起的时候勾销。
private fun createFlow6() = flow<Int> {for (i in 1..3) {delay(1000) println("emitting $i") emit(i) } } @Test fun testCancelFlow() = runBlocking<Unit> {withTimeoutOrNull(2500) {createFlow6().collect {println("collect: $it") } } println("Done.") }
emitting 1 collect: 1 emitting 2 collect: 2 Done. Process finished with exit code 0
设置 2.5 秒超时,流还没发射 3,就超时了,流被勾销。
流的勾销检测
- 不便起见,流构建器对每个发射值执行附加的 ensureActive 检测以进行勾销,这意味着从 flow{…}收回的忙碌循环是能够勾销的。
- 处于性能起因,大多数其余流操作不会自行执行其余勾销检测,在协程处于忙碌循环的状况下,必须明确检测是否勾销。
-
通过 cancellable 操作符来执行操作。
private fun createFlow7() = flow<Int> {for (i in 1..5) {delay(1000) println("emitting $i") emit(i) } } @Test fun testCancelFlowCheck() = runBlocking<Unit> {createFlow7().collect {if (it == 3) cancel() println("collect: $it") } println("Done.") }
emitting 1 collect: 1 emitting 2 collect: 2 emitting 3 collect: 3 kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled ... Process finished with exit code 255
在收集流的时候,遇到 3 就执行勾销操作,抛出 JobCancellationException,3 还是会被收集到。
@Test fun testCancelFlowCheck2() = runBlocking<Unit> {(1..5).asFlow().collect {if (it == 3) cancel() println("collect: $it") } println("Done.") }
collect: 1 collect: 2 collect: 3 collect: 4 collect: 5 Done. kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
应用 asFlow()创立流,在收集的时候,尽管遇到 3 进行了勾销,然而还是把所有的元素都打印了当前才抛出异样。如果要在执行的过程中真正的阻断流,须要加上 cancellable()操作,如下:
@Test fun testCancelFlowCheck3() = runBlocking<Unit> {(1..5).asFlow().cancellable().collect {if (it == 3) cancel() println("collect: $it") } println("Done.") }
collect: 1 collect: 2 collect: 3 kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
背压
- buffer(), 并发运行流中发射元素的代码。
- conflate(), 合并发射项,不对每个值进行解决。
- collectLatest(), 勾销并从新发射最初一个值。
- 当必须更改 CoroutineDispatcher 时,flowOn 操作符应用了雷同的缓冲机制,然而 buffer 函数显示地申请缓冲而不扭转执行上下文。
private fun createFlow8() = flow<Int> {for (i in 1..5) {delay(100) // 生产一个元素须要 0.1 秒
println("emitting $i")
emit(i)
}
}
@Test
fun testBackPressure() = runBlocking<Unit> {
val time = measureTimeMillis {createFlow8()
.buffer(50) // 并发运行流中发射元素
.collect {delay(200) // 生产一个元素须要 0.2 秒
println("collect: $it")
}
}
println("Done, total $time")
}
emitting 1
emitting 2
collect: 1
emitting 3
emitting 4
collect: 2
emitting 5
collect: 3
collect: 4
collect: 5
Done, total 1188
应用 buffer 能够让发射元素并发执行,提高效率。
应用 flowOn()切换线程,也能够提高效率。
private fun createFlow8() = flow<Int> {for (i in 1..5) {delay(100) // 生产一个元素须要 0.1 秒
println("emitting $i, ${Thread.currentThread().name}")
emit(i)
}
}
@Test
fun testBackPressure2() = runBlocking<Unit> {
val time = measureTimeMillis {createFlow8()
.flowOn(Dispatchers.Default)
.collect {delay(200) // 生产一个元素须要 0.2 秒
println("collect: $it, ${Thread.currentThread().name}")
}
}
println("Done, total $time")
}
emitting 1, DefaultDispatcher-worker-1 @coroutine#2
emitting 2, DefaultDispatcher-worker-1 @coroutine#2
emitting 3, DefaultDispatcher-worker-1 @coroutine#2
collect: 1, main @coroutine#1
emitting 4, DefaultDispatcher-worker-1 @coroutine#2
collect: 2, main @coroutine#1
emitting 5, DefaultDispatcher-worker-1 @coroutine#2
collect: 3, main @coroutine#1
collect: 4, main @coroutine#1
collect: 5, main @coroutine#1
Done, total 1186
conflate()能够合并发射项,然而不会对每个值进行解决。
@Test
fun testBackPressure3() = runBlocking<Unit> {
val time = measureTimeMillis {createFlow8()
.conflate()
.collect {delay(200) // 生产一个元素须要 0.2 秒
println("collect: $it, ${Thread.currentThread().name}")
}
}
println("Done, total $time")
}
emitting 1, main @coroutine#2
emitting 2, main @coroutine#2
emitting 3, main @coroutine#2
collect: 1, main @coroutine#1
emitting 4, main @coroutine#2
collect: 3, main @coroutine#1
emitting 5, main @coroutine#2
collect: 4, main @coroutine#1
collect: 5, main @coroutine#1
Done, total 1016
下面的例子中,应用 conflate(),collect 时跳过了 2.
应用 collectLatest()只会收集最初一个值,如下:
@Test
fun testBackPressure4() = runBlocking<Unit> {
val time = measureTimeMillis {createFlow8()
.collectLatest {delay(200) // 生产一个元素须要 0.2 秒
println("collect: $it, ${Thread.currentThread().name}")
}
}
println("Done, total $time")
}
emitting 1, main @coroutine#2
emitting 2, main @coroutine#2
emitting 3, main @coroutine#2
emitting 4, main @coroutine#2
emitting 5, main @coroutine#2
collect: 5, main @coroutine#7
Done, total 913
Process finished with exit code 0
操作符
过渡流操作符
- 能够应用操作符转换流,就像应用汇合与序列一样。
- 过渡操作符利用于上游流,并返回上游流。
- 这些操作符也是冷操作符,就像流一样。这类操作符自身不是挂起函数。
-
运行速度很快,返回新的转换流的定义。
private fun createFlow9() = flow<Int> {for (i in 1..3) {delay(100) // 生产一个元素须要 0.1 秒 println("emitting $i") emit(i) } } @Test fun testMap() = runBlocking<Unit> {createFlow9() .map {data -> performRequest(data) } .collect {println("collect: $it") } }
emitting 1 collect: --response 1-- emitting 2 collect: --response 2-- emitting 3 collect: --response 3-- Process finished with exit code 0
下面的例子,map 操作符,把 Int 流转成了 String 流。
@Test fun testTransform() = runBlocking<Unit> {createFlow9() .transform { data -> emit("making request $data") emit(performRequest(data)) } .collect {println("collect: $it") } }
emitting 1 collect: making request 1 collect: --response 1-- emitting 2 collect: making request 2 collect: --response 2-- emitting 3 collect: making request 3 collect: --response 3-- Process finished with exit code 0
下面的例子,transform 操作符能够把流通过屡次转换,屡次发射。
限长操作符
take 操作符
private fun numbers() = flow<Int> { try {emit(1) emit(2) println("This line will not execute") emit(3) } finally {println("Finally.") } } @Test fun testLimitOperator() = runBlocking {numbers().take(2).collect {println("collect $it") } }
collect 1 collect 2 Finally.
take 传入参数 2,则只取 2 个数据。
末端流操作符
末端操作符是在流上用于启动流收集的挂起函数。collect 是最根底的末端操作符,然而还有一些更不便的末端操作符:
- 转化为各种汇合,例如 toList 和 toSet.
- 获取第一个 (first) 值与确保流发射单个 (single) 值的操作符。
-
应用 reduce 与 fold 将流规约到单个值。
例如 reduce 操作符@Test fun testTerminateOperator() = runBlocking {val sum = (1..5).asFlow().map { it * it}.reduce {a, b -> a + b} println(sum) }
55
计算数字 1 - 5 的平方,而后求和,失去 55.
组合多个流
就像 Kotlin 规范库中的 Sequence.zip 扩大函数一样,流领有一个 zip 操作符用于组合两个流中的相干值。
@Test fun testZip() = runBlocking {val numbers = (1..3).asFlow() val strings = flowOf("One", "Two", "Three") numbers.zip(strings) {a, b -> "$a -> $b"}.collect {println(it) } }
这个例子把数字流和字符串流用 zip 操作符组合起来,成为一个字符流。
1 -> One
2 -> Two
3 -> Three
Process finished with exit code 0
@Test
fun testZip2() = runBlocking {val numbers = (1..3).asFlow().onEach { delay(300) }
val strings = flowOf("One", "Two", "Three").onEach {delay(500) }
val start = System.currentTimeMillis()
numbers.zip(strings) {a, b -> "$a -> $b"}.collect {println("$it, ${System.currentTimeMillis() - start}") }
}
如果两个流各自有 delay,合并操作会期待那个 delay 工夫较长的数据。
1 -> One, 563
2 -> Two, 1065
3 -> Three, 1569
Process finished with exit code 0
展平流
流示意异步接管的值序列,所以很容易遇到这样状况:每个值都会触发对另一个值序列的申请,然而,因为流具备异步的性质,因而须要不同的展平模式,为此,存在一系列的流展平操作符:
- flatMapConcat 连贯模式
- flatMapMerge 合并模式
- flatMapLatest 最新展平模式
应用 flatMapConcat 连贯模式
private fun requestFlow(i: Int) = flow<String> {emit("$i: First")
delay(500)
emit("$i: Second")
}
@Test
fun testFlatMapConcat() = runBlocking {val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach {delay(100) }
//.map {requestFlow(it) } // 如果用 map,则产生一个 Flow<Flow<String>>
.flatMapConcat {requestFlow(it) } // 应用 flatMapConcat,把 flow 展平成一维,达到成果
.collect {println("$it at ${System.currentTimeMillis() - startTime} ms") }
}
1: First at 144 ms
1: Second at 649 ms
2: First at 754 ms
2: Second at 1256 ms
3: First at 1361 ms
3: Second at 1861 ms
Process finished with exit code 0
应用 flatMapMerge
@Test
fun testFlatMapMergeConcat() = runBlocking {val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach {delay(100) }
.flatMapMerge {requestFlow(it) }
.collect {println("$it at ${System.currentTimeMillis() - startTime} ms") }
}
1: First at 202 ms
2: First at 301 ms
3: First at 407 ms
1: Second at 708 ms
2: Second at 805 ms
3: Second at 927 ms
Process finished with exit code 0
发射 1:First 后,delay 500ms,这期间发射 2:First,发射 3:First;别离把这些数据收集到,而后其余的数据累计发射结束并收集。
在来看 flatMapLatest 操作符
private fun requestFlow(i: Int) = flow<String> {emit("$i: First")
delay(500)
emit("$i: Second")
}
@Test
fun testFlatMapLatestConcat() = runBlocking {val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach {delay(200) }
.flatMapLatest {requestFlow(it) }
.collect {println("$it at ${System.currentTimeMillis() - startTime} ms") }
}
1: First at 313 ms
2: First at 581 ms
3: First at 786 ms
3: Second at 1291 ms
Process finished with exit code 0
跳过某些两头值,只收集最新的值。
流的异样解决
当运算符中的发射器或代码抛出异样时,有几种解决异样的办法:
- try catch 块
-
catch 函数
应用代码块捕捉上游异样private fun createFlow10() = flow<Int> {for (i in 1..3) {println("emitting $i") emit(i) } } @Test fun testException() = runBlocking { try {createFlow10().collect {println("collect: $it") check(it <= 1) {"wrong value"} } } catch (e: Exception) {println("handle the exception: $e") } }
emitting 1 collect: 1 emitting 2 collect: 2 handle the exception: java.lang.IllegalStateException: wrong value Process finished with exit code 0
应用 catch 操作符捕捉上游异样
@Test fun testException2() = runBlocking { flow {emit(1) 1/0 emit(2) } .catch {println("$it") } .flowOn(Dispatchers.IO) .collect {println("collect: $it") } }
java.lang.ArithmeticException: / by zero collect: 1 Process finished with exit code 0
能够在捕捉异样后补充发射一个数据
@Test fun testException2() = runBlocking { flow {emit(1) 1/0 emit(2) } .catch {println("$it") emit(10) } .flowOn(Dispatchers.IO) .collect {println("collect: $it") } }
java.lang.ArithmeticException: / by zero collect: 1 collect: 10 Process finished with exit code 0
前面的 2 当然是收不到的。
流的实现
当流收集实现时(一般状况或者异常情况),它可能须要执行一个动作。
- 命令式 finally 块
-
onCompletion 申明式解决
finally 块private fun createFlow11() = flow<Int> {emit(1) 1 / 0 emit(2) } @Test fun testComplete() = runBlocking { try {createFlow11().collect {println("collect $it") } } catch (e: Exception) {println("exception: $e") } finally {println("Finally.") } }
collect 1 exception: java.lang.ArithmeticException: / by zero Finally. Process finished with exit code 0
@Test fun testComplete2() = runBlocking {createFlow11() .onCompletion {println("exception: $it") } .collect {println("collect $it") } }
collect 1 exception: java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
onCompletion 外面能接到异样,然而并不能捕捉异样,如果想捕捉还须要 catch 操作符。