如果您是库作者,您兴许心愿用户在应用 Kotlin 协程与 Flow 时能够更加轻松地调用您基于 Java 或回调的 API。另外,如果您是 API 的使用者,则可能违心将第三方 API 界面适配协程,以使它们对 Kotlin 更敌对。
本文将会介绍如何应用协程和 Flow 简化 API,以及如何应用 suspendCancellableCoroutine 和 callbackFlow API 创立您本人的适配器。针对那些富裕好奇心的读者,本文还会对这些 API 进行分析,以帮您理解它们底层的工作原理。
如果您更喜爱观看视频,能够 点击这里。
查看现有协程适配器
在您为现有 API 编写本人的封装之前,请查看是否曾经存在针对您的用例的适配器或者 扩大办法。上面是一些蕴含常见类型协程适配器的库。
Future 类型
对于 future 类型,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture。这里提到的并不是全副,您能够在线搜寻以确定是否存在实用于您的 future 类型的适配器。
// 期待 CompletionStage 的执行实现而不阻塞线程suspend fun <T> CompletionStage<T>.await(): T // 期待 ListenableFuture 的执行实现而不阻塞线程suspend fun <T> ListenableFuture<T>.await(): T
应用这些函数,您能够解脱回调并挂起协程直到 future 的后果被返回。
Reactive Stream
对于响应式流的库,有针对 RxJava、Java 9 API 与 响应式流库 的集成:
// 将给定的响应式 Publisher 转换为 Flowfun <T : Any> Publisher<T>.asFlow(): Flow<T>
这些函数将响应式流转换为了 Flow。
Android 专用 API
对于 Jetpack 库或 Android 平台 API,您能够参阅 Jetpack KTX 库 列表。现有超过 20 个库领有 KTX 版本,形成了您所相熟的 Java API。其中包含 SharedPreferences、ViewModels、SQLite 以及 Play Core。
回调
回调是实现异步通信时十分常见的做法。事实上,咱们在 后盾线程工作运行指南 中将回调作为 Java 编程语言的默认解决方案。然而,回调也有许多毛病: 这一设计会导致令人费解的回调嵌套。同时,因为没有简略的传播方式,错误处理也更加简单。在 Kotlin 中,您能够简略地应用协程调用回调,但前提是您必须创立您本人的适配器。
创立您本人的适配器
如果没有找到适宜您用例的适配器,更间接的做法是本人编写适配器。对于一次性异步调用,能够应用 suspendCancellableCoroutine API;而对于流数据,能够应用 callbackFlow API。
作为练习,上面的示例将会应用来自 Google Play Services 的 Fused Location Provider API 来获取地位数据。此 API 界面非常简略,然而它应用回调来执行异步操作。当逻辑变得复杂时,这些回调容易使代码变得不可读,而咱们能够应用协程来解脱它们。
如果您心愿摸索其它解决方案,能够通过下面函数所链接的源代码为您带来启发。
一次性异步调用
Fused Location Provider API 提供了 getLastLocation) 办法来取得 最初已知地位。对于协程来说,现实的 API 是一个间接返回确切后果的挂起函数。
留神: 这一 API 返回值为 Task,并且曾经有了对应的 适配器。出于学习的目标,咱们用它作为范例。
咱们能够通过为 FusedLocationProviderClient
创立扩大函数来取得更好的 API:
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
因为这是一个一次性异步操作,咱们应用 suspendCancellableCoroutine
函数: 一个用于从协程库创立挂起函数的底层构建块。
suspendCancellableCoroutine
会执行作为参数传入的代码块,而后在期待持续信号期间挂起协程的执行。当协程 Continuation 对象中的 resume
或 resumeWithException
办法被调用时,协程会被复原执行。无关 Continuation 的更多信息,请参阅: Kotlin Vocabulary | 揭秘协程中的 suspend 修饰符。
咱们应用能够增加到 getLastLocation 办法中的回调来在适合的机会复原协程。参见上面的实现:
// FusedLocationProviderClient 的扩大函数,返回最初已知地位suspend fun FusedLocationProviderClient.awaitLastLocation(): Location = // 创立新的可取消协程 suspendCancellableCoroutine<Location> { continuation -> // 增加复原协程执行的监听器 lastLocation.addOnSuccessListener { location -> // 复原协程并返回地位 continuation.resume(location) }.addOnFailureListener { e -> // 通过抛出异样来复原协程 continuation.resumeWithException(e) } // suspendCancellableCoroutine 块的结尾。这里会挂起协程 //直到某个回调调用了 continuation 参数 }
留神: 只管协程库中同样蕴含了不可勾销版本的协程构建器 (即 suspendCoroutine
),但最好始终抉择应用 suspendCancellableCoroutine
解决协程作用域的勾销及从底层 API 流传勾销事件。
suspendCancellableCoroutine 原理
在外部,suspendCancellableCoroutine 应用 suspendCoroutineUninterceptedOrReturn 在挂起函数的协程中取得 Continuation。这一 Continuation
对象会被一个 CancellableContinuation 对象拦挡,后者会从此时开始管制协程的生命周期 (其 实现 具备 Job 的性能,然而有一些限度)。
接下来,传递给 suspendCancellableCoroutine
的 lambda 表达式会被执行。如果该 lambda 返回了后果,则协程将立刻复原;否则协程将会在 CancellableContinuation 被 lambda 手动复原前放弃挂起状态。
您能够通过我在上面代码片段 (原版实现) 中的正文来理解产生了什么:
public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit): T = // 获取运行此挂起函数的协程的 Continuation 对象 suspendCoroutineUninterceptedOrReturn { uCont -> // 接管协程。Continuation 曾经被拦挡, // 接下来将会遵循 CancellableContinuationImpl 的生命周期 val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...) /* ... */ // 应用可勾销 Continuation 调用代码块 block(cancellable) // 挂起协程并且期待 Continuation 在 “block” 中被复原,或者在 “block” 完结执行时返回后果 cancellable.getResult() }
想理解更多无关挂起函数的工作原理,请参阅这篇: Kotlin Vocabulary | 揭秘协程中的 suspend 修饰符。
流数据
如果咱们转而心愿用户的设施在实在的环境中挪动时,周期性地接管地位更新 (应用 requestLocationUpdates) 函数),咱们就须要应用 Flow 来创立数据流。现实的 API 看起来应该像上面这样:
fun FusedLocationProviderClient.locationFlow(): Flow<Location>
为了将基于回调的 API 转换为 Flow,能够应用 callbackFlow 流构建器来创立新的 flow。callbackFlow
的 lambda 表达式的外部处于一个协程的上下文中,这意味着它能够调用挂起函数。不同于 flow 流构建器,channelFlow 能够在不同的 CoroutineContext 或协程之外应用 offer 办法发送数据。
通常状况下,应用 callbackFlow 构建流适配器遵循以下三个步骤:
- 创立应用 offer 向 flow 增加元素的回调;
- 注册回调;
- 期待消费者勾销协程,并登记回调。
将上述步骤利用于以后用例,咱们失去以下实现:
// 发送地位更新给消费者fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> { // 创立了新的 Flow。这段代码会在协程中执行。 // 1. 创立回调并向 flow 中增加元素 val callback = object : LocationCallback() { override fun onLocationResult(result: LocationResult?) { result ?: return // 疏忽为空的后果 for (location in result.locations) { try { offer(location) // 将地位发送到 flow } catch (t: Throwable) { // 地位无奈发送到 flow } } } } // 2. 注册回调并通过调用 requestLocationUpdates 获取地位更新。 requestLocationUpdates( createLocationRequest(), callback, Looper.getMainLooper() ).addOnFailureListener { e -> close(e) // 在出错时敞开 flow } // 3. 期待消费者勾销协程并登记回调。这一过程会挂起协程,直到 Flow 被敞开。 awaitClose { // 在这里清理代码 removeLocationUpdates(callback) }}
callbackFlow 外部原理
在外部,callbackFlow 应用了一个 channel。channel 在概念上很靠近阻塞 队列) —— 它在配置时须要指定容量 (capacity): 即能够缓冲的元素个数。在 callbackFlow 中创立的 channel 默认容量是 64 个元素。如果将新元素增加到已满的 channel,因为 offer 不会将元素增加到 channel 中,并且会立刻返回 false,所以 send 会暂停生产者,直到频道 channel 中有新元素的可用空间为止。
awaitClose 外部原理
乏味的是,awaitClose
外部应用的是 suspendCancellableCoroutine
。您能够通过我在以下代码片段中的正文 (查看 原始实现) 一窥到底:
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { ... try { // 应用可勾销 continuation 挂起协程 suspendCancellableCoroutine<Unit> { cont -> // 仅在 Flow 或 Channel 敞开时胜利复原协程,否则放弃挂起 invokeOnClose { cont.resume(Unit) } } } finally { // 总是会执行调用者的清理代码 block() }}
复用 Flow
除非额定应用两头操作符 (如: conflate
),否则 Flow 是冷且惰性的。这意味着每次调用 flow 的终端操作符时,都会执行构建块。对于咱们的用例来说,因为增加一个新的地位监听器开销很小,所以这一个性不会有什么大问题。然而对于另外的一些实现可就不肯定了。
您能够应用 shareIn
两头操作符在多个收集器间复用同一个 flow,并使冷流成为热流。
val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> { ...}.shareIn( // 让 flow 追随 applicationScope applicationScope, // 向新的收集器发送上一次发送的元素 replay = 1, // 在有沉闷的订阅者时,放弃生产者的沉闷状态 started = SharingStarted.WhileSubscribed())
您能够通过文章《协程中的勾销和异样 | 驻留工作详解》来理解更多无关在利用中应用 applicationScope
的最佳实际。
您该当思考通过创立协程适配器使您的 API 或现存 API 简洁、易读且合乎 Kotlin 的应用习惯。首先查看是否曾经存在可用的适配器,如果没有,您能够应用 suspendCancellableCoroutine
针对一次性调用;或应用 callbackFlow
针对流数据,来创立您本人的适配器。
您能够通过 codelab: 创立 Kotlin 扩大库,来上手本文所介绍的话题。