关于kotlin:Kotlin协程解析系列上协程调度与挂起

58次阅读

共计 55970 个字符,预计需要花费 140 分钟才能阅读完成。

vivo 互联网客户端团队 - Ruan Wen

本文是 Kotlin 协程解析系列文章的开篇,次要介绍 Kotlin 协程的创立、协程调度与协程挂起相干的内容

一、协程引入

Kotlin 中引入 Coroutine(协程) 的概念,能够帮忙编写异步代码。

在应用和剖析协程前,首先要理解一下:

协程是什么?

为什么须要协程?

协程最为人称道的就是能够用看起来同步的形式写出异步的代码,极大进步了代码的可读性。在理论开发中最常见的异步操作莫过于网络申请。通常咱们须要通过各种回调的形式去解决网络申请,很容易就陷入到天堂回调中。

WalletHttp.target(VCoinTradeSubmitResult.class).setTag(tag)
                .setFullUrl(Constants.VCOIN_TRADE_SUBMIT_URL).setParams(params)
                .callback(new HttpCallback<VCoinTradeSubmitResult>() {    
                    @Override
                    public void onSuccess(VCoinTradeSubmitResult vCoinTradeSubmitResult) {super.onSuccess(vCoinTradeSubmitResult);
                        if (mView == null) {return;}
                        //......
                    }
                }).post();

上述示例是一个我的项目开发中常见的一个网络申请操作,通过接口回调的形式去获取网络申请后果。理论开发中也会常常遇到间断多个接口申请的状况,例如咱们我的项目中的集体核心页的逻辑就是先去异步获取。

本地缓存,获取失败的话就须要异步刷新一下账号 token,而后网络申请相干集体核心的其余信息。这里简略举一个领取示例,进行领取时,可能要先去获取账号 token,而后依赖该 token 再去做领取。

申请操作,依据领取返回数据再去查问领取后果,这种状况通过回调就可能演变为“天堂回调”。

// 获取账号 token
WalletHttp.target(Account.class).setTag(tag)
        .setFullUrl(Constants.ACCOUNT_URL).setParams(params)
        .callback(new HttpCallback<Account>() {
            @Override
            public void onSuccess(Account account) {super.onSuccess(account);
                // 依据账号 token 进行领取操作
                WalletHttp.target(Pay.class).setFullUrl(Constants.PAY_URL).addToken(account.getToken()).callback(new HttpCallback<Pay>() {
                    @Override
                    public void onSuccess(Pay pay){super.onSuccess(pay);
                        // 依据领取操作返回查问领取后果
                        WalletHttp.target(PayResult.class).setFullUrl(Constants.RESULT_URL).addResultCode(pay.getResultCode()).callback(new HttpCallback<PayResult>() {
                            @Override
                            public void onSuccess(PayResult result){super.onSuccess(result);
                                //......
                            }
                        }).post();}
                }).post();}
        }).post();

对于这种场景,kotlin 协程“同步形式写出异步代码”的这个个性就能够很好的解决上述问题。若上述场景用 kotlin 协程代码实现呢,可能就为:

fun postItem(tag: String, params: Map<String, Any?>) = viewModelScope.launch {
    // 获取账号信息
    val account = repository.queryAccount(tag, params)
    // 进行领取操作
    val pay = repository.paySubmit(tag,account.token)
    // 查问领取后果
    val result = repository.queryPayResult(tag,pay.resultCode)
    //......
}

能够看出,协程代码十分简洁,以程序的形式书写异步代码,代码可读性极强。

如果想要将原先的网络回调申请也改写成这种同步模式呢,只须要对原先申请回调用协程提供的 suspendCancellableCoroutine 等办法进行封装解决,即可让晚期的异步代码也享受上述“同步代码”的丝滑。

协程:

一种非抢占式或者合作式的计算机程序并发调度实现,程序能够被动挂起或者复原执行,其外围点是函数或一段程序可能被挂起,稍后再在挂起的地位复原,通过被动让出运行权来实现合作,程序本人解决挂起和复原来实现程序执行流程的合作调度。

协程实质上是轻量级线程。

协程的 特点 有:

  • 协程能够让异步代码同步化,其本质是轻量级线程。
  • 可在单个线程运行多个协程,其反对挂起,不会使运行协程的线程阻塞。
  • 能够升高异步程序的设计复杂度。

Kotlin 协程实现档次:

基础设施层:规范库的协程 API,次要对协程提供了概念和语义上最根本的反对;

业务框架层:协程的下层框架反对,基于规范库实现的封装,也是咱们日常开发应用的协程扩大库。

二、协程启动

具体在应用协程前,首先要配置对 Kotlin 协程的依赖。

(1)我的项目根目录 build.gradle

buildscript {
    ...
    ext.kotlin_coroutines = 'xxx'
    ...
}

(2)Module 下 build.gradle

dependencies {
    ...
    // 协程规范库
    implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_coroutines"
    // 依赖协程外围库,蕴含协程公共 API 局部
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines"
    // 依赖 android 反对库,协程 Android 平台的具体实现形式
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"
   ...
}

2.1 Thread 启动

在 Java 中,能够通过 Thread 开启并发操作:

new Thread(new Runnable() {
    @Override
    public void run() {//... do what you want}
}).start();

在 Kotlin 中,应用线程更为便捷:

val myThread = thread {//.......}

这个 Thread 办法有个参数 start 默认为 true,即发明进去的线程默认启动,你能够自定义启动机会:

val myThread = thread(start = false) {//......}
 
myThread.start()

2.2 协程启动

动协程须要三局部:上下文、启动模式、协程体

启动形式个别有三种,其中最简略的启动协程的形式为:

GlobalScope.launch {//......}

GlobalScope.launch()属于协程构建器 Coroutine builders,Kotlin 中还有其余几种 Builders,负责创立协程:

runBlocking:T

应用 runBlocking 顶层函数创立,会创立一个新的协程同时阻塞以后线程,直到协程完结。实用于 main 函数和单元测试

launch

创立一个新的协程,不会阻塞以后线程,必须在协程作用域中才能够调用。它返回的是一个该协程工作的援用,即 Job 对象。这是最罕用的启动协程的形式。

async

创立一个新的协程,不会阻塞以后线程,必须在协程作用域中才能够调用,并返回 Deffer 对象。可通过调用 Deffer.await()办法期待该子协程执行实现并获取后果。罕用于并发执行 - 同步期待和获取返回值的状况。

2.2.1 runBlocking

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

runBlocking 是一个顶层函数,能够在任意中央独立应用。它能创立一个新的协程同时阻塞以后线程,直到其外部所有逻辑以及子协程所有逻辑全副执行实现。罕用于 main 函数和测试中。

//main 函数中利用
fun main() = runBlocking {
    launch { // 创立一个新协程,runBlocking 会阻塞线程,但外部运行的协程是非阻塞的
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    delay(2000L)      // 延时 2s,保障 JVM 存活
}
 
// 测试中利用
class MyTest {
    @Test
    fun testMySuspendingFunction() = runBlocking {// ......}
}

2.2.2 launch

launch 是最罕用的用于启动协程的形式,会在不阻塞以后线程的状况下启动一个协程,并返回对该协程工作的援用,即 Job 对象。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit): Job

协程须要运行在协程上下文环境中,在非协程环境中的 launch 有两种:GlobalScope 与 CoroutineScope

  • GlobalScope.launch()

在利用范畴内启动一个新协程,不会阻塞调用线程,协程的生命周期与应用程序统一。

fun launchTest() {print("start")
    GlobalScope.launch {delay(1000)// 1 秒无阻塞提早
        print("GlobalScope.launch")
    }
    print("end")
}
 
/** 打印后果
start
end
GlobalScope.launch
*/

这种启动的协程存在组件被销毁但协程还存在的状况,个别不举荐。其中 GlobalScope 自身就是一个作用域,launch 属于其子作用域。

  • CoroutineScope.launch()

启动一个新的协程而不阻塞以后线程,并返回对协程的援用作为一个 Job。

fun launchTest2() {print("start")
    val job = CoroutineScope(Dispatchers.IO).launch {delay(1000)
        print("CoroutineScope.launch")
    }
    print("end")
}

协程上下文管制协程生命周期和线程调度,使得协程和该组件生命周期绑定,组件销毁时,协程一并销毁,从而实现安全可靠地协程调用。这是在利用中最举荐的协程应用形式。

对于 launch,依据业务需要须要创立一个或多个协程,则可能就须要在一个协程中启动子协程。

fun launchTest3() {print("start")
    GlobalScope.launch {delay(1000)
        print("CoroutineScope.launch")
        // 在协程内创立子协程
        launch {delay(1500)
            print("launch 子协程")
        }
    }
    print("end")
}
 
/**** 打印后果
start
end
CoroutineScope.launch
launch 子协程
*/

2.2.3 async

async 相似于 launch,都是创立一个不会阻塞以后线程的新的协程。区别在于:async 的返回是 Deferred 对象,可通过 Deffer.await()期待协程执行实现并获取后果,而 launch 不行。罕用于并发执行 - 同步期待和获取返回值的状况。

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T): Deferred<T>

留神:

  1. await() 不能在协程之外调用,因为它须要挂起直到计算实现,而且只有协程能够以非阻塞的形式挂起。所以把它放到协程中。
  2. 如果 Deferred 不执行 await()则 async 外部抛出的异样不会被 logCat 或 try Catch 捕捉,然而仍然会导致作用域勾销和异样解体; 但当执行 await 时异样信息会从新抛出
  3. 如果将 async 函数中的启动模式设置为 CoroutineStart.LAZY 懒加载模式时则只有调用 Deferred 对象的 await 时 (或者执行 async.satrt()) 才会开始执行异步工作。

三、协程补充常识

在叙述协程启动内容,波及到了 Job、Deferred、启动模式、作用域等概念,这里补充介绍一下上述概念。

3.1 Job

Job 是协程的句柄,赋予协程可勾销,赋予协程以生命周期,赋予协程以结构化并发的能力。

Job 是 launch 构建协程返回的一个协程工作,实现时是没有返回值的。能够把 Job 看成协程对象自身,封装了协程中须要执行的代码逻辑,协程的操作方法都在 Job 身上。Job 具备生命周期并且能够勾销,它也是上下文元素,继承自 CoroutineContext。

在日常 Android 开发过程中,协程配合 Lifecycle 能够做到主动勾销。

Job 生命周期

Job 的生命周期分为 6 种状态,分为

  1. New
  2. Active
  3. Completing
  4. Cancelling
  5. Cancelled
  6. Completed

通常外界会持有 Job 接口作为援用被协程调用者所持有。Job 接口提供 isActive、isCompleted、isCancelled 3 个变量使外界能够感知 Job 外部的状态。

val job = launch(start = CoroutineStart.LAZY) {println("Active")
}
println("New")
job.join()
println("Completed")
 
/** 打印后果 **/
New
Active
Completed
 
/**********
* 1. 以 lazy 形式创立进去的协程 state 为 New
* 2. 对应的 job 调用 join 函数后,协程进入 Active 状态,并开始执行协程对应的具体代码
* 3. 当协程执行结束后,因为没有须要期待的子协程,协程间接进入 Completed 状态
*/

对于 Job,罕用的办法有:

// 沉闷的,是否仍在执行
public val isActive: Boolean
 
// 启动协程,如果启动了协程,则为 true; 如果协程曾经启动或实现,则为 false
public fun start(): Boolean
 
// 勾销 Job,可通过传入 Exception 阐明具体起因
public fun cancel(cause: CancellationException? = null)
 
// 挂起协程直到此 Job 实现
public suspend fun join()
 
// 勾销工作并期待工作实现,联合了 [cancel] 和[join]的调用
public suspend fun Job.cancelAndJoin()
 
// 给 Job 设置一个实现告诉,当 Job 执行实现的时候会同步执行这个函数
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

Job 父子层级

对于 Job,还须要分外关注的是 Job 的父子层级关系。

  • 一个 Job 能够蕴含多个子 Job。
  • 当父 Job 被勾销后,所有的子 Job 也会被主动勾销。
  • 当子 Job 被勾销或者出现异常后父 Job 也会被勾销。
  • 具备多个子 Job 的父 Job 会期待所有子 Job 实现 (或者勾销) 后,本人才会执行实现。

3.2 Deferred

Deferred 继承自 Job,具备与 Job 雷同的状态机制。

它是 async 构建协程返回的一个协程工作,可通过调用 await()办法期待协程执行实现并获取后果。其中 Job 没有后果值,Deffer 有后果值。

public interface Deferred<out T> : Job

3.3 作用域

协程作用域(CoroutineScope):协程定义的作用范畴,实质是一个接口。

确保所有的协程都会被追踪,Kotlin 不容许在没有应用 CoroutineScope 的状况下启动新的协程。CoroutineScope 可被看作是一个具备超能力的 ExecutorService 的轻量级版本。它能启动新的协程,同时这个协程还具备 suspend 和 resume 的劣势。

每个协程生成器 launch、async 等都是 CoroutineScope 的扩大,并继承了它的 coroutineContext,主动流传其所有元素和勾销。

启动协程须要作用域,然而作用域又是在协程创立过程中产生的。

public interface CoroutineScope {
    /**
     * 此域的上下文。Context 被作用域封装,用于在作用域上扩大的协程构建器的实现。*/
    public val coroutineContext: CoroutineContext
}

官网提供的罕用作用域:

  • runBlocking:

顶层函数,可启动协程,但会阻塞以后线程

  • GlobalScope

全局协程作用域。通过 GlobalScope 创立的协程不会有父协程,能够把它称为根协程。它启动的协程的生命周期只受整个应用程序的生命周期的限度,且不能取消,在运行时会耗费一些内存资源,这可能会导致内存泄露,不适用于业务开发。

  • coroutineScope

创立一个独立的协程作用域,直到所有启动的协程都实现后才完结本身。

它是一个挂起函数,须要运行在协程内或挂起函数内。当这个作用域中的任何一个子协程失败时,这个作用域失败,所有其余的子协程都被勾销。

  • supervisorScope

与 coroutineScope 相似,不同的是子协程的异样不会影响父协程,也不会影响其余子协程。(作用域自身的失败 (在 block 或勾销中抛出异样) 会导致作用域及其所有子协程失败,但不会勾销父协程。)

  • MainScope

为 UI 组件创立主作用域。一个顶层函数,上下文是 SupervisorJob() + Dispatchers.Main,阐明它是一个在主线程执行的协程作用域,通过 cancel 对协程进行勾销。

fun scopeTest() {
    GlobalScope.launch {// 父协程
        launch {// 子协程
            print("GlobalScope 的子协程")
        }
        launch {// 第二个子协程
            print("GlobalScope 的第二个子协程")
        }
    }
 
    val mainScope = MainScope()
    mainScope.launch {// 启动协程
        //todo
    }
}

Jetpack 的 Lifecycle 相干组件提供了曾经绑定 UV 申明周期的作用域供咱们间接应用:

  • lifecycleScope:

Lifecycle Ktx 库提供的具备生命周期感知的协程作用域,与 Lifecycle 绑定生命周期,生命周期被销毁时,此作用域将被勾销。会与以后的 UI 组件绑定生命周期,界面销毁时该协程作用域将被勾销,不会造成协程透露,举荐应用。

  • viewModelScope:

与 lifecycleScope 相似,与 ViewModel 绑定生命周期,当 ViewModel 被革除时,这个作用域将被勾销。举荐应用。

3.4 启动模式

前述进行协程创立启动时波及到了启动模式 CoroutineStart,其是一个枚举类,为协程构建器定义启动选项。在协程构建的 start 参数中应用。

DEFAULT 模式

DEFAULT 是饿汉式启动,launch 调用后,会立刻进入待调度状态,一旦调度器 OK 就能够开始执行。

suspend fun main() {log(1)
    val job = GlobalScope.launch{log(2)
    }
    log(3)
    Thread.sleep(5000)  // 避免程序退出
}
fun log(o: Any?) {println("[${Thread.currentThread().name}]:$o")
}

前述示例代码采纳默认的启动模式和默认的调度器,, 运行后果取决于以后线程与后盾线程的调度程序。

/** 可能的运行后果一 ****/
[main]:1
[main]:3
[main]:2
 
/** 可能的运行后果二 ****/
[main]:1
[main]:2
[main]:3

LAZY 模式

LAZY 是懒汉式启动,launch 后并不会有任何调度行为,协程体不会进入执行状态,直到咱们须要他的运行后果时进行执行,其 launch 调用后会返回一个 Job 实例。

对于这种状况,能够:

  1. 调用 Job.start,被动触发协程的调度执行
  2. 调用 Job.join,隐式的触发协程的调度执行
suspend fun main() {log(1)
    val job = GlobalScope.launch(start = CoroutineStart.LAZY){log(2)
    }
    log(3)
    job.join()
    log(4)
}
fun log(o: Any?) {println("[${Thread.currentThread().name}]:$o")
}

对于 join,肯定要期待协程执行结束,所以其运行后果肯定为:

[main]:1
[main]:3
[DefaultDispatcher-worker-1]:2
[main]:4

如果把 join()换为 start(),则输入后果不肯定。

ATOMIC 模式

ATOMIC 只有波及 cancel 的时候才有意义。调用 cancel 的机会不同,后果也有差别。

suspend fun main() {log(1)
    val job = GlobalScope.launch(start = CoroutineStart.ATOMIC){log(2)
    }
    job.cancel()
    log(3)
    Thread.sleep(2000)
}
fun log(o: Any?) {println("[${Thread.currentThread().name}]:$o")
}

前述代码示例创立协程后立刻 cancel,因为是 ATOMIC 模式,因而协程肯定会被调度,则 log 1、2、3 肯定都会被打印输出。如果将模式改为 DEFAULT 模式,则 log 2 有可能打印输出,也可能不会。

其实 cancel 调用肯定会将该 job 的状态置为 cancelling,只不过 ATOMIC 模式的协程在启动时忽视了这一状态。

suspend fun main() {log(1)
    val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {log(2)
        delay(1000)
        log(3)
    }
    job.cancel()
    log(4)
    job.join()
    Thread.sleep(2000)
}
fun log(o: Any?) {println("[${Thread.currentThread().name}]:$o")
}
 
/** 打印输出后果可能如下 ****/
[main]:1
[DefaultDispatcher-worker-1]:2
[main]:4

前述代码中,2 和 3 中加了一个 delay,delay 会使得协程体的执行被挂起,1000ms 之后再次调度前面的局部。对于 ATOMIC 模式,它肯定会被启动,实际上在遇到第一个挂终点之前,它的执行是不会进行的,而 delay 是一个 suspend 函数,这时咱们的协程迎来了本人的第一个挂终点,恰好 delay 是反对 cancel 的,因而前面的 3 将不会被打印。

UNDISPATCHED 模式

协程在这种模式下会间接开始在以后线程下执行,直到第一个挂终点。

与 ATOMIC 的不同之处在于 UNDISPATCHED 不通过任何调度器即开始执行协程体。遇到挂终点之后的执行就取决于挂终点自身的逻辑以及上下文当中的调度器了。

suspend fun main() {log(1)
    val job = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {log(2)
        delay(100)
        log(3)
    }
    log(4)
    job.join()
    log(5)
    Thread.sleep(2000)
}
fun log(o: Any?) {println("[${Thread.currentThread().name}]:$o")
}

协程启动后会立刻在以后线程执行,因而 1、2 会间断在同一线程中执行,delay 是挂终点,因而 3 会等 100ms 后再次调度,这时候 4 执行,join 要求期待协程执行完,因而等 3 输入后再执行 5。

后果如下:

[main]:1
[main]:2
[main]:4
[DefaultDispatcher-worker-1]:3
[DefaultDispatcher-worker-1]:5

3.5 withContext

withContext {}不会创立新的协程。在指定协程上运行挂起代码块,放在该块内的任何代码都始终通过 IO 调度器执行,并挂起该协程直至代码块运行实现。

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T): T

withContext 会应用新指定的上下文的 dispatcher,将 block 的执行转移到指定的线程中。

它会返回后果,能够和以后协程的父协程存在交互关系, 次要作用为了来回切换调度器。

coroutineScope{launch(Dispatchers.Main) {      // 在 UI 线程开始
        val image = withContext(Dispatchers.IO) {  // 切换到 IO 线程,并在执行实现后切回 UI 线程
            getImage(imageId)                      // 将会运行在 IO 线程
        }
        avatarIv.setImageBitmap(image)             // 回到 UI 线程更新 UI
    }
}

四、协程调度

4.1 协程上下文

在协程启动局部提到,启动协程须要三个局部,其中一个局部就是上下文,其接口类型是 CoroutineContext,通常所见的上下文类型是 CombinedContext 或者 EmptyCoroutineContext,一个示意上下文组合,另一个示意空。

协程上下文是 Kotlin 协程的根本结构单元,次要承载着资源获取,配置管理等工作,是执行环境的通用数据资源的对立管理者。除此之外,也包含携带参数,拦挡协程执行等,是实现正确的线程行为、生命周期、异样以及调试的要害。

协程应用以下几种元素集定义协程行为,他们均继承自 CoroutineContext:

  1. 【Job】:协程的句柄,对协程的管制和治理生命周期。
  2. 【CoroutineName】:协程的名称,用于调试
  3. 【CoroutineDispatcher】:调度器,确定协程在指定的线程执行
  4. 【CoroutineExceptionHandler】:协程异样处理器,解决未捕捉的异样

这里回顾一下 launch 和 async 两个函数签名。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit): Job
 
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T): Deferred<T>

两个函数第一个参数都是 CoroutineContext 类型。

所有协程构建函数都是以 CoroutineScope 的扩大函数的模式被定义的,而 CoroutineScope 的接口惟一成员就是 CoroutineContext 类型。

public interface CoroutineScope {public val coroutineContext: CoroutineContext}

简而言之,协程上下文是协程必备组成部分,治理了协程的线程绑定、生命周期、异样解决和调试。

4.1.1 协程上下文构造

看一下 CoroutineContext 的接口办法:

public interface CoroutineContext {// 操作符 [] 重载,能够通过 CoroutineContext[Key]这种模式来获取与 Key 关联的 Element
    public operator fun <E : Element> get(key: Key<E>): E?
 
    // 提供遍历 CoroutineContext 中每一个 Element 的能力,并对每一个 Element 做 operation 操作
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
 
    // 操作符 + 重载,能够 CoroutineContext + CoroutineContext 这种模式把两个 CoroutineContext 合并成一个
    public operator fun plus(context: CoroutineContext): CoroutineContext = .......
     
    // 返回一个新的 CoroutineContext,这个 CoroutineContext 删除了 Key 对应的 Element
    public fun minusKey(key: Key<*>): CoroutineContext
     
    //Key 定义,空实现,仅仅做一个标识
    public interface Key<E : Element>
 
    ///Element 定义,每个 Element 都是一个 CoroutineContext
    public interface Element : CoroutineContext {
         
        // 每个 Element 都有一个 Key 实例
        public val key: Key<*>
        ......
    }
}

Element:协程上下文的一个元素,自身就是一个单例上下文,外面有一个 key,是这个元素的索引。

可知,Element 自身也实现了 CoroutineContext 接口。

这里咱们再看一下官网解释:

/**

  • Persistent context for the coroutine. It is an indexed set of [Element] instances.
  • An indexed set is a mix between a set and a map.
  • Every element in this set has a unique [Key].*/

从官网解释可知,CoroutineContext 是一个 Element 的汇合,这种汇合被称为 indexed set,介于 set 和 map 之间的一种构造。set 意味着其中的元素有唯一性,map 意味着每个元素都对应一个键。

如果将协程上下文外部的一系列上下文称为 子上下文,上下文为每个子上下文调配了一个 Key,它是一个带有类型信息的接口。

这个接口通常被实现为 companion object。

//Job
public interface Job : CoroutineContext.Element {
    /**
     * Key for [Job] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<Job>
}
 
// 拦截器
public interface ContinuationInterceptor : CoroutineContext.Element {
    /**
     * The key that defines *the* context interceptor.
     */
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
}
 
// 协程名
public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {
    /**
     * Key for [CoroutineName] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<CoroutineName>
}
 
// 异样处理器
public interface CoroutineExceptionHandler : CoroutineContext.Element {
    /**
     * Key for [CoroutineExceptionHandler] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
}

源码中定义的子上下文,都会在外部申明一个动态的 Key,类外部的动态变量意味着被所有类实例共享,即全局惟一的 Key 实例能够对应多个子上下文实例。

在一个相似 map 的构造中,每个键必须是惟一的,因为对雷同的键 put 两次值,新值会代替旧值。通过上述形式,通过键的唯一性保障了上下文中的所有子上下文实例都是惟一的。

咱们依照这个格局仿写一下而后反编译。

class MyElement :AbstractCoroutineContextElement(MyElement) {companion object Key : CoroutineContext.Key<MyElement>}
 
// 反编译的 java 文件
public final class MyElement extends AbstractCoroutineContextElement {
    @NotNull
   public static final MyElement.Key Key = new MyElement.Key((DefaultConstructorMarker)null);
 
   public MyElement() {super((kotlin.coroutines.CoroutineContext.Key)Key);
   }
     
    public static final class Key implements kotlin.coroutines.CoroutineContext.Key {private Key() { }
 
      // $FF: synthetic method
      public Key(DefaultConstructorMarker $constructor_marker) {this();
      }
   }
}

比照 kt 和 Java 文件,能够看到 Key 就是一个动态变量,且其实现类未做解决,作用与 HashMap 中的 Key 相似。

Key 是动态变量,全局惟一,为 Element 提供唯一性保障。

前述内容总结如下:

  1. 协程上下文是一个元素的汇合,单个元素自身也是一个上下文,其定义是递归的,本人蕴含若干个本人。
  2. 协程上下文这个汇合有点像 set 构造,其中的元素都是惟一的,不反复的。其通过给每一个元素配有一个动态的键实例,形成一组键值对的形式实现。这使其相似 map 构造。这种介于 set 和 map 之间的构造称为 indexed set。

CoroutineContext.get()获取元素

对于 CoroutineContext,咱们先看一下其是如何取元素的。

这里看一下 Element、CombinedContext、EmptyCoroutineContext 的外部实现,其中 CombinedContext 就是 CoroutineContext 汇合构造的实现,EmptyCoroutineContext 就示意一个空的 CoroutineContext,它外面是空实现。

@SinceKotlin("1.3")
internal class CombinedContext(
    // 左上下文
    private val left: CoroutineContext,
    // 右元素
    private val element: Element
) : CoroutineContext, Serializable {override fun <E : Element> get(key: Key<E>): E? {
        var cur = this
        while (true) {
            // 如果输出 key 和右元素的 key 雷同,则返回右元素
            cur.element[key]?.let {return it}
            // 若右元素不匹配,则向左持续查找
            val next = cur.left
            if (next is CombinedContext) {cur = next} else { // 若左上下文不是混合上下文,则完结递归
                return next[key]
            }
        }
    }
    ......
}
 
public interface Element : CoroutineContext {
    public val key: Key<*>
 
    public override operator fun <E : Element> get(key: Key<E>): E? =
    @Suppress("UNCHECKED_CAST")
    // 如果给定键和元素自身键雷同,则返回以后元素,否则返回空
    if (this.key == key) this as E else null
    ......
}
 
public object EmptyCoroutineContext : CoroutineContext, Serializable {
    // 返回空
    public override fun <E : Element> get(key: Key<E>): E? = null
}

通过 Key 检索 Element,返回值只能是 Element 或 null,链表节点中的元素值,其中 CombinedContext 利用 while 循环实现了相似递归的成果,其中较早被遍历到的元素天然具备较高的优先级。

// 应用示例
println(coroutineContext[CoroutineName])
println(Dispatchers.Main[CoroutineName])

CoroutineContext.minusKey()删除元素

同理看一下 Element、CombinedContext、EmptyCoroutineContext 的外部实现。

internal class CombinedContext(
    // 左上下文
    private val left: CoroutineContext,
    // 右元素
    private val element: Element
) : CoroutineContext, Serializable {public override fun minusKey(key: Key<*>): CoroutineContext {
        // 如果 element 就是要删除的元素,返回 left,否则阐明要删除的元素在 left 中,持续从 left 中删除对应的元素
        element[key]?.let {return left}
        // 在左上下文中去掉对应元素
        val newLeft = left.minusKey(key)
        return when {
            // 如果 left 中不存在要删除的元素,那么以后 CombinedContext 就不存在要删除的元素,间接返回以后 CombinedContext 实例
            newLeft === left -> this
            // 如果 left 中存在要删除的元素,删除了这个元素后,left 变为了空,那么间接返回以后 CombinedContext 的 element 就行
            newLeft === EmptyCoroutineContext -> element
            // 如果 left 中存在要删除的元素,删除了这个元素后,left 不为空,那么组合一个新的 CombinedContext 返回
            else -> CombinedContext(newLeft, element)
        }
    }
    ......
}
 
public object EmptyCoroutineContext : CoroutineContext, Serializable {public override fun minusKey(key: Key<*>): CoroutineContext = this
    ......
}
 
public interface Element : CoroutineContext {// 如果 key 和本人的 key 匹配,那么本人就是要删除的 Element,返回 EmptyCoroutineContext(示意删除了本人),否则阐明本人不须要被删除,返回本人
    public override fun minusKey(key: Key<*>): CoroutineContext =
    if (this.key == key) EmptyCoroutineContext else this
    ......
}

如果把 CombinedContext 和 Element 联合来看,那么 CombinedContext 的整体构造如下:

其构造相似链表,left 就是指向下一个结点的指针,get、minusKey 操作大体逻辑都是先拜访以后 element,不满足,再拜访 left 的 element,程序都是从 right 到 left。

CoroutineContext.fold()元素遍历

internal class CombinedContext(
    // 左上下文
    private val left: CoroutineContext,
    // 右元素
    private val element: Element
) : CoroutineContext, Serializable {
 
    // 先对 left 做 fold 操作,把 left 做完 fold 操作的的返回后果和 element 做 operation 操作
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
    operation(left.fold(initial, operation), element)
    ......
}
 
public object EmptyCoroutineContext : CoroutineContext, Serializable {public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
    ......
}
 
public interface Element : CoroutineContext {
    // 对传入的 initial 和本人做 operation 操作
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
    operation(initial, this)
    ......
}

fold 也是递归的模式操作,fold 的操作大体逻辑是:先拜访 left,直到递归到最初的 element,而后再从 left 到 right 的返回,从而拜访了所有的 element。

CoroutineContext.plus()增加元素

对于 CoroutineContext 的元素增加办法,间接看其 plus()实现,也是惟一没有被重写的办法。

public operator fun plus(context: CoroutineContext): CoroutineContext =
// 如果要相加的 CoroutineContext 为空,那么不做任何解决,间接返回
if (context === EmptyCoroutineContext) this else
// 如果要相加的 CoroutineContext 不为空,那么对它进行 fold 操作, 能够把 acc 了解成 + 号右边的 CoroutineContext,element 了解成 + 号左边的 CoroutineContext 的某一个 element
context.fold(this) { acc, element ->
                    // 首先从右边 CoroutineContext 中删除左边的这个 element
                    val removed = acc.minusKey(element.key)
                    // 如果 removed 为空,阐明右边 CoroutineContext 删除了和 element 雷同的元素后为空,那么返回左边的 element 即可
                    if (removed === EmptyCoroutineContext) element else {
                        // 如果 removed 不为空,阐明右边 CoroutineContext 删除了和 element 雷同的元素后还有其余元素,那么结构一个新的 CombinedContext 返回
                        val interceptor = removed[ContinuationInterceptor]
                        if (interceptor == null) CombinedContext(removed, element) else {val left = removed.minusKey(ContinuationInterceptor)
                            if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                        }
                    }
                   }

plus 办法大部分状况下返回一个 CombinedContext,即咱们把两个 CoroutineContext 相加后,返回一个 CombinedContext,在组合成 CombinedContext 时,+ 号左边的 CoroutineContext 中的元素会笼罩 + 号右边的 CoroutineContext 中的含有雷同 key 的元素。

这个笼罩操作就在 fold 办法的参数 operation代码块 中实现,通过 minusKey 办法删除掉反复元素。

plus 办法中能够看到外面有个对 ContinuationInterceptor 的解决,目标是让 ContinuationInterceptor 在每次相加后都能变成 CoroutineContext 中的最初一个元素。

ContinuationInterceptor 继承自 Element,称为协程上下文拦截器,作用是在协程执行前拦挡它,从而在协程执行前做出一些其余的操作。通过把 ContinuationInterceptor 放在最初面,协程在查找上下文的 element 时,总能最快找到拦截器,防止了递归查找,从而让拦挡行为前置执行。

4.1.2 CoroutineName

public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {

CoroutineName 是用户用来指定的协程名称的,用于不便调试和定位问题。

GlobalScope.launch(CoroutineName("GlobalScope")) {launch(CoroutineName("CoroutineA")) {// 指定协程名称
        val coroutineName = coroutineContext[CoroutineName]// 获取协程名称
        print(coroutineName)
    }
}
 
/** 打印后果
CoroutineName(CoroutineA)
*/

协程外部能够通过 coroutineContext 这个全局属性间接获取以后协程的上下文。

4.1.3 上下文组合

如果要传递多个上下文元素,CoroutineContext 能够应用 ”+” 运算符进行合并。因为 CoroutineContext 是由一组元素组成的,所以加号右侧的元素会笼罩加号左侧的元素,进而组成新创建的 CoroutineContext。

GlobalScope.launch {
    // 通过 + 号运算增加多个上下文元素
    var context = CoroutineName("协程 1") + Dispatchers.Main
    print("context == $context")
 
    context += Dispatchers.IO // 增加反复 Dispatchers 元素,Dispatchers.IO 会替换 ispatchers.Main
    print("context == $context")
 
    val contextResult = context.minusKey(context[CoroutineName]!!.key)// 移除 CoroutineName 元素
    print("contextResult == $contextResult")
}
 
/** 打印后果
context == [CoroutineName(协程 1), Dispatchers.Main]
context == [CoroutineName(协程 1), Dispatchers.IO]
contextResult == Dispatchers.IO
*/

如果有反复的元素(key 统一)则左边的会代替右边的元素,相干原理参看协程上下文构造章节。

4.1.4 CoroutineScope 构建

CoroutineScope 实际上是一个 CoroutineContext 的封装,当咱们须要启动一个协程时,会在 CoroutineScope 的实例上调用构建函数,如 async 和 launch。

在构建函数中,一共呈现了 3 个 CoroutineContext。

查看协程构建函数 async 和 launch 的源码,其第一行都是如下代码:

val newContext = newCoroutineContext(context)

进一步查看:

@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context   //CoroutineContext 拼接组合
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

构建器外部进行了一个 CoroutineContext 拼接操作,plus 左值是 CoroutineScope 外部的 CoroutineContext,右值是作为构建函数参数的 CoroutineContext。

抽象类 AbstractCoroutineScope 实现了 CoroutineScope 和 Job 接口。大部分 CoroutineScope 的实现都继承自 AbstractCoroutineScope,意味着他们同时也是一个 Job。

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    /**
     * The context of this coroutine that includes this coroutine as a [Job].
     */
    public final override val context: CoroutineContext = parentContext + this
    // 重写了父类的 coroutineContext 属性
    public override val coroutineContext: CoroutineContext get() = context}

从上述剖析可知:coroutine context = parent context + coroutine job

4.1.5 典型用例

全限定 Context

launch(Dispatchers.Main + Job() + CoroutineName("HelloCoroutine") + CoroutineExceptionHandler {_, _ -> /* ... */}) {/* ... */}

全限定 Context,即全副显式指定具体值的 Elements。不管你用哪一个 CoroutineScope 构建该协程,它都具备统一的体现,不会受到 CoroutineScope 任何影响。

CoroutineScope Context

基于 Activity 生命周期实现一个 CoroutineScope

abstract class ScopedAppActivity:
AppCompatActivity(),
CoroutineScope
{
    protected lateinit var job: Job
    override val coroutineContext: CoroutineContext
    get() = job + Dispatchers.Main // 留神这里应用 + 拼接 CoroutineContext
 
    override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)
        job = Job()}
 
    override fun onDestroy() {super.onDestroy()
        job.cancel()}
}

Dispatcher:应用 Dispatcher.Main,以在 UI 线程进行绘制

Job:在 onCreate 时构建,在 onDestroy 时销毁,所有基于该 CoroutineContext 创立的协程,都会在 Activity 销毁时勾销,从而防止 Activity 泄露的问题

长期指定参数

CoroutineContext 的参数次要有两个起源:从 scope 中继承 + 参数指定。咱们能够用 withContext 便捷地指定某个参数启动子协程,例如咱们想要在协程外部执行一个无奈被勾销的子协程:

withContext(NonCancellable) {/* ... */}

读取协程上下文参数

通过顶级挂起只读属性 coroutineContext 获取协程上下文参数,它位于 kotlin-stdlib / kotlin.coroutines / coroutineContext

println("Running in ${coroutineContext[CoroutineName]}")

Nested Context 内嵌上下文

内嵌上下文切换:在协程 A 外部构建协程 B 时,B 会主动继承 A 的 Dispatcher。

能够在调用 async 时退出 Dispatcher 参数,切换到工作线程

// 谬误的做法,在主线程中间接调用 async,若耗时过长则阻塞 UI
GlobalScope.launch(Dispatchers.Main) {
    val deferred = async {/* ... */}
    /* ... */
}
 
// 正确的做法,在工作线程执行协程工作
GlobalScope.launch(Dispatchers.Main) {val deferred = async(Dispatchers.Default) {/* ... */}
    /* ... */
}

4.2 协程拦截器

@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
     
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    //......
     
}
  1. 无论在 CoroutineContext 前面 放了多少个拦截器,Key 为 ContinuationInterceptor 的拦截器只能有一个。
  2. Continuation 在调用其 Continuation#resumeWith() 办法,会执行其 suspend 润饰的函数的代码块,如果咱们提前拦挡到,能够做点其余事件,比如说切换线程,这是 ContinuationInterceptor 的次要作用。

协程的实质就是回调,这个回调就是被拦挡的 Continuation。OkHttp 用拦截器做缓存,打日志,模仿申请等,协程拦截器同理。

咱们通过 Dispatchers 来指定协程产生的线程,Dispatchers 实现了 ContinuationInterceptor 接口。

这里咱们自定义一个拦截器放到协程上下文,看一下会产生什么。

class MyContinuationInterceptor: ContinuationInterceptor{
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}
 
class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
    override val context = continuation.context
    override fun resumeWith(result: Result<T>) {log("<MyContinuation> $result")
        continuation.resumeWith(result)
    }
}
suspend fun main(args: Array<String>) { // start main coroutine
    GlobalScope.launch(MyContinuationInterceptor()) {log(1)
        val job = async {log(2)
            delay(1000)
            log(3)
            "Hello"
        }
        log(4)
        val result = job.await()
        log("5. $result")
    }.join()
    log(6)
}
fun log(o: Any?) {println("[${Thread.currentThread().name}]:$o")
}
/****** 打印后果 ******/
[main]:<MyContinuation> Success(kotlin.Unit)      //11
[main]:1
[main]:<MyContinuation> Success(kotlin.Unit)      //22
[main]:2
[main]:4
[kotlinx.coroutines.DefaultExecutor]:<MyContinuation> Success(kotlin.Unit)        //33
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:<MyContinuation> Success(Hello)
[kotlinx.coroutines.DefaultExecutor]:5. Hello
[kotlinx.coroutines.DefaultExecutor]:6
  • 所有协程启动时,都有一次 Continuation.resumeWith 的操作,协程有机会调度到其余线程的要害之处就在于此。
  • delay 是挂终点,1s 之后须要持续调度执行该协程,因而就有了 33 处日志。

前述剖析 CoroutineContext 的 plus 办法波及到了 ContinuationInterceptor,plus 每次都会将 ContinuationInterceptor 增加到拼接链的尾部,这里再具体解释一下起因。

public operator fun plus(context: CoroutineContext): CoroutineContext =
// 如果要相加的 CoroutineContext 为空,那么不做任何解决,间接返回
if (context === EmptyCoroutineContext) this else
// 如果要相加的 CoroutineContext 不为空,那么对它进行 fold 操作, 能够把 acc 了解成 + 号右边的 CoroutineContext,element 了解成 + 号左边的 CoroutineContext 的某一个 element
context.fold(this) { acc, element ->
                    // 首先从右边 CoroutineContext 中删除左边的这个 element
                    val removed = acc.minusKey(element.key)
                    // 如果 removed 为空,阐明右边 CoroutineContext 删除了和 element 雷同的元素后为空,那么返回左边的 element 即可
                    if (removed === EmptyCoroutineContext) element else {
                        // 如果 removed 不为空,阐明右边 CoroutineContext 删除了和 element 雷同的元素后还有其余元素,那么结构一个新的 CombinedContext 返回
                        val interceptor = removed[ContinuationInterceptor]
                        if (interceptor == null) CombinedContext(removed, element) else {val left = removed.minusKey(ContinuationInterceptor)
                            if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                        }
                    }
                   }

起因一:CombinedContext 的构造决定。

其有两个元素,left 是一个前驱汇合,element 为一个纯正 CoroutineContext,它的 get 办法每次都是从 element 开始进行查找对应 Key 的 CoroutineContext 对象;没有匹配到才会去 left 汇合中进行递归查找。为了放慢查找 ContinuationInterceptor 类型的实例,才将它退出到拼接链的尾部,对应的就是 element。

起因二:ContinuationInterceptor 应用很频繁

每次创立协程都会去尝试查找以后协程的 CoroutineContext 中是否存在 ContinuationInterceptor。这里咱们用 launch 来验证一下

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.()
 -> 
Unit
): Job {val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

如果应用的 launch 应用的是默认参数,此时 Coroutine 就是 StandaloneCoroutine,而后调用 start 办法启动协程。

    start(block, receiver, this)
}
 
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        }

如果咱们应用默认参数,看一下默认参数对应执行的 block.startCoroutineCancellable(completion)

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
  1. 首先通过 createCoroutineUnintercepted 来创立一个协程
  2. 而后再调用 intercepted 办法进行拦挡操作
  3. 最初调用 resumeCancellable,即 Continuation 的 resumeWith 办法,启动协程,所以每次启动协程都会主动回调一次 resumeWith 办法

这里看一下 intercepted

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

看其在 ContinuationImpl 的 intercepted 办法实现

public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also {intercepted = it}
  1. 首先获取到 ContinuationInterceptor 实例
  2. 而后调用它的 interceptContinuation 办法返回一个解决过的 Continuation(屡次调用 intercepted,对应的 interceptContinuation 只会调用一次)

至此可知,ContinuationInterceptor 的拦挡是通过 interceptContinuation 办法进行

上面再看一个 ContinuationInterceptor 的典型示例

val interceptor = object : ContinuationInterceptor {
  
    override val key: CoroutineContext.Key<*> = ContinuationInterceptor
  
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {println("intercept todo something. change run to thread")
        return object : Continuation<T> by continuation {override fun resumeWith(result: Result<T>) {println("create new thread")
                thread {continuation.resumeWith(result)
                }
            }
        }
    }
}
  
println(Thread.currentThread().name)
  
lifecycleScope.launch(interceptor) {println("launch start. current thread: ${Thread.currentThread().name}")
     
    withContext(Dispatchers.Main) {println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
    }
     
    launch {println("new continuation todo something. current thread: ${Thread.currentThread().name}")
    }
     
    println("launch end. current thread: ${Thread.currentThread().name}")
}
/****** 打印后果 ******/
main
// 第一次 launch
intercept todo something. change run to thread
create new thread
launch start. current thread: Thread-2
new continuation todo something in the main thread. current thread: main
create new thread
// 第二次 launch
intercept todo something. change run to thread
create new thread
launch end. current thread: Thread-7
new continuation todo something. current thread: Thread-8
  1. 首先程序运行在 main 线程,启动协程时将自定义的 interceptor 退出到上下文中,协程启动时进行拦挡,将在 main 线程运行的程序切换到新的 thread 线程
  2. withContext 没有拦挡胜利,具体起因在上面的调度器再具体解释,简略来说就是咱们自定义的 interceptor 被替换了。
  3. launch start 与 launch end 所处的线程不一样,因为 withContext 完结之后,它外部还会进行一次线程复原,将本身所处的 main 线程切换到之前的线程。协程每一个挂起后复原都是通过回调 resumeWith 进行的,然而内部 launch 协程咱们进行了拦挡,在它返回的 Continuation 的 resumeWith 回调中总是会创立新的 thread。

4.3 调度器

CoroutineDispatcher 调度器指定指定执行协程的指标载体,它确定了相干的协程在哪个线程或哪些线程上执行。能够将协程限度在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
         
        // 将可运行块的执行分派到给定上下文中的另一个线程上
        public abstract fun dispatch(context: CoroutineContext, block: Runnable)
         
        // 返回一个 continuation,它封装了提供的[continuation],拦挡了所有的复原
        public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
        //......
    }

协程须要调度的地位就是挂终点的地位,只有当挂终点正在挂起的时候才会进行调度,实现调度须要应用协程的拦截器。

调度的实质就是解决挂终点复原之后的协程逻辑在哪里运行的问题。调度器也属于协程上下文一类,它继承自 拦截器。

  • 【val Default】: CoroutineDispatcher
  • 【val Main】: MainCoroutineDispatcher
  • 【val Unconfined】: CoroutineDispatcher

IO 仅在 Jvm 上有定义,它基于 Default 调度器背地的线程池,并实现了独立的队列和限度,因而协程调度器从 Default 切换到 IO 并不会触发线程切换

对于调度器介绍到这里,还没有具体解释前述协程拦截器中的 withContext 为什么拦挡失败。这里针对这个具体看一下源码实现。

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

其返回类型为 MainCoroutineDispatcher,继承自 CoroutineDispatcher。

public abstract class MainCoroutineDispatcher : CoroutineDispatcher()
 
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
     
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
 
    public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
     
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    ......
}

CoroutineDispatch 实现了 ContinuationInterceptor,依据前述解释的 CoroutineContext 构造,可知咱们自定义的拦截器没有失效是因为被替换了。

CoroutineDispatch 中的 isDispatchNeeded 就是判断是否须要散发,而后 dispatch 就是执行散发。

ContinuationInterceptor 重要的办法就是 interceptContinuation,在 CoroutineDispatcher 中间接返回了 DispatchedContinuation 对象,它是一个 Continuation 类型,看一下其 resumeWith 实现。

override fun resumeWith(result: Result<T>) {
    val context = continuation.context
    val state = result.toState()
    // 判断是否须要散发
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_ATOMIC
        dispatcher.dispatch(context, this)
    } else {executeUnconfined(state, MODE_ATOMIC) {withCoroutineContext(this.context, countOrElement) {
                // 不须要散发,间接应用原先的 continuation 对象的 resumewith
                continuation.resumeWith(result)
            }
        }
    }
}

那么散发的判断逻辑是怎么实现的?这要依据具体的 dispatcher 来看。

如果咱们拿的是 Dispatchers.Main,其 dispatcher 为 HandlerContext。

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {override fun isDispatchNeeded(context: CoroutineContext): Boolean {return !invokeImmediately || Looper.myLooper() != handler.looper
    }
 
    override fun dispatch(context: CoroutineContext, block: Runnable) {if (!handler.post(block)) {cancelOnRejection(context, block)
        }
    }
    ......

其中 HandlerContext 继承于 HandlerDispatcher,而 HandlerDispatcher 继承于 MainCoroutineDispatcher

Dispatcher 的 根本实现原理 大抵为:

  1. 首先在协程进行启动的时候通过拦截器的形式进行拦挡,对应的办法是 interceptContinuation
  2. 而后返回一个具备切换线程性能的 Continuation
  3. 在每次进行 resumeWith 的时候,外部再通过 isDispatchNeeded 进行判断以后协程的运行是否须要切换线程。
  4. 如果须要则调用 dispatch 进行线程的切换,保障协程的正确运行。如果要自定义协程线程的切换,能够通过继承 CoroutineDispatcher 来实现。

这里再简略看一下 WithContext,咱们都晓得其不仅能够承受 CoroutineDispatcher 来帮忙咱们切换线程,同时在执行结束之后还会帮忙咱们将之前切换掉的线程进复原,保障协程运行的连贯性。那这是怎么实现的呢?

withContext 的线程复原原理是它外部生成了一个 DispatchedCoroutine,保留切换线程时的 CoroutineContext 与切换之前的 Continuation,最初在 onCompletionInternal 进行复原。咱们简略翻一翻其源码实现。

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T): T {
    contract {callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // 创立新的 CoroutineContext
        val oldContext = uCont.context
        val newContext = oldContext + context
        ......
        // 应用新的 Dispatcher,笼罩外层
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()}
}
internal class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    // 在 complete 时会会回调
    override fun afterCompletion(state: Any?) {
        // Call afterResume from afterCompletion and not vice-versa, because stack-size is more
        // important for afterResume implementation
        afterResume(state)
    }
 
    override fun afterResume(state: Any?) {
        ////uCont 就是父协程,context 仍是老版 context, 因而能够切换回原来的线程上
        if (tryResume()) return // completed before getResult invocation -- bail out
        // Resume in a cancellable way because we have to switch back to the original dispatcher
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
    ......
}
  1. 对于 withContext,传入的 context 会笼罩外层的拦截器并生成一个 newContext,因而能够实现线程切换。
  2. DispatchedCoroutine 作为 complete 传入协程体的创立函数中,因而协程体执行实现后会回调到 afterCompletion 中。
  3. DispatchedCoroutine 中传入的 uCont 是父协程,它的拦截器仍是外层的拦截器,因而会切换回原来的线程中。

4.3.1 典型用例

例如:点击一个按钮,进行异步操作后再回调刷新 UI

getUserBtn.setOnClickListener {
    getUser { user ->
        handler.post {userNameView.text = user.name}
    }
}
typealias Callback = (User) -> Unit
 
fun getUser(callback: Callback){...}

因为 getUser 函数须要切到其余线程执行,因而回调通常也会在这个非 UI 的线程中调用,所以为了确保 UI 正确被刷新,咱们须要用 handler.post 切换到 UI 线程。

如果要用协程实现呢?

suspend fun getUserCoroutine() = suspendCoroutine<User> {
    continuation ->
    getUser {continuation.resume(it)
    }
}
 
getUserBtn.setOnClickListener {GlobalScope.launch(Dispatchers.Main) {userNameView.text = getUserCoroutine().name
    }
}

suspendCoroutine 这个办法并不是帮咱们启动协程的,它运行在协程当中并且帮咱们获取到以后协程的 Continuation 实例,也就是拿到回调,不便前面咱们调用它的 resume 或者 resumeWithException 来返回后果或者抛出异样。

4.3.2 线程绑定

调度器的目标就是切线程,咱们只有提供线程,调度器就应该很不便的创立进去。

suspend fun main() {val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
    GlobalScope.launch(myDispatcher) {log(1)
    }.join()
    log(2)
}

因为这个线程池是咱们本人创立的,因而咱们须要在适合的时候敞开它。

除了上述的办法,kotlin 协程还给出了更简略的 api,如下:

GlobalScope.launch(newSingleThreadContext("Dispather")) {//......}.join()

前述咱们是通过线程的形式,同理能够通过线程池转为调度器实现。

Executors.newScheduledThreadPool(10)
    .asCoroutineDispatcher().use { dispatcher ->
        GlobalScope.launch(dispatcher) {//......}.join

五、协程挂起

在前述协程时,常常会呈现 suspend 关键字和挂起的说法,其含意和用法是什么?一起深刻看一下。

5.1 概述

suspend 翻译过去就是中断、挂起,用在函数申明前,起到挂起协程的标识,实质作用是代码调用时为办法增加一个 Continuation 类型的参数,保障协程中 Continuation 的高低传递。

挂起函数只能在协程或另一个挂起函数中被调用,如果你在非协程中应用到了挂起函数,会报错。

阻塞:

函数 A 必须在函数 B 之前实现执行,线程被锁定以便函数 A 可能实现其执行

挂起:

函数 A 尽管曾经启动,但能够暂停,让函数 B 执行,而后只在稍后复原。线程没有被函数 A 锁定。

“挂起”是指协程从它以后线程脱离,切换到另一个线程运行。当线程运行到 suspend 函数时,会临时挂起这个函数及后续代码的执行。简而言之,挂起函数是一个能够启动、暂停和复原的函数。

协程运行的时候每遇到被 suspend 润饰的办法时,都可能会挂起以后协程,不是必会挂起,例如如下办法就不会被挂起。

private suspend fun a() {println("aa")
}

这是因为这种办法不会返回 COROUTINE_SUSPENDED 类型,这在前面具体解释。

5.2 suspend 实质

Kotlin 应用堆栈帧来治理要运行哪个函数以及所有局部变量。

协程在惯例函数根底上增加了 suspend 和 resume 两项操作用于解决长时间运行的工作。

【suspend】:挂起或暂停,用于挂起执行以后协程,并保留所有局部变量

【resume】:复原,用于让已挂起的协程从挂起处复原继续执行

挂起 (暂停) 协程时,会复制并保留以后的堆栈帧以供稍后应用,将信息保留到 Continuation 对象中。

复原 协程时,会将堆栈帧从其保留地位复制回来,对应的 Continuation 通过调用 resumeWith 函数才会复原协程的执行,而后函数再次开始运行。同时返回 Result 类型的胜利或者异样的后果。

public interface Continuation<in T> {
    // 对应这个 Continuation 的协程上下文
    public val context: CoroutineContext
    // 复原相应协程的执行,传递一个胜利或失败的后果作为最初一个挂终点的返回值。public fun resumeWith(result: Result<T>)
}
 
// 将 [value] 作为最初一个挂终点的返回值,复原相应协程的执行。@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))
 
// 复原相应协程的执行,以便在最初一个挂终点之后从新抛出[异样]。@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))
  1. Continuation 类有一个 resumeWith 函数能够接管 Result 类型的参数。
  2. 在后果胜利获取时,调用 resumeWith(Result.success(value))或者调用拓展函数 resume(value);出现异常时,调用 resumeWith(Result.failure(exception))或者调用拓展函数 resumeWithException(exception)。这就是 Continuation 的复原调用。
@FormUrlEncoded
@POST("/api/common/countryList")
suspend fun fetchCountryList(@FieldMap params: Map<String, String?>): CountryResponse

前述挂起函数解析后反编译如下:

@FormUrlEncoded
@POST("/api/common/countryList")
@Nullable
Object fetchCountryList(@FieldMap @NotNull Map var1, @NotNull Continuation var2);
  1. 挂起函数反编译后,发现多了一个 Continuation 参数,有编译器传递,阐明调用挂起函数须要 Continuation。
  2. 只有挂起函数或者协程中才有 Continuation,所以挂起函数只能在协程或者其余挂起函数中执行。

5.2.1 Continuation

这里看一下该 Continuation 的传递起源。

这个函数只能在协程或者挂起函数中执行,阐明 Continuation 很有可能是从协程中传入来的,查看协程构建源码。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit): Job {val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

通过 launch 启动一个协程时,其通过 coroutine 的 start 办法启动协程:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {start(block, receiver, this)
}

而后 start 办法外面调用了 CoroutineStart 的 invoke,这个时候咱们发现了 Continuation:

//CoroutineStart 的 invoke 办法呈现了 Continuation
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {DEFAULT -> block.startCoroutineCancellable(receiver, completion)
    ATOMIC -> block.startCoroutine(receiver, completion)
    UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
    LAZY -> Unit // will start lazily
}
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

最终回调到 Continuation 的 resumeWith()复原函数中。

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

咱们再深刻 kotlin 源码看一下其外部实现。

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {createCoroutineFromSuspendFunction(probeCompletion) {(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)//1
        }
    }
}
private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // label == 0 when coroutine is not started yet (initially) or label == 1 when it was
    return if (context === EmptyCoroutineContext)
        object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
            private var label = 0
 
            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow() // this is the result if the block had suspended}
                    else -> error("This coroutine had already completed")
                }
        }
    else
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0
 
            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow() // this is the result if the block had suspended}
                    else -> error("This coroutine had already completed")
                }
        }
}
  1. createCoroutineUnintercepted(receiver, completion)办法在 Kotlin 源码中是通过 suspend 关键字润饰的扩大办法。
  2. suspend 关键字润饰 (suspend R.() -> T) 对象理论被编译成为一个 Function2<r, continuation, Any?> 接口对象,而关键字 suspend 理论编译成了 Continuation 接口。

所以:

  1. 协程体自身就是 Continuation,即必须在协程内调用 suspend 挂起函数。
  2. suspend 关键字并不具备暂停、挂起代码块或者函数办法性能。

5.2.2 状态机 CPS

协程理论挂起是如何实现的?

这里首先通过一个示例来演示一下状态机。

suspend fun main() {log(1)
    log(returnSuspended())
    log(2)
    delay(1000)
    log(3)
    log(returnImmediately())
    log(4)
}
 
suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{
        continuation ->
    thread {Thread.sleep(1000)
        continuation.resume("Return suspended.")
    }
    COROUTINE_SUSPENDED
}
 
suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{log(5)
    "Return immediately."
}

这里咱们定义了两个挂起函数,一个会真正挂起,一个会间接返回后果,其运行后果为:

[main]:1
[Thread-2]:Return suspended.
[Thread-2]:2
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:5
[kotlinx.coroutines.DefaultExecutor]:Return immediately.
[kotlinx.coroutines.DefaultExecutor]:4

前述代码的理论实现状况大抵如下:

public class ContinuationImpl implements Continuation<Object> {
 
    private int label = 0;
    private final Continuation<Unit> completion;
 
    public ContinuationImpl(Continuation<Unit> completion) {this.completion = completion;}
 
    @Override
    public CoroutineContext getContext() {return EmptyCoroutineContext.INSTANCE;}
 
    @Override
    public void resumeWith(@NotNull Object o) {
        try {
            Object result = o;
            switch (label) {
                case 0: {LogKt.log(1);
                    result = SuspendFunctionsKt.returnSuspended(this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 1: {LogKt.log(result);
                    LogKt.log(2);
                    result = DelayKt.delay(1000, this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 2: {LogKt.log(3);
                    result = SuspendFunctionsKt.returnImmediately(this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 3:{LogKt.log(result);
                    LogKt.log(4);
                }
            }
            completion.resumeWith(Unit.INSTANCE);
        } catch (Exception e) {completion.resumeWith(e);
        }
    }
 
    private boolean isSuspended(Object result) {return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
    }
}
  1. 首先定义了一个 ContinuationImpl,即一个 Continuation 的实现。
  2. 能够在 Kotlin 的规范库当中找到一个名叫 ContinuationImpl 的类,其 resumeWith 最终调用到了 invokeSuspend,而这个 invokeSuspend 实际上就是咱们的协程体,通常也就是一个 Lambda 表达式。
  3. 通过 launch 启动协程,传入的那个 Lambda 表达式,实际上会被编译成一个 SuspendLambda 的子类,而它又是 ContinuationImpl 的子类。
public class RunSuspend implements Continuation<Unit> {
 
    private Object result;
 
    @Override
    public CoroutineContext getContext() {return EmptyCoroutineContext.INSTANCE;}
 
    @Override
    public void resumeWith(@NotNull Object result) {synchronized (this){
            this.result = result;
            notifyAll(); // 协程曾经完结,告诉上面的 wait() 办法进行阻塞
        }
    }
 
    public void await() throws Throwable {synchronized (this){while (true){
                Object result = this.result;
                if(result == null) wait(); // 调用了 Object.wait(),阻塞以后线程,在 notify 或者 notifyAll 调用时返回
                else if(result instanceof Throwable){throw (Throwable) result;
                } else return;
            }
        }
    }
}

接着,定义了一个 RunSuspend,用来接管后果。

public static void main(String... args) throws Throwable {RunSuspend runSuspend = new RunSuspend();
    ContinuationImpl table = new ContinuationImpl(runSuspend);
    table.resumeWith(Unit.INSTANCE);
    runSuspend.await();}

作为 completion 传入的 RunSuspend 实例的 resumeWith 实际上是在 ContinuationImpl 的 resumeWtih 的最初才会被调用,因而它的 await() 一旦进入阻塞态,直到 ContinuationImpl 的整体状态流转结束才会进行阻塞,此时过程也就运行结束失常退出了。

这段代码的运行后果为:

/****** 打印后果 ******/
[main]:1
[Thread-2]:Return suspended.
[Thread-2]:2
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:5
[kotlinx.coroutines.DefaultExecutor]:Return immediately.
[kotlinx.coroutines.DefaultExecutor]:4
  1. 协程体的执行就是一个状态机,每一次遇到挂起函数,都是一次状态转移,就像咱们后面例子中的 label 一直的自增来实现状态流转一样
  2. 状态机即代码中每一个挂终点和初始挂终点对应的 Continuation 都会转化为一种状态,协程复原只是跳转到下一种状态。
  3. 挂起函数将执行过程分为多个 Continuation 片段,并且利用状态机的形式保障各个片段是程序执行的,所以异步逻辑也能够用程序的代码来实现。

5.3 协程运行原理

前述相干示例更多是为了验证剖析协程的一些个性,这里从协程的创立、启动、复原、线程调度,协程切换等具体解析协程的实现。

5.3.1 协程创立与启动

首先创立一个协程并启动,最常见的莫过于 CoroutineScope.launch{},其源码实现为:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit): Job {val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

咱们如果不指定 start 参数,所以它会应用默认的 CoroutineStart.DEFAULT,最终 coroutine 会失去一个 StandaloneCoroutine。其实现自 AbstractCoroutine,实现了 Continuation。

前述剖析 suspend 实质时已知,其最终会调用到 createCoroutineUnintercepted,次要是创立了一个新的可挂起计算,通过调用 resume(Unit)启动协程,返回值为 Continuation,Continuation 提供了 resumeWith 复原协程的接口,用以实现协程复原,Continuation 封装了协程的代码运行逻辑和复原接口。

将协程代码进行反编译,再看一下其字节码和 java 实现,例如

suspend fun test() {CoroutineScope(Dispatchers.IO).launch {delay(11)
    }
}

查看其字节码实现时,可知其编译生成外部类。

协程的计算逻辑封装在 invokeSuspend 办法中,而 SuspendLambda 的继承关系为,

SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation

其中 BaseContinuationImpl 局部要害源码如下:

internal abstract class BaseContinuationImpl(...) {
    // 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        val outcome = invokeSuspend(param)
        ...
    }
    // 由编译生成的协程相干类来实现
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

前述的协程示例代码反编译为:

public static final Object test(@NotNull Continuation $completion) {Job var10000 = BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getIO()), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
        int label;
 
        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
            // 挂起标识
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
                case 0:
                    ResultKt.throwOnFailure($result);
                    // 设置挂起后复原,进入的状态
                    this.label = 1;
                    if (DelayKt.delay(11L, this) == var2) {return var2;}
                    break;
                case 1:
                    // 是否须要抛出异样
                    ResultKt.throwOnFailure($result);
                    break;
                default:
                    throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
            }
 
            return Unit.INSTANCE;
        }
 
        @NotNull
        public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
        }
 
        public final Object invoke(Object var1, Object var2) {return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
        }
    }), 3, (Object)null);
    return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;}

所以,协程的启动流程 为:resume(Unit)->resumeWith()->invokeSuspend()。

协程的挂起通过 suspend 挂起函数实现,协程的复原通过 Continuation.resumeWith 实现。

5.3.2 协程线程调度

协程的线程调度是通过拦截器实现的,后面提到了协程启动调用到了 startCoroutineCancellable,对于协程调度在前述的协程调度器局部已具体介绍了,这里再简略过一下。

@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

看一下其 intercepted()的具体实现:

@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
internal abstract class ContinuationImpl(......) : BaseContinuationImpl(completion) {constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
 
    public override val context: CoroutineContext
        get() = _context!!
 
    @Transient
    private var intercepted: Continuation<Any?>? = null
 
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also {intercepted = it}
    // context[ContinuationInterceptor] 就是协程的 CoroutineDispatcher
    ......
}
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    ......
}

intercepted()最终会应用协程的 CoroutineDispatcher 的 interceptContinuation 办法包装原来的 Continuation,拦挡所有的协程运行操作。

DispatchedContinuation 拦挡了协程的启动和复原,别离是 resumeCancellableWith 和重写的 resumeWith(Result)。

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {@Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {val state = result.toState(onCancellation)
        // 判断是否须要线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            // 将协程运算散发到另一个线程
            dispatcher.dispatch(context, this)
        } else {executeUnconfined(state, MODE_CANCELLABLE) {if (!resumeCancelled(state)) {
                    // 间接在以后线程执行协程运算
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
     
    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        // 判断是否须要线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            // 将协程的运算散发到另一个线程
            dispatcher.dispatch(context, this)
        } else {executeUnconfined(state, MODE_ATOMIC) {withCoroutineContext(this.context, countOrElement) {
                    // 间接在以后线程执行协程运算
                    continuation.resumeWith(result)
                }
            }
        }
    }
     
}
 
internal abstract class DispatchedTask<in T>(@JvmField public var resumeMode: Int) : SchedulerTask(){public final override fun run() {// 封装了 continuation.resume 逻辑}
    ......
}

5.3.3 协程挂起与复原

编译器会生成继承自 SuspendLambda 的子类,协程的真正运算逻辑都在 invokeSuspend 中。这里咱们先再次回到 startCoroutineCancellable 函数中。

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}

看一下其中的 resumeCancellableWith 办法。

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

这是 Continuation 的扩大办法,最初都会调用到 Continuation 的 resumeWith,这里的 Continuation 就是前述所说的 SuspendLambda,它继承了 BaseContinuationImpl

internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                try {
                    // 执行 invokeSuspend 内的代码块
                    val outcome = invokeSuspend(param)
                    // 如果代码块内执行了挂起办法,协程挂起,resumeWith 执行完结,再次调用 resumeWith 时协程挂终点之后的代码能力继续执行
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {Result.failure(exception)
                }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // 如果实现的 completion 也是 BaseContinuationImpl,就会进入循环
                    current = completion
                    param = outcome
                } else {
                    // 执行 completion resumeWith 办法
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
 
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
     
    .....
}

上面看一下 invokeSuspend 的实现逻辑。

fun main(args: Array<String>) {val coroutineDispatcher = newSingleThreadContext("ctx")
    // 启动协程 1
    GlobalScope.launch(coroutineDispatcher) {println("the first coroutine")
        async (Dispatchers.IO) {println("the second coroutine 11111")
            delay(100)
            println("the second coroutine 222222")
        }.await()
        println("the first coroutine end end end")
    }
    // 保障 main 线程存活,确保下面两个协程运行实现
    Thread.sleep(500)
}

前述示例编译成 SuspendLambda 子类的 invokeSuspend 办法为:

public final Object invokeSuspend(@NotNull Object $result) {
    // 挂起函数返回标识 SUSPEND_FLAG
    Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    String var3;
    boolean var4;
    //label 默认初始值为 0
    switch(this.label) {
        case 0:
            ResultKt.throwOnFailure($result);
            CoroutineScope $this$launch = (CoroutineScope)this.L$0;
            var3 = "the first coroutine";
            var4 = false;
            System.out.println(var3);
            // 新建并启动 async 协程
            Deferred var10000 = BuildersKt.async$default($this$launch, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
                int label;
 
                @Nullable
                public final Object invokeSuspend(@NotNull Object $result) {
                    // 挂起标识
                    Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                    String var2;
                    boolean var3;
                    switch(this.label) {
                        case 0:
                            ResultKt.throwOnFailure($result);
                            var2 = "the second coroutine 11111";
                            var3 = false;
                            System.out.println(var2);
                            this.label = 1;
                            // 判断是否执行 delay 挂起函数
                            if (DelayKt.delay(100L, this) == var4) {
                                // 挂起,跳出该办法
                                return var4;
                            }
                            break;
                        case 1:
                            ResultKt.throwOnFailure($result);
                            // 复原协程后再执行一次 resumeWith(),而后无异样的话执行最初的 println()
                            break;
                        default:
                            throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
                    }
 
                    var2 = "the second coroutine 222222";
                    var3 = false;
                    System.out.println(var2);
                    return Unit.INSTANCE;
                }
 
                @NotNull
                public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");
                    Function2 var3 = new <anonymous constructor>(completion);
                    return var3;
                }
 
                public final Object invoke(Object var1, Object var2) {return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                }
            }), 2, (Object)null);
            // 设置挂起后复原时,进入的状态
            this.label = 1;
            // 调用 await()挂起函数
            if (var10000.await(this) == var5) {return var5;}
            break;
        case 1:
            ResultKt.throwOnFailure($result);
            break;
        default:
            throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
    }
 
    var3 = "the first coroutine end end end";
    var4 = false;
    System.out.println(var3);
    return Unit.INSTANCE;
}

如果 async 线程未执行实现,await()返回为 IntrinsicsKt.getCOROUTINE_SUSPENDED(),就会 return,launch 协程的 invokeSuspend 办法执行实现,协程所在线程持续往下运行,此时 launch 协程处于挂起状态。

所以协程的挂起在代码层面来说就是跳出协程执行的办法体,或者说跳出协程以后状态机下的对应状态,而后期待下一个状态来长期在进行执行。

对于协程挂起有三点注意事项:

  1. 启动其余协程并不会挂起以后协程,所以 launch 和 async 启动线程时,除非新协程运行在以后线程,则以后协程只能在新协程运行实现后继续执行,否则以后协程都会马上持续运行。
  2. 协程挂起并不会阻塞线程,因为协程挂起时相当于执行完协程的办法,线程继续执行其余之后的逻辑。
  3. 挂起函数并肯定都会挂起协程,例如 await()挂起函数如果返回值不等于 IntrinsicsKt.getCOROUTINE_SUSPENDED(),则协程继续执行挂终点之后逻辑。

看完 invokeSuspend,咱们再次回到 startCoroutineCancellable 函数中,其调用的 createCoroutineUnintercepted 办法中创立的 SuspendLambda 实例是 BaseContinuationImpl 的子类对象,其 completion 参数为下:

launch: if (isLazy) LazyStandaloneCoroutine else StandaloneCoroutine

async: if (isLazy) LazyDeferredCoroutine else DeferredCoroutine

下面这几个类都是 AbstractCoroutine 的子类。而依据 completion 的类型会执行不同的逻辑:

BaseContinuationImpl: 执行协程逻辑

其它: 调用 resumeWith 办法,解决协程的状态,协程挂起后的复原即与它无关

前述的示例中 async 启动的协程,也会调用其 invokeSuspend 办法执行 async 协程,假如 async 返回的后果曾经可用时,即非 COROUTINE_SUSPENDED 值,此时 completion 是 DeferredCoroutine 对象,因而就会调用 DeferredCoroutine.resumeWith 办法,而后返回,父协程的复原逻辑便是在这里。

public final override fun resumeWith(result: Result<T>) {val state = makeCompletingOnce(result.toState())
    if (state === COMPLETING_WAITING_CHILDREN) return
    afterResume(state)
}

在 makeCompletingOnce 办法中,会依据 state 去解决协程状态,这里最终会走到 ResumeAwaitOnCompletion.invoke 来复原父协程,必要的话还会把 async 的后果给它。

private class ResumeAwaitOnCompletion<T>(private val continuation: CancellableContinuationImpl<T>) : JobNode() {override fun invoke(cause: Throwable?) {
        val state = job.state
        assert {state !is Incomplete}
        if (state is CompletedExceptionally) {
            // Resume with with the corresponding exception to preserve it
            continuation.resumeWithException(state.cause)
        } else {
           // resume 被挂起的协程
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state.unboxState() as T)
        }
    }
}

这里的 continuation 就是 launch 协程体,也就是 SuspendLambda 对象,于是 invoke 办法会再一次调用到 BaseContinuationImpl.resumeWith 办法,接着调用 SuspendLambda.invokeSuspend, 而后依据 label 取值继续执行接下来的逻辑!

launch 协程复原的过程,从 async 协程的 SuspendLambda 的子类的 completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最初 handler 节点外面通过调用 resume(result)复原协程。

await()挂起函数复原协程的原理:

  1. 将 launch 协程封装为 ResumeAwaitOnCompletion 作为 handler 节点增加到 aynsc 协程的 state.list
  2. 而后在 async 协程实现时会告诉 handler 节点调用 launch 协程的 resume(result) 办法将后果传给 launch 协程,并复原 launch 协程继续执行 await 挂终点之后的逻辑。

5.3.4 协程三层封装

通过前述的一系列剖析可知,协程有三层封装:

  1. 罕用的 launch 和 async 返回的 Job、Deferred,外面封装了协程状态,提供了勾销协程接口,而它们的实例都是继承自 AbstractCoroutine,它是协程的第一层包装。
  2. 第二层包装是编译器生成的 SuspendLambda 的子类,封装了协程的真正运算逻辑,继承自 BaseContinuationImpl,其中 completion 属性就是协程的第一层包装。
  3. 第三层包装是后面剖析协程的线程调度时提到的 DispatchedContinuation,封装了线程调度逻辑,蕴含了协程的第二层包装。

协程其实就是一段能够挂起和复原执行的运算逻辑,而协程的挂起通过挂起函数实现,挂起函数用状态机的形式用挂终点将协程的运算逻辑拆分成不同的片段,每次运行协程执行不同的逻辑片段。

所以协程有两个很大的益处:

  1. 简化异步编程,反对异步返回;
  2. 挂起不阻塞线程,提供线程利用率

六、总结

本文通过为什么应用协程,协程如何创立启动,协程的调度原理和协程的挂起原理几个方面对协程进行了初步分析,上面一起回顾一下全文重点内容,对全文内容进行一个总结

协程引入:

  1. 协程能够让异步代码同步化,升高程序波及的复杂度
  2. 协程实质是轻量级线程,单个线程能够运行多个协程,协程的运行不会导致线程阻塞

协程启动:

  1. 协程启动须要三局部:上下文、启动模式、协程体。创立协程的形式有 runBlocking、launch 和 async,举荐应用 CoroutineScope.launch 的形式创立协程,应用 async 的形式创立并发执行,同步期待获取返回值的状况。
  2. Job 是 launch 构建协程返回的一个协程工作,实现时没有返回值,可看成协程对象自身。其提供相干办法可用于察看协程执行状况。Deferred 继承自 Job,是 async 构建协程返回的一个协程工作,可通过调用 await()办法期待执行实现获取后果。
  3. 启动协程须要作用域,作用域在协程创立过程中产生,常见的协程作用域有 GlobalScope、coroutineScope 等,协程配合 Jetpack Lifecycle 相干组件提供的 lifecycleScope 等作用域进行应用,异样丝滑好用。
  4. 协程的启动模式有 DEFAULT、ATOMIC、UNDISPATCHED、LAZY 四种,留神不同启动模式的区别。
  5. 如果要在父协程中进行子协程切换操作,能够应用 withContext。

协程调度:

  1. 协程上下文是一个元素的汇合,其定义是递归的,本人蕴含若干个本人,其构造介于 set 和 map 之间。
  2. 协程实现的实质是回调,这个回调即 Continuation。协程拦截器的实现就是拦挡 Continuation,可在此处进行缓存、日志打印等拦挡解决
  3. 调度器即确认相干协程在哪个线程上执行,调度的实质是解决挂起复原后协程逻辑在哪里运行的问题,其继承自拦截器。
  4. 调度器的是实现原理即在协程启动时通过拦截器进行拦挡,返回一个 Continuation,再在协程复原进行 resumeWith 操作时,进行线程切换判断和线程切换。

协程挂起:

  1. 挂起函数是一个可启动、暂停和复原的函数,被 suspend 润饰的函数在协程运行时不是肯定会被挂起的。
  2. 挂起函数的挂起实现原理就是状态机的状态转移。协程体的执行就是一个状态机,每遇到一次挂起函数就是一次状态转移,而协程的复原不过是从一种状态跳转到下一种状态。挂起函数将整个执行过程划分为多个 Continuation 片段,利用状态机的形式保障各个片段时程序执行的,从而实现了用程序的代码实现异步逻辑。

参考资料:

  • 【1】破解 Kotlin 协程
  • 【2】Kotlin Jetpack 实战 | 09. 图解协程原理
  • 【3】一文看透 Kotlin 协程实质
  • 【4】抽丝剥茧 Kotlin – 协程
  • 【5】Kotlin 协程实现原理
  • 【6】kotlin 协程 -Android 实战
  • 【7】kotlin 协程 官网领导文档

正文完
 0