公众号「罕见猿诉」 原文链接 专家之路上的Flow高级秘籍
『君不见,黄河之水天上来,奔流到海不复回。』
学习与河流一样,一方面学无止境,又是逆水行舟,逆水行舟,因为其他人都在卷。前文一篇文章讲了Flow的根底,大多数状况下够用了,然而不能进行卷,因为你不卷,就会被他人卷。一旦波及到简单的利用场景,就须要用到一些高级的API。明天就来学习一下Flow的高级个性,当遇到问题时也能更从容的应答。
上下文切换
Flow是基于协程的,是用协程来实现并发,后面也提到过像[flow {...}](),在上游生产数据,以及中游做变幻时,都是能够间接调用suspend,耗时甚至是阻塞的函数的。而终端操作符如[collect]()则是suspend的,调用者(也就是消费者)须要负责确保collect是在协程中调用。咱们还晓得Flow是是冷流,消费者终端才会触发上游生产者生产,所以对于flow {...}来说,它的上游和中游运行的上下文来自于终端调用者的上下文,这个叫做『上下文保留』(context preservation),咱们能够用一个 来验证一下:
fun main() = runBlocking { // Should be main by default simple().collect { log("Got: $it") } // Collect in a specified context withContext(Dispatchers.Default) { simple().collect { log("Now got: $it") } }}private fun simple(): Flow<Int> = flow { log("Started the simple flow") for (i in 1..3) { delay(100) log("Producing $i") emit(i) }}
输入如下:
[main @coroutine#1] Started the simple flow[main @coroutine#1] Producing 1[main @coroutine#1] Got: 1[main @coroutine#1] Producing 2[main @coroutine#1] Got: 2[main @coroutine#1] Producing 3[main @coroutine#1] Got: 3[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow[DefaultDispatcher-worker-1 @coroutine#1] Producing 1[DefaultDispatcher-worker-1 @coroutine#1] Now got: 1[DefaultDispatcher-worker-1 @coroutine#1] Producing 2[DefaultDispatcher-worker-1 @coroutine#1] Now got: 2[DefaultDispatcher-worker-1 @coroutine#1] Producing 3[DefaultDispatcher-worker-1 @coroutine#1] Now got: 3
从这个 能够分明的看到,Flow的context是来自于终端调用者的。
用flowOn来指定上下文
有时候应用终端调用者的上下文可能不太不便,因为生产者与消费者的模式其实是解耦的,它们不应该互相受制于对方,对于要害的并发的上下文更是如此。比如说在GUI的利用中,显著应该在工作线程中生产数据,在UI线程中生产数据,从下面的例子来看,由终端调用者来决定上游上下文显著不可取。有同学举手了,欺侮我没学过协程是吧?我能够在Flow外部应用withContext来指定上下文啊,咱们来试试:
fun main() = runBlocking { // Should be main by default simple().collect { log("Got: $it") }}private fun simple(): Flow<Int> = flow { withContext(Dispatchers.Default) { log("Started the simple flow") for (i in 1..3) { delay(100) log("Producing $i") emit(i) } }}
这位同学能够间接进来了,因为你的代码crash 了:
[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow[DefaultDispatcher-worker-1 @coroutine#1] Producing 1Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@545486c7, BlockingEventLoop@13bfcf14], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@27015c5a, Dispatchers.Default]. Please refer to 'flow' documentation or use 'flowOn' instead
意思大略是说Flow外部不让间接用withContext来切上下文,毁坏了Flow的不变式,想切上下文要用flowOn。而且认真看,异样是由emit函数抛出来的。
其实Flow的设计者曾经思考到了这个问题,并且给出了优雅的形式,如果想切换Flow外部(也即上游和中游)的运行上下文,要用flowOn函数:
fun main() = runBlocking { // Should be main by default simple().collect { log("Got: $it") }}private fun simple(): Flow<Int> = flow { log("Started the simple flow") for (i in 1..3) { delay(100) log("Producing $i") emit(i) Thread.sleep(50) }}.flowOn(Dispatchers.Default)//[DefaultDispatcher-worker-1 @coroutine#2] Started the simple flow//[DefaultDispatcher-worker-1 @coroutine#2] Producing 1//[main @coroutine#1] Got: 1//[DefaultDispatcher-worker-1 @coroutine#2] Producing 2//[main @coroutine#1] Got: 2//[DefaultDispatcher-worker-1 @coroutine#2] Producing 3//[main @coroutine#1] Got: 3
这回就谐和多了,后盾搞生产,UI只展现,完满!还须要特地留神的是函数flowOn只影响它的上游,不影响它的上游,更不会影响终端,终端永远都在其调用者的上下文中,来看一个 :
withContext(Dispatchers.Main) { val singleValue = intFlow // will be executed on IO if context wasn't specified before .map { ... } // Will be executed in IO .flowOn(Dispatchers.IO) .filter { ... } // Will be executed in Default .flowOn(Dispatchers.Default) .single() // Will be executed in the Main}
第一个flowOn切到IO,只影响到它后面的创立和map,第二次切换到Default,只影响filter。single是终端,是在Main,因为它的调用者是在Main外面。
留神,留神: Flow是一个数据流,放弃其数据流的特点是相当重要的,无论是失常数据,异样数据,还是出错都是一种数据,应该让其自上而下的流动,在中游变幻时或者终端时通过操作符来解决。所以,像硬性的上下文切换,或者异样的try/catch都是不容许的。这就是所谓的流的不变性(Flow invariant)。前面讲异样时还会提到这点。
任意上下文的Flow builders
从后面的学习咱们晓得了,下下文保留的个性,终端会决定上游生产者的上下文,当然也能够通过flowOn来扭转上下文。Flow builder其实就是一个生产者,异步的emit数据。但有些时候生产数据时的上下文,也就是调用emit时的上下文,是不确定的。比如说安卓 下面的各种回调(callback)有些是回调在调用者的线程里,有些则不是。flow {...}中的emit就不能在异步的回调外面调用,这时就要用callbackFlow {...}。callbackFlow专门实用于把现有的一些回调转为Flow,最典型的利用就是地位信息:
fun locationFlow(): Flow<Location> = callbackFlow { val listener = object : LocationListener { override fun onLocationUpdate(loc: Location) { trySend(location) } } locationManager.reqisterLocaitonUpdates(listener) awaitClose { locationManager.unregisterLocationUpdates(listener) }}
如果这个Flow,用flow {}去创立会抛异样,因为emit没法在回调中应用。callbackFlow会在回调中发射数据,并在awaitClose代码块中反注册回调以清理资源。awaitClose会在这个流完结时(实现或者被勾销)被回调到,以有机会进行资源清理。
其实,无论是flow {}还是callbackFlow {}都是channelFlow {}的简单化,channelFlow非常复杂,也超级弱小,它能够自带buffer,自带并发,实用于创立一些非常复杂的Flow。在少数时候flow {}和callbackFlow {}就够咱们用了。
扩大浏览:
- Android Kotlin Coroutines: what is the difference between flow, callbackFlow, channelFlow,... other flow constructors
- Kotlin 协程四 —— Flow 和 Channel 的利用
- [[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow](https://juejin.cn/post/7220593395420627004)
- 轻松搞定Kotlin的Flow, ChannelFlow和CallbackFlow - 2
副作用函数
Flow是一个数据流,核心思想是把数据的生产和解决和最终生产离开,上游只负责生产数据,各种操作都应该由中游操作符来做,最终数据由终端生产掉。须要增强数据的封装性,和流的不变性,不毁坏管道,用各种转换器来对数据进行操作。那么,对于流何时开始,每个数据何时产生,流什么时候终止,这些事件对于调试来说是很有帮忙的。Flow的设计者给出了一系列副作用函数来做之些事件。副作用的意思就是这些函数不会对流自身产生影响。
- onStart Flow开始生产之前会调用此函数。
- onEach 在生产(emit)每个数据之前调用此函数,这个函数最罕用被用来打日志,以查看每个产生的数据。
- onCompletion 当Flow终止时或者被勾销后会调用此函数。
- onSubscritpion 有消费者了时调用此函数(也就是有人collect了此Flow时)。
异样,勾销和错误处理
这一大节重点来看看非正常代码逻辑的解决。先来看看异样解决(Exception handling)。
用catch函数来解决Flow过程中的异样
代码随时都可能抛出异样,所以异样解决是一个必须要思考的事件。当然能够在Flow的各个节点如上游生产,中游变幻和上游终端的代码块外面各种try/catch。一来是不够优雅,再者这会毁坏Flow的不变性或者说一致性,它就是管道,数据在外面流动,不应该加以过多的烦扰,想要对数据处理应该用操作符。也就是说要让异样(包含其余谬误也是如此)对Flow是通明的,意思是说Flow并不关怀是否有异样。所以提供了一个catch函数,它的作用是捕捉并解决上游操作中产生的异样:
simple() .catch { e -> emit("Caught $e") } // emit on exception .collect { value -> println(value) }
须要留神catch与flowOn一样,只影响上游产生的异样,管不了上游:
flow { emitData() } .map { computeOne(it) } .catch { ... } // catches exceptions in emitData and computeOne .map { computeTwo(it) } .collect { process(it) } // throws exceptions from process and computeTwo
勾销Flow
Flow没有显式的勾销函数。Flow是冷流,有消费者时才会去生产数据,消费者进行生产了,Flow天然也就被勾销了。终端操作都是suspend的,也就是要在协程中调用,因而勾销终端调用的协程,就会勾销Flow。
错误处理
其实没有特地的谬误处理函数,后面的异样算是一个,如果上游没有抛出异样,就不会有其余谬误了,因为谬误也是数据的一种类型,并且是由咱们本人依据场景来定义的。比如说从网络获取新闻列表,失常时的数据当然是一个个的新闻条目。出错了,比方无网络,或者服务器无响应,这时可能返回一个空的条目,外面有谬误的具体信息。但这都是由业务逻辑决定的,是业务逻辑层面的货色。对于Flow而言,都还是有数据的,都是一种数据,具体数据的解读,那是消费者终端的事件,Flow并不关怀。
惟一算得上错误处理的函数就是onEmpty,它会在Flow是空的时候,也就是不生产任何数据的时候被回调。能够在onEmpty外面生产emit数据,比方产生一个带有错误信息的数据,或者产生一个默认值。因为Flow为空,不产生emit任何数据时,管子是空的数据没有流动,Flow的整个链路,特地是终端collect是不会被执行的,这时可能会有问题,比方UI根本无法做出任何react,除非你设置过了默认UI状态,否则可能会不对。这个时候如果用onEmpty去产生一些默认值或者错误信息的话,就能激活整个Flow,终端能做出预期的响应。
重试机制
另一个十分有用的函数就是retry,它能够预设一个条件,当条件满足时就会触发从新collect。Flow是冷流,有消费者collect时才会触发生产者emit数据,因而从新collect就能让Flow从新emit数据流。
背压
Flow是异步数据流,响应式编程范式,上游生产数据,上游终端生产数据。有时候可能会遇到这样一种状况,就是上游数据生产的速度超过了上游终端的生产速度,这会造成数据流积压在管道中,终端无奈及时响应。这种状况称为『背压(Back pressure)』。想像一下一个水管,如果进水速度大于水龙头流出的速度,水就会积压在水管里,如果水管是比拟单薄的(如气球),那么它会收缩,最初爆掉。
通常状况下,当上游是较为可控的生产者时,不会产生背压,但如果是一些不是开发人员可控的,如硬件(触摸事件,地位信息,传感器,摄像头),其余零碎(零碎框架的回调,或者服务器的Push)等等,就会产生背压,这时必须进行相应的解决。所有的FRP式异步数据流API都必须解决『背压』,Flow也有相应的API来解决:
- buffer 把生产者的emit的数据缓存,而后用Channel以并发的形式流向中游和上游,能够简略了解为并发地调用collect。失常状况下Flow是程序的(Sequentially),就是数据从上游到中游再到终端,按程序流动,先生产的数据先流到collect,这就是程序的数据流sequentially。用上buffer后,就是会是并发的流,先emit的数据不肯定先到collect,这就是concurrently。显著,能用buffer的前提是终端解决数据时没有对数据程序的依赖。
- conflate 也会像buffer一样启动并发式emit数据,但未能及时被终端生产掉的数据会被抛弃,终端只解决最新数据。
- collectLatest 当有新的数据流进去时,终端只解决最新的数据,此之的终端解决会被勾销掉(如果还没有解决完)。
转为热流
惯例的Flow都是冷的(cold flow),但有时热流(hot flow)也有它的利用场景,Flow API中也有创立热流的办法。
StateFlow
StateFlow是一个『状态持有』流,它仅蕴含一个以后元素value,能够用过update来更新此状态。它是一个热流,能够有多个终端colloctor,每次更新都会把以后的值emit给所有的终端。
能够用构造方法MutableStateFlow创立一个StateFlow,或者通过函数stateIn来把一个冷流转化为一个StateFlow。
StateFlow是比拟罕用的,在安卓开发中,简直所有的ViewModel都会用StateFlow来暂存UI状态数据。
SharedFlow
比StateFlow更为通用的便是通用的热流SharedFlow。能够通过构造方法MutableSharedFlow来创立SharedFlow,或者通过函数sharedIn把一个冷流转为SharedFlow。
SharedFlow能够有多个终端collector,所以能够实现一对多的告诉,如实现观察者模式,或者像设置/配置更新,或者播送等等就能够思考用SharedFlow来实现。
扩大浏览:
- StateFlow and SharedFlow
- SharedFlow vs StateFlow,一篇看懂抉择和应用技巧
- Kotlin SharedFlow&StateFlow 热流到底有多热?
- ShareFlow与StateFlow实战
欢送搜寻并关注 公众号「罕见猿诉」 获取更多的优质文章!
原创不易,「打赏」,「点赞」,「在看」,「珍藏」,「分享」 总要有一个吧!