公众号「罕见猿诉」 原文链接 专家之路上的 Flow 高级秘籍
『君不见,黄河之水天上来,奔流到海不复回。』
学习与河流一样,一方面学无止境,又是逆水行舟,逆水行舟,因为其他人都在卷。前文一篇文章讲了 Flow 的根底,大多数状况下够用了,然而不能进行卷,因为你不卷,就会被他人卷。一旦波及到简单的利用场景,就须要用到一些高级的 API。明天就来学习一下 Flow 的高级个性,当遇到问题时也能更从容的应答。
上下文切换
Flow 是基于协程的,是用协程来实现并发,后面也提到过像 [flow {…}](),在上游生产数据,以及中游做变幻时,都是能够间接调用 suspend,耗时甚至是阻塞的函数的。而终端操作符如[collect]() 则是 suspend 的,调用者(也就是消费者)须要负责确保 collect 是在协程中调用。咱们还晓得 Flow 是是冷流,消费者终端才会触发上游生产者生产,所以对于 flow {…}来说,它的上游和中游运行的上下文来自于终端调用者的上下文,这个叫做『上下文保留』(context preservation),咱们能够用一个🌰 来验证一下:
fun main() = runBlocking {
// Should be main by default
simple().collect { log("Got: $it") }
// Collect in a specified context
withContext(Dispatchers.Default) {simple().collect {log("Now got: $it") }
}
}
private fun simple(): Flow<Int> = flow {log("Started the simple flow")
for (i in 1..3) {delay(100)
log("Producing $i")
emit(i)
}
}
输入如下:
[main @coroutine#1] Started the simple flow
[main @coroutine#1] Producing 1
[main @coroutine#1] Got: 1
[main @coroutine#1] Producing 2
[main @coroutine#1] Got: 2
[main @coroutine#1] Producing 3
[main @coroutine#1] Got: 3
[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow
[DefaultDispatcher-worker-1 @coroutine#1] Producing 1
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 1
[DefaultDispatcher-worker-1 @coroutine#1] Producing 2
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 2
[DefaultDispatcher-worker-1 @coroutine#1] Producing 3
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 3
从这个🌰 能够分明的看到,Flow 的 context 是来自于终端调用者的。
用 flowOn 来指定上下文
有时候应用终端调用者的上下文可能不太不便,因为生产者与消费者的模式其实是解耦的,它们不应该互相受制于对方,对于要害的并发的上下文更是如此。比如说在 GUI 的利用中,显著应该在工作线程中生产数据,在 UI 线程中生产数据,从下面的例子来看,由终端调用者来决定上游上下文显著不可取。有同学举手了,欺侮我没学过协程是吧?我能够在 Flow 外部应用 withContext 来指定上下文啊,咱们来试试:
fun main() = runBlocking {
// Should be main by default
simple().collect { log("Got: $it") }
}
private fun simple(): Flow<Int> = flow {withContext(Dispatchers.Default) {log("Started the simple flow")
for (i in 1..3) {delay(100)
log("Producing $i")
emit(i)
}
}
}
这位同学能够间接进来了,因为你的代码 crash 了:
[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow
[DefaultDispatcher-worker-1 @coroutine#1] Producing 1
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@545486c7, BlockingEventLoop@13bfcf14],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@27015c5a, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
意思大略是说 Flow 外部不让间接用 withContext 来切上下文,毁坏了 Flow 的不变式,想切上下文要用 flowOn。而且认真看,异样是由 emit 函数抛出来的。
其实 Flow 的设计者曾经思考到了这个问题,并且给出了优雅的形式,如果想切换 Flow 外部(也即上游和中游)的运行上下文,要用 flowOn 函数:
fun main() = runBlocking {
// Should be main by default
simple().collect { log("Got: $it") }
}
private fun simple(): Flow<Int> = flow {log("Started the simple flow")
for (i in 1..3) {delay(100)
log("Producing $i")
emit(i)
Thread.sleep(50)
}
}.flowOn(Dispatchers.Default)
//[DefaultDispatcher-worker-1 @coroutine#2] Started the simple flow
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 1
//[main @coroutine#1] Got: 1
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 2
//[main @coroutine#1] Got: 2
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 3
//[main @coroutine#1] Got: 3
这回就谐和多了,后盾搞生产,UI 只展现,完满!还须要特地留神的是 函数 flowOn 只影响它的上游,不影响它的上游,更不会影响终端 ,终端永远都在 其调用者的上下文 中,来看一个🌰:
withContext(Dispatchers.Main) {
val singleValue = intFlow // will be executed on IO if context wasn't specified before
.map {...} // Will be executed in IO
.flowOn(Dispatchers.IO)
.filter {...} // Will be executed in Default
.flowOn(Dispatchers.Default)
.single() // Will be executed in the Main}
第一个 flowOn 切到 IO,只影响到它后面的创立和 map,第二次切换到Default,只影响 filter。single 是终端,是在Main,因为它的调用者是在Main 外面。
留神,留神: Flow 是一个数据流,放弃其数据流的特点是相当重要的,无论是失常数据,异样数据,还是出错都是一种数据,应该让其自上而下的流动,在 中游变幻时或者终端时通过操作符来解决。所以,像硬性的上下文切换,或者异样的 try/catch 都是不容许的。这就是所谓的流的不变性(Flow invariant)。前面讲异样时还会提到这点。
任意上下文的 Flow builders
从后面的学习咱们晓得了,下下文保留的个性,终端会决定上游生产者的上下文,当然也能够通过 flowOn 来扭转上下文。Flow builder 其实就是一个生产者,异步的 emit 数据。但有些时候生产数据时的上下文,也就是调用 emit 时的上下文,是不确定的。比如说安卓 下面的各种回调(callback)有些是回调在调用者的线程里,有些则不是。flow {…}中的 emit 就不能在异步的回调外面调用,这时就要用 callbackFlow {…}。callbackFlow 专门实用于把现有的一些回调转为 Flow,最典型的利用就是地位信息:
fun locationFlow(): Flow<Location> = callbackFlow {
val listener = object : LocationListener {override fun onLocationUpdate(loc: Location) {trySend(location)
}
}
locationManager.reqisterLocaitonUpdates(listener)
awaitClose {locationManager.unregisterLocationUpdates(listener)
}
}
如果这个 Flow,用 flow {}去创立会抛异样,因为 emit 没法在回调中应用。callbackFlow 会在回调中发射数据,并在 awaitClose 代码块中反注册回调以清理资源。awaitClose 会在这个流完结时(实现或者被勾销)被回调到,以有机会进行资源清理。
其实,无论是 flow {}还是 callbackFlow {}都是 channelFlow {}的简单化,channelFlow 非常复杂,也超级弱小,它能够自带 buffer,自带并发,实用于创立一些非常复杂的 Flow。在少数时候 flow {}和 callbackFlow {}就够咱们用了。
扩大浏览:
- Android Kotlin Coroutines: what is the difference between flow, callbackFlow, channelFlow,… other flow constructors
- Kotlin 协程四 —— Flow 和 Channel 的利用
- [[译]轻松学习 Kotlin 的 Flow、ChannelFlow 和 CallbackFlow](https://juejin.cn/post/7220593395420627004)
- 轻松搞定 Kotlin 的 Flow, ChannelFlow 和 CallbackFlow – 2
副作用函数
Flow 是一个数据流,核心思想是把数据的生产和解决和最终生产离开,上游只负责生产数据,各种操作都应该由中游操作符来做,最终数据由终端生产掉。须要增强数据的封装性,和流的不变性,不毁坏管道,用各种转换器来对数据进行操作。那么,对于流何时开始,每个数据何时产生,流什么时候终止,这些事件对于调试来说是很有帮忙的。Flow 的设计者给出了一系列副作用函数来做之些事件。副作用的意思就是这些函数不会对流自身产生影响。
- onStart Flow 开始生产之前会调用此函数。
- onEach 在生产 (emit) 每个数据之前调用此函数,这个函数最罕用被用来打日志,以查看每个产生的数据。
- onCompletion 当 Flow 终止时或者被勾销后会调用此函数。
- onSubscritpion 有消费者了时调用此函数(也就是有人 collect 了此 Flow 时)。
异样,勾销和错误处理
这一大节重点来看看非正常代码逻辑的解决。先来看看异样解决(Exception handling)。
用 catch 函数来解决 Flow 过程中的异样
代码随时都可能抛出异样,所以异样解决是一个必须要思考的事件。当然能够在 Flow 的各个节点如上游生产,中游变幻和上游终端的代码块外面各种 try/catch。一来是不够优雅,再者这会毁坏 Flow 的不变性或者说一致性,它就是管道,数据在外面流动,不应该加以过多的烦扰,想要对数据处理应该用操作符。也就是说要让异样(包含其余谬误也是如此)对 Flow 是通明的,意思是说 Flow 并不关怀是否有异样。所以提供了一个 catch 函数,它的作用是捕捉并解决上游操作中产生的异样:
simple()
.catch {e -> emit("Caught $e") } // emit on exception
.collect {value -> println(value) }
须要留神 catch 与 flowOn 一样,只影响上游产生的异样,管不了上游:
flow {emitData() }
.map {computeOne(it) }
.catch {...} // catches exceptions in emitData and computeOne
.map {computeTwo(it) }
.collect {process(it) } // throws exceptions from process and computeTwo
勾销 Flow
Flow 没有显式的勾销函数。Flow 是冷流,有消费者时才会去生产数据,消费者进行生产了,Flow 天然也就被勾销了。终端操作都是 suspend 的,也就是要在协程中调用,因而 勾销终端调用的协程,就会勾销 Flow。
错误处理
其实没有特地的谬误处理函数,后面的异样算是一个,如果上游没有抛出异样,就不会有其余谬误了,因为谬误也是数据的一种类型,并且是由咱们本人依据场景来定义的。比如说从网络获取新闻列表,失常时的数据当然是一个个的新闻条目。出错了,比方无网络,或者服务器无响应,这时可能返回一个空的条目,外面有谬误的具体信息。但这都是由业务逻辑决定的,是业务逻辑层面的货色。对于 Flow 而言,都还是有数据的,都是一种数据,具体数据的解读,那是消费者终端的事件,Flow 并不关怀。
惟一算得上错误处理的函数就是 onEmpty,它会在 Flow 是空的时候,也就是不生产任何数据的时候被回调。能够在 onEmpty 外面生产 emit 数据,比方产生一个带有错误信息的数据,或者产生一个默认值。因为 Flow 为空,不产生 emit 任何数据时,管子是空的数据没有流动,Flow 的整个链路,特地是终端 collect 是不会被执行的,这时可能会有问题,比方 UI 根本无法做出任何 react,除非你设置过了默认 UI 状态,否则可能会不对。这个时候如果用 onEmpty 去产生一些默认值或者错误信息的话,就能激活整个 Flow,终端能做出预期的响应。
重试机制
另一个十分有用的函数就是 retry,它能够预设一个条件,当条件满足时就会触发从新 collect。Flow 是冷流,有消费者 collect 时才会触发生产者 emit 数据,因而从新 collect 就能让 Flow 从新 emit 数据流。
背压
Flow 是异步数据流,响应式编程范式,上游生产数据,上游终端生产数据。有时候可能会遇到这样一种状况,就是上游数据生产的速度超过了上游终端的生产速度,这会造成数据流积压在管道中,终端无奈及时响应。这种状况称为『背压(Back pressure)』。想像一下一个水管,如果进水速度大于水龙头流出的速度,水就会积压在水管里,如果水管是比拟单薄的(如气球),那么它会收缩,最初爆掉。
通常状况下,当上游是较为可控的生产者时,不会产生背压,但如果是一些不是开发人员可控的,如硬件(触摸事件,地位信息,传感器,摄像头),其余零碎(零碎框架的回调,或者服务器的 Push)等等,就会产生背压,这时必须进行相应的解决。所有的 FRP 式异步数据流 API 都必须解决『背压』,Flow 也有相应的 API 来解决:
- buffer 把生产者的 emit 的数据缓存,而后用 Channel 以并发的形式流向中游和上游,能够简略了解为并发地调用 collect。失常状况下 Flow 是程序的(Sequentially),就是数据从上游到中游再到终端,按程序流动,先生产的数据先流到 collect,这就是程序的数据流 sequentially。用上 buffer 后,就是会是并发的流,先 emit 的数据不肯定先到 collect,这就是 concurrently。显著,能用 buffer 的前提是终端解决数据时没有对数据程序的依赖。
- conflate 也会像 buffer 一样启动并发式 emit 数据,但未能及时被终端生产掉的数据会被抛弃,终端只解决最新数据。
- collectLatest 当有新的数据流进去时,终端只解决最新的数据,此之的终端解决会被勾销掉(如果还没有解决完)。
转为热流
惯例的 Flow 都是冷的 (cold flow),但有时热流(hot flow) 也有它的利用场景,Flow API 中也有创立热流的办法。
StateFlow
StateFlow 是一个『状态持有』流,它仅蕴含一个以后元素 value,能够用过 update 来更新此状态。它是一个热流,能够有多个终端 colloctor,每次更新都会把以后的值 emit 给所有的终端。
能够用构造方法 MutableStateFlow 创立一个 StateFlow,或者通过函数 stateIn 来把一个冷流转化为一个 StateFlow。
StateFlow 是比拟罕用的,在安卓开发中,简直所有的 ViewModel 都会用 StateFlow 来暂存 UI 状态数据。
SharedFlow
比 StateFlow 更为通用的便是通用的热流 SharedFlow。能够通过构造方法 MutableSharedFlow 来创立 SharedFlow,或者通过函数 sharedIn 把一个冷流转为 SharedFlow。
SharedFlow 能够有多个终端 collector,所以能够实现一对多的告诉,如实现观察者模式,或者像设置 / 配置更新,或者播送等等就能够思考用 SharedFlow 来实现。
扩大浏览:
- StateFlow and SharedFlow
- SharedFlow vs StateFlow,一篇看懂抉择和应用技巧
- Kotlin SharedFlow&StateFlow 热流到底有多热?
- ShareFlow 与 StateFlow 实战
欢送搜寻并关注 公众号「罕见猿诉」 获取更多的优质文章!
原创不易,「打赏」,「点赞」,「在看」,「珍藏」,「分享」 总要有一个吧!