关于kotlin:Kotlin之Flow实战

14次阅读

共计 14999 个字符,预计需要花费 38 分钟才能阅读完成。

Flow 异步流

  • 意识

    • 个性
    • 构建器和上下文
    • 启动
    • 勾销与勾销检测
    • 缓冲
  • 操作符

    • 过渡操作符
    • 末端操作符
    • 组合
    • 展平
  • 异样

    • 异样解决
    • 实现

如何示意多个值?
挂起函数能够异步的返回单个值,然而如何异步返回多个计算好的值呢?

计划

  • 汇合
  • 序列
  • 挂起函数
  • Flow
    用汇合,返回多个值,但不是异步的。

    private fun createList() = listOf<Int>(1, 2, 3)
    
    @Test
    fun test_list() {createList().forEach {println(it) }
    }

    用序列,返回一个整数序列

    private fun createSequence(): Sequence<Int> {
      return sequence {for (i in 1..3) {Thread.sleep(1000) // 伪装在计算,此处是阻塞,不能做其余事件了
              // delay(1000) 这里不能用挂起函数
              yield(i)
          }
      }
    }
    
    @Test
    fun test_sequence() {createSequence().forEach {println(it) }
    }

    看下源码

    public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T> = Sequence {iterator(block) }

    传入的是一个 SequenceScope 的扩大函数。

    @RestrictsSuspension
    @SinceKotlin("1.3")
    public abstract class SequenceScope<in T> internal constructor()

    而 RestrictsSuspension 限度只能应用外面提供的已有的挂起函数,如 yield,yieldAll 等。
    createSequence 返回了多个值,然而也是同步的。

    // 返回多个值,异步
    private suspend fun createList2(): List<Int> {delay(5000)
      return listOf<Int>(1, 2, 3)
    }
    
    @Test
    fun test_list2() = runBlocking<Unit> {createList().forEach {println(it) }
    }

    能够应用 suspend 函数返回多个值,是异步,然而是是一次性返回了多个值,是否像流一样返回多个值并放弃异步呢?Flow 能够解决这个问题。

    private suspend fun createFlow(): Flow<Int> = flow {for (i in 1..3) {delay(1000)
          emit(i) // 发射,产生一个元素
      }
    }
    
    @Test
    fun test_flow() = runBlocking<Unit> {createFlow().collect {println(it) } // collect 是一个末端操作符,前面讲
    }

    每隔 1 秒钟产生一个元素,这里是挂起的。用例子来证实一下:

      private suspend fun createFlow(): Flow<Int> = flow {for (i in 1..3) {delay(1000)
              emit(i)
          }
      }
      @Test
      fun test_flow2() = runBlocking<Unit> {
          launch {for (i in 1..3) {println("I am running and not blocked $i")
                  delay(1500)
              }
          }
          createFlow().collect { println(it) }
      }

    输入

    I am running and not blocked 1
    1
    I am running and not blocked 2
    2
    I am running and not blocked 3
    3
    
    Process finished with exit code 0

    collect 收集后果的过程并没有阻塞另外的协程,打印完 1,而后在 delay 挂起时,去执行其余,并没有阻塞,两个工作来回切换执行。
    Flow 真正地做到了返回多个值,并且是异步的。

Flow 与其余形式的区别

  • 名为 flow 的 Flow 类型的构建器函数
  • flow{…}构建块中的代码能够挂起
  • 函数 createFlow()不再标有 suspend 修饰符, 下面代码中的 suspend 修饰符能够去掉
  • 流应用 emit 函数发射值
  • 流应用 collect 函数收集值

    Flow 利用

    在 android 中,文件下载是 Flow 的一个十分典型的利用。

冷流

Flow 是一种相似于序列的冷流,flow 构建器中的代码直到流被收集的时候才运行。

private fun createFlow2() = flow<Int> {println("Flow started.")
    for (i in 1..3) {delay(1000)
        emit(i)
    }
}
@Test
fun test_flow_cold() = runBlocking<Unit> {val flow = createFlow2()
    println("calling collect...")
    flow.collect {value -> println(value) }
    println("calling collect again...")
    flow.collect {value -> println(value) }
}
calling collect...
Flow started.
1
2
3
calling collect again...
Flow started.
1
2
3

Process finished with exit code 0

能够看到,当调用 collect 办法的时候,流才开始运行,并且能够屡次调用。

流的连续性

  • 流的每次独自收集都是按程序执行的,除非应用非凡操作符。
  • 从上游到上游,每个过渡操作符都会解决每个发射出的值,而后再交给末端操作符。

    @Test
    fun test_flow_continuation() = runBlocking<Unit> {(1..5).asFlow()
          .filter {it % 2 == 0}
          .map {"string $it"}
          .collect {println("collect $it") }
    }
    collect string 2
    collect string 4
    
    Process finished with exit code 0

    改例子通过了如下步骤:生成一个流,过滤出偶数,转成字符串,开始收集

    流的构建器

  • flowOf 构建器定义了一个发射固定值集的流。
  • 应用.asFlow()扩大函数,能够将各种汇合与序列转换为流。

    @Test
    fun test_flow_builder() = runBlocking<Unit> {
      // flowOf 构建器
      flowOf("one", "two", "three")
          .onEach {delay(1000) }
          .collect {value -> println(value) }
      // asFlow 扩大函数
      (1..3).asFlow().collect { value -> println(value) }
    }
    one
    two
    three
    1
    2
    3
    
    Process finished with exit code 0

    流的上下文

  • 流的收集总是在调用协程的上下文中产生,流的该属性成为上下文保留。
  • flow{…}构建器中的代码必须遵循上下文保留属性,并且不容许从其余上下文中发射。
  • flowOn 操作符,该函数用于更改流发射的上下文。

    private fun createFlow3() = flow<Int> {println("Flow started ${Thread.currentThread()}")
      for (i in 1..3) {delay(1000)
          emit(i)
      }
    }
    
    @Test
    fun test_flow_context() = runBlocking<Unit> {createFlow3()
          .collect {println("$it, thread: ${Thread.currentThread()}")
          }
    }
    Flow started Thread[main @coroutine#1,5,main]
    1, thread: Thread[main @coroutine#1,5,main]
    2, thread: Thread[main @coroutine#1,5,main]
    3, thread: Thread[main @coroutine#1,5,main]
    
    Process finished with exit code 0

    不做线程切换,收集和构建都在同一上下文,运行的线程是一样的。
    试着更改一下线程,如下

    private fun createFlow4() = flow {withContext(Dispatchers.IO) { // 用 io 线程
          println("Flow started ${Thread.currentThread().name}")
          for (i in 1..3) {delay(1000)
              emit(i)
          }
      }
    }
    
    @Test
    fun test_flow_on() = runBlocking {createFlow4().collect {println("collect $it, ${Thread.currentThread()}")
      }
    }
    Flow started DefaultDispatcher-worker-1 @coroutine#1
    
    java.lang.IllegalStateException: Flow invariant is violated:
          Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@4600ac86, BlockingEventLoop@1e1d1f06],
          but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@79c0c2ed, Dispatchers.IO].
          Please refer to 'flow' documentation or use 'flowOn' instead

    能够看到,构建流在 IO 线程中执行, 但在收集流的时候报错了, 不容许这样做, 倡议应用 flowOn.
    正确的做法如下:

    private fun createFlow5() = flow {println("Flow started ${Thread.currentThread().name}")
      for (i in 1..3) {delay(1000)
          emit(i)
      }
    }.flowOn(Dispatchers.IO)
    
    @Test
    fun test_flow_on2() = runBlocking {createFlow5().collect {println("collect $it, ${Thread.currentThread().name}")
      }
    }
    Flow started DefaultDispatcher-worker-1 @coroutine#2
    collect 1, main @coroutine#1
    collect 2, main @coroutine#1
    collect 3, main @coroutine#1
    
    Process finished with exit code 0

    流在 IO 线程中构建和发射,在 main 线程中收集。

    启动流

  • 应用 launchIn 替换 collect,能够在独自的协程中启动流的收集。

      private fun events() = (1..3).asFlow()
          .onEach {delay(1000)
              println("$it, ${Thread.currentThread().name}")
          }.flowOn(Dispatchers.Default)
    
    
      @Test
      fun testFlowLaunch() = runBlocking<Unit> {events()
              .onEach {e -> println("Event: $e ${Thread.currentThread().name}") }
              //.collect()
              .launchIn(CoroutineScope(Dispatchers.IO))
              .join()}
    1, DefaultDispatcher-worker-3 @coroutine#3
    Event: 1 DefaultDispatcher-worker-1 @coroutine#2
    2, DefaultDispatcher-worker-1 @coroutine#3
    Event: 2 DefaultDispatcher-worker-1 @coroutine#2
    3, DefaultDispatcher-worker-1 @coroutine#3
    Event: 3 DefaultDispatcher-worker-2 @coroutine#2
    
    Process finished with exit code 0

    onEach 是过渡操作符,并不会触发收集数据,collect 是末端操作符,能力触发收集数据。过渡操作符就像是过滤器,末端操作符就像是水龙头的阀门,不关上阀门水就流不进去,无论两头加了多少个过滤安装。
    如果想指定在哪个协程外面收集数据,就能够用末端操作符 launchIn(), 能够传递一个作用域进去,而作用域又能够指定调度器,launchIn(CoroutineScope(Dispatchers.IO)).

launchIn 返回的是一个 job 对象,能够进行 cancel 等操作。例如

@Test
fun testFlowLaunch2() = runBlocking<Unit> {val job = events()
        .onEach {e -> println("Event: $e ${Thread.currentThread().name}") }
        .launchIn(CoroutineScope(Dispatchers.IO))
    delay(2000)
    job.cancel()}
1, DefaultDispatcher-worker-1 @coroutine#3
Event: 1 DefaultDispatcher-worker-3 @coroutine#2

Process finished with exit code 0

如上,只收集了一个数字,job 就勾销了。
其实 runBlockint 自身就是一个主线程作用域,能够放到 launchIn 中,如下

@Test
fun testFlowLaunch3() = runBlocking<Unit> {val job = events()
        .onEach {e -> println("Event: $e ${Thread.currentThread().name}") }
        .launchIn(this)
}
1, DefaultDispatcher-worker-1 @coroutine#3
Event: 1 main @coroutine#2
2, DefaultDispatcher-worker-1 @coroutine#3
Event: 2 main @coroutine#2
3, DefaultDispatcher-worker-1 @coroutine#3
Event: 3 main @coroutine#2

Process finished with exit code 0

流的勾销

  • 流采纳与协程同样的合作勾销。流的收集能够是当流在一个可勾销的挂起函数(如 delay)中挂起的时候勾销。

      private fun createFlow6() = flow<Int> {for (i in 1..3) {delay(1000)
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testCancelFlow() = runBlocking<Unit> {withTimeoutOrNull(2500) {createFlow6().collect {println("collect: $it")
              }
          }
          println("Done.")
      }
    emitting 1
    collect: 1
    emitting 2
    collect: 2
    Done.
    
    Process finished with exit code 0

    设置 2.5 秒超时,流还没发射 3,就超时了,流被勾销。

    流的勾销检测

  • 不便起见,流构建器对每个发射值执行附加的 ensureActive 检测以进行勾销,这意味着从 flow{…}收回的忙碌循环是能够勾销的。
  • 处于性能起因,大多数其余流操作不会自行执行其余勾销检测,在协程处于忙碌循环的状况下,必须明确检测是否勾销。
  • 通过 cancellable 操作符来执行操作。

      private fun createFlow7() = flow<Int> {for (i in 1..5) {delay(1000)
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testCancelFlowCheck() = runBlocking<Unit> {createFlow7().collect {if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    emitting 1
    collect: 1
    emitting 2
    collect: 2
    emitting 3
    collect: 3
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
    ...
    
    Process finished with exit code 255

    在收集流的时候,遇到 3 就执行勾销操作,抛出 JobCancellationException,3 还是会被收集到。

      @Test
      fun testCancelFlowCheck2() = runBlocking<Unit> {(1..5).asFlow().collect {if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    collect: 1
    collect: 2
    collect: 3
    collect: 4
    collect: 5
    Done.
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

    应用 asFlow()创立流,在收集的时候,尽管遇到 3 进行了勾销,然而还是把所有的元素都打印了当前才抛出异样。如果要在执行的过程中真正的阻断流,须要加上 cancellable()操作,如下:

      @Test
      fun testCancelFlowCheck3() = runBlocking<Unit> {(1..5).asFlow().cancellable().collect {if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    collect: 1
    collect: 2
    collect: 3
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

    背压

  • buffer(), 并发运行流中发射元素的代码。
  • conflate(), 合并发射项,不对每个值进行解决。
  • collectLatest(), 勾销并从新发射最初一个值。
  • 当必须更改 CoroutineDispatcher 时,flowOn 操作符应用了雷同的缓冲机制,然而 buffer 函数显示地申请缓冲而不扭转执行上下文。
    private fun createFlow8() = flow<Int> {for (i in 1..5) {delay(100) // 生产一个元素须要 0.1 秒
            println("emitting $i")
            emit(i)
        }
    }

    @Test
    fun testBackPressure() = runBlocking<Unit> {
        val time = measureTimeMillis {createFlow8()
                .buffer(50) // 并发运行流中发射元素
                .collect {delay(200) // 生产一个元素须要 0.2 秒
                    println("collect: $it")
                }
        }

        println("Done, total $time")
    }
emitting 1
emitting 2
collect: 1
emitting 3
emitting 4
collect: 2
emitting 5
collect: 3
collect: 4
collect: 5
Done, total 1188

应用 buffer 能够让发射元素并发执行,提高效率。
应用 flowOn()切换线程,也能够提高效率。

    private fun createFlow8() = flow<Int> {for (i in 1..5) {delay(100) // 生产一个元素须要 0.1 秒
            println("emitting $i, ${Thread.currentThread().name}")
            emit(i)
        }
    }

    @Test
    fun testBackPressure2() = runBlocking<Unit> {
        val time = measureTimeMillis {createFlow8()
                .flowOn(Dispatchers.Default)
                .collect {delay(200) // 生产一个元素须要 0.2 秒
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, DefaultDispatcher-worker-1 @coroutine#2
emitting 2, DefaultDispatcher-worker-1 @coroutine#2
emitting 3, DefaultDispatcher-worker-1 @coroutine#2
collect: 1, main @coroutine#1
emitting 4, DefaultDispatcher-worker-1 @coroutine#2
collect: 2, main @coroutine#1
emitting 5, DefaultDispatcher-worker-1 @coroutine#2
collect: 3, main @coroutine#1
collect: 4, main @coroutine#1
collect: 5, main @coroutine#1
Done, total 1186

conflate()能够合并发射项,然而不会对每个值进行解决。

    @Test
    fun testBackPressure3() = runBlocking<Unit> {
        val time = measureTimeMillis {createFlow8()
                .conflate()
                .collect {delay(200) // 生产一个元素须要 0.2 秒
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, main @coroutine#2
emitting 2, main @coroutine#2
emitting 3, main @coroutine#2
collect: 1, main @coroutine#1
emitting 4, main @coroutine#2
collect: 3, main @coroutine#1
emitting 5, main @coroutine#2
collect: 4, main @coroutine#1
collect: 5, main @coroutine#1
Done, total 1016

下面的例子中,应用 conflate(),collect 时跳过了 2.
应用 collectLatest()只会收集最初一个值,如下:

    @Test
    fun testBackPressure4() = runBlocking<Unit> {
        val time = measureTimeMillis {createFlow8()
                .collectLatest {delay(200) // 生产一个元素须要 0.2 秒
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, main @coroutine#2
emitting 2, main @coroutine#2
emitting 3, main @coroutine#2
emitting 4, main @coroutine#2
emitting 5, main @coroutine#2
collect: 5, main @coroutine#7
Done, total 913

Process finished with exit code 0

操作符

过渡流操作符

  • 能够应用操作符转换流,就像应用汇合与序列一样。
  • 过渡操作符利用于上游流,并返回上游流。
  • 这些操作符也是冷操作符,就像流一样。这类操作符自身不是挂起函数。
  • 运行速度很快,返回新的转换流的定义。

      private fun createFlow9() = flow<Int> {for (i in 1..3) {delay(100) // 生产一个元素须要 0.1 秒
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testMap() = runBlocking<Unit> {createFlow9()
              .map {data -> performRequest(data) }
              .collect {println("collect: $it")
              }
      }
    emitting 1
    collect: --response 1--
    emitting 2
    collect: --response 2--
    emitting 3
    collect: --response 3--
    
    Process finished with exit code 0

    下面的例子,map 操作符,把 Int 流转成了 String 流。

      @Test
      fun testTransform() = runBlocking<Unit> {createFlow9()
              .transform { data ->
                  emit("making request $data")
                  emit(performRequest(data))
              }
              .collect {println("collect: $it")
              }
      }
    emitting 1
    collect: making request 1
    collect: --response 1--
    emitting 2
    collect: making request 2
    collect: --response 2--
    emitting 3
    collect: making request 3
    collect: --response 3--
    
    Process finished with exit code 0

    下面的例子,transform 操作符能够把流通过屡次转换,屡次发射。

    限长操作符

    take 操作符

      private fun numbers() = flow<Int> {
          try {emit(1)
              emit(2)
              println("This line will not execute")
              emit(3)
          } finally {println("Finally.")
          }
      }
    
      @Test
      fun testLimitOperator() = runBlocking {numbers().take(2).collect {println("collect $it")
          }
      }
    collect 1
    collect 2
    Finally.

    take 传入参数 2,则只取 2 个数据。

    末端流操作符

    末端操作符是在流上用于启动流收集的挂起函数。collect 是最根底的末端操作符,然而还有一些更不便的末端操作符:

  • 转化为各种汇合,例如 toList 和 toSet.
  • 获取第一个 (first) 值与确保流发射单个 (single) 值的操作符。
  • 应用 reduce 与 fold 将流规约到单个值。
    例如 reduce 操作符

      @Test
      fun testTerminateOperator() = runBlocking {val sum = (1..5).asFlow().map { it * it}.reduce {a, b -> a + b}
          println(sum)
      }
    55

    计算数字 1 - 5 的平方,而后求和,失去 55.

    组合多个流

    就像 Kotlin 规范库中的 Sequence.zip 扩大函数一样,流领有一个 zip 操作符用于组合两个流中的相干值。

      @Test
      fun testZip() = runBlocking {val numbers = (1..3).asFlow()
          val strings = flowOf("One", "Two", "Three")
          numbers.zip(strings) {a, b -> "$a -> $b"}.collect {println(it) }
      }

    这个例子把数字流和字符串流用 zip 操作符组合起来,成为一个字符流。

1 -> One
2 -> Two
3 -> Three

Process finished with exit code 0
    @Test
    fun testZip2() = runBlocking {val numbers = (1..3).asFlow().onEach { delay(300) }
        val strings = flowOf("One", "Two", "Three").onEach {delay(500) }
        val start = System.currentTimeMillis()
        numbers.zip(strings) {a, b -> "$a -> $b"}.collect {println("$it, ${System.currentTimeMillis() - start}") }
    }

如果两个流各自有 delay,合并操作会期待那个 delay 工夫较长的数据。

1 -> One, 563
2 -> Two, 1065
3 -> Three, 1569

Process finished with exit code 0

展平流

流示意异步接管的值序列,所以很容易遇到这样状况:每个值都会触发对另一个值序列的申请,然而,因为流具备异步的性质,因而须要不同的展平模式,为此,存在一系列的流展平操作符:

  • flatMapConcat 连贯模式
  • flatMapMerge 合并模式
  • flatMapLatest 最新展平模式

应用 flatMapConcat 连贯模式

    private fun requestFlow(i: Int) = flow<String> {emit("$i: First")
        delay(500)
        emit("$i: Second")
    }

    @Test
    fun testFlatMapConcat() = runBlocking {val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach {delay(100) }
            //.map {requestFlow(it) } // 如果用 map,则产生一个 Flow<Flow<String>>
            .flatMapConcat {requestFlow(it) } // 应用 flatMapConcat,把 flow 展平成一维,达到成果
            .collect {println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 144 ms
1: Second at 649 ms
2: First at 754 ms
2: Second at 1256 ms
3: First at 1361 ms
3: Second at 1861 ms

Process finished with exit code 0

应用 flatMapMerge

    @Test
    fun testFlatMapMergeConcat() = runBlocking {val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach {delay(100) }
            .flatMapMerge {requestFlow(it) }
            .collect {println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 202 ms
2: First at 301 ms
3: First at 407 ms
1: Second at 708 ms
2: Second at 805 ms
3: Second at 927 ms

Process finished with exit code 0

发射 1:First 后,delay 500ms,这期间发射 2:First,发射 3:First;别离把这些数据收集到,而后其余的数据累计发射结束并收集。
在来看 flatMapLatest 操作符

    private fun requestFlow(i: Int) = flow<String> {emit("$i: First")
        delay(500)
        emit("$i: Second")
    }
    
    @Test
    fun testFlatMapLatestConcat() = runBlocking {val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach {delay(200) }
            .flatMapLatest {requestFlow(it) }
            .collect {println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 313 ms
2: First at 581 ms
3: First at 786 ms
3: Second at 1291 ms

Process finished with exit code 0

跳过某些两头值,只收集最新的值。

流的异样解决

当运算符中的发射器或代码抛出异样时,有几种解决异样的办法:

  • try catch 块
  • catch 函数
    应用代码块捕捉上游异样

      private fun createFlow10() = flow<Int> {for (i in 1..3) {println("emitting  $i")
              emit(i)
          }
      }
    
      @Test
      fun testException() = runBlocking {
          try {createFlow10().collect {println("collect: $it")
                  check(it <= 1) {"wrong value"}
              }
          } catch (e: Exception) {println("handle the exception: $e")
          }
      }
    emitting  1
    collect: 1
    emitting  2
    collect: 2
    handle the exception: java.lang.IllegalStateException: wrong value 
    
    Process finished with exit code 0

    应用 catch 操作符捕捉上游异样

      @Test
      fun testException2() = runBlocking {
          flow {emit(1)
              1/0
              emit(2)
          }
              .catch {println("$it") }
              .flowOn(Dispatchers.IO)
              .collect {println("collect: $it") }
      }
    java.lang.ArithmeticException: / by zero
    collect: 1
    
    Process finished with exit code 0

    能够在捕捉异样后补充发射一个数据

      @Test
      fun testException2() = runBlocking {
          flow {emit(1)
              1/0
              emit(2)
          }
              .catch {println("$it")
                  emit(10)
              }
              .flowOn(Dispatchers.IO)
              .collect {println("collect: $it") }
      }
    java.lang.ArithmeticException: / by zero
    collect: 1
    collect: 10
    
    Process finished with exit code 0

    前面的 2 当然是收不到的。

    流的实现

    当流收集实现时(一般状况或者异常情况),它可能须要执行一个动作。

  • 命令式 finally 块
  • onCompletion 申明式解决
    finally 块

      private fun createFlow11() = flow<Int> {emit(1)
          1 / 0
          emit(2)
      }
    
      @Test
      fun testComplete() = runBlocking {
          try {createFlow11().collect {println("collect $it") }
          } catch (e: Exception) {println("exception: $e")
          } finally {println("Finally.")
          }
      }
    
    collect 1
    exception: java.lang.ArithmeticException: / by zero
    Finally.
    
    Process finished with exit code 0
      @Test
      fun testComplete2() = runBlocking {createFlow11()
              .onCompletion {println("exception: $it") }
              .collect {println("collect $it") }
      }
    collect 1
    exception: java.lang.ArithmeticException: / by zero
    
    java.lang.ArithmeticException: / by zero

    onCompletion 外面能接到异样,然而并不能捕捉异样,如果想捕捉还须要 catch 操作符。

正文完
 0