Flow异步流
意识
- 个性
- 构建器和上下文
- 启动
- 勾销与勾销检测
- 缓冲
操作符
- 过渡操作符
- 末端操作符
- 组合
- 展平
异样
- 异样解决
- 实现
如何示意多个值?
挂起函数能够异步的返回单个值,然而如何异步返回多个计算好的值呢?
计划
- 汇合
- 序列
- 挂起函数
Flow
用汇合,返回多个值,但不是异步的。private fun createList() = listOf<Int>(1, 2, 3)@Testfun test_list() { createList().forEach { println(it) }}
用序列,返回一个整数序列
private fun createSequence(): Sequence<Int> { return sequence { for (i in 1..3) { Thread.sleep(1000) // 伪装在计算,此处是阻塞,不能做其余事件了 // delay(1000) 这里不能用挂起函数 yield(i) } }}@Testfun test_sequence() { createSequence().forEach { println(it) }}
看下源码
public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T> = Sequence { iterator(block) }
传入的是一个SequenceScope的扩大函数。
@RestrictsSuspension@SinceKotlin("1.3")public abstract class SequenceScope<in T> internal constructor()
而RestrictsSuspension限度只能应用外面提供的已有的挂起函数,如yield,yieldAll等。
createSequence返回了多个值,然而也是同步的。// 返回多个值,异步private suspend fun createList2(): List<Int> { delay(5000) return listOf<Int>(1, 2, 3)}@Testfun test_list2() = runBlocking<Unit> { createList().forEach { println(it) }}
能够应用suspend函数返回多个值,是异步,然而是是一次性返回了多个值,是否像流一样返回多个值并放弃异步呢?Flow能够解决这个问题。
private suspend fun createFlow(): Flow<Int> = flow { for (i in 1..3) { delay(1000) emit(i) // 发射,产生一个元素 }}@Testfun test_flow() = runBlocking<Unit> { createFlow().collect { println(it) } // collect是一个末端操作符,前面讲}
每隔1秒钟产生一个元素,这里是挂起的。用例子来证实一下:
private suspend fun createFlow(): Flow<Int> = flow { for (i in 1..3) { delay(1000) emit(i) } } @Test fun test_flow2() = runBlocking<Unit> { launch { for (i in 1..3) { println("I am running and not blocked $i") delay(1500) } } createFlow().collect { println(it) } }
输入
I am running and not blocked 11I am running and not blocked 22I am running and not blocked 33Process finished with exit code 0
collect收集后果的过程并没有阻塞另外的协程,打印完1,而后在delay挂起时,去执行其余,并没有阻塞,两个工作来回切换执行。
Flow真正地做到了返回多个值,并且是异步的。
Flow与其余形式的区别
- 名为flow的Flow类型的构建器函数
- flow{...}构建块中的代码能够挂起
- 函数createFlow()不再标有suspend修饰符,下面代码中的suspend修饰符能够去掉
- 流应用emit函数发射值
流应用collect函数收集值
Flow利用
在android中,文件下载是Flow的一个十分典型的利用。
冷流
Flow是一种相似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。
private fun createFlow2() = flow<Int> { println("Flow started.") for (i in 1..3) { delay(1000) emit(i) }}@Testfun test_flow_cold() = runBlocking<Unit> { val flow = createFlow2() println("calling collect...") flow.collect { value -> println(value) } println("calling collect again...") flow.collect { value -> println(value) }}
calling collect...Flow started.123calling collect again...Flow started.123Process finished with exit code 0
能够看到,当调用collect办法的时候,流才开始运行,并且能够屡次调用。
流的连续性
- 流的每次独自收集都是按程序执行的,除非应用非凡操作符。
从上游到上游,每个过渡操作符都会解决每个发射出的值,而后再交给末端操作符。
@Testfun test_flow_continuation() = runBlocking<Unit> { (1..5).asFlow() .filter { it % 2 == 0 } .map { "string $it" } .collect { println("collect $it") }}
collect string 2collect string 4Process finished with exit code 0
改例子通过了如下步骤:生成一个流,过滤出偶数,转成字符串,开始收集
流的构建器
- flowOf构建器定义了一个发射固定值集的流。
应用.asFlow()扩大函数,能够将各种汇合与序列转换为流。
@Testfun test_flow_builder() = runBlocking<Unit> { // flowOf构建器 flowOf("one", "two", "three") .onEach { delay(1000) } .collect { value -> println(value) } // asFlow扩大函数 (1..3).asFlow().collect { value -> println(value) }}
onetwothree123Process finished with exit code 0
流的上下文
- 流的收集总是在调用协程的上下文中产生,流的该属性成为上下文保留。
- flow{...}构建器中的代码必须遵循上下文保留属性,并且不容许从其余上下文中发射。
flowOn操作符,该函数用于更改流发射的上下文。
private fun createFlow3() = flow<Int> { println("Flow started ${Thread.currentThread()}") for (i in 1..3) { delay(1000) emit(i) }}@Testfun test_flow_context() = runBlocking<Unit> { createFlow3() .collect { println("$it, thread: ${Thread.currentThread()}") }}
Flow started Thread[main @coroutine#1,5,main]1, thread: Thread[main @coroutine#1,5,main]2, thread: Thread[main @coroutine#1,5,main]3, thread: Thread[main @coroutine#1,5,main]Process finished with exit code 0
不做线程切换,收集和构建都在同一上下文,运行的线程是一样的。
试着更改一下线程,如下private fun createFlow4() = flow { withContext(Dispatchers.IO) { // 用io线程 println("Flow started ${Thread.currentThread().name}") for (i in 1..3) { delay(1000) emit(i) } }}@Testfun test_flow_on() = runBlocking { createFlow4().collect { println("collect $it, ${Thread.currentThread()}") }}
Flow started DefaultDispatcher-worker-1 @coroutine#1java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@4600ac86, BlockingEventLoop@1e1d1f06], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@79c0c2ed, Dispatchers.IO]. Please refer to 'flow' documentation or use 'flowOn' instead
能够看到,构建流在IO线程中执行,但在收集流的时候报错了,不容许这样做,倡议应用flowOn.
正确的做法如下:private fun createFlow5() = flow { println("Flow started ${Thread.currentThread().name}") for (i in 1..3) { delay(1000) emit(i) }}.flowOn(Dispatchers.IO)@Testfun test_flow_on2() = runBlocking { createFlow5().collect { println("collect $it, ${Thread.currentThread().name}") }}
Flow started DefaultDispatcher-worker-1 @coroutine#2collect 1, main @coroutine#1collect 2, main @coroutine#1collect 3, main @coroutine#1Process finished with exit code 0
流在IO线程中构建和发射,在main线程中收集。
启动流
应用launchIn替换collect,能够在独自的协程中启动流的收集。
private fun events() = (1..3).asFlow() .onEach { delay(1000) println("$it, ${Thread.currentThread().name}") }.flowOn(Dispatchers.Default) @Test fun testFlowLaunch() = runBlocking<Unit> { events() .onEach { e -> println("Event: $e ${Thread.currentThread().name}") } //.collect() .launchIn(CoroutineScope(Dispatchers.IO)) .join() }
1, DefaultDispatcher-worker-3 @coroutine#3Event: 1 DefaultDispatcher-worker-1 @coroutine#22, DefaultDispatcher-worker-1 @coroutine#3Event: 2 DefaultDispatcher-worker-1 @coroutine#23, DefaultDispatcher-worker-1 @coroutine#3Event: 3 DefaultDispatcher-worker-2 @coroutine#2Process finished with exit code 0
onEach是过渡操作符,并不会触发收集数据,collect是末端操作符,能力触发收集数据。过渡操作符就像是过滤器,末端操作符就像是水龙头的阀门,不关上阀门水就流不进去,无论两头加了多少个过滤安装。
如果想指定在哪个协程外面收集数据,就能够用末端操作符launchIn(),能够传递一个作用域进去,而作用域又能够指定调度器,launchIn(CoroutineScope(Dispatchers.IO)).
launchIn返回的是一个job对象,能够进行cancel等操作。例如
@Testfun testFlowLaunch2() = runBlocking<Unit> { val job = events() .onEach { e -> println("Event: $e ${Thread.currentThread().name}") } .launchIn(CoroutineScope(Dispatchers.IO)) delay(2000) job.cancel()}
1, DefaultDispatcher-worker-1 @coroutine#3Event: 1 DefaultDispatcher-worker-3 @coroutine#2Process finished with exit code 0
如上,只收集了一个数字,job就勾销了。
其实runBlockint自身就是一个主线程作用域,能够放到launchIn中,如下
@Testfun testFlowLaunch3() = runBlocking<Unit> { val job = events() .onEach { e -> println("Event: $e ${Thread.currentThread().name}") } .launchIn(this)}
1, DefaultDispatcher-worker-1 @coroutine#3Event: 1 main @coroutine#22, DefaultDispatcher-worker-1 @coroutine#3Event: 2 main @coroutine#23, DefaultDispatcher-worker-1 @coroutine#3Event: 3 main @coroutine#2Process finished with exit code 0
流的勾销
流采纳与协程同样的合作勾销。流的收集能够是当流在一个可勾销的挂起函数(如delay)中挂起的时候勾销。
private fun createFlow6() = flow<Int> { for (i in 1..3) { delay(1000) println("emitting $i") emit(i) } } @Test fun testCancelFlow() = runBlocking<Unit> { withTimeoutOrNull(2500) { createFlow6().collect { println("collect: $it") } } println("Done.") }
emitting 1collect: 1emitting 2collect: 2Done.Process finished with exit code 0
设置2.5秒超时,流还没发射3,就超时了,流被勾销。
流的勾销检测
- 不便起见,流构建器对每个发射值执行附加的ensureActive检测以进行勾销,这意味着从flow{...}收回的忙碌循环是能够勾销的。
- 处于性能起因,大多数其余流操作不会自行执行其余勾销检测,在协程处于忙碌循环的状况下,必须明确检测是否勾销。
通过cancellable操作符来执行操作。
private fun createFlow7() = flow<Int> { for (i in 1..5) { delay(1000) println("emitting $i") emit(i) } } @Test fun testCancelFlowCheck() = runBlocking<Unit> { createFlow7().collect { if (it == 3) cancel() println("collect: $it") } println("Done.") }
emitting 1collect: 1emitting 2collect: 2emitting 3collect: 3kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled...Process finished with exit code 255
在收集流的时候,遇到3就执行勾销操作,抛出JobCancellationException,3还是会被收集到。
@Test fun testCancelFlowCheck2() = runBlocking<Unit> { (1..5).asFlow().collect { if (it == 3) cancel() println("collect: $it") } println("Done.") }
collect: 1collect: 2collect: 3collect: 4collect: 5Done.kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
应用asFlow()创立流,在收集的时候,尽管遇到3进行了勾销,然而还是把所有的元素都打印了当前才抛出异样。如果要在执行的过程中真正的阻断流,须要加上cancellable()操作,如下:
@Test fun testCancelFlowCheck3() = runBlocking<Unit> { (1..5).asFlow().cancellable().collect { if (it == 3) cancel() println("collect: $it") } println("Done.") }
collect: 1collect: 2collect: 3kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
背压
- buffer(),并发运行流中发射元素的代码。
- conflate(),合并发射项,不对每个值进行解决。
- collectLatest(),勾销并从新发射最初一个值。
- 当必须更改CoroutineDispatcher时,flowOn操作符应用了雷同的缓冲机制,然而buffer函数显示地申请缓冲而不扭转执行上下文。
private fun createFlow8() = flow<Int> { for (i in 1..5) { delay(100) // 生产一个元素须要0.1秒 println("emitting $i") emit(i) } } @Test fun testBackPressure() = runBlocking<Unit> { val time = measureTimeMillis { createFlow8() .buffer(50) // 并发运行流中发射元素 .collect { delay(200) // 生产一个元素须要0.2秒 println("collect: $it") } } println("Done, total $time") }
emitting 1emitting 2collect: 1emitting 3emitting 4collect: 2emitting 5collect: 3collect: 4collect: 5Done, total 1188
应用buffer能够让发射元素并发执行,提高效率。
应用flowOn()切换线程,也能够提高效率。
private fun createFlow8() = flow<Int> { for (i in 1..5) { delay(100) // 生产一个元素须要0.1秒 println("emitting $i, ${Thread.currentThread().name}") emit(i) } } @Test fun testBackPressure2() = runBlocking<Unit> { val time = measureTimeMillis { createFlow8() .flowOn(Dispatchers.Default) .collect { delay(200) // 生产一个元素须要0.2秒 println("collect: $it, ${Thread.currentThread().name}") } } println("Done, total $time") }
emitting 1, DefaultDispatcher-worker-1 @coroutine#2emitting 2, DefaultDispatcher-worker-1 @coroutine#2emitting 3, DefaultDispatcher-worker-1 @coroutine#2collect: 1, main @coroutine#1emitting 4, DefaultDispatcher-worker-1 @coroutine#2collect: 2, main @coroutine#1emitting 5, DefaultDispatcher-worker-1 @coroutine#2collect: 3, main @coroutine#1collect: 4, main @coroutine#1collect: 5, main @coroutine#1Done, total 1186
conflate()能够合并发射项,然而不会对每个值进行解决。
@Test fun testBackPressure3() = runBlocking<Unit> { val time = measureTimeMillis { createFlow8() .conflate() .collect { delay(200) // 生产一个元素须要0.2秒 println("collect: $it, ${Thread.currentThread().name}") } } println("Done, total $time") }
emitting 1, main @coroutine#2emitting 2, main @coroutine#2emitting 3, main @coroutine#2collect: 1, main @coroutine#1emitting 4, main @coroutine#2collect: 3, main @coroutine#1emitting 5, main @coroutine#2collect: 4, main @coroutine#1collect: 5, main @coroutine#1Done, total 1016
下面的例子中,应用conflate(),collect时跳过了2.
应用collectLatest()只会收集最初一个值,如下:
@Test fun testBackPressure4() = runBlocking<Unit> { val time = measureTimeMillis { createFlow8() .collectLatest { delay(200) // 生产一个元素须要0.2秒 println("collect: $it, ${Thread.currentThread().name}") } } println("Done, total $time") }
emitting 1, main @coroutine#2emitting 2, main @coroutine#2emitting 3, main @coroutine#2emitting 4, main @coroutine#2emitting 5, main @coroutine#2collect: 5, main @coroutine#7Done, total 913Process finished with exit code 0
操作符
过渡流操作符
- 能够应用操作符转换流,就像应用汇合与序列一样。
- 过渡操作符利用于上游流,并返回上游流。
- 这些操作符也是冷操作符,就像流一样。这类操作符自身不是挂起函数。
运行速度很快,返回新的转换流的定义。
private fun createFlow9() = flow<Int> { for (i in 1..3) { delay(100) // 生产一个元素须要0.1秒 println("emitting $i") emit(i) } } @Test fun testMap() = runBlocking<Unit> { createFlow9() .map { data -> performRequest(data) } .collect { println("collect: $it") } }
emitting 1collect: --response 1--emitting 2collect: --response 2--emitting 3collect: --response 3--Process finished with exit code 0
下面的例子,map操作符,把Int流转成了String流。
@Test fun testTransform() = runBlocking<Unit> { createFlow9() .transform { data -> emit("making request $data") emit(performRequest(data)) } .collect { println("collect: $it") } }
emitting 1collect: making request 1collect: --response 1--emitting 2collect: making request 2collect: --response 2--emitting 3collect: making request 3collect: --response 3--Process finished with exit code 0
下面的例子,transform操作符能够把流通过屡次转换,屡次发射。
限长操作符
take操作符
private fun numbers() = flow<Int> { try { emit(1) emit(2) println("This line will not execute") emit(3) } finally { println("Finally.") } } @Test fun testLimitOperator() = runBlocking { numbers().take(2).collect { println("collect $it") } }
collect 1collect 2Finally.
take传入参数2,则只取2个数据。
末端流操作符
末端操作符是在流上用于启动流收集的挂起函数。collect是最根底的末端操作符,然而还有一些更不便的末端操作符:
- 转化为各种汇合,例如toList和toSet.
- 获取第一个(first)值与确保流发射单个(single)值的操作符。
应用reduce与fold将流规约到单个值。
例如reduce操作符@Test fun testTerminateOperator() = runBlocking { val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b } println(sum) }
55
计算数字1-5的平方,而后求和,失去55.
组合多个流
就像Kotlin规范库中的Sequence.zip扩大函数一样,流领有一个zip操作符用于组合两个流中的相干值。
@Test fun testZip() = runBlocking { val numbers = (1..3).asFlow() val strings = flowOf("One", "Two", "Three") numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println(it) } }
这个例子把数字流和字符串流用zip操作符组合起来,成为一个字符流。
1 -> One2 -> Two3 -> ThreeProcess finished with exit code 0
@Test fun testZip2() = runBlocking { val numbers = (1..3).asFlow().onEach { delay(300) } val strings = flowOf("One", "Two", "Three").onEach { delay(500) } val start = System.currentTimeMillis() numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println("$it, ${System.currentTimeMillis() - start}") } }
如果两个流各自有delay,合并操作会期待那个delay工夫较长的数据。
1 -> One, 5632 -> Two, 10653 -> Three, 1569Process finished with exit code 0
展平流
流示意异步接管的值序列,所以很容易遇到这样状况:每个值都会触发对另一个值序列的申请,然而,因为流具备异步的性质,因而须要不同的展平模式,为此,存在一系列的流展平操作符:
- flatMapConcat连贯模式
- flatMapMerge合并模式
- flatMapLatest最新展平模式
应用flatMapConcat连贯模式
private fun requestFlow(i: Int) = flow<String> { emit("$i: First") delay(500) emit("$i: Second") } @Test fun testFlatMapConcat() = runBlocking { val startTime = System.currentTimeMillis() (1..3).asFlow() .onEach { delay(100) } //.map { requestFlow(it) } // 如果用map,则产生一个Flow<Flow<String>> .flatMapConcat { requestFlow(it) } // 应用flatMapConcat,把flow展平成一维,达到成果 .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") } }
1: First at 144 ms1: Second at 649 ms2: First at 754 ms2: Second at 1256 ms3: First at 1361 ms3: Second at 1861 msProcess finished with exit code 0
应用flatMapMerge
@Test fun testFlatMapMergeConcat() = runBlocking { val startTime = System.currentTimeMillis() (1..3).asFlow() .onEach { delay(100) } .flatMapMerge { requestFlow(it) } .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") } }
1: First at 202 ms2: First at 301 ms3: First at 407 ms1: Second at 708 ms2: Second at 805 ms3: Second at 927 msProcess finished with exit code 0
发射1:First后,delay 500ms,这期间发射2:First,发射3:First;别离把这些数据收集到,而后其余的数据累计发射结束并收集。
在来看flatMapLatest操作符
private fun requestFlow(i: Int) = flow<String> { emit("$i: First") delay(500) emit("$i: Second") } @Test fun testFlatMapLatestConcat() = runBlocking { val startTime = System.currentTimeMillis() (1..3).asFlow() .onEach { delay(200) } .flatMapLatest { requestFlow(it) } .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") } }
1: First at 313 ms2: First at 581 ms3: First at 786 ms3: Second at 1291 msProcess finished with exit code 0
跳过某些两头值,只收集最新的值。
流的异样解决
当运算符中的发射器或代码抛出异样时,有几种解决异样的办法:
- try catch块
catch函数
应用代码块捕捉上游异样private fun createFlow10() = flow<Int> { for (i in 1..3) { println("emitting $i") emit(i) } } @Test fun testException() = runBlocking { try { createFlow10().collect { println("collect: $it") check(it <= 1) { "wrong value " } } } catch (e: Exception) { println("handle the exception: $e") } }
emitting 1collect: 1emitting 2collect: 2handle the exception: java.lang.IllegalStateException: wrong value Process finished with exit code 0
应用catch操作符捕捉上游异样
@Test fun testException2() = runBlocking { flow { emit(1) 1/0 emit(2) } .catch { println("$it") } .flowOn(Dispatchers.IO) .collect { println("collect: $it") } }
java.lang.ArithmeticException: / by zerocollect: 1Process finished with exit code 0
能够在捕捉异样后补充发射一个数据
@Test fun testException2() = runBlocking { flow { emit(1) 1/0 emit(2) } .catch { println("$it") emit(10) } .flowOn(Dispatchers.IO) .collect { println("collect: $it") } }
java.lang.ArithmeticException: / by zerocollect: 1collect: 10Process finished with exit code 0
前面的2当然是收不到的。
流的实现
当流收集实现时(一般状况或者异常情况),它可能须要执行一个动作。
- 命令式finally块
onCompletion申明式解决
finally 块private fun createFlow11() = flow<Int> { emit(1) 1 / 0 emit(2) } @Test fun testComplete() = runBlocking { try { createFlow11().collect { println("collect $it") } } catch (e: Exception) { println("exception: $e") } finally { println("Finally.") } }
collect 1exception: java.lang.ArithmeticException: / by zeroFinally.Process finished with exit code 0
@Test fun testComplete2() = runBlocking { createFlow11() .onCompletion { println("exception: $it") } .collect { println("collect $it") } }
collect 1exception: java.lang.ArithmeticException: / by zerojava.lang.ArithmeticException: / by zero
onCompletion外面能接到异样,然而并不能捕捉异样,如果想捕捉还须要catch操作符。