共计 7366 个字符,预计需要花费 19 分钟才能阅读完成。
本文介绍了咱们在开发 2019 Android 开发者峰会 (ADS) 利用时总结整顿的 Flow 最佳实际 (利用源码已开源),咱们将和大家独特探讨利用中的每个层级将如何解决数据流。
ADS 利用的架构恪守 Android 官网的 举荐架构指南,咱们在其中引入了 Domain 层 (用以囊括各种 UseCases 类) 来帮忙拆散焦点,进而放弃代码的精简、复用性、可测试性。
2019 ADS 利用的架构
更多对于利用架构指南的分层设计 (Data 层、Domain 层、UI 层),请参考 示例利用 | Plaid 2.0 重构。
如同许多 Android 利用一样,ADS 利用从网络或缓存懒加载数据。咱们发现,这种场景非常适合 Flow。挂起函数 (suspend functions) 更适宜于一次性操作。为了应用协程,咱们将重构分为两次 commit 提交: 第一次 迁徙了一次性操作,第二次 将其迁徙至数据流。
在本文中,您将看到咱们把利用从 “ 在所有层级应用 LiveData”,重构为 “ 只在 View 和 ViewModel 间应用 LiveData 进行通信,并在利用的底层和 UserCase 层架构中应用协程 ”。
优先应用 Flow 来裸露数据流 (而不是 Channel)
您有两种办法在协程中解决数据流: 一种是 Flow API,另一种是 Channel API。Channels 是一种同步原语,而 Flows 是为数据流模型所设计的: 它是订阅数据流的工厂。不过咱们能够应用 Channels 来反对 Flows,这一点咱们稍后再说。
相较于 Channel,Flow 更灵便,并提供了更明确的束缚和更多操作符。
因为末端操作符 (terminal operator) 会触发数据流的执行,同时会依据生产者一侧流操作来决定是胜利实现操作还是抛出异样,因而 Flows 会主动地敞开数据流,您根本不会在生产者一侧透露资源;而一旦 Channel 没有正确敞开,生产者可能不会清理大型资源,因而 Channels 更容易造成资源透露。
利用数据层负责提供数据,通常是从数据库中读取,或从网络获取数据,例如,示例 是一个数据源接口,它提供了一个用户事件数据流:
interface UserEventDataSource {fun getObservableUserEvent(userId: String): Flow<UserEventResult>
}
如何将 Flow 利用在您的 Android 利用架构中
1. UseCase 层和 Repository 层
介于 View/ViewModel 和数据源之间的层 (在咱们的例子中是 UseCase 和 Repository) 通常须要合并来自多个查问的数据,或在 ViewModel 层应用之前转化数据。就像 Kotlin sequences 一样,Flow 反对大量操作符来转换数据。目前曾经有 大量的可用的操作符,同时您也能够创立您本人的转换器 (比方,应用 transform 操作符)。不过 Flow 在许多的操作符中裸露了 suspend lambda 表达式,因而在大多数状况下没有必要通过自定义转换来实现简单工作,能够间接在 Flow 中调用挂起函数。
在 ADS 利用中,咱们想将 UserEventResult 和 Repository 层中的会话数据进行绑定。咱们利用 map 操作符来将一个 suspend lambda 表达式利用在从数据源接管到的每一个 Flow 的值上:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
class DefaultSessionAndUserEventRepository(
private val userEventDataSource: UserEventDataSource,
private val sessionRepository: SessionRepository
) : SessionAndUserEventRepository {
override fun getObservableUserEvent(
userId: String?,
eventId: SessionId
): Flow<Result<LoadUserSessionUseCaseResult>> {
// 解决 userId
// 监听用户事件,并将其与 Session 数据进行合并
return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult ->
val event = sessionRepository.getSession(eventId)
// 将 Session 和用户数据进行合并,并传递后果
val userSession = UserSession(
event,
userEventResult.userEvent ?: createDefaultUserEvent(event)
)
Result.Success(LoadUserSessionUseCaseResult(userSession))
}
}
}
2. ViewModel
在利用 LiveData 执行 UI ↔ ViewModel 通信时,ViewModel 层应该利用末端操作符来生产来自数据层的数据流 (比方: collect、first 或者是 toList)。
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
// 实在代码的简化版
class SessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {private fun listenForUserSessionChanges(sessionId: SessionId) {
viewModelScope.launch {loadUserSessionUseCase(sessionId).collect {loadResult ->}
}
}
}
残缺代码能够参考 这里.
如果您须要将 Flow 转化为 LiveData,则能够应用 AndroidX lifecycle library 提供的 Flow.asLiveData() 扩大函数 (extension function)。这个扩大函数十分便于应用,因为它共享了 Flow 的底层订阅,同时依据观察者的生命周期治理订阅。此外,LiveData 能够为后续增加的观察者提供最新的数据,其订阅在配置产生变更的时候仍旧可能失效。上面利用一段简略的代码来演示如何应用这个扩大函数:
class SimplifiedSessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {val sessions = loadUserSessionUseCase(sessionId).asLiveData()}
特地阐明: 这段代码不是 ADS 利用的,它只是用来演示如何应用 Flow.asLiveData()。
具体实现时,该在何时应用 BroadcastChannel 或者 Flow
回到数据源的实现,要怎么去实现之前裸露的 getObservableUserEvent 函数?咱们思考了两种实现: flow 结构器,或 BroadcastChannel 接口,这两种实现利用于不同的场景。
1. 什么时候应用 Flow?
Flow 是一种 “ 冷流 ”(Cold Stream)。” 冷流 ” 是一种数据源,该类数据源的生产者会在每个监听者开始生产事件的时候执行,从而在每个订阅上创立新的数据流。一旦消费者进行监听或者生产者的阻塞完结,数据流将会被主动敞开。
Flow 非常适合须要开始 / 进行数据的产生来匹配观察者的场景。
您能够利用 flow 结构器来发送无限个 / 有限个元素。
val oneElementFlow: Flow<Int> = flow {
// 生产者代码开始执行,流被关上
emit(1)
// 生产者代码完结,流将被敞开
}
val unlimitedElementFlow: Flow<Int> = flow {
// 生产者代码开始执行,流被关上
while(true) {
// 执行计算
emit(result)
delay(100)
}
// 生产者代码完结,流将被敞开
}
Flow 通过协程勾销性能提供主动清理性能,因而偏向于执行一些重型工作。请留神,这里提到的勾销是有条件的,一个永不挂起的 Flow 是永不会被勾销的: 在咱们的例子中,因为 delay 是一个挂起函数,用于查看勾销状态,当订阅者进行监听时,Flow 将会进行并清理资源。
2. 什么时候应用 BroadcastChannel
Channel 是一个用于协程间通信的并发原语。BroadcastChannel 基于 Channel,并退出了多播性能。
可能在这样一些场景里,您可能会思考在数据源层中应用 BroadcastChannel:
如果生产者和消费者的生命周期不同或者彼此齐全独立运行时,请应用 BroadcastChannel。
如果您心愿生产者有独立的生命周期,同时向任何存在的监听者发送以后数据的时候,BroadcastChannel API 非常适合这种场景。在这种状况下,当新的监听者开始生产事件时,生产者不须要每次都被执行。
您仍然能够向调用者提供 Flow,它们不须要晓得具体的实现。您能够应用 BroadcastChannel.asFlow() 这个扩大函数来将一个 BroadcastChannel 作为一个 Flow 应用。
不过,敞开这个非凡的 Flow 不会勾销订阅。当应用 BroadcastChannel 的时候,您必须本人治理生命周期。BroadcastChannel 无奈感知到以后是否还存在监听者,除非敞开或勾销 BroadcastChannel,否则将会始终持有资源。请确保在不须要 BroadcastChannel 的时候将其敞开。同时请留神敞开后的 BroadcastChannel 无奈再次被应用,如果须要,您须要从新创立实例。
接下来,咱们将分享如何应用 BroadcastChannel API 的示例。
3. 特地阐明
局部 Flow 和 Channel API 仍处于试验阶段,很可能会产生变动。在一些状况下,您可能会正在应用 Channel,不过在将来可能会建议您应用 Flow。具体来讲,StateFlow 和 Flow 的 share operator 计划可能在将来会缩小 Channel 的应用。
将数据流中基于回调的 API 转化为协程
蕴含 Room 在内的很多库曾经反对将协程用于数据流操作。对于那些还不反对的库,您能够将任何基于回调的 API 转换为协程。
1. Flow 的实现
如果您想将一个基于回调的流 API 转换为应用 Flow,您能够应用 channelFlow 函数 (当然也能够应用 callbackFlow,它们都基于雷同的实现)。channelFlow 将会创立一个 Flow 的实例,该实例中的元素将传递给一个 Channel。这样能够容许咱们在不同的上下文或并发中提供元素。
以下示例中,咱们想要把从回调中拿到的元素发送到 Flow 中:
- 利用 channelFlow 结构器创立一个能够把回调注册到第三方库的流;
- 将从回调接管到的所有数据传递给 Flow;
- 当订阅者进行监听,咱们利用挂 起函数 “awaitClose” 来解除 API 的订阅。
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow<UserEventResult> {// 1) 利用 channelFlow 创立一个 Flow
return channelFlow<UserEventResult> {val eventDocument = firestore.collection(USERS_COLLECTION)
.document(userId)
.collection(EVENTS_COLLECTION)
.document(eventId)
// 1) 将回调注册到 API 上
val subscription = eventDocument.addSnapshotListener { snapshot, _ ->
val userEvent = if (snapshot.exists()) {parseUserEvent(snapshot)
} else {null}
// 2) 将数据发送到 Flow
channel.offer(UserEventResult(userEvent))
}
// 3) 请不要敞开数据流,在消费者敞开或者 API 调用 onCompleted/onError 函数之前,请保障数据流
// 始终处于关上状态。// 当数据流敞开后,请勾销第三方库的订阅。awaitClose {subscription.remove() }
}
}
具体代码能够参考 这里。
2. BroadcastChannel 实现
对于应用 Firestore 跟踪用户身份认证的数据流,咱们应用了 BroadcastChannel API,因为咱们心愿注册一个有独立生命周期的 Authentication 监听者,同时也心愿能向所有正在监听的对象播送以后的后果。
转化回调 API 为 BroadcastChannel 相比转化为 Flow 要略简单一点。您能够创立一个类,并设置将实例化后的 BroadcastChannel 作为变量保留。在初始化期间,注册回调,像以前一样将元素发送到 BroadcastChannel:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource {private val channel = ConflatedBroadcastChannel<Result<AuthenticatedUserInfo>>()
private val listener: ((FirebaseAuth) -> Unit) = { auth ->
// 数据处理逻辑
// 将以后的用户 (数据) 发送给消费者
if (!channel.isClosedForSend) {channel.offer(Success(FirebaseUserInfo(auth.currentUser)))
} else {unregisterListener()
}
}
@Synchronized
override fun getBasicUserInfo(): Flow<Result<AuthenticatedUserInfo>> {if (!isListening) {firebase.addAuthStateListener(listener)
isListening = true
}
return channel.asFlow()}
}
具体代码能够参考 这里
测试小倡议
为了 测试 Flow 转换 (就像咱们在 UseCase 和 Repository 层中所做的那样),您能够利用 flow 结构器返回一个假数据,例如:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
object FakeUserEventDataSource : UserEventDataSource {override fun getObservableUserEvents(userId: String) = flow {emit(UserEventsResult(userEvents))
}
}
class DefaultSessionAndUserEventRepositoryTest {
@Test
fun observableUserEvents_areMappedCorrectly() = runBlockingTest {
// 筹备一个 repo
val userEvents = repository
.getObservableUserEvents("user", true).first()
// 对接管到的用户事件进行断言
}
}
为了胜利实现测试,一个比拟好的做法是应用 take 操作符来从 Flow 中获取一些数据,应用 toList 作为末端操作符来从数组中获取后果。示例如下:
class AnotherStreamDataSourceImplTest {
@Test
fun `Test happy path`() = runBlockingTest {
// 筹备好 subject
val result = subject.flow.take(1).toList()
// 断言后果和预期的统一
}
}
take 操作符非常适合在获取到数据后敞开 Flow。在测试结束后不敞开 Flow 或 BroadcastChannel 将会导致内存透露以及测试后果不统一。
留神: 如果在数据源的实现是通过 BroadcastChannel 实现的,那么下面的代码还不够。您须要本人治理数据源的生命周期,并确保 BroadcastChannel 在测试开始之前曾经启动,同时须要在测试完结后将其敞开,否则将会导致内存透露。你能够 在这里获取更多信息。
协程测试的最佳实际在这里仍然实用。如果您在测试代码中创立新的协程,则可能想要在测试线程中执行它来确保测试取得执行。
您也能够通过视频回顾 2019 Android 开发者峰会演讲 —— 在 Android 上测试协程 获取更多相干信息。
总结
- 因为 Flow 所提供的更加明确的束缚和各种操作符,咱们更倡议向消费者裸露 Flow 而不是 Channel;
- 应用 Flow 时,生产者会在每次有新的监听者时被执行,同时数据流的生命周期将会被主动解决;
- 应用 BroadcastChannel 时,您能够共享生产者,但须要本人治理它的生命周期;
- 请思考将基于回调的 API 转化为协程,以便在您的利用中更好、更习用地集成 API;
- 应用 take 和 toList 操作符能够简化 Flow 的相干代码测试。
2019 ADS 利用在 GitHub 开源,请点击 这里 在 GitHub 上查看更具体的代码实现。