公众号「罕见猿诉」        原文链接 专家之路上的Flow高级秘籍

『君不见,黄河之水天上来,奔流到海不复回。』

学习与河流一样,一方面学无止境,又是逆水行舟,逆水行舟,因为其他人都在卷。前文一篇文章讲了Flow的根底,大多数状况下够用了,然而不能进行卷,因为你不卷,就会被他人卷。一旦波及到简单的利用场景,就须要用到一些高级的API。明天就来学习一下Flow的高级个性,当遇到问题时也能更从容的应答。

上下文切换

Flow是基于协程的,是用协程来实现并发,后面也提到过像[flow {...}](),在上游生产数据,以及中游做变幻时,都是能够间接调用suspend,耗时甚至是阻塞的函数的。而终端操作符如[collect]()则是suspend的,调用者(也就是消费者)须要负责确保collect是在协程中调用。咱们还晓得Flow是是冷流,消费者终端才会触发上游生产者生产,所以对于flow {...}来说,它的上游和中游运行的上下文来自于终端调用者的上下文,这个叫做『上下文保留』(context preservation),咱们能够用一个 来验证一下:

fun main() = runBlocking {    // Should be main by default    simple().collect { log("Got: $it") }    // Collect in a specified context    withContext(Dispatchers.Default) {        simple().collect { log("Now got: $it") }    }}private fun simple(): Flow<Int> = flow {    log("Started the simple flow")    for (i in 1..3) {        delay(100)        log("Producing $i")        emit(i)    }}

输入如下:

[main @coroutine#1] Started the simple flow[main @coroutine#1] Producing 1[main @coroutine#1] Got: 1[main @coroutine#1] Producing 2[main @coroutine#1] Got: 2[main @coroutine#1] Producing 3[main @coroutine#1] Got: 3[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow[DefaultDispatcher-worker-1 @coroutine#1] Producing 1[DefaultDispatcher-worker-1 @coroutine#1] Now got: 1[DefaultDispatcher-worker-1 @coroutine#1] Producing 2[DefaultDispatcher-worker-1 @coroutine#1] Now got: 2[DefaultDispatcher-worker-1 @coroutine#1] Producing 3[DefaultDispatcher-worker-1 @coroutine#1] Now got: 3

从这个 能够分明的看到,Flow的context是来自于终端调用者的。

用flowOn来指定上下文

有时候应用终端调用者的上下文可能不太不便,因为生产者与消费者的模式其实是解耦的,它们不应该互相受制于对方,对于要害的并发的上下文更是如此。比如说在GUI的利用中,显著应该在工作线程中生产数据,在UI线程中生产数据,从下面的例子来看,由终端调用者来决定上游上下文显著不可取。有同学举手了,欺侮我没学过协程是吧?我能够在Flow外部应用withContext来指定上下文啊,咱们来试试:

fun main() = runBlocking {    // Should be main by default    simple().collect { log("Got: $it") }}private fun simple(): Flow<Int> = flow {    withContext(Dispatchers.Default) {        log("Started the simple flow")        for (i in 1..3) {            delay(100)            log("Producing $i")            emit(i)        }    }}

这位同学能够间接进来了,因为你的代码crash 了:

[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow[DefaultDispatcher-worker-1 @coroutine#1] Producing 1Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:        Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@545486c7, BlockingEventLoop@13bfcf14],        but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@27015c5a, Dispatchers.Default].        Please refer to 'flow' documentation or use 'flowOn' instead

意思大略是说Flow外部不让间接用withContext来切上下文,毁坏了Flow的不变式,想切上下文要用flowOn。而且认真看,异样是由emit函数抛出来的。

其实Flow的设计者曾经思考到了这个问题,并且给出了优雅的形式,如果想切换Flow外部(也即上游和中游)的运行上下文,要用flowOn函数:

fun main() = runBlocking {    // Should be main by default    simple().collect { log("Got: $it") }}private fun simple(): Flow<Int> = flow {    log("Started the simple flow")    for (i in 1..3) {        delay(100)        log("Producing $i")        emit(i)        Thread.sleep(50)    }}.flowOn(Dispatchers.Default)//[DefaultDispatcher-worker-1 @coroutine#2] Started the simple flow//[DefaultDispatcher-worker-1 @coroutine#2] Producing 1//[main @coroutine#1] Got: 1//[DefaultDispatcher-worker-1 @coroutine#2] Producing 2//[main @coroutine#1] Got: 2//[DefaultDispatcher-worker-1 @coroutine#2] Producing 3//[main @coroutine#1] Got: 3

这回就谐和多了,后盾搞生产,UI只展现,完满!还须要特地留神的是函数flowOn只影响它的上游,不影响它的上游,更不会影响终端,终端永远都在其调用者的上下文中,来看一个 :

withContext(Dispatchers.Main) {    val singleValue = intFlow // will be executed on IO if context wasn't specified before        .map { ... } // Will be executed in IO        .flowOn(Dispatchers.IO)        .filter { ... } // Will be executed in Default        .flowOn(Dispatchers.Default)        .single() // Will be executed in the Main}

第一个flowOn切到IO,只影响到它后面的创立和map,第二次切换到Default,只影响filter。single是终端,是在Main,因为它的调用者是在Main外面。

留神,留神: Flow是一个数据流,放弃其数据流的特点是相当重要的,无论是失常数据,异样数据,还是出错都是一种数据,应该让其自上而下的流动,在中游变幻时或者终端时通过操作符来解决。所以,像硬性的上下文切换,或者异样的try/catch都是不容许的。这就是所谓的流的不变性(Flow invariant)。前面讲异样时还会提到这点。

任意上下文的Flow builders

从后面的学习咱们晓得了,下下文保留的个性,终端会决定上游生产者的上下文,当然也能够通过flowOn来扭转上下文。Flow builder其实就是一个生产者,异步的emit数据。但有些时候生产数据时的上下文,也就是调用emit时的上下文,是不确定的。比如说安卓 下面的各种回调(callback)有些是回调在调用者的线程里,有些则不是。flow {...}中的emit就不能在异步的回调外面调用,这时就要用callbackFlow {...}。callbackFlow专门实用于把现有的一些回调转为Flow,最典型的利用就是地位信息:

fun locationFlow(): Flow<Location> = callbackFlow {    val listener = object : LocationListener {        override fun onLocationUpdate(loc: Location) {            trySend(location)        }    }    locationManager.reqisterLocaitonUpdates(listener)        awaitClose {        locationManager.unregisterLocationUpdates(listener)    }}

如果这个Flow,用flow {}去创立会抛异样,因为emit没法在回调中应用。callbackFlow会在回调中发射数据,并在awaitClose代码块中反注册回调以清理资源。awaitClose会在这个流完结时(实现或者被勾销)被回调到,以有机会进行资源清理。

其实,无论是flow {}还是callbackFlow {}都是channelFlow {}的简单化,channelFlow非常复杂,也超级弱小,它能够自带buffer,自带并发,实用于创立一些非常复杂的Flow。在少数时候flow {}和callbackFlow {}就够咱们用了。

扩大浏览:

  • Android Kotlin Coroutines: what is the difference between flow, callbackFlow, channelFlow,... other flow constructors
  • Kotlin 协程四 —— Flow 和 Channel 的利用
  • [[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow](https://juejin.cn/post/7220593395420627004)
  • 轻松搞定Kotlin的Flow, ChannelFlow和CallbackFlow - 2

副作用函数

Flow是一个数据流,核心思想是把数据的生产和解决和最终生产离开,上游只负责生产数据,各种操作都应该由中游操作符来做,最终数据由终端生产掉。须要增强数据的封装性,和流的不变性,不毁坏管道,用各种转换器来对数据进行操作。那么,对于流何时开始,每个数据何时产生,流什么时候终止,这些事件对于调试来说是很有帮忙的。Flow的设计者给出了一系列副作用函数来做之些事件。副作用的意思就是这些函数不会对流自身产生影响。

  • onStart Flow开始生产之前会调用此函数。
  • onEach 在生产(emit)每个数据之前调用此函数,这个函数最罕用被用来打日志,以查看每个产生的数据。
  • onCompletion 当Flow终止时或者被勾销后会调用此函数。
  • onSubscritpion 有消费者了时调用此函数(也就是有人collect了此Flow时)。

异样,勾销和错误处理

这一大节重点来看看非正常代码逻辑的解决。先来看看异样解决(Exception handling)。

用catch函数来解决Flow过程中的异样

代码随时都可能抛出异样,所以异样解决是一个必须要思考的事件。当然能够在Flow的各个节点如上游生产,中游变幻和上游终端的代码块外面各种try/catch。一来是不够优雅,再者这会毁坏Flow的不变性或者说一致性,它就是管道,数据在外面流动,不应该加以过多的烦扰,想要对数据处理应该用操作符。也就是说要让异样(包含其余谬误也是如此)对Flow是通明的,意思是说Flow并不关怀是否有异样。所以提供了一个catch函数,它的作用是捕捉并解决上游操作中产生的异样:

simple()    .catch { e -> emit("Caught $e") } // emit on exception    .collect { value -> println(value) }

须要留神catch与flowOn一样,只影响上游产生的异样,管不了上游:

flow { emitData() }    .map { computeOne(it) }    .catch { ... } // catches exceptions in emitData and computeOne    .map { computeTwo(it) }    .collect { process(it) } // throws exceptions from process and computeTwo

勾销Flow

Flow没有显式的勾销函数。Flow是冷流,有消费者时才会去生产数据,消费者进行生产了,Flow天然也就被勾销了。终端操作都是suspend的,也就是要在协程中调用,因而勾销终端调用的协程,就会勾销Flow。

错误处理

其实没有特地的谬误处理函数,后面的异样算是一个,如果上游没有抛出异样,就不会有其余谬误了,因为谬误也是数据的一种类型,并且是由咱们本人依据场景来定义的。比如说从网络获取新闻列表,失常时的数据当然是一个个的新闻条目。出错了,比方无网络,或者服务器无响应,这时可能返回一个空的条目,外面有谬误的具体信息。但这都是由业务逻辑决定的,是业务逻辑层面的货色。对于Flow而言,都还是有数据的,都是一种数据,具体数据的解读,那是消费者终端的事件,Flow并不关怀。

惟一算得上错误处理的函数就是onEmpty,它会在Flow是空的时候,也就是不生产任何数据的时候被回调。能够在onEmpty外面生产emit数据,比方产生一个带有错误信息的数据,或者产生一个默认值。因为Flow为空,不产生emit任何数据时,管子是空的数据没有流动,Flow的整个链路,特地是终端collect是不会被执行的,这时可能会有问题,比方UI根本无法做出任何react,除非你设置过了默认UI状态,否则可能会不对。这个时候如果用onEmpty去产生一些默认值或者错误信息的话,就能激活整个Flow,终端能做出预期的响应。

重试机制

另一个十分有用的函数就是retry,它能够预设一个条件,当条件满足时就会触发从新collect。Flow是冷流,有消费者collect时才会触发生产者emit数据,因而从新collect就能让Flow从新emit数据流。

背压

Flow是异步数据流,响应式编程范式,上游生产数据,上游终端生产数据。有时候可能会遇到这样一种状况,就是上游数据生产的速度超过了上游终端的生产速度,这会造成数据流积压在管道中,终端无奈及时响应。这种状况称为『背压(Back pressure)』。想像一下一个水管,如果进水速度大于水龙头流出的速度,水就会积压在水管里,如果水管是比拟单薄的(如气球),那么它会收缩,最初爆掉。

通常状况下,当上游是较为可控的生产者时,不会产生背压,但如果是一些不是开发人员可控的,如硬件(触摸事件,地位信息,传感器,摄像头),其余零碎(零碎框架的回调,或者服务器的Push)等等,就会产生背压,这时必须进行相应的解决。所有的FRP式异步数据流API都必须解决『背压』,Flow也有相应的API来解决:

  • buffer 把生产者的emit的数据缓存,而后用Channel以并发的形式流向中游和上游,能够简略了解为并发地调用collect。失常状况下Flow是程序的(Sequentially),就是数据从上游到中游再到终端,按程序流动,先生产的数据先流到collect,这就是程序的数据流sequentially。用上buffer后,就是会是并发的流,先emit的数据不肯定先到collect,这就是concurrently。显著,能用buffer的前提是终端解决数据时没有对数据程序的依赖。
  • conflate 也会像buffer一样启动并发式emit数据,但未能及时被终端生产掉的数据会被抛弃,终端只解决最新数据。
  • collectLatest 当有新的数据流进去时,终端只解决最新的数据,此之的终端解决会被勾销掉(如果还没有解决完)。

转为热流

惯例的Flow都是冷的(cold flow),但有时热流(hot flow)也有它的利用场景,Flow API中也有创立热流的办法。

StateFlow

StateFlow是一个『状态持有』流,它仅蕴含一个以后元素value,能够用过update来更新此状态。它是一个热流,能够有多个终端colloctor,每次更新都会把以后的值emit给所有的终端。

能够用构造方法MutableStateFlow创立一个StateFlow,或者通过函数stateIn来把一个冷流转化为一个StateFlow。

StateFlow是比拟罕用的,在安卓开发中,简直所有的ViewModel都会用StateFlow来暂存UI状态数据。

SharedFlow

比StateFlow更为通用的便是通用的热流SharedFlow。能够通过构造方法MutableSharedFlow来创立SharedFlow,或者通过函数sharedIn把一个冷流转为SharedFlow。

SharedFlow能够有多个终端collector,所以能够实现一对多的告诉,如实现观察者模式,或者像设置/配置更新,或者播送等等就能够思考用SharedFlow来实现。

扩大浏览:

  • StateFlow and SharedFlow
  • SharedFlow vs StateFlow,一篇看懂抉择和应用技巧
  • Kotlin SharedFlow&StateFlow 热流到底有多热?
  • ShareFlow与StateFlow实战

欢送搜寻并关注 公众号「罕见猿诉」 获取更多的优质文章!

原创不易,「打赏」「点赞」「在看」「珍藏」「分享」 总要有一个吧!