Flow异步流

  • 意识

    • 个性
    • 构建器和上下文
    • 启动
    • 勾销与勾销检测
    • 缓冲
  • 操作符

    • 过渡操作符
    • 末端操作符
    • 组合
    • 展平
  • 异样

    • 异样解决
    • 实现

如何示意多个值?
挂起函数能够异步的返回单个值,然而如何异步返回多个计算好的值呢?

计划

  • 汇合
  • 序列
  • 挂起函数
  • Flow
    用汇合,返回多个值,但不是异步的。

    private fun createList() = listOf<Int>(1, 2, 3)@Testfun test_list() {  createList().forEach { println(it) }}

    用序列,返回一个整数序列

    private fun createSequence(): Sequence<Int> {  return sequence {      for (i in 1..3) {          Thread.sleep(1000) // 伪装在计算,此处是阻塞,不能做其余事件了          // delay(1000) 这里不能用挂起函数          yield(i)      }  }}@Testfun test_sequence() {  createSequence().forEach { println(it) }}

    看下源码

    public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T> = Sequence { iterator(block) }

    传入的是一个SequenceScope的扩大函数。

    @RestrictsSuspension@SinceKotlin("1.3")public abstract class SequenceScope<in T> internal constructor()

    而RestrictsSuspension限度只能应用外面提供的已有的挂起函数,如yield,yieldAll等。
    createSequence返回了多个值,然而也是同步的。

    // 返回多个值,异步private suspend fun createList2(): List<Int> {  delay(5000)  return listOf<Int>(1, 2, 3)}@Testfun test_list2() = runBlocking<Unit> {  createList().forEach { println(it) }}

    能够应用suspend函数返回多个值,是异步,然而是是一次性返回了多个值,是否像流一样返回多个值并放弃异步呢?Flow能够解决这个问题。

    private suspend fun createFlow(): Flow<Int> = flow {  for (i in 1..3) {      delay(1000)      emit(i) // 发射,产生一个元素  }}@Testfun test_flow() = runBlocking<Unit> {  createFlow().collect { println(it) } // collect是一个末端操作符,前面讲}

    每隔1秒钟产生一个元素,这里是挂起的。用例子来证实一下:

      private suspend fun createFlow(): Flow<Int> = flow {      for (i in 1..3) {          delay(1000)          emit(i)      }  }  @Test  fun test_flow2() = runBlocking<Unit> {      launch {          for (i in 1..3) {              println("I am running and not blocked $i")              delay(1500)          }      }      createFlow().collect { println(it) }  }

    输入

    I am running and not blocked 11I am running and not blocked 22I am running and not blocked 33Process finished with exit code 0

    collect收集后果的过程并没有阻塞另外的协程,打印完1,而后在delay挂起时,去执行其余,并没有阻塞,两个工作来回切换执行。
    Flow真正地做到了返回多个值,并且是异步的。

Flow与其余形式的区别

  • 名为flow的Flow类型的构建器函数
  • flow{...}构建块中的代码能够挂起
  • 函数createFlow()不再标有suspend修饰符,下面代码中的suspend修饰符能够去掉
  • 流应用emit函数发射值
  • 流应用collect函数收集值

    Flow利用

    在android中,文件下载是Flow的一个十分典型的利用。

冷流

Flow是一种相似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。

private fun createFlow2() = flow<Int> {    println("Flow started.")    for (i in 1..3) {        delay(1000)        emit(i)    }}@Testfun test_flow_cold() = runBlocking<Unit> {    val flow = createFlow2()    println("calling collect...")    flow.collect { value -> println(value) }    println("calling collect again...")    flow.collect { value -> println(value) }}
calling collect...Flow started.123calling collect again...Flow started.123Process finished with exit code 0

能够看到,当调用collect办法的时候,流才开始运行,并且能够屡次调用。

流的连续性

  • 流的每次独自收集都是按程序执行的,除非应用非凡操作符。
  • 从上游到上游,每个过渡操作符都会解决每个发射出的值,而后再交给末端操作符。

    @Testfun test_flow_continuation() = runBlocking<Unit> {  (1..5).asFlow()      .filter { it % 2 == 0 }      .map { "string $it" }      .collect { println("collect $it") }}
    collect string 2collect string 4Process finished with exit code 0

    改例子通过了如下步骤:生成一个流,过滤出偶数,转成字符串,开始收集

    流的构建器

  • flowOf构建器定义了一个发射固定值集的流。
  • 应用.asFlow()扩大函数,能够将各种汇合与序列转换为流。

    @Testfun test_flow_builder() = runBlocking<Unit> {  // flowOf构建器  flowOf("one", "two", "three")      .onEach { delay(1000) }      .collect { value -> println(value) }  // asFlow扩大函数  (1..3).asFlow().collect { value -> println(value) }}
    onetwothree123Process finished with exit code 0

    流的上下文

  • 流的收集总是在调用协程的上下文中产生,流的该属性成为上下文保留。
  • flow{...}构建器中的代码必须遵循上下文保留属性,并且不容许从其余上下文中发射。
  • flowOn操作符,该函数用于更改流发射的上下文。

    private fun createFlow3() = flow<Int> {  println("Flow started ${Thread.currentThread()}")  for (i in 1..3) {      delay(1000)      emit(i)  }}@Testfun test_flow_context() = runBlocking<Unit> {  createFlow3()      .collect {          println("$it, thread: ${Thread.currentThread()}")      }}
    Flow started Thread[main @coroutine#1,5,main]1, thread: Thread[main @coroutine#1,5,main]2, thread: Thread[main @coroutine#1,5,main]3, thread: Thread[main @coroutine#1,5,main]Process finished with exit code 0

    不做线程切换,收集和构建都在同一上下文,运行的线程是一样的。
    试着更改一下线程,如下

    private fun createFlow4() = flow {  withContext(Dispatchers.IO) { // 用io线程      println("Flow started ${Thread.currentThread().name}")      for (i in 1..3) {          delay(1000)          emit(i)      }  }}@Testfun test_flow_on() = runBlocking {  createFlow4().collect {      println("collect $it, ${Thread.currentThread()}")  }}
    Flow started DefaultDispatcher-worker-1 @coroutine#1java.lang.IllegalStateException: Flow invariant is violated:      Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@4600ac86, BlockingEventLoop@1e1d1f06],      but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@79c0c2ed, Dispatchers.IO].      Please refer to 'flow' documentation or use 'flowOn' instead

    能够看到,构建流在IO线程中执行,但在收集流的时候报错了,不容许这样做,倡议应用flowOn.
    正确的做法如下:

    private fun createFlow5() = flow {  println("Flow started ${Thread.currentThread().name}")  for (i in 1..3) {      delay(1000)      emit(i)  }}.flowOn(Dispatchers.IO)@Testfun test_flow_on2() = runBlocking {  createFlow5().collect {      println("collect $it, ${Thread.currentThread().name}")  }}
    Flow started DefaultDispatcher-worker-1 @coroutine#2collect 1, main @coroutine#1collect 2, main @coroutine#1collect 3, main @coroutine#1Process finished with exit code 0

    流在IO线程中构建和发射,在main线程中收集。

    启动流

  • 应用launchIn替换collect,能够在独自的协程中启动流的收集。

      private fun events() = (1..3).asFlow()      .onEach {          delay(1000)          println("$it, ${Thread.currentThread().name}")      }.flowOn(Dispatchers.Default)  @Test  fun testFlowLaunch() = runBlocking<Unit> {      events()          .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }          //.collect()          .launchIn(CoroutineScope(Dispatchers.IO))          .join()  }
    1, DefaultDispatcher-worker-3 @coroutine#3Event: 1 DefaultDispatcher-worker-1 @coroutine#22, DefaultDispatcher-worker-1 @coroutine#3Event: 2 DefaultDispatcher-worker-1 @coroutine#23, DefaultDispatcher-worker-1 @coroutine#3Event: 3 DefaultDispatcher-worker-2 @coroutine#2Process finished with exit code 0

    onEach是过渡操作符,并不会触发收集数据,collect是末端操作符,能力触发收集数据。过渡操作符就像是过滤器,末端操作符就像是水龙头的阀门,不关上阀门水就流不进去,无论两头加了多少个过滤安装。
    如果想指定在哪个协程外面收集数据,就能够用末端操作符launchIn(),能够传递一个作用域进去,而作用域又能够指定调度器,launchIn(CoroutineScope(Dispatchers.IO)).

launchIn返回的是一个job对象,能够进行cancel等操作。例如

@Testfun testFlowLaunch2() = runBlocking<Unit> {    val job = events()        .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }        .launchIn(CoroutineScope(Dispatchers.IO))    delay(2000)    job.cancel()}
1, DefaultDispatcher-worker-1 @coroutine#3Event: 1 DefaultDispatcher-worker-3 @coroutine#2Process finished with exit code 0

如上,只收集了一个数字,job就勾销了。
其实runBlockint自身就是一个主线程作用域,能够放到launchIn中,如下

@Testfun testFlowLaunch3() = runBlocking<Unit> {    val job = events()        .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }        .launchIn(this)}
1, DefaultDispatcher-worker-1 @coroutine#3Event: 1 main @coroutine#22, DefaultDispatcher-worker-1 @coroutine#3Event: 2 main @coroutine#23, DefaultDispatcher-worker-1 @coroutine#3Event: 3 main @coroutine#2Process finished with exit code 0

流的勾销

  • 流采纳与协程同样的合作勾销。流的收集能够是当流在一个可勾销的挂起函数(如delay)中挂起的时候勾销。

      private fun createFlow6() = flow<Int> {      for (i in 1..3) {          delay(1000)          println("emitting $i")          emit(i)      }  }  @Test  fun testCancelFlow() = runBlocking<Unit> {      withTimeoutOrNull(2500) {          createFlow6().collect {              println("collect: $it")          }      }      println("Done.")  }
    emitting 1collect: 1emitting 2collect: 2Done.Process finished with exit code 0

    设置2.5秒超时,流还没发射3,就超时了,流被勾销。

    流的勾销检测

  • 不便起见,流构建器对每个发射值执行附加的ensureActive检测以进行勾销,这意味着从flow{...}收回的忙碌循环是能够勾销的。
  • 处于性能起因,大多数其余流操作不会自行执行其余勾销检测,在协程处于忙碌循环的状况下,必须明确检测是否勾销。
  • 通过cancellable操作符来执行操作。

      private fun createFlow7() = flow<Int> {      for (i in 1..5) {          delay(1000)          println("emitting $i")          emit(i)      }  }  @Test  fun testCancelFlowCheck() = runBlocking<Unit> {      createFlow7().collect {          if (it == 3) cancel()          println("collect: $it")      }      println("Done.")  }
    emitting 1collect: 1emitting 2collect: 2emitting 3collect: 3kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled...Process finished with exit code 255

    在收集流的时候,遇到3就执行勾销操作,抛出JobCancellationException,3还是会被收集到。

      @Test  fun testCancelFlowCheck2() = runBlocking<Unit> {      (1..5).asFlow().collect {          if (it == 3) cancel()          println("collect: $it")      }      println("Done.")  }
    collect: 1collect: 2collect: 3collect: 4collect: 5Done.kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

    应用asFlow()创立流,在收集的时候,尽管遇到3进行了勾销,然而还是把所有的元素都打印了当前才抛出异样。如果要在执行的过程中真正的阻断流,须要加上cancellable()操作,如下:

      @Test  fun testCancelFlowCheck3() = runBlocking<Unit> {      (1..5).asFlow().cancellable().collect {          if (it == 3) cancel()          println("collect: $it")      }      println("Done.")  }
    collect: 1collect: 2collect: 3kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

    背压

  • buffer(),并发运行流中发射元素的代码。
  • conflate(),合并发射项,不对每个值进行解决。
  • collectLatest(),勾销并从新发射最初一个值。
  • 当必须更改CoroutineDispatcher时,flowOn操作符应用了雷同的缓冲机制,然而buffer函数显示地申请缓冲而不扭转执行上下文。
    private fun createFlow8() = flow<Int> {        for (i in 1..5) {            delay(100) // 生产一个元素须要0.1秒            println("emitting $i")            emit(i)        }    }    @Test    fun testBackPressure() = runBlocking<Unit> {        val time = measureTimeMillis {            createFlow8()                .buffer(50) // 并发运行流中发射元素                .collect {                    delay(200) // 生产一个元素须要0.2秒                    println("collect: $it")                }        }        println("Done, total $time")    }
emitting 1emitting 2collect: 1emitting 3emitting 4collect: 2emitting 5collect: 3collect: 4collect: 5Done, total 1188

应用buffer能够让发射元素并发执行,提高效率。
应用flowOn()切换线程,也能够提高效率。

    private fun createFlow8() = flow<Int> {        for (i in 1..5) {            delay(100) // 生产一个元素须要0.1秒            println("emitting $i, ${Thread.currentThread().name}")            emit(i)        }    }    @Test    fun testBackPressure2() = runBlocking<Unit> {        val time = measureTimeMillis {            createFlow8()                .flowOn(Dispatchers.Default)                .collect {                    delay(200) // 生产一个元素须要0.2秒                    println("collect: $it, ${Thread.currentThread().name}")                }        }        println("Done, total $time")    }
emitting 1, DefaultDispatcher-worker-1 @coroutine#2emitting 2, DefaultDispatcher-worker-1 @coroutine#2emitting 3, DefaultDispatcher-worker-1 @coroutine#2collect: 1, main @coroutine#1emitting 4, DefaultDispatcher-worker-1 @coroutine#2collect: 2, main @coroutine#1emitting 5, DefaultDispatcher-worker-1 @coroutine#2collect: 3, main @coroutine#1collect: 4, main @coroutine#1collect: 5, main @coroutine#1Done, total 1186

conflate()能够合并发射项,然而不会对每个值进行解决。

    @Test    fun testBackPressure3() = runBlocking<Unit> {        val time = measureTimeMillis {            createFlow8()                .conflate()                .collect {                    delay(200) // 生产一个元素须要0.2秒                    println("collect: $it, ${Thread.currentThread().name}")                }        }        println("Done, total $time")    }
emitting 1, main @coroutine#2emitting 2, main @coroutine#2emitting 3, main @coroutine#2collect: 1, main @coroutine#1emitting 4, main @coroutine#2collect: 3, main @coroutine#1emitting 5, main @coroutine#2collect: 4, main @coroutine#1collect: 5, main @coroutine#1Done, total 1016

下面的例子中,应用conflate(),collect时跳过了2.
应用collectLatest()只会收集最初一个值,如下:

    @Test    fun testBackPressure4() = runBlocking<Unit> {        val time = measureTimeMillis {            createFlow8()                .collectLatest {                    delay(200) // 生产一个元素须要0.2秒                    println("collect: $it, ${Thread.currentThread().name}")                }        }        println("Done, total $time")    }
emitting 1, main @coroutine#2emitting 2, main @coroutine#2emitting 3, main @coroutine#2emitting 4, main @coroutine#2emitting 5, main @coroutine#2collect: 5, main @coroutine#7Done, total 913Process finished with exit code 0

操作符

过渡流操作符

  • 能够应用操作符转换流,就像应用汇合与序列一样。
  • 过渡操作符利用于上游流,并返回上游流。
  • 这些操作符也是冷操作符,就像流一样。这类操作符自身不是挂起函数。
  • 运行速度很快,返回新的转换流的定义。

      private fun createFlow9() = flow<Int> {      for (i in 1..3) {          delay(100) // 生产一个元素须要0.1秒          println("emitting $i")          emit(i)      }  }  @Test  fun testMap() = runBlocking<Unit> {      createFlow9()          .map { data -> performRequest(data) }          .collect {              println("collect: $it")          }  }
    emitting 1collect: --response 1--emitting 2collect: --response 2--emitting 3collect: --response 3--Process finished with exit code 0

    下面的例子,map操作符,把Int流转成了String流。

      @Test  fun testTransform() = runBlocking<Unit> {      createFlow9()          .transform { data ->              emit("making request $data")              emit(performRequest(data))          }          .collect {              println("collect: $it")          }  }
    emitting 1collect: making request 1collect: --response 1--emitting 2collect: making request 2collect: --response 2--emitting 3collect: making request 3collect: --response 3--Process finished with exit code 0

    下面的例子,transform操作符能够把流通过屡次转换,屡次发射。

    限长操作符

    take操作符

      private fun numbers() = flow<Int> {      try {          emit(1)          emit(2)          println("This line will not execute")          emit(3)      } finally {          println("Finally.")      }  }  @Test  fun testLimitOperator() = runBlocking {      numbers().take(2).collect {          println("collect $it")      }  }
    collect 1collect 2Finally.

    take传入参数2,则只取2个数据。

    末端流操作符

    末端操作符是在流上用于启动流收集的挂起函数。collect是最根底的末端操作符,然而还有一些更不便的末端操作符:

  • 转化为各种汇合,例如toList和toSet.
  • 获取第一个(first)值与确保流发射单个(single)值的操作符。
  • 应用reduce与fold将流规约到单个值。
    例如reduce操作符

      @Test  fun testTerminateOperator() = runBlocking {      val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }      println(sum)  }
    55

    计算数字1-5的平方,而后求和,失去55.

    组合多个流

    就像Kotlin规范库中的Sequence.zip扩大函数一样,流领有一个zip操作符用于组合两个流中的相干值。

      @Test  fun testZip() = runBlocking {      val numbers = (1..3).asFlow()      val strings = flowOf("One", "Two", "Three")      numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println(it) }  }

    这个例子把数字流和字符串流用zip操作符组合起来,成为一个字符流。

1 -> One2 -> Two3 -> ThreeProcess finished with exit code 0
    @Test    fun testZip2() = runBlocking {        val numbers = (1..3).asFlow().onEach { delay(300) }        val strings = flowOf("One", "Two", "Three").onEach { delay(500) }        val start = System.currentTimeMillis()        numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println("$it, ${System.currentTimeMillis() - start}") }    }

如果两个流各自有delay,合并操作会期待那个delay工夫较长的数据。

1 -> One, 5632 -> Two, 10653 -> Three, 1569Process finished with exit code 0

展平流

流示意异步接管的值序列,所以很容易遇到这样状况:每个值都会触发对另一个值序列的申请,然而,因为流具备异步的性质,因而须要不同的展平模式,为此,存在一系列的流展平操作符:

  • flatMapConcat连贯模式
  • flatMapMerge合并模式
  • flatMapLatest最新展平模式

应用flatMapConcat连贯模式

    private fun requestFlow(i: Int) = flow<String> {        emit("$i: First")        delay(500)        emit("$i: Second")    }    @Test    fun testFlatMapConcat() = runBlocking {        val startTime = System.currentTimeMillis()        (1..3).asFlow()            .onEach { delay(100) }            //.map { requestFlow(it) } // 如果用map,则产生一个Flow<Flow<String>>            .flatMapConcat { requestFlow(it) } // 应用flatMapConcat,把flow展平成一维,达到成果            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }    }
1: First at 144 ms1: Second at 649 ms2: First at 754 ms2: Second at 1256 ms3: First at 1361 ms3: Second at 1861 msProcess finished with exit code 0

应用flatMapMerge

    @Test    fun testFlatMapMergeConcat() = runBlocking {        val startTime = System.currentTimeMillis()        (1..3).asFlow()            .onEach { delay(100) }            .flatMapMerge { requestFlow(it) }            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }    }
1: First at 202 ms2: First at 301 ms3: First at 407 ms1: Second at 708 ms2: Second at 805 ms3: Second at 927 msProcess finished with exit code 0

发射1:First后,delay 500ms,这期间发射2:First,发射3:First;别离把这些数据收集到,而后其余的数据累计发射结束并收集。
在来看flatMapLatest操作符

    private fun requestFlow(i: Int) = flow<String> {        emit("$i: First")        delay(500)        emit("$i: Second")    }        @Test    fun testFlatMapLatestConcat() = runBlocking {        val startTime = System.currentTimeMillis()        (1..3).asFlow()            .onEach { delay(200) }            .flatMapLatest { requestFlow(it) }            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }    }
1: First at 313 ms2: First at 581 ms3: First at 786 ms3: Second at 1291 msProcess finished with exit code 0

跳过某些两头值,只收集最新的值。

流的异样解决

当运算符中的发射器或代码抛出异样时,有几种解决异样的办法:

  • try catch块
  • catch函数
    应用代码块捕捉上游异样

      private fun createFlow10() = flow<Int> {      for (i in 1..3) {          println("emitting  $i")          emit(i)      }  }  @Test  fun testException() = runBlocking {      try {          createFlow10().collect {              println("collect: $it")              check(it <= 1) { "wrong value " }          }      } catch (e: Exception) {          println("handle the exception: $e")      }  }
    emitting  1collect: 1emitting  2collect: 2handle the exception: java.lang.IllegalStateException: wrong value Process finished with exit code 0

    应用catch操作符捕捉上游异样

      @Test  fun testException2() = runBlocking {      flow {          emit(1)          1/0          emit(2)      }          .catch { println("$it") }          .flowOn(Dispatchers.IO)          .collect { println("collect: $it") }  }
    java.lang.ArithmeticException: / by zerocollect: 1Process finished with exit code 0

    能够在捕捉异样后补充发射一个数据

      @Test  fun testException2() = runBlocking {      flow {          emit(1)          1/0          emit(2)      }          .catch {              println("$it")              emit(10)          }          .flowOn(Dispatchers.IO)          .collect { println("collect: $it") }  }
    java.lang.ArithmeticException: / by zerocollect: 1collect: 10Process finished with exit code 0

    前面的2当然是收不到的。

    流的实现

    当流收集实现时(一般状况或者异常情况),它可能须要执行一个动作。

  • 命令式finally块
  • onCompletion申明式解决
    finally 块

      private fun createFlow11() = flow<Int> {      emit(1)      1 / 0      emit(2)  }  @Test  fun testComplete() = runBlocking {      try {          createFlow11().collect { println("collect $it") }      } catch (e: Exception) {          println("exception: $e")      } finally {          println("Finally.")      }  }
    collect 1exception: java.lang.ArithmeticException: / by zeroFinally.Process finished with exit code 0
      @Test  fun testComplete2() = runBlocking {      createFlow11()          .onCompletion { println("exception: $it") }          .collect { println("collect $it") }  }
    collect 1exception: java.lang.ArithmeticException: / by zerojava.lang.ArithmeticException: / by zero

    onCompletion外面能接到异样,然而并不能捕捉异样,如果想捕捉还须要catch操作符。