关于kotlin:Kotlin协程Flow浅析

4次阅读

共计 5093 个字符,预计需要花费 13 分钟才能阅读完成。

Kotlin 协程中的 Flow 次要用于解决简单的异步数据,以一种”流“的形式,从上到下顺次解决,和 RxJava 的解决形式类型,然而比后者更加弱小。

Flow 基本概念

Flow 中基本上有三个概念,即 发送方,解决中间层,接管方,能够类比水利发电站中的上游,发电站,上游的概念,数据从上游开始发送”流淌“至两头站被”解决“了一下,又流淌到了上游。

示例代码如下

flow {         // 发送方、上游
    emit(1)    // 挂起函数,发送数据
    emit(2)
    emit(3)
    emit(4)
    emit(5)
}
.filter {it > 2}  // 中转站,解决数据
.map {it * 2}
.take(2)
.collect{           // 接管方,上游
    println(it)
}
输入内容:6
8

通过下面代码咱们能够看到,基于一种链式调用 api 的形式,流式的进行解决数据还是很棒的,接下来具体看一下下面的组成:

  • flow{}, 是个高阶函数,次要用于创立一个新的 Flow。在其 Lambda 函数外部应用了 emit() 挂起函数进行发送数据。
  • filter{}、map{}、take{}, 属于两头解决层,也是两头数据处理的操作符,Flow 最大的劣势,就是它的操作符跟汇合操作符高度一致。只有会用 List、Sequence,那么就能够疾速上手 Flow 的操作符。
  • collect{}, 上游接管方,也成为终止操作符,它的作用其实只有一个:终止 Flow 数据流,并且接管这些数据。

其余创立 Flow 的形式还是 flowOf() 函数,示例代码如下

fun main() = runBlocking{aassssssssaaaaaaaas
    flowOf(1,2,3,4,5).filter {it > 2}
        .map {it * 2}
        .take(2)
        .collect{println("flowof: $it")
    }
}

咱们在看一下 list 汇合的操作示例

listOf(1,2,3,4,5).filter {it > 2}
        .map {it * 2}
        .take(2)
        .forEach{println("listof: $it")
        }

通过以上比照发现,两者的基本操作简直统一,Kotlin 也提供了两者互相转换的 API,Flow.toList()、List.asFlow() 这两个扩大函数,让数据在 List、Flow 之间来回转换,示例代码如下:

//flow 转 list
    flowOf(1,2,3)
        .toList()
        .filter {it > 1}
        .map {it * 2}
        .take(2)
        .forEach{println(it)
        }
    // list 转 flow
    listOf(1,2,3).asFlow()
        .filter {it > 2}
        .map {it * 2}
        .take(2)
        .collect{println(it)
        }

Flow 生命周期

尽管从下面操作看和汇合类型,然而 Flow 还是有些非凡操作符的,毕竟它是协程的一部分,和 Channel 不同,Flow 是有生命周期的,只是以操作符的模式回调而已,比方 onStart、onCompletion 这两个两头操作符。

flowOf(1,2,3,4,5,6)
        .filter {println("filter: $it")
            it > 3
        }
        .map {println("map: $it")
            it * 2
        }
        .take(2)
        .onStart {println("onStart") }
        .collect{println("collect: $it")
        }
输入内容:onStart
filter: 1
filter: 2
filter: 3
filter: 4
map: 4
collect: 8
filter: 5
map: 5
collect: 10

咱们能够看到 onStart,它的作用是注册一个监听事件:当 flow 启动当前,它就会被回调。

和 filter、map、take 这些两头操作符不同,他们的程序会影响数据的处理结果,这也很好了解;onStart 和地位没有关系,它实质上是一个回调,不是一个数据处理的两头站。同样的还有数据处理实现的回调 onCompletion。

flowOf(1,2,3,4,5,6)
        .filter {println("filter: $it")
            it > 3
        }
        .map {println("map: $it")
            it * 2
        }
        .take(2)
        .onStart {println("onStart") }
        .onCompletion {println("onCompletion") }
        .collect{println("collect: $it")
        }

Flow 中 onCompletion{} 在面对以下三种状况时都会进行回调:

  • 1,Flow 失常执行结束
  • 2,Flow 当中出现异常
  • 3,Flow 被勾销。

解决异样

在数据流的处理过程中,很难保障不呈现问题,那么出现异常之后再该怎么解决呢?

  • 对于产生在上游、两头操作这两个阶段的异样,咱们能够间接应用 catch 这个操作符来进行捕捉和进一步解决。
  • 对于产生在上游,应用 try-catch,把 collect{} 当中可能呈现问题的代码包裹起来进行捕捉解决。

上游或者两头异样应用 catch

fun main() = runBlocking{
    val flow = flow {emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }

    flow.map {it * 2}
        .catch {println("catch: $it") }
        .collect{println("collect: $it")
        }
}
输入:collect: 2
collect: 4
catch: java.lang.IllegalStateException

catch 这个操作符的作用是和它的地位强相干的,catch 的作用域,仅限于 catch 的上游。换句话说,产生在 catch 上游的异样,才会被捕捉,产生在 catch 上游的异样,则不会被捕捉。

val flow = flow {emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }

    flow.map {it * 2}
        .catch {println("catch: $it") }
        .filter {it / 0 > 1} // catch 之后产生异样
        .collect{println("collect: $it")
    }
输入内容:
Exception in thread "main" java.lang.ArithmeticException: / by zero

上游应用 try-catch

flowOf(1,2,3)
        .onCompletion {println("onCompletion $it") }
        .collect{
            try {println("collect: $it")
                throw IllegalStateException();}catch (e: Exception){println("catch $e")
            }
        }
输入:collect: 1
catch java.lang.IllegalStateException
collect: 2
catch java.lang.IllegalStateException
collect: 3
catch java.lang.IllegalStateException
onCompletion null

切换执行线程

Flow 适宜解决简单的异步工作,大多数状况下耗时工作放在子线程或线程池中解决,对于 UI 工作放在主线程中进行。

在 Flow 中能够应用 flowOn 操作符实现上述场景中的线程切换。

flowOf(1,2,3,4,5)
        .filter {logX("filter: $it")
            it > 2 }
        .flowOn(Dispatchers.IO) // 切换线程
        .collect{logX("collect: $it")
        }
输入内容:================================
filter: 1
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 2
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 3
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 4
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 5
Thread:DefaultDispatcher-worker-1
================================
================================
collect: 3
Thread:main
================================
================================
collect: 4
Thread:main
================================
================================
collect: 5
Thread:main
================================

flowOn 操作符也是和它的地位强相干的。作用域限于它的上游。在下面的代码中,flowOn 的上游,就是 flowOf{}、filter{} 当中的代码,所以,它们的代码全都运行在 DefaultDispatcher 这个线程池当中。只有 collect{} 当中的代码是运行在 main 线程当中的。

终止操作符

Flow 外面,最常见的终止操作符就是 collect。除此之外,还有一些从汇合中借鉴过去的操作符,也是 Flow 的终止操作符。比方 first()、single()、fold{}、reduce{},实质上来说说当咱们尝试将 Flow 转换成汇合的时候,曾经不属于 Flow 的 API,也不属于协程的领域了,它自身也就意味着 Flow 数据流的终止。

“ 冷的数据流 ” 从何而来

在下面文章《Kotlin 协程 Channel 浅析》中,咱们意识到 Channel 是”热数据流“,随时筹备好,随用随取,就像海底捞里的服务员。

当初咱们看下 Flow 和 Channel 的区别

val flow = flow {(1..4).forEach{println("Flow 发送前:$it")
            emit(it)
            println("Flow 发送后:$it")
        }
    }

    val channel: ReceiveChannel<Int> = produce {(1..4).forEach{println("Channel 发送前:$it")
            send(it)
            println("Channel 发送后:$it")
        }
    }
    
输入内容:Channel 发送前:1

Flow 中的逻辑并未执行,因而咱们能够这样类比,Channel 之所以被认为是“热”的起因,是因为不论有没有接管方,发送方都会工作。那么对应的,Flow 被认为是“冷”的起因,就是因为只有调用终止操作符之后,Flow 才会开始工作。

除此之外,Flow 一次解决一条数据,是个”懒家伙“。

    val flow = flow {(3..6).forEach {println("Flow 发送前:$it")
            emit(it)
            println("Flow 发送后:$it")
        }
    }.filter {println("filter: $it")
        it > 3
    }.map {println("map: $it")
        it * 2
    }.collect {println("后果 collect: $it")
    }
输入内容:Flow 发送前:3
filter: 3
Flow 发送后:3
Flow 发送前:4
filter: 4
map: 4
后果 collect: 8
Flow 发送后:4
Flow 发送前:5
filter: 5
map: 5
后果 collect: 10
Flow 发送后:5
Flow 发送前:6
filter: 6
map: 6
后果 collect: 12
Flow 发送后:6

相比于满面春风,热情服务的 Channel,Flow 更像个冷酷的家伙,你不找他,他不搭理你。

  • Channel, 响应速度快,但数据可能是旧的,占用资源
  • Flow, 响应速度慢,但数据是最新的,节俭资源

Flow 也能够是”热“的,你晓得吗?

正文完
 0