关于android:包教包会的Kotlin-Flow教程

76次阅读

共计 11835 个字符,预计需要花费 30 分钟才能阅读完成。

原文链接 包教包会的 Kotlin Flow 教程        公众号「罕见猿诉」

Kotlin 中的 Flow 是专门用于解决异步数据流的 API,是函数响应式编程范式 (Functional Reactive Programming FRP) 在 Kotlin 上的一个实现,并且深度交融了 Kotlin 的协程。是 Kotlin 中解决异步数据流问题的首先计划。明天就来认识一下 Flow 并学会如何应用它。

Hello, Flow!

老规矩,新学习一个新货色的时候,总是要从一个根底的『Hello, world』开始,疾速上手体验,有个第一印象。咱们就从一个简略的『Hello, Flow!』开始 Flow 之旅:

fun main() = runBlocking {
    val simple = flow {listOf("Hello", "world", "of", "flows!")
            .forEach {delay(100)
                emit(it)
            }
    }

    simple.collect {println(it)
    }
}
//Hello
//world
//of
//flows!

这里创立了一个异步产生 String 的数据流 Flow<String>,会不定时的产生一个 String,而后收集此数据流产生的数据,把流出的 String 对象生产掉。

能够看出 Flow 实质上是一个 生产者消费者模式 ,流出的数据是由生产者产生的,且最终被消费者生产掉。能够把 Flow 想像成为一个 生产线中的传送带 ,产品(数据)在下面不停的流动,通过各个站点的加工,最终成型,由消费者生产掉。从这个小例子中能够看出 Flow API 的三要素:数据流的 上游 是创立 Flow(生产者);中游 是变幻操作(数据的解决和加工);上游 是收集数据(消费者),咱们一一的具体来学习。

创立 Flow

Flow 是一个生产者,创立 Flow 也就是把数据放到传送带上。数据能够是根底数据或者汇合,也能够是其余形式生成的数据,如网络或者回调或者硬件。创立 Flow 的 API 称作 flow builder 函数。

用汇合创立 Flow

这是创立 Flow 的最简略的形式,有两个,一个是 flowOf 用于从固定数量的元素创立,多用于示例,理论中基本上用不到:

val simple = flowOf("Hello", "world", "of", "flows!")
simple.collect {println(it) }

或者,通过 asFlow 把现有的汇合转为 Flow,这个还是比拟实用的:

listOf("Hello", "world", "of", "flows!").asFlow()
    .collect {println(it) }
(1..5).asFlow().collect { println(it) }

通用 flow builder

最为通用的 flow builder 就是 flow {…}了,这是最为通用,也是最为罕用的结构器。在代码块中调用 emit 就能够了,这个代码块会运行在协程之中,所以在这个代码里能够调用 suspend 函数:

fun main() = runBlocking {
    val simple = flow {for (i in 1..3) {delay(100)
            println("Emitting: $i")
            emit(i)
        }
    }
    simple.collect {println(it) }
}
//Emitting: 1
//1
//Emitting: 2
//2
//Emitting: 3
//3

这是一个代码块,只有调用了 emit 产生数据即可,又可调用 suspend 函数,因而十分的实用,比方能够执行网络申请,申请回来后 emit 等等。

终端操作符

数据从生产者流出,直到消费者把数据收集起来进行生产,而只有数据被生产了才有意义。因而,还须要终端操作(Terminal flow operators)。须要留神的是终端操作符是 Flow 的起点,并不算是 Flow 传送带外部,因而终端操作都是 suspend 函数,调用者须要负责创立协程以失常调用这些 suspending terminal operators。

常见的终端操作有三个:

  • collect 最为通用的,可执行一个代码块,参数就是 Flow 流出的数据
  • 转换为汇合 Collections,如 toList 和 toSet 等,能够不便把收集到的数据转换为汇合
  • 取特定的值,如 first()只取第一个,last 只取最初一个, single 只有一个数据(无数据和超过一个数据时都会抛异样。
  • 降维(或者叫作聚合 accumulate)操作,如折叠 fold 和化约 reduce,折叠和化约能够对数据流进行降维,如求和,求积,求最大值最小值等等。
  • count 其实也是降维的一种,返回数据流中的数据个数,它还能够联合过滤以计算某种过滤条件后的数据数量。
fun main() = runBlocking {
    val simple = flow {for (i in 1..3) {delay(100)
            println("Emitting: $i")
            emit(i)
        }
    }
    simple.collect {println(it) }
    println("toList: ${simple.toList()}")
    println("first: ${simple.first()}")
    println("sum by fold: ${simple.fold(0) {s, a -> s + a}}")
}

输入:

Emitting: 1
1
Emitting: 2
2
Emitting: 3
3
Emitting: 1
Emitting: 2
Emitting: 3
toList: [1, 2, 3]
Emitting: 1
first: 1
Emitting: 1
Emitting: 2
Emitting: 3
sum by fold: 6

这些终端操作符都简略,比拟好了解,看一眼示例就晓得怎么用了。须要留神的就是 first()和 single(),first 是只接管数据流中的第一个,而 single 则要求数据流只能有一个数据(没有或者超过一个都会抛异样)。比拟有意思就是 last(),数据流是一个流,一个产品传送带,通常状况下都是指有限或者说不确定数据 数量时才叫数据流,那又何来最初一个数据呢?通常状况下 last 都是无意义的。只有当咱们晓得流的生产者只生产无限数量数据时,或者采纳了一些限制性的变幻操作符时,last 能力派上用场。

再有就是留神 fold 和 reduce 的区别,这里它们的区别跟汇合上的操作是一样的,fold 能够提供初始值,流为空时返回初始值;而 reduce 没初始值,流为空时会抛异样。

变幻操作符

数据在流动的过程中能够对数据进行转化操作,从一种数据类型变别另外一种,这就是变幻(Transformation),这是数据流最为灵便和弱小的一个方面。这跟汇合的变幻是相似的。

转换

最常见的变幻就是转换,也就是把从一种数据类型转换为另一种数据类型,用的最多当然是 map,还有更为通用的 transform。它们都能把数据流中的数据从一种类型转换为另一种类型,比方把 Flow<String> 转为 Flow<Int>。区别在于,map 是死板的转换,一个对象进去,另一个对象作为返回值进去;但 transform 更为灵便,它并不是把新类型作为返回值,它能够像上游生产者那样产生 (emit) 新数据,甚至能够产生 (emit) 多个新数据,它是十分弱小的,所有其余的变幻操作符,都是基于 transform 实现的。

fun main() = runBlocking {
    val simple = flow {for (i in 1..3) {delay(100)
            println("Emitting: $i")
            emit(i)
        }
    }

    simple.map {"Mapping to ${it * it}" }
        .collect {println(it) }

    simple.transform { req ->
        emit("Making request $req")
        emit(performRequest(req))
    }.collect {println(it)
    }
}

fun performRequest(req: Int) = "Response for $req"

输入是:

Emitting: 1
 Mapping to 1
Emitting: 2
 Mapping to 4
Emitting: 3
 Mapping to 9
Emitting: 1
 Making request 1
Response for 1
Emitting: 2
 Making request 2
Response for 2
Emitting: 3
 Making request 3
Response for 3

还有一个操作符 withIndex 它与汇合中的 mapIndexed 是相似的,它的作用是把元素变成 IndexedValue,这样在前面就能够失去元素和元素的索引 了,在某些场景下还是比拟不便的。

限度

数据流外面的数据不肯定都是须要的,所以通常须要对数据元素进行过滤,这就是限制性操作符,最常见的就是 filter,这里与汇合的限度操作也是相似的:

  • filter 把数据转为布尔型,从而对数据流进行过滤。
  • distinctUntilChanged 过滤数据流中反复的元素。
  • drop 抛弃后面肯定数量的元素。
  • take 只返回流中后面肯定数量的元素,当数量达到时流将被勾销,留神 take 与 drop 是相同的。
  • debounce 仅保留流中肯定超时距离内的元素,比方超时工夫是 1 秒,那只返回达到 1 秒时最新的元素,这个元素后面的将被抛弃。这个在秒杀场景拦挡疯狂点击,或者一个服务中拦挡疯狂申请时十分有用。只取肯定工夫距离内的最新的元素,拦挡掉有效数据。
  • sample 以肯定的工夫距离取元素,与 debounce 差不多,区别在于 debounce 会返回最初一个元素,而 sample 不肯定,要看距离最初一个元素是否落在一个工夫距离内。
@OptIn(FlowPreview::class)
fun main() = runBlocking {
    val constraint = flow {emit(1)
        delay(90)
        emit(2)
        delay(90)
        emit(3)
        delay(1010)
        emit(4)
        delay(1010)
        emit(5)
    }

    constraint.filter {it % 2 == 0}
        .collect {println("filter: $it") }
    constraint.drop(3)
        .collect {println("drop(3): $it") }
    constraint.take(3)
        .collect {println("take(3): $it") }

    constraint.debounce(1000)
        .collect {println("debounce(1000): $it") }
    constraint.sample(1000)
        .collect {println("sample(1000): $it") }
}

认真看它们的输入,以了解它们的作用:

filter: 2
filter: 4
drop(3): 4
drop(3): 5
take(3): 1
take(3): 2
take(3): 3
debounce(1000): 3
debounce(1000): 4
debounce(1000): 5
sample(1000): 3
sample(1000): 4

须要注意,debounce 和 sample 是 Preview 的 API,须要加上 Preview 注解。

中游的变幻操作符仍属于流的一部分,它们都仍运行在 Flow 的上下文中,因而,这些操作符内,与流的 builder 一样,都能够间接调用其余的 supsend 函数,甚至是其余的耗时的,阻塞的函数都能够调用。并不需要特地的为上游和中游创立上下文。

Flow 的操作符特地多,咱们须要注意区别中游操作符和上游终端。看这些函数的返回类型就能够了,返回类型是具体数据的,肯定是上游终端操作符;而对于上游生产者和中游变幻操作符,其返回值肯定是一个 Flow。

高级操作符

后面讲的操作符都是针对 某一个流自身的,但大多数场景一个流显著不够用啊,咱们须要操作多个流,这时就须要用到一些高级操作符了。

合并多路流

多路流不可能一个一个的解决,合并成为一路流更加的不便,有以下合并办法:

  • 归并 merge 把 数据类型雷同的多路流归并为一路,留神肯定是数据类型雷同的才能够归并,并且归并后的元素程序是未知的,也即不会保留原各路流的元素程序。归并流的数量没有限度。
  • 粘合 zip 当想要 把两路流的元素对齐后粘合为一个元素 时,就能够应用 zip,当任何一个流完结或者被勾销时,zip 也就完结了。只能两个两个的粘合。
  • 组合 combine 把多路流中的每个流的最新元素粘合成新数据,造成一个新的流,其元素是把 每个元素 都用 每路流的最新元素 来转换生成。起码须要 2 路流,最多反对 5 路流。

用一个🌰来感受一下它们的作用:

fun main() = runBlocking {val one = flowOf(1, 2, 3)
                .map(Int::toString)
                .onEach {delay(10) }
    val two = flowOf("a", "b", "c", "d")
                .onEach {delay(25) }
    merge(one, two)
        .collect {println("Merge: $it") }
    one.zip(two) {i, s -> "Zip: $i. $s"}
        .collect {println(it) }
    combine(one, two) {i, s -> "Combine $i with $s"}
        .collect {println(it) }
}

这里是输入:

Merge: 1
Merge: 2
Merge: a
Merge: 3
Merge: b
Merge: c
Merge: d
Zip: 1. a
Zip: 2. b
Zip: 3. c
Combine 2 with a
Combine 3 with a
Combine 3 with b
Combine 3 with c
Combine 3 with d

通过它们的输入能够看到它们的区别:merge 就像把两个水管接到一样,简略没有多余加工,适宜数据类型一样的流(比方都是水);zip 会对齐两路流,让能对齐的元素两两联合,对不齐时就完结了。

而 combine 要等到 集齐每路流的最新元素,能力转换成新数据,two 是较 one 慢的,看到 two 的元素『a』时,one 最新的元素是『2』,之后 one 的『3』来了,这时 two 最新的元素还是『a』,之后 one 停在了『3』,后续 two 的元素都与『3』组合。有同学可能会有疑难,为啥 one 的『1』抛弃了,没找到组合呢?因为它来的太早了,one 的『1』来了时,two 还没有元素,它必定会等,但当 two 的第一个元素『a』来了时,这时 one 的最新元素已是『2』了,one 是 10 发一个元素,two 是隔 25 发一个元素,所以 two 的第 1 个元素到了时,one 的第 2 个元素曾经来了,它是最新的,所以组合时会用它。combine 要集齐每路流的最新元素能力合成。

总结起来就是,zip 会按程序对齐元素 ;而 combine 要 集齐每路流的最新元素 ,先要 集齐 ,齐了时还要 取每个流的最新元素。能够入手运行示例,批改 delay 的工夫,看输入有啥不一样的,以加深了解。

展平(Flatten)

一个 Flow 就是一个异步数据流,它相当于一个传送带或者管道,货物(具体的数据)在其下面或者外面流动。失常状况下 Flow 外部都是惯例数据(对象)在流动,但 Flow 自身也是一个对象,因而也能够嵌套,把流当成另一个流的数据,比方 Flow<Flow<Int>>,这就是 Flow of Flows of Int。Flow 是数据流,最终消费者须要的是具体的数据,所以对于嵌套的 Flow of Flows,通常都须要在传给终端操作符之前进行展平 (flatten),失去一个 faltterned Flow(即从 Flow<Flow<Int>> 转成 Flow<Int>),就能够被终端生产了。操作符中以 flat 结尾的函数都是用于展平的,次要是两类,一类是 展平 flatten 系 ,一类是 先变幻再展平 flatMap 系

间接展平

最直观的展平莫过于对于曾经是嵌套的 Flow of Flows 做展平解决,以能让终端操作符失常的生产 Flow 外面的数据,有两个 API 能够做展平:

  • flattenConcat 把嵌套的 Flow of Flows 展平为一个 Flow,内层的每个流都是按程序拼接在一起的,串行拼接。比方 Flow of 4 Flows,内层有四个管道,那就就变成了『内层 1』->『内层 2』->『内层 3』->『内层 4』。
  • flattenMerge 把 Flow of Flows 展平为一个 Flow,内层的所有 Flow 是以并发的形式将元素混合流入新管道,是并发式混合,相当于四个管道同时往另一个管道倒水,原流中的程序会错乱掉。
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {val flow2D = flowOf("Hello", "world", "of", "flow!")
        .map {it.toCharArray().map {c -> "'$c' "}.asFlow()}
        .flowOn(Dispatchers.Default)

    flow2D.collect {println("Flow object before flatten: $it") } // Data in flow are Flow objects

    println("With flattenConcat:")
    flow2D.flattenConcat()
        .collect {print(it) }

    println("\nWith flattenMerge:")
    flow2D.flattenMerge()
        .collect {print(it) }
}
//Flow object before flatten: kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$3@1b0375b3
//Flow object before flatten: kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$3@e580929
//Flow object before flatten: kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$3@1cd072a9
//Flow object before flatten: kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$3@7c75222b
//With flattenConcat:
 //'H'  'e'  'l'  'l'  'o'  'w'  'o'  'r'  'l'  'd'  'o'  'f'  'f'  'l'  'o'  'w'  '!' 
//With flattenMerge:
// 'H'  'e'  'l'  'l'  'o'  'w'  'o'  'r'  'l'  'd'  'o'  'f'  'f'  'l'  'o'  'w'  '!'

从输入中能够看出,如果不展平 Flow 外面是 Flow 对象,没法用。flattenConcat 是把内层的流串行的接在一起。但 flattenMerge 的输入仿佛与文档形容不太统一,并没有并发式的混合。

先转换再展平

大多数时候并没有现成的嵌套好的 Flow of Flows 给你展平,更多的时候是咱们须要本人把元素转换为一个 Flow,学生成 Flow of Flows,而后再展平,且有定义好的 API 能够间接用:

  • flatMapConcat 先把 Flow 中的数据做变幻,这个变幻必须从元素变成另一个 Flow,这时就变成了嵌套式的 Flow of Flows,而后再串行式展平为一个 Flow。
  • flatMapLatest 先把 Flow 中的最新数据做变幻,这个变幻必须从元素变成另一个 Flow,这时会勾销掉之前转换生成的内层流,后果尽管也是嵌套,但内层流只有一个,就是原 Flow 中最新元素转换生成的那个流。而后再展平,这个其实也不须要真展平,因为内层流只有一个,它外面的数据就是最终展平后的数据。
  • flatMapMerge 与 flatMapConcat 一样,只不过展平的时候嵌套的内层流是以并发的模式来拼接的。

来看个🌰就能明确它们的作用了:

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {val source = (1..3).asFlow()
        .onEach {delay(100) }

    println("With flatMapConcat:")
    var start = System.currentTimeMillis()
    source.flatMapConcat(::requestFlow)
        .collect {println("$it at ${System.currentTimeMillis() - start}ms from the start") }

    println("With flatMapMerge:")
    start = System.currentTimeMillis()
    source.flatMapMerge(4, ::requestFlow)
        .collect {println("$it at ${System.currentTimeMillis() - start}ms from the start") }

    println("With flatMapLatest:")
    source.flatMapLatest(::requestFlow)
        .collect {println("$it at ${System.currentTimeMillis() - start}ms from the start") }
}

fun requestFlow(x: Int): Flow<String> = flow {emit(">>[$x]: First: $x")
    delay(150)
    emit(">>[$x]: Second: ${x * x}")
    delay(200)
    emit(">>[$x]: Third: ${x * x * x}")
}

输入比拟多:

With flatMapConcat:
 >>[1]: First: 1 at 140ms from the start
 >>[1]: Second: 1 at 306ms from the start
 >>[1]: Third: 1 at 508ms from the start
 >>[2]: First: 2 at 613ms from the start
 >>[2]: Second: 4 at 765ms from the start
 >>[2]: Third: 8 at 969ms from the start
 >>[3]: First: 3 at 1074ms from the start
 >>[3]: Second: 9 at 1230ms from the start
 >>[3]: Third: 27 at 1432ms from the start
With flatMapMerge:
 >>[1]: First: 1 at 130ms from the start
 >>[2]: First: 2 at 235ms from the start
 >>[1]: Second: 1 at 284ms from the start
 >>[3]: First: 3 at 341ms from the start
 >>[2]: Second: 4 at 386ms from the start
 >>[1]: Third: 1 at 486ms from the start
 >>[3]: Second: 9 at 492ms from the start
 >>[2]: Third: 8 at 591ms from the start
 >>[3]: Third: 27 at 695ms from the start
With flatMapLatest:
 >>[1]: First: 1 at 807ms from the start
 >>[2]: First: 2 at 915ms from the start
 >>[3]: First: 3 at 1021ms from the start
 >>[3]: Second: 9 at 1173ms from the start
 >>[3]: Third: 27 at 1378ms from the start

这个示例中原始 Flow 是一个 Int 值,把它转换成为一个字符串流 Flow<String>。从输入中能够看到 flatMapConcat 的确是串行拼接,并且 flatMapMerge 是并发式的混合,不保障外部 Flow 的元素程序。认真看 flatMapLatest 的输入,每当原始 Flow 中有新的值生成时,之前转换生成的流会被勾销,它们并没有运行完(仅第一个元素流出了)。而原始流的最初一个元素『3』则残缺的从展平流中流出了。

展平的函数比拟多容易学杂,其实有一个非常简单的辨别办法:带有 Map 字样 的函数就是先把元素 转换成 Flow之后再展平;带有 Concat 就是把嵌套内层流 串行拼接 ;而带有Merge 的则是把内层流 并发式的混合 。应用的时候,如果 想保障程序就用带有 Concat 的函数;想要并发性,想高效一些,并且不在乎元素程序,那就用带有 Merge 的函数。

Flow 是冷流

对于数据流来说有 冷热 之分,冷流 (Cold stream) 是指消费者开始接收数据时,才开始生产数据,换句话说就是生产者消费者整个链路搭建好了后,上游才开始生产数据;热流(Hot stream),与之相同,不论有没有人在生产,都在生产数据。有一个十分形象的比喻就是,冷流就好比 CD,你啥时候都能够听,而且只有你播放就从头开始播放 CD 上所有的音乐;而热流就好比电台播送,不论你听不听,它总是按它的节奏在播送,明天不听,就错过明天的数据了,明天听跟今天听,听到的内容也是不一样的。

Kotlin 的 Flow 是冷流,其实从下面的例子也能看进去,每个例子中都是只创立一个 Flow 对象,而后有屡次 collect,但 每次 collect 都能拿到 Flow 中残缺的数据 ,这就是 典型的冷流。绝大多数场景,咱们须要的也都是冷流。

扩大浏览 Hot and cold data sources。

与 ReactiveX 的区别

Flow 是用于解决异步数据流的 API,是函数响应式编程范式 FRP 的一个实现。但它并不是惟一的,更为风行的 RxJava 也是合乎 FRP 的异步数据流解决 API,它呈现的要更早,社区更沉闷,资源更丰盛,风行水平更高,基本上是每个安卓我的项目必备的依赖库,同时也是面试必考题。

因为 Kotlin 是基于 JVM 的衍生语言,它与 Java 是互通的,能够混着用。所以 RxJava 能够间接在 Kotlin 中应用,无须要任何改变。但毕竟 RxJava 是原生的 Java 库,Kotlin 中的大量语法糖还是很香的,由此便有了 RxKotlin。RxKotlin 并不是把 ReactiveX 标准从新实现一遍,它只是一个轻量的粘合库,通过扩大函数和 Kotlin 的语法糖等,让 RxJava 更加的 Kotlin 敌对,在 Kotlin 中应用 RxJava 时更加的顺滑。但外围仍是 RxJava,如并发的实现仍是用线程。

那么 Flow 相较 RxJava 有啥区别呢?区别就在于 Flow 是纯的 Kotlin 的货色,它们背地的思维是一样的都是异步数据流,都是 FRP,但 Flow 是原生的,它与 Kotlin 的个性紧密结合,比方它的并发是用协程通信用的是 Channel。应用倡议就是,如果自身对 RxJava 很相熟,且是遗留代码,那就没有必要去再改成 Flow;但如果是新开发的纯新性能,并且不与遗留代码交互,也没有与架构抵触,还是倡议间接上 Flow。

什么时候用 Flow

每一个工具都有它特定的利用场景,Flow 虽好,但不可滥用,要以架构的角度来认清问题的实质,合乎才能够用。Flow 是用于解决异步数据流的 API,是 FRP 范式下的利器。因而,只当外围业务逻辑是由异步数据流驱动的场景时,用 Flow 才是适合的。当初绝大多数端(前端,客户端和桌面)GUI 利用都是响应式的,用户输出了,或者服务器 Push 了数据,利用做出响应,所以都是合乎 FRP 范式的。那么重点就在于数据流了,如果数据连串成流,就能够用 Flow。比方用户输入,点击事件 / 文字输出等,这并不只产生一次,所以是数据流(事件流)。外围的业务数据,比方新闻列表,商品列表,文章列表,评论列表等都是流,都能够用 Flow。配置,设置和数据库的变动也都是流。

但,一个单篇的文章展现,一个商品展现这就不是流,只有一个文章,即应用流,它也只有一个数据,而且咱们晓得它只有一个数据。这种状况就没有必要用 Flow,间接用一个 supsend 申请就好了。

在 Android 中应用 Flow

安卓开发的官方语言曾经变成了 Kotlin 了,安卓利用也十分合乎 FRP 范式,那么对于波及异步数据流的场景天然要应用 Flow。

扩大浏览:

  • What is Flow in Kotlin and how to use it in Android Project?
  • Kotlin flows on Android
  • Learn Kotlin Flow by real examples for Android

书籍举荐

Flow 自身的货色其实并不多,就是三板斧:创立,变幻和终端。但 Flow 背地的思维是很宏大的,想要用好 Flow 必须要学会函数响应式编程范式。也就是说只有学会以 FRP 范式来构建软件时,能力真正用好 Flow。

《Functional Reactive Programming》

参考资料

  • Asynchronous Flow
  • Mastering Flow API in Kotlin

搜寻并关注公众号「罕见猿诉」

原创不易,打赏 点赞 在看 珍藏 分享 总要有一个吧

正文完
 0