原文链接 包教包会的Kotlin Flow教程 公众号「罕见猿诉」
Kotlin中的Flow是专门用于解决异步数据流的API,是函数响应式编程范式(Functional Reactive Programming FRP)在Kotlin上的一个实现,并且深度交融了Kotlin的协程。是Kotlin中解决异步数据流问题的首先计划。明天就来认识一下Flow并学会如何应用它。
Hello, Flow!
老规矩,新学习一个新货色的时候,总是要从一个根底的『Hello, world』开始,疾速上手体验,有个第一印象。咱们就从一个简略的『Hello, Flow!』开始Flow之旅:
fun main() = runBlocking { val simple = flow { listOf("Hello", "world", "of", "flows!") .forEach { delay(100) emit(it) } } simple.collect { println(it) }}//Hello//world//of//flows!
这里创立了一个异步产生String的数据流Flow<String>,会不定时的产生一个String,而后收集此数据流产生的数据,把流出的String对象生产掉。
能够看出Flow实质上是一个生产者消费者模式,流出的数据是由生产者产生的,且最终被消费者生产掉。能够把Flow想像成为一个生产线中的传送带,产品(数据)在下面不停的流动,通过各个站点的加工,最终成型,由消费者生产掉。从这个小例子中能够看出Flow API的三要素:数据流的上游是创立Flow(生产者);中游是变幻操作(数据的解决和加工);上游是收集数据(消费者),咱们一一的具体来学习。
创立Flow
Flow是一个生产者,创立Flow也就是把数据放到传送带上。数据能够是根底数据或者汇合,也能够是其余形式生成的数据,如网络或者回调或者硬件。创立Flow的API称作flow builder函数。
用汇合创立Flow
这是创立Flow的最简略的形式,有两个,一个是flowOf用于从固定数量的元素创立,多用于示例,理论中基本上用不到:
val simple = flowOf("Hello", "world", "of", "flows!")simple.collect { println(it) }
或者,通过asFlow把现有的汇合转为Flow,这个还是比拟实用的:
listOf("Hello", "world", "of", "flows!").asFlow() .collect { println(it) }(1..5).asFlow().collect { println(it) }
通用flow builder
最为通用的flow builder就是flow {...}了,这是最为通用,也是最为罕用的结构器。在代码块中调用emit就能够了,这个代码块会运行在协程之中,所以在这个代码里能够调用suspend函数:
fun main() = runBlocking { val simple = flow { for (i in 1..3) { delay(100) println("Emitting: $i") emit(i) } } simple.collect { println(it) }}//Emitting: 1//1//Emitting: 2//2//Emitting: 3//3
这是一个代码块,只有调用了emit产生数据即可,又可调用suspend函数,因而十分的实用,比方能够执行网络申请,申请回来后emit等等。
终端操作符
数据从生产者流出,直到消费者把数据收集起来进行生产,而只有数据被生产了才有意义。因而,还须要终端操作(Terminal flow operators)。须要留神的是终端操作符是Flow的起点,并不算是Flow传送带外部,因而终端操作都是suspend函数,调用者须要负责创立协程以失常调用这些suspending terminal operators。
常见的终端操作有三个:
- collect 最为通用的,可执行一个代码块,参数就是Flow流出的数据
- 转换为汇合Collections,如toList和toSet等,能够不便把收集到的数据转换为汇合
- 取特定的值,如first()只取第一个,last只取最初一个, single只有一个数据(无数据和超过一个数据时都会抛异样。
- 降维(或者叫作聚合accumulate)操作,如折叠fold和化约reduce,折叠和化约能够对数据流进行降维,如求和,求积,求最大值最小值等等。
- count 其实也是降维的一种,返回数据流中的数据个数,它还能够联合过滤以计算某种过滤条件后的数据数量。
fun main() = runBlocking { val simple = flow { for (i in 1..3) { delay(100) println("Emitting: $i") emit(i) } } simple.collect { println(it) } println("toList: ${simple.toList()}") println("first: ${simple.first()}") println("sum by fold: ${simple.fold(0) { s, a -> s + a }}")}
输入:
Emitting: 11Emitting: 22Emitting: 33Emitting: 1Emitting: 2Emitting: 3toList: [1, 2, 3]Emitting: 1first: 1Emitting: 1Emitting: 2Emitting: 3sum by fold: 6
这些终端操作符都简略,比拟好了解,看一眼示例就晓得怎么用了。须要留神的就是first()和single(),first是只接管数据流中的第一个,而single则要求数据流只能有一个数据(没有或者超过一个都会抛异样)。比拟有意思就是last(),数据流是一个流,一个产品传送带,通常状况下都是指有限或者说不确定数据 数量时才叫数据流,那又何来最初一个数据呢?通常状况下last都是无意义的。只有当咱们晓得流的生产者只生产无限数量数据时,或者采纳了一些限制性的变幻操作符时,last能力派上用场。
再有就是留神fold和reduce的区别,这里它们的区别跟汇合上的操作是一样的,fold能够提供初始值,流为空时返回初始值;而reduce没初始值,流为空时会抛异样。
变幻操作符
数据在流动的过程中能够对数据进行转化操作,从一种数据类型变别另外一种,这就是变幻(Transformation),这是数据流最为灵便和弱小的一个方面。这跟汇合的变幻是相似的。
转换
最常见的变幻就是转换,也就是把从一种数据类型转换为另一种数据类型,用的最多当然是map,还有更为通用的transform。它们都能把数据流中的数据从一种类型转换为另一种类型,比方把Flow<String>转为Flow<Int>。区别在于,map是死板的转换,一个对象进去,另一个对象作为返回值进去;但transform更为灵便,它并不是把新类型作为返回值,它能够像上游生产者那样产生(emit)新数据,甚至能够产生(emit)多个新数据,它是十分弱小的,所有其余的变幻操作符,都是基于transform实现的。
fun main() = runBlocking { val simple = flow { for (i in 1..3) { delay(100) println("Emitting: $i") emit(i) } } simple.map { " Mapping to ${it * it}" } .collect { println(it) } simple.transform { req -> emit(" Making request $req") emit(performRequest(req)) }.collect { println(it) }}fun performRequest(req: Int) = "Response for $req"
输入是:
Emitting: 1 Mapping to 1Emitting: 2 Mapping to 4Emitting: 3 Mapping to 9Emitting: 1 Making request 1Response for 1Emitting: 2 Making request 2Response for 2Emitting: 3 Making request 3Response for 3
还有一个操作符withIndex它与汇合中的mapIndexed是相似的,它的作用是把元素变成IndexedValue,这样在前面就能够失去元素和元素的索引 了,在某些场景下还是比拟不便的。
限度
数据流外面的数据不肯定都是须要的,所以通常须要对数据元素进行过滤,这就是限制性操作符,最常见的就是filter,这里与汇合的限度操作也是相似的:
- filter 把数据转为布尔型,从而对数据流进行过滤。
- distinctUntilChanged 过滤数据流中反复的元素。
- drop 抛弃后面肯定数量的元素。
- take 只返回流中后面肯定数量的元素,当数量达到时流将被勾销,留神take与drop是相同的。
- debounce 仅保留流中肯定超时距离内的元素,比方超时工夫是1秒,那只返回达到1秒时最新的元素,这个元素后面的将被抛弃。这个在秒杀场景拦挡疯狂点击,或者一个服务中拦挡疯狂申请时十分有用。只取肯定工夫距离内的最新的元素,拦挡掉有效数据。
- sample 以肯定的工夫距离取元素,与debounce差不多,区别在于debounce会返回最初一个元素,而sample不肯定,要看距离最初一个元素是否落在一个工夫距离内。
@OptIn(FlowPreview::class)fun main() = runBlocking { val constraint = flow { emit(1) delay(90) emit(2) delay(90) emit(3) delay(1010) emit(4) delay(1010) emit(5) } constraint.filter { it % 2 == 0 } .collect { println("filter: $it") } constraint.drop(3) .collect { println("drop(3): $it") } constraint.take(3) .collect { println("take(3): $it") } constraint.debounce(1000) .collect { println("debounce(1000): $it") } constraint.sample(1000) .collect { println("sample(1000): $it") }}
认真看它们的输入,以了解它们的作用:
filter: 2filter: 4drop(3): 4drop(3): 5take(3): 1take(3): 2take(3): 3debounce(1000): 3debounce(1000): 4debounce(1000): 5sample(1000): 3sample(1000): 4
须要注意,debounce和sample是Preview的API,须要加上Preview注解。
中游的变幻操作符仍属于流的一部分,它们都仍运行在Flow的上下文中,因而,这些操作符内,与流的builder一样,都能够间接调用其余的supsend函数,甚至是其余的耗时的,阻塞的函数都能够调用。并不需要特地的为上游和中游创立上下文。
Flow的操作符特地多,咱们须要注意区别中游操作符和上游终端。看这些函数的返回类型就能够了,返回类型是具体数据的,肯定是上游终端操作符;而对于上游生产者和中游变幻操作符,其返回值肯定是一个Flow。
高级操作符
后面讲的操作符都是针对 某一个流自身的,但大多数场景一个流显著不够用啊,咱们须要操作多个流,这时就须要用到一些高级操作符了。
合并多路流
多路流不可能一个一个的解决,合并成为一路流更加的不便,有以下合并办法:
- 归并merge把数据类型雷同的多路流归并为一路,留神肯定是数据类型雷同的才能够归并,并且归并后的元素程序是未知的,也即不会保留原各路流的元素程序。归并流的数量没有限度。
- 粘合zip 当想要把两路流的元素对齐后粘合为一个元素时,就能够应用zip,当任何一个流完结或者被勾销时,zip也就完结了。只能两个两个的粘合。
- 组合combine把多路流中的每个流的最新元素粘合成新数据,造成一个新的流,其元素是把每个元素都用每路流的最新元素来转换生成。起码须要2路流,最多反对5路流。
用一个来感受一下它们的作用:
fun main() = runBlocking { val one = flowOf(1, 2, 3) .map(Int::toString) .onEach { delay(10) } val two = flowOf("a", "b", "c", "d") .onEach { delay(25) } merge(one, two) .collect { println("Merge: $it") } one.zip(two) { i, s -> "Zip: $i. $s" } .collect { println(it) } combine(one, two) { i, s -> "Combine $i with $s" } .collect { println(it) }}
这里是输入:
Merge: 1Merge: 2Merge: aMerge: 3Merge: bMerge: cMerge: dZip: 1. aZip: 2. bZip: 3. cCombine 2 with aCombine 3 with aCombine 3 with bCombine 3 with cCombine 3 with d
通过它们的输入能够看到它们的区别:merge就像把两个水管接到一样,简略没有多余加工,适宜数据类型一样的流(比方都是水);zip会对齐两路流,让能对齐的元素两两联合,对不齐时就完结了。
而combine要等到集齐每路流的最新元素,能力转换成新数据,two是较one慢的,看到two的元素『a』时,one最新的元素是『2』,之后one的『3』来了,这时two最新的元素还是『a』,之后one停在了『3』,后续two的元素都与『3』组合。有同学可能会有疑难,为啥one的『1』抛弃了,没找到组合呢?因为它来的太早了,one的『1』来了时,two还没有元素,它必定会等,但当two的第一个元素『a』来了时,这时one的最新元素已是『2』了,one是10发一个元素,two是隔25发一个元素,所以two的第1个元素到了时,one的第2个元素曾经来了,它是最新的,所以组合时会用它。combine要集齐每路流的最新元素能力合成。
总结起来就是,zip会按程序对齐元素;而combine要集齐每路流的最新元素,先要集齐,齐了时还要取每个流的最新元素。能够入手运行示例,批改delay的工夫,看输入有啥不一样的,以加深了解。
展平(Flatten)
一个Flow就是一个异步数据流,它相当于一个传送带或者管道,货物(具体的数据)在其下面或者外面流动。失常状况下Flow外部都是惯例数据(对象)在流动,但Flow自身也是一个对象,因而也能够嵌套,把流当成另一个流的数据,比方Flow<Flow<Int>>,这就是Flow of Flows of Int。Flow是数据流,最终消费者须要的是具体的数据,所以对于嵌套的Flow of Flows,通常都须要在传给终端操作符之前进行展平(flatten),失去一个faltterned Flow(即从Flow<Flow<Int>>转成Flow<Int>),就能够被终端生产了。操作符中以flat结尾的函数都是用于展平的,次要是两类,一类是展平flatten系,一类是先变幻再展平flatMap系。
间接展平
最直观的展平莫过于对于曾经是嵌套的Flow of Flows做展平解决,以能让终端操作符失常的生产Flow外面的数据,有两个API能够做展平:
- flattenConcat 把嵌套的Flow of Flows展平为一个Flow,内层的每个流都是按程序拼接在一起的,串行拼接。比方Flow of 4 Flows,内层有四个管道,那就就变成了『内层1』->『内层2』->『内层3』->『内层4』。
- flattenMerge 把Flow of Flows展平为一个Flow,内层的所有Flow是以并发的形式将元素混合流入新管道,是并发式混合,相当于四个管道同时往另一个管道倒水,原流中的程序会错乱掉。
@OptIn(ExperimentalCoroutinesApi::class)fun main() = runBlocking { val flow2D = flowOf("Hello", "world", "of", "flow!") .map { it.toCharArray().map { c -> " '$c' " }.asFlow() } .flowOn(Dispatchers.Default) flow2D.collect { println("Flow object before flatten: $it") } // Data in flow are Flow objects println("With flattenConcat:") flow2D.flattenConcat() .collect { print(it) } println("\nWith flattenMerge:") flow2D.flattenMerge() .collect { print(it) }}//Flow object before flatten: kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$3@1b0375b3//Flow object before flatten: kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$3@e580929//Flow object before flatten: kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$3@1cd072a9//Flow object before flatten: kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$3@7c75222b//With flattenConcat: //'H' 'e' 'l' 'l' 'o' 'w' 'o' 'r' 'l' 'd' 'o' 'f' 'f' 'l' 'o' 'w' '!' //With flattenMerge:// 'H' 'e' 'l' 'l' 'o' 'w' 'o' 'r' 'l' 'd' 'o' 'f' 'f' 'l' 'o' 'w' '!'
从输入中能够看出,如果不展平Flow外面是Flow对象,没法用。flattenConcat是把内层的流串行的接在一起。但flattenMerge的输入仿佛与文档形容不太统一,并没有并发式的混合。
先转换再展平
大多数时候并没有现成的嵌套好的Flow of Flows给你展平,更多的时候是咱们须要本人把元素转换为一个Flow,学生成Flow of Flows,而后再展平,且有定义好的API能够间接用:
- flatMapConcat 先把Flow中的数据做变幻,这个变幻必须从元素变成另一个Flow,这时就变成了嵌套式的Flow of Flows,而后再串行式展平为一个Flow。
- flatMapLatest 先把Flow中的最新数据做变幻,这个变幻必须从元素变成另一个Flow,这时会勾销掉之前转换生成的内层流,后果尽管也是嵌套,但内层流只有一个,就是原Flow中最新元素转换生成的那个流。而后再展平,这个其实也不须要真展平,因为内层流只有一个,它外面的数据就是最终展平后的数据。
- flatMapMerge 与flatMapConcat一样,只不过展平的时候嵌套的内层流是以并发的模式来拼接的。
来看个就能明确它们的作用了:
@OptIn(ExperimentalCoroutinesApi::class)fun main() = runBlocking { val source = (1..3).asFlow() .onEach { delay(100) } println("With flatMapConcat:") var start = System.currentTimeMillis() source.flatMapConcat(::requestFlow) .collect { println("$it at ${System.currentTimeMillis() - start}ms from the start") } println("With flatMapMerge:") start = System.currentTimeMillis() source.flatMapMerge(4, ::requestFlow) .collect { println("$it at ${System.currentTimeMillis() - start}ms from the start") } println("With flatMapLatest:") source.flatMapLatest(::requestFlow) .collect { println("$it at ${System.currentTimeMillis() - start}ms from the start") }}fun requestFlow(x: Int): Flow<String> = flow { emit(" >>[$x]: First: $x") delay(150) emit(" >>[$x]: Second: ${x * x}") delay(200) emit(" >>[$x]: Third: ${x * x * x}")}
输入比拟多:
With flatMapConcat: >>[1]: First: 1 at 140ms from the start >>[1]: Second: 1 at 306ms from the start >>[1]: Third: 1 at 508ms from the start >>[2]: First: 2 at 613ms from the start >>[2]: Second: 4 at 765ms from the start >>[2]: Third: 8 at 969ms from the start >>[3]: First: 3 at 1074ms from the start >>[3]: Second: 9 at 1230ms from the start >>[3]: Third: 27 at 1432ms from the startWith flatMapMerge: >>[1]: First: 1 at 130ms from the start >>[2]: First: 2 at 235ms from the start >>[1]: Second: 1 at 284ms from the start >>[3]: First: 3 at 341ms from the start >>[2]: Second: 4 at 386ms from the start >>[1]: Third: 1 at 486ms from the start >>[3]: Second: 9 at 492ms from the start >>[2]: Third: 8 at 591ms from the start >>[3]: Third: 27 at 695ms from the startWith flatMapLatest: >>[1]: First: 1 at 807ms from the start >>[2]: First: 2 at 915ms from the start >>[3]: First: 3 at 1021ms from the start >>[3]: Second: 9 at 1173ms from the start >>[3]: Third: 27 at 1378ms from the start
这个示例中原始Flow是一个Int值,把它转换成为一个字符串流Flow<String>。从输入中能够看到flatMapConcat的确是串行拼接,并且flatMapMerge是并发式的混合,不保障外部Flow的元素程序。认真看flatMapLatest的输入,每当原始Flow中有新的值生成时,之前转换生成的流会被勾销,它们并没有运行完(仅第一个元素流出了)。而原始流的最初一个元素『3』则残缺的从展平流中流出了。
展平的函数比拟多容易学杂,其实有一个非常简单的辨别办法:带有Map字样的函数就是先把元素转换成Flow之后再展平;带有Concat就是把嵌套内层流串行拼接;而带有Merge的则是把内层流并发式的混合。应用的时候,如果想保障程序就用带有Concat的函数;想要并发性,想高效一些,并且不在乎元素程序,那就用带有Merge的函数。
Flow是冷流
对于数据流来说有冷热之分,冷流(Cold stream)是指消费者开始接收数据时,才开始生产数据,换句话说就是生产者消费者整个链路搭建好了后,上游才开始生产数据;热流(Hot stream),与之相同,不论有没有人在生产,都在生产数据。有一个十分形象的比喻就是,冷流就好比CD,你啥时候都能够听,而且只有你播放就从头开始播放CD上所有的音乐;而热流就好比电台播送,不论你听不听,它总是按它的节奏在播送,明天不听,就错过明天的数据了,明天听跟今天听,听到的内容也是不一样的。
Kotlin的Flow是冷流,其实从下面的例子也能看进去,每个例子中都是只创立一个Flow对象,而后有屡次collect,但每次collect都能拿到Flow中残缺的数据,这就是典型的冷流。绝大多数场景,咱们须要的也都是冷流。
扩大浏览Hot and cold data sources。
与ReactiveX的区别
Flow是用于解决异步数据流的API,是函数响应式编程范式FRP的一个实现。但它并不是惟一的,更为风行的RxJava也是合乎FRP的异步数据流解决API,它呈现的要更早,社区更沉闷,资源更丰盛,风行水平更高,基本上是每个安卓我的项目必备的依赖库,同时也是面试必考题。
因为Kotlin是基于JVM的衍生语言,它与Java是互通的,能够混着用。所以RxJava能够间接在Kotlin中应用,无须要任何改变。但毕竟RxJava是原生的Java库,Kotlin中的大量语法糖还是很香的,由此便有了RxKotlin。RxKotlin并不是把ReactiveX标准从新实现一遍,它只是一个轻量的粘合库,通过扩大函数和Kotlin的语法糖等,让RxJava更加的Kotlin敌对,在Kotlin中应用RxJava时更加的顺滑。但外围仍是RxJava,如并发的实现仍是用线程。
那么Flow相较RxJava有啥区别呢?区别就在于Flow是纯的Kotlin的货色,它们背地的思维是一样的都是异步数据流,都是FRP,但Flow是原生的,它与Kotlin的个性紧密结合,比方它的并发是用协程通信用的是Channel。应用倡议就是,如果自身对RxJava很相熟,且是遗留代码,那就没有必要去再改成Flow;但如果是新开发的纯新性能,并且不与遗留代码交互,也没有与架构抵触,还是倡议间接上Flow。
什么时候用Flow
每一个工具都有它特定的利用场景,Flow虽好,但不可滥用,要以架构的角度来认清问题的实质,合乎才能够用。Flow是用于解决异步数据流的API,是FRP范式下的利器。因而,只当外围业务逻辑是由异步数据流驱动的场景时,用Flow才是适合的。当初绝大多数端(前端,客户端和桌面)GUI利用都是响应式的,用户输出了,或者服务器Push了数据,利用做出响应,所以都是合乎FRP范式的。那么重点就在于数据流了,如果数据连串成流,就能够用Flow。比方用户输入,点击事件/文字输出等,这并不只产生一次,所以是数据流(事件流)。外围的业务数据,比方新闻列表,商品列表,文章列表,评论列表等都是流,都能够用Flow。配置,设置和数据库的变动也都是流。
但,一个单篇的文章展现,一个商品展现这就不是流,只有一个文章,即应用流,它也只有一个数据,而且咱们晓得它只有一个数据。这种状况就没有必要用Flow,间接用一个supsend申请就好了。
在Android中应用Flow
安卓开发的官方语言曾经变成了Kotlin了,安卓利用也十分合乎FRP范式,那么对于波及异步数据流的场景天然要应用Flow。
扩大浏览:
- What is Flow in Kotlin and how to use it in Android Project?
- Kotlin flows on Android
- Learn Kotlin Flow by real examples for Android
书籍举荐
Flow自身的货色其实并不多,就是三板斧:创立,变幻和终端。但Flow背地的思维是很宏大的,想要用好Flow必须要学会函数响应式编程范式。也就是说只有学会以FRP范式来构建软件时,能力真正用好Flow。
《Functional Reactive Programming》
参考资料
- Asynchronous Flow
- Mastering Flow API in Kotlin