Kotlin协程中的Flow次要用于解决简单的异步数据,以一种”流“的形式,从上到下顺次解决,和RxJava的解决形式类型,然而比后者更加弱小。
Flow基本概念
Flow中基本上有三个概念,即 发送方,解决中间层,接管方,能够类比水利发电站中的上游,发电站,上游的概念, 数据从上游开始发送”流淌“至两头站被”解决“了一下,又流淌到了上游。
示例代码如下
flow { // 发送方、上游 emit(1) // 挂起函数,发送数据 emit(2) emit(3) emit(4) emit(5)}.filter { it > 2 } // 中转站,解决数据.map { it * 2 }.take(2).collect{ // 接管方,上游 println(it)}输入内容:68
通过下面代码咱们能够看到,基于一种链式调用api的形式,流式的进行解决数据还是很棒的,接下来具体看一下下面的组成:
- flow{},是个高阶函数,次要用于创立一个新的Flow。在其Lambda函数外部应用了emit()挂起函数进行发送数据。
- filter{}、map{}、take{},属于两头解决层,也是两头数据处理的操作符,Flow最大的劣势,就是它的操作符跟汇合操作符高度一致。只有会用List、Sequence,那么就能够疾速上手 Flow 的操作符。
- collect{},上游接管方,也成为终止操作符,它的作用其实只有一个:终止Flow数据流,并且接管这些数据。
其余创立Flow的形式还是flowOf()函数,示例代码如下
fun main() = runBlocking{aassssssssaaaaaaaas flowOf(1,2,3,4,5).filter { it > 2 } .map { it * 2 } .take(2) .collect{ println("flowof: $it") }}
咱们在看一下list汇合的操作示例
listOf(1,2,3,4,5).filter { it > 2 } .map { it * 2 } .take(2) .forEach{ println("listof: $it") }
通过以上比照发现,两者的基本操作简直统一,Kotlin也提供了两者互相转换的API,Flow.toList()、List.asFlow()这两个扩大函数,让数据在 List、Flow 之间来回转换,示例代码如下:
//flow 转list flowOf(1,2,3) .toList() .filter { it > 1 } .map { it * 2 } .take(2) .forEach{ println(it) } // list 转 flow listOf(1,2,3).asFlow() .filter { it > 2 } .map { it * 2 } .take(2) .collect{ println(it) }
Flow生命周期
尽管从下面操作看和汇合类型,然而Flow还是有些非凡操作符的,毕竟它是协程的一部分,和Channel不同,Flow是有生命周期的,只是以操作符的模式回调而已,比方onStart、onCompletion这两个两头操作符。
flowOf(1,2,3,4,5,6) .filter { println("filter: $it") it > 3 } .map { println("map: $it") it * 2 } .take(2) .onStart { println("onStart") } .collect{ println("collect: $it") }输入内容:onStartfilter: 1filter: 2filter: 3filter: 4map: 4collect: 8filter: 5map: 5collect: 10
咱们能够看到onStart,它的作用是注册一个监听事件:当 flow 启动当前,它就会被回调。
和filter、map、take这些两头操作符不同,他们的程序会影响数据的处理结果,这也很好了解;onStart和地位没有关系,它实质上是一个回调,不是一个数据处理的两头站。同样的还有数据处理实现的回调onCompletion。
flowOf(1,2,3,4,5,6) .filter { println("filter: $it") it > 3 } .map { println("map: $it") it * 2 } .take(2) .onStart { println("onStart") } .onCompletion { println("onCompletion") } .collect{ println("collect: $it") }
Flow中onCompletion{} 在面对以下三种状况时都会进行回调:
- 1,Flow 失常执行结束
- 2,Flow 当中出现异常
- 3,Flow 被勾销。
解决异样
在数据流的处理过程中,很难保障不呈现问题,那么出现异常之后再该怎么解决呢?
- 对于产生在上游、两头操作这两个阶段的异样,咱们能够间接应用 catch 这个操作符来进行捕捉和进一步解决。
- 对于产生在上游,应用try-catch,把collect{}当中可能呈现问题的代码包裹起来进行捕捉解决。
上游或者两头异样应用catch
fun main() = runBlocking{ val flow = flow { emit(1) emit(2) throw IllegalStateException() emit(3) } flow.map { it * 2 } .catch { println("catch: $it") } .collect{ println("collect: $it") }}输入:collect: 2collect: 4catch: java.lang.IllegalStateException
catch 这个操作符的作用是和它的地位强相干的,catch 的作用域,仅限于catch的上游。换句话说,产生在 catch 上游的异样,才会被捕捉,产生在 catch 上游的异样,则不会被捕捉。
val flow = flow { emit(1) emit(2) throw IllegalStateException() emit(3) } flow.map { it * 2 } .catch { println("catch: $it") } .filter { it / 0 > 1 } // catch之后产生异样 .collect{ println("collect: $it") }输入内容:Exception in thread "main" java.lang.ArithmeticException: / by zero
上游应用try-catch
flowOf(1,2,3) .onCompletion { println("onCompletion $it") } .collect{ try { println("collect: $it") throw IllegalStateException(); }catch (e: Exception){ println("catch $e") } }输入:collect: 1catch java.lang.IllegalStateExceptioncollect: 2catch java.lang.IllegalStateExceptioncollect: 3catch java.lang.IllegalStateExceptiononCompletion null
切换执行线程
Flow适宜解决简单的异步工作,大多数状况下耗时工作放在子线程或线程池中解决,对于UI工作放在主线程中进行。
在Flow中能够应用flowOn操作符实现上述场景中的线程切换。
flowOf(1,2,3,4,5) .filter { logX("filter: $it") it > 2 } .flowOn(Dispatchers.IO) // 切换线程 .collect{ logX("collect: $it") }输入内容:================================filter: 1Thread:DefaultDispatcher-worker-1================================================================filter: 2Thread:DefaultDispatcher-worker-1================================================================filter: 3Thread:DefaultDispatcher-worker-1================================================================filter: 4Thread:DefaultDispatcher-worker-1================================================================filter: 5Thread:DefaultDispatcher-worker-1================================================================collect: 3Thread:main================================================================collect: 4Thread:main================================================================collect: 5Thread:main================================
flowOn 操作符也是和它的地位强相干的。作用域限于它的上游。在下面的代码中,flowOn 的上游,就是 flowOf{}、filter{} 当中的代码,所以,它们的代码全都运行在 DefaultDispatcher 这个线程池当中。只有collect{}当中的代码是运行在 main 线程当中的。
终止操作符
Flow 外面,最常见的终止操作符就是collect。除此之外,还有一些从汇合中借鉴过去的操作符,也是Flow的终止操作符。比方 first()、single()、fold{}、reduce{},实质上来说说当咱们尝试将 Flow 转换成汇合的时候,曾经不属于Flow的API,也不属于协程的领域了,它自身也就意味着 Flow 数据流的终止。
"冷的数据流"从何而来
在下面文章《Kotlin协程Channel浅析》中,咱们意识到Channel是”热数据流“,随时筹备好,随用随取,就像海底捞里的服务员。
当初咱们看下Flow和Channel的区别
val flow = flow { (1..4).forEach{ println("Flow发送前:$it") emit(it) println("Flow发送后: $it") } } val channel: ReceiveChannel<Int> = produce { (1..4).forEach{ println("Channel发送前: $it") send(it) println("Channel发送后: $it") } } 输入内容:Channel发送前: 1
Flow中的逻辑并未执行,因而咱们能够这样类比,Channel之所以被认为是“热”的起因,是因为不论有没有接管方,发送方都会工作。那么对应的,Flow被认为是“冷”的起因,就是因为只有调用终止操作符之后,Flow才会开始工作。
除此之外,Flow一次解决一条数据,是个”懒家伙“。
val flow = flow { (3..6).forEach { println("Flow发送前:$it") emit(it) println("Flow发送后: $it") } }.filter { println("filter: $it") it > 3 }.map { println("map: $it") it * 2 }.collect { println("后果collect: $it") }输入内容:Flow发送前:3filter: 3Flow发送后: 3Flow发送前:4filter: 4map: 4后果collect: 8Flow发送后: 4Flow发送前:5filter: 5map: 5后果collect: 10Flow发送后: 5Flow发送前:6filter: 6map: 6后果collect: 12Flow发送后: 6
相比于满面春风,热情服务的Channel,Flow更像个冷酷的家伙,你不找他,他不搭理你。
- Channel,响应速度快,但数据可能是旧的,占用资源
- Flow,响应速度慢,但数据是最新的,节俭资源
Flow也能够是”热“的,你晓得吗?