乐趣区

关于android:有小伙伴说看不懂-LiveDataFlowChannel跟我走

你的反对对我意义重大!

🔥 Hi,我是小彭。本文已收录到 GitHub · Android-NoteBook 中。这里有 Android 进阶成长路线笔记 & 博客,有气味相投的敌人,欢送跟着我一起成长。(联系方式 & 入群形式在 GitHub)

背景

  • Kotlin Flow 是基于 Kotlin 协程根底能力搭建的一套数据流框架,从性能复杂性上看是介于 LiveData 和 RxJava 之间的解决方案。Kotlin Flow 领有比 LiveData 更丰盛的能力,但裁剪了 RxJava 大量简单的操作符,做得更加精简。并且在 Kotlin 协程的加持下,Kotlin Flow 目前是 Google 主推的数据流框架。

1. 为什么要应用 Flow?

LiveData、Kotlin Flow 和 RxJava 三者都属于 可察看的数据容器类,观察者模式是它们雷同的根本设计模式,那么绝对于其余两者,Kotlin Flow 的劣势是什么呢?

LiveData 是 androidx 包下的组件,是 Android 生态中一个的简略的生命周期感知型容器。简略即是它的劣势,也是它的局限,当然这些局限性不应该算 LiveData 的毛病,因为 LiveData 的设计初衷就是一个简略的数据容器。对于简略的数据流场景,应用 LiveData 齐全没有问题。

  • LiveData 只能在主线程更新数据: 只能在主线程 setValue,即便 postValue 外部也是切换到主线程执行;
  • LiveData 数据重放问题: 注册新的订阅者,会从新收到 LiveData 存储的数据,这在有些状况下不合乎预期(能够应用自定义的 LiveData 子类 SingleLiveData 或 UnPeekLiveData 解决,此处不开展);
  • LiveData 不防抖: 反复 setValue 雷同的值,订阅者会收到屡次 onChanged() 回调(能够应用 distinctUntilChanged() 解决,此处不开展);
  • LiveData 不反对背压: 在数据生产速度 > 数据生产速度时,LiveData 无奈失常解决。比方在子线程大量 postValue 数据但主线程生产跟不上时,两头就会有一部分数据被疏忽。

RxJava 是第三方组织 ReactiveX 开发的组件,Rx 是一个包含 Java、Go 等语言在内的多语言数据流框架。功能强大是它的劣势,反对大量丰盛的操作符,也反对线程切换和背压。然而 Rx 的学习门槛过高,对开发反而是一种新的累赘,也会带来误用的危险。

Kotlin 是 kotlinx 包下的组件,不是单纯 Android 生态下的产物。那么,Flow 的劣势在哪里呢?

  • Flow 反对协程: Flow 基于协程根底能力,可能以结构化并发的形式生产和生产数据,可能实现线程切换(依附协程的 Dispatcher);
  • Flow 反对背压: Flow 的子类 SharedFlow 反对配置缓存容量,能够应答数据生产速度 > 数据生产速度的状况;
  • Flow 反对数据重放配置: Flow 的子类 SharedFlow 反对配置重放 replay,可能自定义对新订阅者重放数据的配置;
  • Flow 绝对 RxJava 的学习门槛更低: Flow 的性能更精简,学习性价比绝对更高。不过 Flow 是基于协程,在协程会有一些学习老本,但这个应该拆分来看。

当然 Kotlin Flow 也存在一些局限:

  • Flow 不是生命周期感知型组件: Flow 不是 Android 生态下的产物,天然 Flow 是不会关怀组件生命周期。那么咱们如何确保订阅者在监听 Flow 数据流时,不会在谬误的状态更新 View 呢?这个问题在下文 第 6 节 再说。

2. 冷数据流与热数据流

Kotlin Flow 蕴含三个实体:数据生产方 –(可选的)中介者 – 数据应用方。数据生产方负责向数据流发射(emit)数据,而数据应用方从数据流中生产数据。依据生产方产生数据的机会,能够将 Kotlin Flow 分为冷流和热流两种:

  • 一般 Flow(冷流): 冷流是不共享的,也没有缓存机制。冷流只有在订阅者 collect 数据时,才按需执行发射数据流的代码。冷流和订阅者是一对一的关系,多个订阅者间的数据流是互相独立的,一旦订阅者进行监听或者生产代码完结,数据流就主动敞开。
  • SharedFlow / StateFlow(热流): 热流是共享的,有缓存机制的。无论是否有订阅者 collect 数据,都能够生产数据并且缓存起来。热流和订阅者是一对多的关系,多个订阅者能够共享同一个数据流。当一个订阅者进行监听时,数据流不会主动敞开(除非应用 WhileSubscribed 策略,这个在下文再说)。

3. 一般 Flow(冷流)

一般 Flow 是冷流,数据是不共享的,也没有缓存机制。数据源会提早到消费者开始监听时才生产数据(如终端操作 collect{}),并且每次订阅都会创立一个全新的数据流。 一旦消费者进行监听或者生产者代码完结,Flow 会主动敞开。

val coldFlow: Flow<Int> = flow {
    // 生产者代码
    while(true) {
        // 执行计算
        emit(result)
        delay(100)
    }
    // 生产者代码完结,流将被敞开
}.collect{data ->}

冷流 Flow 次要的操作如下:

  • 创立数据流 flow{}: Flow 结构器会创立一个新的数据流。flow{} 是 suspend 函数,须要在协程中执行;
  • 发送数据 emit(): emit() 将一个新的值发送到数据流中;
  • 终端操作 collect{}: 触发数据流生产,能够获取数据流中所有的收回值。Flow 是冷流,数据流会提早到终端操作 collect 才执行,并且每次在 Flow 上反复调用 collect,都会反复执行 flow{} 去触发发送数据动作(源码地位:AbstractFlow)。collect 是 suspend 函数,须要在协程中执行。
  • 异样捕捉 catch{}: catch{} 会捕捉数据流中产生的异样;
  • 协程上下文切换 flowOn(): 更改上流数据操作的协程上下文 CoroutineContext,对上流操作没有影响。如果有多个 flowOn 运算符,每个 flowOn 只会更改以后地位的上游数据流;
  • 状态回调 onStart: 在数据开始发送之前触发,在数据生产线程回调;
  • 状态回调 onCompletion: 在数据发送完结之后触发,在数据生产线程回调;
  • 状态回调 onEmpty: 在数据流为空时触发(在数据发送完结但事实上没有发送任何数据时),在数据生产线程回调。

一般 Flow 的外围代码在 AbstractFlow 中,能够看到每次调用终端操作 collect,collector 代码块都会执行一次,也就是从新执行一次数据生产代码:

AbstractFlow.kt

public abstract class AbstractFlow<T> : Flow<T> {

    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {// 1. 对 flow{} 的包装
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {// 2. 执行 flow{} 代码块
            collectSafely(safeCollector)
        } finally {
            // 3. 开释协程相干的参数
            safeCollector.releaseIntercepted()}
    }

    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {override suspend fun collectSafely(collector: FlowCollector<T>) {collector.block()
    }
}

4. SharedFlow —— 高配版 LiveData

下文要讲的 StateFlow 其实是 SharedFlow 的一个子类,所以咱们先讲 SharedFlow。SharedFlow 和 StateFlow 都属于热流,无论是否有订阅者(collect),都能够生产数据并且缓存。 它们都有一个可变的版本 MutableSharedFlow 和 MutableStateFlow,这与 LiveData 和 MutableLiveData 相似,对外裸露接口时,应该应用不可变的版本。

4.1 SharedFlow 与 MutableSharedFlow 接口

间接对着接口讲不明确,这里先放出这两个接口不便查看:

public interface SharedFlow<out T> : Flow<T> {
    // 缓存的重放数据的快照
    public val replayCache: List<T>
}

public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    
    // 发射数据(留神这是个挂起函数)override suspend fun emit(value: T)

    // 尝试发射数据(如果缓存溢出策略是 SUSPEND,则溢出时不会挂起而是返回 false)public fun tryEmit(value: T): Boolean

    // 沉闷订阅者数量
    public val subscriptionCount: StateFlow<Int>

    // 重置重放缓存,新订阅者只会收到注册后新发射的数据
    public fun resetReplayCache()}

4.2 结构一个 SharedFlow

我会把 SharedFlow 了解为一个高配版的 LiveData,这点首先在构造函数就能够体现进去。SharedFlow 的构造函数容许咱们配置三个参数:

SharedFlow.kt

public fun <T> MutableSharedFlow(
    // 重放数据个数
    replay: Int = 0,
    // 额定缓存容量
    extraBufferCapacity: Int = 0,
    // 缓存溢出策略
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    val bufferCapacity0 = replay + extraBufferCapacity
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}

public enum class BufferOverflow {
    // 挂起
    SUSPEND,
    // 抛弃最早的一个
    DROP_OLDEST,
    // 抛弃最近的一个
    DROP_LATEST
}
参数 形容
reply 重放数据个数,当新订阅者时注册时会重放缓存的 replay 个数据
extraBufferCapacity 额定缓存容量,在 replay 之外的额定容量,SharedFlow 的缓存容量 capacity = replay + extraBufferCapacity(切实想不出额定容量有什么用,晓得能够通知我)
onBufferOverflow 缓存溢出策略,即缓存容量 capacity 满时的解决策略(SUSPEND、DROP_OLDEST、DROP_LAST)

SharedFlow 默认容量 capacity 为 0,重放 replay 为 0,缓存溢出策略是 SUSPEND,发射数据时已注册的订阅者会收到数据,但数据会立即抛弃,而新的订阅者不会收到历史发射过的数据。

为什么咱们能够把 SharedFlow 了解为“高配版”LiveData,拿 SharedFlow 和 LiveData 做个简略的比照就晓得了:

  • 容量问题: LiveData 容量固定为 1 个,而 SharedFlow 容量反对配置 0 个到 多个;
  • 背压问题: LiveData 无奈应答背压问题,而 SharedFlow 有缓存空间能应答背压问题;
  • 重放问题: LiveData 固定重放 1 个数据,而 SharedFlow 反对配置重放 0 个到多个;
  • 线程问题: LiveData 只能在主线程订阅,而 SharedFlow 反对在任意线程(通过协程的 Dispatcher)订阅。

当然 SharedFlow 也并不是完胜,LiveData 可能解决生命周期平安问题,而 SharedFlow 不行(因为 Flow 自身就不是纯 Android 生态下的组件),不合理的应用会存在不必要的操作和资源节约,以及在谬误的状态更新 View 的危险。不过别放心,这个问题能够通过 第 6 节 的 Lifecycle API 来解决。

4.3 一般 Flow 转换为 SharedFlow

后面提到过,冷流是不共享的,也没有缓存机制。应用 Flow.shareIn 或 Flow.stateIn 能够把冷流转换为热流,一来能够将数据共享给多个订阅者,二来能够减少缓冲机制。

Share.kt

public fun <T> Flow<T>.shareIn(
    // 协程作用域范畴
    scope: CoroutineScope,
    // 启动策略
    started: SharingStarted,
    // 控制数据重放的个数
    replay: Int = 0
): SharedFlow<T> {val config = configureSharing(replay)
  val shared = MutableSharedFlow<T>(
      replay = replay,
      extraBufferCapacity = config.extraBufferCapacity,
      onBufferOverflow = config.onBufferOverflow
  )
  @Suppress("UNCHECKED_CAST")
  scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
  return shared.asSharedFlow()}
public companion object {
    // 热启动式:立刻开始,并在 scope 指定的作用域完结时终止
    public val Eagerly: SharingStarted = StartedEagerly()
    // 懒启动式:在注册首个订阅者时开始,并在 scope 指定的作用域完结时终止
    public val Lazily: SharingStarted = StartedLazily()
 
    public fun WhileSubscribed(
        stopTimeoutMillis: Long = 0,
        replayExpirationMillis: Long = Long.MAX_VALUE
    ): SharingStarted =
        StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
}

sharedIn 的参数 scope 和 replay 不须要过多解释,次要介绍下 started: SharingStarted 启动策略,分为三种:

  • Eagerly(热启动式): 立刻启动数据流,并放弃数据流(直到 scope 指定的作用域完结);
  • Lazily(懒启动式): 在首个订阅者注册时启动,并放弃数据流(直到 scope 指定的作用域完结);
  • WhileSubscribed(): 在首个订阅者注册时启动,并放弃数据流直到在最初一个订阅者登记时完结(或直到 scope 指定的作用域完结)。通过 WhildSubscribed() 策略可能在没有订阅者的时候及时进行数据流,防止引起不必要的资源节约,例如始终从数据库、传感器中读取数据。

    whileSubscribed() 还提供了两个配置参数:

    • stopTimeoutMillis 超时工夫(毫秒): 最初一个订阅者登记订阅后,保留数据流的超时工夫,默认值 0 示意立即进行。这个参数可能帮忙防抖,防止订阅者长期短时间登记就马上敞开数据流。例如心愿期待 5 秒后没有订阅者则进行数据流,能够应用 whileSubscribed(5000)。
    • replayExpirationMillis 重放过期工夫(毫秒): 进行数据流后,保留重放数据的超时工夫,默认值 Long.MAX_VALUE 示意永恒保留(replayExpirationMillis 产生在进行数据流后,阐明 replayExpirationMillis 工夫是在 stopTimeoutMillis 之后产生的)。例如心愿心愿期待 5 秒后进行数据流,再期待 5 秒后的数据视为无用的古老数据,能够应用 whileSubscribed(5000, 5000)。

5. StateFlow —— LiveData 的替代品

StateFlow 是 SharedFlow 的子接口,能够了解为一个非凡的 SharedFlow。不过它们的继承关系只是接口上有继承关系,外部的实现类 SharedFlowImplStateFlowImpl 其实是离开的,这里要留个印象就好。

5.1 StateFlow 与 MutableStateFlow 接口

这里先放出这两个接口不便查看:

public interface StateFlow<out T> : SharedFlow<T> {
    // 以后值
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    // 以后值
    public override var value: T

    // 比拟并设置(通过 equals 比照,如果值产生实在变动返回 true)public fun compareAndSet(expect: T, update: T): Boolean
}

5.2 结构一个 StateFlow

StateFlow 的构造函数就简略多了,有且仅有一个必选的参数,代表初始值:

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

5.3 非凡的 SharedFlow

StateFlow 是 SharedFlow 的一种非凡配置,MutableStateFlow(initialValue) 这样一行代码实质上和上面应用 SharedFlow 的形式是完全相同的:

val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
  • 有初始值: StateFlow 初始化时必须传入初始值;
  • 容量为 1: StateFlow 只会保留一个值;
  • 重放为 1: StateFlow 会向新订阅者重放最新的值;
  • 不反对 resetReplayCache() 重置重放缓存: StateFlow 的 resetReplayCache() 办法抛出 UnsupportedOperationException
  • 缓存溢出策略为 DROP_OLDEST: 意味着每次发射的新数据会笼罩旧数据;

总的来说,StateFlow 要求传入初始值,并且仅反对保留一个最新的数据,会向新订阅者会重放一次最新值,也不容许重置重放缓存。说 StateFlow 是 LiveData 的替代品一点不为过。除此之外,StateFlow 还额定反对一些个性:

  • 数据防抖: 意味着仅在更新值并且发生变化才会回调,如果更新值没有变动不会回调 collect,其实就是在发射数据时加了一层拦挡:

StateFlow.kt

public override var value: T
    get() = NULL.unbox(_state.value)
    set(value) {updateState(null, value ?: NULL) }

override fun compareAndSet(expect: T, update: T): Boolean =
    updateState(expect ?: NULL, update ?: NULL)

private fun updateState(expectedState: Any?, newState: Any): Boolean {
    var curSequence = 0
    var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
    synchronized(this) {
        val oldState = _state.value
        if (expectedState != null && oldState != expectedState) return false // CAS support
        if (oldState == newState) return true // 如果新值 equals 旧值则拦挡, 但 CAS 返回 true
        _state.value = newState
        ...
        return true
    }
}
  • CAS 操作: 原子性的比拟与设置操作,只有在旧值与 expect 雷同时返回 ture。

5.4 一般 Flow 转换为 StateFlow

跟 SharedFlow 一样,一般 Flow 也能够转换为 StateFlow:

Share.kt

public fun <T> Flow<T>.stateIn(
    // 共享开始时所在的协程作用域范畴
    scope: CoroutineScope,
    // 共享开始策略
    started: SharingStarted,
    // 初始值
    initialValue: T
): StateFlow<T> {val config = configureSharing(1)
    val state = MutableStateFlow(initialValue)
    scope.launchSharing(config.context, config.upstream, state, started, initialValue)
    return state.asStateFlow()}

6. 平安地察看 Flow 数据流

后面也提到了,Flow 不具备 LiveData 的生命周期感知能力,所以订阅者在监听 Flow 数据流时,会存在生命周期平安的问题。Google 举荐的做法是应用 Lifecycle#repeatOnLifecycle API:

// 从 2.4.0 开始反对 Lifecycle#repeatOnLifecycle API
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.4.1"
  • LifecycleOwner#addRepeatingJob: 在生命周期达到指定状态时,主动创立并启动协程执行代码块,在生命周期低于该状态时,主动勾销协程。因为 addRepeatingJob 不是挂起函数,所以不遵循结构化并发的规定。目前曾经废除,被上面的 repeatOnLifecycle() 代替了(废除 addRepeatingJob 的考量见 设计 repeatOnLifecycle API 背地的故事);
  • Lifecycle#repeatOnLifecycle: repeatOnLifecycle 的作用雷同,区别在于它是一个 suspend 函数,须要在协程中执行;
  • Flow#flowWithLifecycle: Flow#flowWithLifecycle 的作用雷同,外部基于 repeatOnLifecycle API。
class LocationActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)

        lifecycleOwner.addRepeatingJob(Lifecycle.State.STARTED) {locationProvider.locationFlow().collect {// update UI}
        }
    }
}

class LocationActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)

        // repeatOnLifecycle 是 suspends 函数,所以须要在协程中执行
        // 当 lifecycleScope 的生命周期高于 STARTED 状态时,启动一个新的协程并执行代码块
        // 当 lifecycleScope 的生命周期低于 STARTED 状态时,勾销该协程
        lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {
                // 以后生命周期肯定高于 STARTED 状态,能够平安地从数据流中取数据,并更新 View
                locationProvider.locationFlow().collect {// update UI}
            }
        // 结构化并发:生命周期处于 DESTROYED 状态时,切换回调用 repeatOnLifecycle 的协程继续执行
        }
    }
}

class LocationActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)

        locationProvider.locationFlow()
            .flowWithLifecycle(this, Lifecycle.State.STARTED)
            .onEach {// update UI}
            .launchIn(lifecycleScope) 
    }
}

如果不应用 Lifecycle#repeatOnLifecycle API,具体会呈现什么问题呢?

  • Activity.lifecycleScope.launch: 立刻启动协程,并在 Activity 销毁时勾销协程;
  • Fragment.lifecycleScope.launch: 立刻启动协程,并在 Fragment 销毁时勾销协程;
  • Fragment.viewLifecycleOwner.lifecycleScope.launch: 立刻启动协程,并在 Fragment 中视图销毁时勾销协程。

能够看到,这些协程 API 只有在最初组件 / 视图销毁时才会勾销协程,当视图进入后盾时协程并不会被勾销,Flow 会继续生产数据,并且会触发更新视图。

  • LifecycleContinueScope.launchWhenX: 在生命周期达到指定状态时立刻启动协程执行代码块,在生命周期低于该状态时挂起(而不是勾销)协程,在生命周期从新高于指定状态时,主动复原该协程。

能够看到,这些协程 API 在视图来到某个状态时会挂起协程,可能防止更新视图。然而 Flow 会继续生产数据,也会产生一些不必要的操作和资源耗费(CPU 和内存)。 尽管能够在视图进入后盾时手动勾销协程,但很显著增写了模板代码,没有 repeatOnLifecycle API 来得简洁。

class LocationActivity : AppCompatActivity() {

    // 协程控制器
    private var locationUpdatesJob: Job? = null

    override fun onStart() {super.onStart()
        locationUpdatesJob = lifecycleScope.launch {locationProvider.locationFlow().collect {// update UI} 
        }
    }

    override fun onStop() {
       // 在视图进入后盾时勾销协程
        locationUpdatesJob?.cancel()
        super.onStop()}
}

回过头来看,repeatOnLifecycle 是怎么实现生命周期感知的呢?其实很简略,是通过 Lifecycle#addObserver 来监听生命周期变动:

RepeatOnLifecycle.kt

suspendCancellableCoroutine<Unit> { cont ->
    // Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
    // cancels when it falls below that state.
    val startWorkEvent = Lifecycle.Event.upTo(state)
    val cancelWorkEvent = Lifecycle.Event.downFrom(state)
    val mutex = Mutex()
    observer = LifecycleEventObserver { _, event ->
        if (event == startWorkEvent) {
            // Launch the repeating work preserving the calling context
            launchedJob = this@coroutineScope.launch {
                // Mutex makes invocations run serially,
                // coroutineScope ensures all child coroutines finish
                mutex.withLock {
                    coroutineScope {block()
                    }
                }
            }
            return@LifecycleEventObserver
        }
        if (event == cancelWorkEvent) {launchedJob?.cancel()
            launchedJob = null
        }
        if (event == Lifecycle.Event.ON_DESTROY) {cont.resume(Unit)
        }
    }
    this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)
}

7. Channel 通道

在协程的根底能力上应用数据流,除了上文提到到 Flow API,还有一个 Channel API。Channel 是 Kotlin 中实现跨协程数据传输的数据结构,相似于 Java 中的 BlockQueue 阻塞队列。不同之处在于 BlockQueue 会阻塞线程,而 Channel 是挂起线程。Google 的倡议 是优先应用 Flow 而不是 Channel,次要起因是 Flow 会更主动地敞开数据流,而一旦 Channel 没有失常敞开,则容易造成资源透露。此外,Flow 相较于 Channel 提供了更明确的束缚和操作符,更灵便。

Channel 次要的操作如下:

  • 创立 Channel: 通过 Channel<Int>(Channel.UNLIMITED) 创立一个 Channel 对象,或者间接应用 produce<Int>{} 创立一个生产者协程;
  • 敞开 Channel: Channel#close();
  • 发送数据: Channel#send() 往 Channel 中发送一个数据,在 Channel 容量有余时 send() 操作会挂起,Channel 默认容量 capacity 是 1;
  • 接收数据: 通过 Channel#receive() 从 Channel 中取出一个数据,或者间接通过 actor<Int> 创立一个消费者协程,在 Channel 中数据有余时 receive() 操作会挂起。
  • 播送通道 BroadcastChannel(废除,应用 SharedFlow): 一般 Channel 中一个数据只会被一个生产端接管,而 BroadcastChannel 容许多个生产端接管。
public fun <E> Channel(

    // 缓冲区容量,当超出容量时会触发 onBufferOverflow 回绝策略
    capacity: Int = RENDEZVOUS,  

    // 缓冲区溢出策略,默认为挂起,还有 DROP_OLDEST 和 DROP_LATEST
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,

    // 解决元素未能胜利送达解决的状况,如订阅者被勾销或者抛异样
    onUndeliveredElement: ((E) -> Unit)? = null

): Channel<E>

8. 浅尝一下

到这里,LiveData、Flow 和 Channel 咱们都讲了一遍了,理论场景中怎么应用呢,浅尝一下。

  • 事件(Event): 事件是一次无效的,新订阅者不应该收到旧的事件,因而事件数据适宜用 SharedFlow(replay=0);
  • 状态(State): 状态是能够复原的,新订阅者容许收到旧的状态数据,因而状态数据适宜用 StateFlow。

示例代码如下,不相熟 MVI 模式的同学能够移步:Android UI 架构演进:从 MVC 到 MVP、MVVM、MVI

BaseViewModel.kt

interface UiState

interface UiEvent

interface UiEffect

abstract class BaseViewModel<State : UiState, Event : UiEvent, Effect : UiEffect> : ViewModel() {

    // 初始状态
    private val initialState: State by lazy {createInitialState() }

    // 页面须要的状态,对应于 MVI 模式的 ViewState
    private val _uiState = MutableStateFlow<State>(initialState)
    // 对外接口应用不可变版本
    val uiState = _uiState.asStateFlow()

    // 页面状态变更的“副作用”,相似一次性事件,不须要重放的状态变更(例如 Toast)private val _effect = MutableSharedFlow<Effect>()
    // 对外接口应用不可变版本
    val effect = _effect.asSharedFlow()

    // 页面的事件操作,对应于 MVI 模式的 Intent 
    private val _event = MutableSharedFlow<Event>()

    init {
        viewModelScope.launch {
            _event.collect {handleEvent(it)
            }
        }
    }

    // 初始状态
    protected abstract fun createInitialState(): State

    // 事件处理
    protected abstract fun handleEvent(event: Event)

    /**
     * 事件入口
     */
    fun sendEvent(event: Event) {
        viewModelScope.launch {_event.emit(event)
        }
    }

    /**
     * 状态变更
     */
    protected fun setState(newState: State) {_uiState.value = newState}

    /**
     * 副作用
     */
    protected fun setEffect(effect: Effect) {_effect.send(effect)
    }
}

参考资料

  • 协程 Flow 最佳实际 | 基于 Android 开发者峰会利用 —— Android 官网文档
  • 设计 repeatOnLifecycle API 背地的故事 —— Android 官网文档
  • 应用更为平安的形式收集 Android UI 数据流 —— Android 官网文档
  • Flow 操作符 shareIn 和 stateIn 应用须知 —— Android 官网文档
  • 从 LiveData 迁徙到 Kotlin 数据流 —— Android 官网文档
  • 用 Kotlin Flow 解决开发中的痛点 —— 都梁人 著
  • 抽丝剥茧 Kotlin – 协程中绕不过的 Flow —— 九心 著
  • Kotlin flow 实际总结! —— 入魔的冬瓜 著
  • Android—kotlin-Channel 超具体解说 —— hqk 著

你的点赞对我意义重大!微信搜寻公众号 [彭旭锐],心愿大家能够一起探讨技术,找到气味相投的敌人,咱们下次见!

退出移动版