vivo 互联网客户端团队- Ruan Wen
本文是Kotlin协程解析系列文章的开篇,次要介绍Kotlin协程的创立、协程调度与协程挂起相干的内容
一、协程引入
Kotlin 中引入 Coroutine(协程) 的概念,能够帮忙编写异步代码。
在应用和剖析协程前,首先要理解一下:
协程是什么?
为什么须要协程?
协程最为人称道的就是能够用看起来同步的形式写出异步的代码,极大进步了代码的可读性。在理论开发中最常见的异步操作莫过于网络申请。通常咱们须要通过各种回调的形式去解决网络申请,很容易就陷入到天堂回调中。
WalletHttp.target(VCoinTradeSubmitResult.class).setTag(tag) .setFullUrl(Constants.VCOIN_TRADE_SUBMIT_URL).setParams(params) .callback(new HttpCallback<VCoinTradeSubmitResult>() { @Override public void onSuccess(VCoinTradeSubmitResult vCoinTradeSubmitResult) { super.onSuccess(vCoinTradeSubmitResult); if (mView == null) { return; } //...... } }).post();
上述示例是一个我的项目开发中常见的一个网络申请操作,通过接口回调的形式去获取网络申请后果。理论开发中也会常常遇到间断多个接口申请的状况,例如咱们我的项目中的集体核心页的逻辑就是先去异步获取。
本地缓存,获取失败的话就须要异步刷新一下账号token,而后网络申请相干集体核心的其余信息。这里简略举一个领取示例,进行领取时,可能要先去获取账号token,而后依赖该token再去做领取。
申请操作,依据领取返回数据再去查问领取后果,这种状况通过回调就可能演变为“天堂回调”。
//获取账号tokenWalletHttp.target(Account.class).setTag(tag) .setFullUrl(Constants.ACCOUNT_URL).setParams(params) .callback(new HttpCallback<Account>() { @Override public void onSuccess(Account account) { super.onSuccess(account); //依据账号token进行领取操作 WalletHttp.target(Pay.class).setFullUrl(Constants.PAY_URL).addToken(account.getToken()).callback(new HttpCallback<Pay>() { @Override public void onSuccess(Pay pay){ super.onSuccess(pay); //依据领取操作返回查问领取后果 WalletHttp.target(PayResult.class).setFullUrl(Constants.RESULT_URL).addResultCode(pay.getResultCode()).callback(new HttpCallback<PayResult>() { @Override public void onSuccess(PayResult result){ super.onSuccess(result); //...... } }).post(); } }).post(); } }).post();
对于这种场景,kotlin协程“同步形式写出异步代码”的这个个性就能够很好的解决上述问题。若上述场景用kotlin 协程代码实现呢,可能就为:
fun postItem(tag: String, params: Map<String, Any?>) = viewModelScope.launch { // 获取账号信息 val account = repository.queryAccount(tag, params) // 进行领取操作 val pay = repository.paySubmit(tag,account.token) //查问领取后果 val result = repository.queryPayResult(tag,pay.resultCode) //......}
能够看出,协程代码十分简洁,以程序的形式书写异步代码,代码可读性极强。
如果想要将原先的网络回调申请也改写成这种同步模式呢,只须要对原先申请回调用协程提供的suspendCancellableCoroutine等办法进行封装解决,即可让晚期的异步代码也享受上述“同步代码”的丝滑。
协程:
一种非抢占式或者合作式的计算机程序并发调度实现,程序能够被动挂起或者复原执行,其外围点是函数或一段程序可能被挂起,稍后再在挂起的地位复原,通过被动让出运行权来实现合作,程序本人解决挂起和复原来实现程序执行流程的合作调度。
协程实质上是轻量级线程。
协程的特点有:
- 协程能够让异步代码同步化,其本质是轻量级线程。
- 可在单个线程运行多个协程,其反对挂起,不会使运行协程的线程阻塞。
- 能够升高异步程序的设计复杂度。
Kotlin协程实现档次:
基础设施层:规范库的协程API,次要对协程提供了概念和语义上最根本的反对;
业务框架层:协程的下层框架反对,基于规范库实现的封装,也是咱们日常开发应用的协程扩大库。
二、协程启动
具体在应用协程前,首先要配置对Kotlin协程的依赖。
(1)我的项目根目录build.gradle
buildscript { ... ext.kotlin_coroutines = 'xxx' ...}
(2)Module下build.gradle
dependencies { ... //协程规范库 implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_coroutines" //依赖协程外围库,蕴含协程公共API局部 implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines" //依赖android反对库,协程Android平台的具体实现形式 implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines" ...}
2.1 Thread 启动
在Java中,能够通过Thread开启并发操作:
new Thread(new Runnable() { @Override public void run() { //... do what you want }}).start();
在Kotlin中,应用线程更为便捷:
val myThread = thread { //.......}
这个Thread办法有个参数start默认为true,即发明进去的线程默认启动,你能够自定义启动机会:
val myThread = thread(start = false) { //......} myThread.start()
2.2 协程启动
动协程须要三局部:上下文、启动模式、协程体。
启动形式个别有三种,其中最简略的启动协程的形式为:
GlobalScope.launch { //......}
GlobalScope.launch()属于协程构建器Coroutine builders,Kotlin 中还有其余几种 Builders,负责创立协程:
runBlocking:T
应用runBlocking顶层函数创立,会创立一个新的协程同时阻塞以后线程,直到协程完结。实用于main函数和单元测试
launch
创立一个新的协程,不会阻塞以后线程,必须在协程作用域中才能够调用。它返回的是一个该协程工作的援用,即Job对象。这是最罕用的启动协程的形式。
async
创立一个新的协程,不会阻塞以后线程,必须在协程作用域中才能够调用,并返回Deffer对象。可通过调用Deffer.await()办法期待该子协程执行实现并获取后果。罕用于并发执行-同步期待和获取返回值的状况。
2.2.1 runBlocking
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
runBlocking是一个顶层函数,能够在任意中央独立应用。它能创立一个新的协程同时阻塞以后线程,直到其外部所有逻辑以及子协程所有逻辑全副执行实现。罕用于main函数和测试中。
//main函数中利用fun main() = runBlocking { launch { // 创立一个新协程,runBlocking会阻塞线程,但外部运行的协程是非阻塞的 delay(1000L) println("World!") } println("Hello,") delay(2000L) // 延时2s,保障JVM存活} //测试中利用class MyTest { @Test fun testMySuspendingFunction() = runBlocking { // ...... }}
2.2.2 launch
launch是最罕用的用于启动协程的形式,会在不阻塞以后线程的状况下启动一个协程,并返回对该协程工作的援用,即Job对象。
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): Job
协程须要运行在协程上下文环境中,在非协程环境中的launch有两种:GlobalScope 与 CoroutineScope 。
- GlobalScope.launch()
在利用范畴内启动一个新协程,不会阻塞调用线程,协程的生命周期与应用程序统一。
fun launchTest() { print("start") GlobalScope.launch { delay(1000)//1秒无阻塞提早 print("GlobalScope.launch") } print("end")} /** 打印后果startendGlobalScope.launch*/
这种启动的协程存在组件被销毁但协程还存在的状况,个别不举荐。其中GlobalScope自身就是一个作用域,launch属于其子作用域。
- CoroutineScope.launch()
启动一个新的协程而不阻塞以后线程,并返回对协程的援用作为一个Job。
fun launchTest2() { print("start") val job = CoroutineScope(Dispatchers.IO).launch { delay(1000) print("CoroutineScope.launch") } print("end")}
协程上下文管制协程生命周期和线程调度,使得协程和该组件生命周期绑定,组件销毁时,协程一并销毁,从而实现安全可靠地协程调用。这是在利用中最举荐的协程应用形式。
对于launch,依据业务需要须要创立一个或多个协程,则可能就须要在一个协程中启动子协程。
fun launchTest3() { print("start") GlobalScope.launch { delay(1000) print("CoroutineScope.launch") //在协程内创立子协程 launch { delay(1500) print("launch 子协程") } } print("end")} /**** 打印后果startendCoroutineScope.launchlaunch 子协程*/
2.2.3 async
async相似于launch,都是创立一个不会阻塞以后线程的新的协程。区别在于:async的返回是Deferred对象,可通过Deffer.await()期待协程执行实现并获取后果,而 launch 不行。罕用于并发执行-同步期待和获取返回值的状况。
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T): Deferred<T>
留神:
- await() 不能在协程之外调用,因为它须要挂起直到计算实现,而且只有协程能够以非阻塞的形式挂起。所以把它放到协程中。
- 如果Deferred不执行await()则async外部抛出的异样不会被logCat或try Catch捕捉,然而仍然会导致作用域勾销和异样解体; 但当执行await时异样信息会从新抛出
- 如果将async函数中的启动模式设置为CoroutineStart.LAZY懒加载模式时则只有调用Deferred对象的await时(或者执行async.satrt())才会开始执行异步工作。
三、协程补充常识
在叙述协程启动内容,波及到了Job、Deferred、启动模式、作用域等概念,这里补充介绍一下上述概念。
3.1 Job
Job 是协程的句柄,赋予协程可勾销,赋予协程以生命周期,赋予协程以结构化并发的能力。
Job是launch构建协程返回的一个协程工作,实现时是没有返回值的。能够把Job看成协程对象自身,封装了协程中须要执行的代码逻辑,协程的操作方法都在Job身上。Job具备生命周期并且能够勾销,它也是上下文元素,继承自CoroutineContext。
在日常 Android 开发过程中,协程配合 Lifecycle 能够做到主动勾销。
Job生命周期
Job 的生命周期分为 6 种状态,分为
- New
- Active
- Completing
- Cancelling
- Cancelled
- Completed
通常外界会持有 Job 接口作为援用被协程调用者所持有。Job 接口提供 isActive、isCompleted、isCancelled 3 个变量使外界能够感知 Job 外部的状态。
val job = launch(start = CoroutineStart.LAZY) { println("Active")}println("New")job.join()println("Completed") /**打印后果**/NewActiveCompleted /*********** 1. 以 lazy 形式创立进去的协程 state 为 New* 2. 对应的 job 调用 join 函数后,协程进入 Active 状态,并开始执行协程对应的具体代码* 3. 当协程执行结束后,因为没有须要期待的子协程,协程间接进入 Completed 状态*/
对于Job,罕用的办法有:
//沉闷的,是否仍在执行public val isActive: Boolean //启动协程,如果启动了协程,则为true;如果协程曾经启动或实现,则为falsepublic fun start(): Boolean //勾销Job,可通过传入Exception阐明具体起因public fun cancel(cause: CancellationException? = null) //挂起协程直到此Job实现public suspend fun join() //勾销工作并期待工作实现,联合了[cancel]和[join]的调用public suspend fun Job.cancelAndJoin() //给Job设置一个实现告诉,当Job执行实现的时候会同步执行这个函数public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
Job父子层级
对于Job,还须要分外关注的是Job的父子层级关系。
- 一个Job能够蕴含多个子Job。
- 当父Job被勾销后,所有的子Job也会被主动勾销。
- 当子Job被勾销或者出现异常后父Job也会被勾销。
- 具备多个子 Job 的父Job 会期待所有子Job实现(或者勾销)后,本人才会执行实现。
3.2 Deferred
Deferred继承自Job,具备与Job雷同的状态机制。
它是async构建协程返回的一个协程工作,可通过调用await()办法期待协程执行实现并获取后果。其中Job没有后果值,Deffer有后果值。
public interface Deferred<out T> : Job
3.3 作用域
协程作用域(CoroutineScope):协程定义的作用范畴,实质是一个接口。
确保所有的协程都会被追踪,Kotlin 不容许在没有应用CoroutineScope的状况下启动新的协程。CoroutineScope可被看作是一个具备超能力的ExecutorService的轻量级版本。它能启动新的协程,同时这个协程还具备suspend和resume的劣势。
每个协程生成器launch、async等都是CoroutineScope的扩大,并继承了它的coroutineContext,主动流传其所有元素和勾销。
启动协程须要作用域,然而作用域又是在协程创立过程中产生的。
public interface CoroutineScope { /** * 此域的上下文。Context被作用域封装,用于在作用域上扩大的协程构建器的实现。 */ public val coroutineContext: CoroutineContext}
官网提供的罕用作用域:
- runBlocking:
顶层函数,可启动协程,但会阻塞以后线程
- GlobalScope
全局协程作用域。通过GlobalScope创立的协程不会有父协程,能够把它称为根协程。它启动的协程的生命周期只受整个应用程序的生命周期的限度,且不能取消,在运行时会耗费一些内存资源,这可能会导致内存泄露,不适用于业务开发。
- coroutineScope
创立一个独立的协程作用域,直到所有启动的协程都实现后才完结本身。
它是一个挂起函数,须要运行在协程内或挂起函数内。当这个作用域中的任何一个子协程失败时,这个作用域失败,所有其余的子协程都被勾销。
- supervisorScope
与coroutineScope相似,不同的是子协程的异样不会影响父协程,也不会影响其余子协程。(作用域自身的失败(在block或勾销中抛出异样)会导致作用域及其所有子协程失败,但不会勾销父协程。)
- MainScope
为UI组件创立主作用域。一个顶层函数,上下文是SupervisorJob() + Dispatchers.Main,阐明它是一个在主线程执行的协程作用域,通过cancel对协程进行勾销。
fun scopeTest() { GlobalScope.launch {//父协程 launch {//子协程 print("GlobalScope的子协程") } launch {//第二个子协程 print("GlobalScope的第二个子协程") } } val mainScope = MainScope() mainScope.launch {//启动协程 //todo }}
Jetpack 的Lifecycle相干组件提供了曾经绑定UV申明周期的作用域供咱们间接应用:
- lifecycleScope:
Lifecycle Ktx库提供的具备生命周期感知的协程作用域,与Lifecycle绑定生命周期,生命周期被销毁时,此作用域将被勾销。会与以后的UI组件绑定生命周期,界面销毁时该协程作用域将被勾销,不会造成协程透露,举荐应用。
- viewModelScope:
与lifecycleScope相似,与ViewModel绑定生命周期,当ViewModel被革除时,这个作用域将被勾销。举荐应用。
3.4 启动模式
前述进行协程创立启动时波及到了启动模式CoroutineStart,其是一个枚举类,为协程构建器定义启动选项。在协程构建的start参数中应用。
DEFAULT模式
DEFAULT 是饿汉式启动,launch 调用后,会立刻进入待调度状态,一旦调度器 OK 就能够开始执行。
suspend fun main() { log(1) val job = GlobalScope.launch{ log(2) } log(3) Thread.sleep(5000) //避免程序退出}fun log(o: Any?) { println("[${Thread.currentThread().name}]:$o")}
前述示例代码采纳默认的启动模式和默认的调度器,,运行后果取决于以后线程与后盾线程的调度程序。
/**可能的运行后果一****/[main]:1[main]:3[main]:2 /**可能的运行后果二****/[main]:1[main]:2[main]:3
LAZY模式
LAZY 是懒汉式启动,launch 后并不会有任何调度行为,协程体不会进入执行状态,直到咱们须要他的运行后果时进行执行,其launch 调用后会返回一个 Job 实例。
对于这种状况,能够:
- 调用Job.start,被动触发协程的调度执行
- 调用Job.join,隐式的触发协程的调度执行
suspend fun main() { log(1) val job = GlobalScope.launch(start = CoroutineStart.LAZY){ log(2) } log(3) job.join() log(4)}fun log(o: Any?) { println("[${Thread.currentThread().name}]:$o")}
对于join,肯定要期待协程执行结束,所以其运行后果肯定为:
[main]:1[main]:3[DefaultDispatcher-worker-1]:2[main]:4
如果把join()换为start(),则输入后果不肯定。
ATOMIC模式
ATOMIC 只有波及 cancel 的时候才有意义。调用cancel的机会不同,后果也有差别。
suspend fun main() { log(1) val job = GlobalScope.launch(start = CoroutineStart.ATOMIC){ log(2) } job.cancel() log(3) Thread.sleep(2000)}fun log(o: Any?) { println("[${Thread.currentThread().name}]:$o")}
前述代码示例创立协程后立刻cancel,因为是ATOMIC模式,因而协程肯定会被调度,则log 1、2、3肯定都会被打印输出。如果将模式改为DEFAULT模式,则log 2有可能打印输出,也可能不会。
其实cancel 调用肯定会将该 job 的状态置为 cancelling,只不过ATOMIC 模式的协程在启动时忽视了这一状态。
suspend fun main() { log(1) val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) { log(2) delay(1000) log(3) } job.cancel() log(4) job.join() Thread.sleep(2000)}fun log(o: Any?) { println("[${Thread.currentThread().name}]:$o")} /**打印输出后果可能如下****/[main]:1[DefaultDispatcher-worker-1]:2[main]:4
前述代码中,2和3中加了一个delay,delay会使得协程体的执行被挂起,1000ms 之后再次调度前面的局部。对于 ATOMIC 模式,它肯定会被启动,实际上在遇到第一个挂终点之前,它的执行是不会进行的,而 delay 是一个 suspend 函数,这时咱们的协程迎来了本人的第一个挂终点,恰好 delay 是反对 cancel 的,因而前面的 3 将不会被打印。
UNDISPATCHED模式
协程在这种模式下会间接开始在以后线程下执行,直到第一个挂终点。
与ATOMIC的不同之处在于 UNDISPATCHED 不通过任何调度器即开始执行协程体。遇到挂终点之后的执行就取决于挂终点自身的逻辑以及上下文当中的调度器了。
suspend fun main() { log(1) val job = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) { log(2) delay(100) log(3) } log(4) job.join() log(5) Thread.sleep(2000)}fun log(o: Any?) { println("[${Thread.currentThread().name}]:$o")}
协程启动后会立刻在以后线程执行,因而 1、2 会间断在同一线程中执行,delay 是挂终点,因而 3 会等 100ms 后再次调度,这时候 4 执行,join 要求期待协程执行完,因而等 3 输入后再执行 5。
后果如下:
[main]:1[main]:2[main]:4[DefaultDispatcher-worker-1]:3[DefaultDispatcher-worker-1]:5
3.5 withContext
withContext {}不会创立新的协程。在指定协程上运行挂起代码块,放在该块内的任何代码都始终通过IO调度器执行,并挂起该协程直至代码块运行实现。
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T): T
withContext会应用新指定的上下文的dispatcher,将block的执行转移到指定的线程中。
它会返回后果, 能够和以后协程的父协程存在交互关系, 次要作用为了来回切换调度器。
coroutineScope{ launch(Dispatchers.Main) { // 在 UI 线程开始 val image = withContext(Dispatchers.IO) { // 切换到 IO 线程,并在执行实现后切回 UI 线程 getImage(imageId) // 将会运行在 IO 线程 } avatarIv.setImageBitmap(image) // 回到 UI 线程更新 UI }}
四、协程调度
4.1 协程上下文
在协程启动局部提到,启动协程须要三个局部,其中一个局部就是上下文,其接口类型是CoroutineContext,通常所见的上下文类型是CombinedContext或者EmptyCoroutineContext,一个示意上下文组合,另一个示意空。
协程上下文是Kotlin协程的根本结构单元,次要承载着资源获取,配置管理等工作,是执行环境的通用数据资源的对立管理者。除此之外,也包含携带参数,拦挡协程执行等,是实现正确的线程行为、生命周期、异样以及调试的要害。
协程应用以下几种元素集定义协程行为,他们均继承自CoroutineContext:
- 【Job】:协程的句柄,对协程的管制和治理生命周期。
- 【CoroutineName】:协程的名称,用于调试
- 【CoroutineDispatcher】:调度器,确定协程在指定的线程执行
- 【CoroutineExceptionHandler】:协程异样处理器,解决未捕捉的异样
这里回顾一下launch和async两个函数签名。
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): Job public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T): Deferred<T>
两个函数第一个参数都是CoroutineContext类型。
所有协程构建函数都是以CoroutineScope的扩大函数的模式被定义的,而CoroutineScope的接口惟一成员就是CoroutineContext类型。
public interface CoroutineScope { public val coroutineContext: CoroutineContext}
简而言之,协程上下文是协程必备组成部分,治理了协程的线程绑定、生命周期、异样解决和调试。
4.1.1 协程上下文构造
看一下CoroutineContext的接口办法:
public interface CoroutineContext { //操作符[]重载,能够通过CoroutineContext[Key]这种模式来获取与Key关联的Element public operator fun <E : Element> get(key: Key<E>): E? //提供遍历CoroutineContext中每一个Element的能力,并对每一个Element做operation操作 public fun <R> fold(initial: R, operation: (R, Element) -> R): R //操作符+重载,能够CoroutineContext + CoroutineContext这种模式把两个CoroutineContext合并成一个 public operator fun plus(context: CoroutineContext): CoroutineContext = ....... //返回一个新的CoroutineContext,这个CoroutineContext删除了Key对应的Element public fun minusKey(key: Key<*>): CoroutineContext //Key定义,空实现,仅仅做一个标识 public interface Key<E : Element> ///Element定义,每个Element都是一个CoroutineContext public interface Element : CoroutineContext { //每个Element都有一个Key实例 public val key: Key<*> ...... }}
Element:协程上下文的一个元素,自身就是一个单例上下文,外面有一个key,是这个元素的索引。
可知,Element自身也实现了CoroutineContext接口。
这里咱们再看一下官网解释:
/**
- Persistent context for the coroutine. It is an indexed set of [Element] instances.
- An indexed set is a mix between a set and a map.
- Every element in this set has a unique [Key].*/
从官网解释可知,CoroutineContext是一个Element的汇合,这种汇合被称为indexed set,介于set 和 map 之间的一种构造。set 意味着其中的元素有唯一性,map 意味着每个元素都对应一个键。
如果将协程上下文外部的一系列上下文称为子上下文,上下文为每个子上下文调配了一个Key,它是一个带有类型信息的接口。
这个接口通常被实现为companion object。
//Jobpublic interface Job : CoroutineContext.Element { /** * Key for [Job] instance in the coroutine context. */ public companion object Key : CoroutineContext.Key<Job>} //拦截器public interface ContinuationInterceptor : CoroutineContext.Element { /** * The key that defines *the* context interceptor. */ companion object Key : CoroutineContext.Key<ContinuationInterceptor>} //协程名public data class CoroutineName( val name: String) : AbstractCoroutineContextElement(CoroutineName) { /** * Key for [CoroutineName] instance in the coroutine context. */ public companion object Key : CoroutineContext.Key<CoroutineName>} //异样处理器public interface CoroutineExceptionHandler : CoroutineContext.Element { /** * Key for [CoroutineExceptionHandler] instance in the coroutine context. */ public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>}
源码中定义的子上下文,都会在外部申明一个动态的Key,类外部的动态变量意味着被所有类实例共享,即全局惟一的 Key 实例能够对应多个子上下文实例。
在一个相似 map 的构造中,每个键必须是惟一的,因为对雷同的键 put 两次值,新值会代替旧值。通过上述形式,通过键的唯一性保障了上下文中的所有子上下文实例都是惟一的。
咱们依照这个格局仿写一下而后反编译。
class MyElement :AbstractCoroutineContextElement(MyElement) { companion object Key : CoroutineContext.Key<MyElement>} //反编译的java文件public final class MyElement extends AbstractCoroutineContextElement { @NotNull public static final MyElement.Key Key = new MyElement.Key((DefaultConstructorMarker)null); public MyElement() { super((kotlin.coroutines.CoroutineContext.Key)Key); } public static final class Key implements kotlin.coroutines.CoroutineContext.Key { private Key() { } // $FF: synthetic method public Key(DefaultConstructorMarker $constructor_marker) { this(); } }}
比照kt和Java文件,能够看到Key就是一个动态变量,且其实现类未做解决,作用与HashMap中的Key相似。
Key是动态变量,全局惟一,为Element提供唯一性保障。
前述内容总结如下:
- 协程上下文是一个元素的汇合,单个元素自身也是一个上下文,其定义是递归的,本人蕴含若干个本人。
- 协程上下文这个汇合有点像 set 构造,其中的元素都是惟一的,不反复的。其通过给每一个元素配有一个动态的键实例,形成一组键值对的形式实现。这使其相似 map 构造。这种介于 set 和 map 之间的构造称为indexed set。
CoroutineContext.get()获取元素
对于CoroutineContext,咱们先看一下其是如何取元素的。
这里看一下Element、CombinedContext、EmptyCoroutineContext的外部实现,其中CombinedContext就是CoroutineContext汇合构造的实现,EmptyCoroutineContext就示意一个空的CoroutineContext,它外面是空实现。
@SinceKotlin("1.3")internal class CombinedContext( //左上下文 private val left: CoroutineContext, //右元素 private val element: Element) : CoroutineContext, Serializable { override fun <E : Element> get(key: Key<E>): E? { var cur = this while (true) { //如果输出 key 和右元素的 key 雷同,则返回右元素 cur.element[key]?.let { return it } // 若右元素不匹配,则向左持续查找 val next = cur.left if (next is CombinedContext) { cur = next } else { // 若左上下文不是混合上下文,则完结递归 return next[key] } } } ......} public interface Element : CoroutineContext { public val key: Key<*> public override operator fun <E : Element> get(key: Key<E>): E? = @Suppress("UNCHECKED_CAST") // 如果给定键和元素自身键雷同,则返回以后元素,否则返回空 if (this.key == key) this as E else null ......} public object EmptyCoroutineContext : CoroutineContext, Serializable { //返回空 public override fun <E : Element> get(key: Key<E>): E? = null}
通过Key检索Element,返回值只能是Element或null,链表节点中的元素值,其中CombinedContext利用while循环实现了相似递归的成果,其中较早被遍历到的元素天然具备较高的优先级。
//应用示例println(coroutineContext[CoroutineName])println(Dispatchers.Main[CoroutineName])
CoroutineContext.minusKey()删除元素
同理看一下Element、CombinedContext、EmptyCoroutineContext的外部实现。
internal class CombinedContext( //左上下文 private val left: CoroutineContext, //右元素 private val element: Element) : CoroutineContext, Serializable { public override fun minusKey(key: Key<*>): CoroutineContext { //如果element就是要删除的元素,返回left,否则阐明要删除的元素在left中,持续从left中删除对应的元素 element[key]?.let { return left } //在左上下文中去掉对应元素 val newLeft = left.minusKey(key) return when { //如果left中不存在要删除的元素,那么以后CombinedContext就不存在要删除的元素,间接返回以后CombinedContext实例 newLeft === left -> this //如果left中存在要删除的元素,删除了这个元素后,left变为了空,那么间接返回以后CombinedContext的element就行 newLeft === EmptyCoroutineContext -> element //如果left中存在要删除的元素,删除了这个元素后,left不为空,那么组合一个新的CombinedContext返回 else -> CombinedContext(newLeft, element) } } ......} public object EmptyCoroutineContext : CoroutineContext, Serializable { public override fun minusKey(key: Key<*>): CoroutineContext = this ......} public interface Element : CoroutineContext { //如果key和本人的key匹配,那么本人就是要删除的Element,返回EmptyCoroutineContext(示意删除了本人),否则阐明本人不须要被删除,返回本人 public override fun minusKey(key: Key<*>): CoroutineContext = if (this.key == key) EmptyCoroutineContext else this ......}
如果把CombinedContext和Element联合来看,那么CombinedContext的整体构造如下:
其构造相似链表,left就是指向下一个结点的指针,get、minusKey操作大体逻辑都是先拜访以后element,不满足,再拜访left的element,程序都是从right到left。
CoroutineContext.fold()元素遍历
internal class CombinedContext( //左上下文 private val left: CoroutineContext, //右元素 private val element: Element) : CoroutineContext, Serializable { //先对left做fold操作,把left做完fold操作的的返回后果和element做operation操作 public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = operation(left.fold(initial, operation), element) ......} public object EmptyCoroutineContext : CoroutineContext, Serializable { public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial ......} public interface Element : CoroutineContext { //对传入的initial和本人做operation操作 public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = operation(initial, this) ......}
fold也是递归的模式操作,fold的操作大体逻辑是:先拜访left,直到递归到最初的element,而后再从left到right的返回,从而拜访了所有的element。
CoroutineContext.plus()增加元素
对于CoroutineContext的元素增加办法,间接看其plus()实现,也是惟一没有被重写的办法。
public operator fun plus(context: CoroutineContext): CoroutineContext =//如果要相加的CoroutineContext为空,那么不做任何解决,间接返回if (context === EmptyCoroutineContext) this else//如果要相加的CoroutineContext不为空,那么对它进行fold操作,能够把acc了解成+号右边的CoroutineContext,element了解成+号左边的CoroutineContext的某一个elementcontext.fold(this) { acc, element -> //首先从右边CoroutineContext中删除左边的这个element val removed = acc.minusKey(element.key) //如果removed为空,阐明右边CoroutineContext删除了和element雷同的元素后为空,那么返回左边的element即可 if (removed === EmptyCoroutineContext) element else { //如果removed不为空,阐明右边CoroutineContext删除了和element雷同的元素后还有其余元素,那么结构一个新的CombinedContext返回 val interceptor = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } }
plus办法大部分状况下返回一个CombinedContext,即咱们把两个CoroutineContext相加后,返回一个CombinedContext,在组合成CombinedContext时,+号左边的CoroutineContext中的元素会笼罩+号右边的CoroutineContext中的含有雷同key的元素。
这个笼罩操作就在fold办法的参数operation代码块中实现,通过minusKey办法删除掉反复元素。
plus办法中能够看到外面有个对ContinuationInterceptor的解决,目标是让ContinuationInterceptor在每次相加后都能变成CoroutineContext中的最初一个元素。
ContinuationInterceptor继承自Element,称为协程上下文拦截器,作用是在协程执行前拦挡它,从而在协程执行前做出一些其余的操作。通过把ContinuationInterceptor放在最初面,协程在查找上下文的element时,总能最快找到拦截器,防止了递归查找,从而让拦挡行为前置执行。
4.1.2 CoroutineName
public data class CoroutineName( val name: String) : AbstractCoroutineContextElement(CoroutineName) {
CoroutineName是用户用来指定的协程名称的,用于不便调试和定位问题。
GlobalScope.launch(CoroutineName("GlobalScope")) { launch(CoroutineName("CoroutineA")) {//指定协程名称 val coroutineName = coroutineContext[CoroutineName]//获取协程名称 print(coroutineName) }} /** 打印后果CoroutineName(CoroutineA)*/
协程外部能够通过coroutineContext这个全局属性间接获取以后协程的上下文。
4.1.3 上下文组合
如果要传递多个上下文元素,CoroutineContext能够应用"+"运算符进行合并。因为CoroutineContext是由一组元素组成的,所以加号右侧的元素会笼罩加号左侧的元素,进而组成新创建的CoroutineContext。
GlobalScope.launch { //通过+号运算增加多个上下文元素 var context = CoroutineName("协程1") + Dispatchers.Main print("context == $context") context += Dispatchers.IO //增加反复Dispatchers元素,Dispatchers.IO 会替换 ispatchers.Main print("context == $context") val contextResult = context.minusKey(context[CoroutineName]!!.key)//移除CoroutineName元素 print("contextResult == $contextResult")} /**打印后果context == [CoroutineName(协程1), Dispatchers.Main]context == [CoroutineName(协程1), Dispatchers.IO]contextResult == Dispatchers.IO*/
如果有反复的元素(key统一)则左边的会代替右边的元素,相干原理参看协程上下文构造章节。
4.1.4 CoroutineScope 构建
CoroutineScope实际上是一个CoroutineContext的封装,当咱们须要启动一个协程时,会在CoroutineScope的实例上调用构建函数,如async和launch。
在构建函数中,一共呈现了3个CoroutineContext。
查看协程构建函数async和launch的源码,其第一行都是如下代码:
val newContext = newCoroutineContext(context)
进一步查看:
@ExperimentalCoroutinesApipublic actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context //CoroutineContext拼接组合 val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug}
构建器外部进行了一个CoroutineContext拼接操作,plus左值是CoroutineScope外部的CoroutineContext,右值是作为构建函数参数的CoroutineContext。
抽象类AbstractCoroutineScope实现了CoroutineScope和Job接口。大部分CoroutineScope的实现都继承自AbstractCoroutineScope,意味着他们同时也是一个Job。
public abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean) : JobSupport(active), Job, Continuation<T>, CoroutineScope { /** * The context of this coroutine that includes this coroutine as a [Job]. */ public final override val context: CoroutineContext = parentContext + this //重写了父类的coroutineContext属性 public override val coroutineContext: CoroutineContext get() = context}
从上述剖析可知:coroutine context = parent context + coroutine job
4.1.5 典型用例
全限定Context
launch( Dispatchers.Main + Job() + CoroutineName("HelloCoroutine") + CoroutineExceptionHandler { _, _ -> /* ... */ }) {/* ... */}
全限定Context,即全副显式指定具体值的Elements。不管你用哪一个CoroutineScope构建该协程,它都具备统一的体现,不会受到CoroutineScope任何影响。
CoroutineScope Context
基于Activity生命周期实现一个CoroutineScope
abstract class ScopedAppActivity:AppCompatActivity(),CoroutineScope{ protected lateinit var job: Job override val coroutineContext: CoroutineContext get() = job + Dispatchers.Main // 留神这里应用+拼接CoroutineContext override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) job = Job() } override fun onDestroy() { super.onDestroy() job.cancel() }}
Dispatcher:应用Dispatcher.Main,以在UI线程进行绘制
Job:在onCreate时构建,在onDestroy时销毁,所有基于该CoroutineContext创立的协程,都会在Activity销毁时勾销,从而防止Activity泄露的问题
长期指定参数
CoroutineContext的参数次要有两个起源:从scope中继承+参数指定。咱们能够用withContext便捷地指定某个参数启动子协程,例如咱们想要在协程外部执行一个无奈被勾销的子协程:
withContext(NonCancellable) { /* ... */}
读取协程上下文参数
通过顶级挂起只读属性coroutineContext获取协程上下文参数,它位于 kotlin-stdlib / kotlin.coroutines / coroutineContext
println("Running in ${coroutineContext[CoroutineName]}")
Nested Context内嵌上下文
内嵌上下文切换:在协程A外部构建协程B时,B会主动继承A的Dispatcher。
能够在调用async时退出Dispatcher参数,切换到工作线程
// 谬误的做法,在主线程中间接调用async,若耗时过长则阻塞UIGlobalScope.launch(Dispatchers.Main) { val deferred = async { /* ... */ } /* ... */} // 正确的做法,在工作线程执行协程工作GlobalScope.launch(Dispatchers.Main) { val deferred = async(Dispatchers.Default) { /* ... */ } /* ... */}
4.2 协程拦截器
@SinceKotlin("1.3")public interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.Key<ContinuationInterceptor> public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> //...... }
- 无论在CoroutineContext前面 放了多少个拦截器,Key 为 ContinuationInterceptor 的拦截器只能有一个。
- Continuation 在调用其 Continuation#resumeWith() 办法,会执行其 suspend 润饰的函数的代码块,如果咱们提前拦挡到,能够做点其余事件,比如说切换线程,这是 ContinuationInterceptor 的次要作用。
协程的实质就是回调,这个回调就是被拦挡的Continuation。OkHttp用拦截器做缓存,打日志,模仿申请等,协程拦截器同理。
咱们通过Dispatchers 来指定协程产生的线程,Dispatchers 实现了 ContinuationInterceptor接口。
这里咱们自定义一个拦截器放到协程上下文,看一下会产生什么。
class MyContinuationInterceptor: ContinuationInterceptor{ override val key = ContinuationInterceptor override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)} class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> { override val context = continuation.context override fun resumeWith(result: Result<T>) { log("<MyContinuation> $result" ) continuation.resumeWith(result) }}suspend fun main(args: Array<String>) { // start main coroutine GlobalScope.launch(MyContinuationInterceptor()) { log(1) val job = async { log(2) delay(1000) log(3) "Hello" } log(4) val result = job.await() log("5. $result") }.join() log(6)}fun log(o: Any?) { println("[${Thread.currentThread().name}]:$o")}
/******打印后果******/[main]:<MyContinuation> Success(kotlin.Unit) //11[main]:1[main]:<MyContinuation> Success(kotlin.Unit) //22[main]:2[main]:4[kotlinx.coroutines.DefaultExecutor]:<MyContinuation> Success(kotlin.Unit) //33[kotlinx.coroutines.DefaultExecutor]:3[kotlinx.coroutines.DefaultExecutor]:<MyContinuation> Success(Hello)[kotlinx.coroutines.DefaultExecutor]:5. Hello[kotlinx.coroutines.DefaultExecutor]:6
- 所有协程启动时,都有一次Continuation.resumeWith 的操作,协程有机会调度到其余线程的要害之处就在于此。
- delay是挂终点,1s之后须要持续调度执行该协程,因而就有了33处日志。
前述剖析CoroutineContext的plus办法波及到了ContinuationInterceptor,plus每次都会将ContinuationInterceptor增加到拼接链的尾部,这里再具体解释一下起因。
public operator fun plus(context: CoroutineContext): CoroutineContext =//如果要相加的CoroutineContext为空,那么不做任何解决,间接返回if (context === EmptyCoroutineContext) this else//如果要相加的CoroutineContext不为空,那么对它进行fold操作,能够把acc了解成+号右边的CoroutineContext,element了解成+号左边的CoroutineContext的某一个elementcontext.fold(this) { acc, element -> //首先从右边CoroutineContext中删除左边的这个element val removed = acc.minusKey(element.key) //如果removed为空,阐明右边CoroutineContext删除了和element雷同的元素后为空,那么返回左边的element即可 if (removed === EmptyCoroutineContext) element else { //如果removed不为空,阐明右边CoroutineContext删除了和element雷同的元素后还有其余元素,那么结构一个新的CombinedContext返回 val interceptor = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } }
起因一:CombinedContext的构造决定。
其有两个元素,left是一个前驱汇合,element为一个纯正CoroutineContext,它的get办法每次都是从element开始进行查找对应Key的CoroutineContext对象;没有匹配到才会去left汇合中进行递归查找。为了放慢查找ContinuationInterceptor类型的实例,才将它退出到拼接链的尾部,对应的就是element。
起因二:ContinuationInterceptor应用很频繁
每次创立协程都会去尝试查找以后协程的CoroutineContext中是否存在ContinuationInterceptor。这里咱们用launch来验证一下
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine}
如果应用的launch应用的是默认参数,此时Coroutine就是StandaloneCoroutine,而后调用start办法启动协程。
start(block, receiver, this)} public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // will start lazily }
如果咱们应用默认参数,看一下默认参数对应执行的block.startCoroutineCancellable(completion)
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))}
- 首先通过createCoroutineUnintercepted来创立一个协程
- 而后再调用intercepted办法进行拦挡操作
- 最初调用resumeCancellable,即Continuation的resumeWith办法,启动协程,所以每次启动协程都会主动回调一次resumeWith办法
这里看一下intercepted
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
看其在ContinuationImpl的intercepted办法实现
public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it }
- 首先获取到ContinuationInterceptor实例
- 而后调用它的interceptContinuation办法返回一个解决过的Continuation(屡次调用intercepted,对应的interceptContinuation只会调用一次)
至此可知,ContinuationInterceptor的拦挡是通过interceptContinuation办法进行
上面再看一个ContinuationInterceptor的典型示例
val interceptor = object : ContinuationInterceptor { override val key: CoroutineContext.Key<*> = ContinuationInterceptor override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> { println("intercept todo something. change run to thread") return object : Continuation<T> by continuation { override fun resumeWith(result: Result<T>) { println("create new thread") thread { continuation.resumeWith(result) } } } }} println(Thread.currentThread().name) lifecycleScope.launch(interceptor) { println("launch start. current thread: ${Thread.currentThread().name}") withContext(Dispatchers.Main) { println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}") } launch { println("new continuation todo something. current thread: ${Thread.currentThread().name}") } println("launch end. current thread: ${Thread.currentThread().name}")}
/******打印后果******/main// 第一次launchintercept todo something. change run to threadcreate new threadlaunch start. current thread: Thread-2new continuation todo something in the main thread. current thread: maincreate new thread// 第二次launchintercept todo something. change run to threadcreate new threadlaunch end. current thread: Thread-7new continuation todo something. current thread: Thread-8
- 首先程序运行在main线程,启动协程时将自定义的interceptor退出到上下文中,协程启动时进行拦挡,将在main线程运行的程序切换到新的thread线程
- withContext没有拦挡胜利,具体起因在上面的调度器再具体解释,简略来说就是咱们自定义的interceptor被替换了。
- launch start与launch end所处的线程不一样,因为withContext完结之后,它外部还会进行一次线程复原,将本身所处的main线程切换到之前的线程。协程每一个挂起后复原都是通过回调resumeWith进行的,然而内部launch协程咱们进行了拦挡,在它返回的Continuation的resumeWith回调中总是会创立新的thread。
4.3 调度器
CoroutineDispatcher调度器指定指定执行协程的指标载体,它确定了相干的协程在哪个线程或哪些线程上执行。能够将协程限度在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { //将可运行块的执行分派到给定上下文中的另一个线程上 public abstract fun dispatch(context: CoroutineContext, block: Runnable) //返回一个continuation,它封装了提供的[continuation],拦挡了所有的复原 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> //...... }
协程须要调度的地位就是挂终点的地位,只有当挂终点正在挂起的时候才会进行调度,实现调度须要应用协程的拦截器。
调度的实质就是解决挂终点复原之后的协程逻辑在哪里运行的问题。调度器也属于协程上下文一类,它继承自拦截器。
- 【val Default】: CoroutineDispatcher
- 【val Main】: MainCoroutineDispatcher
- 【val Unconfined】: CoroutineDispatcher
IO仅在 Jvm 上有定义,它基于 Default 调度器背地的线程池,并实现了独立的队列和限度,因而协程调度器从 Default 切换到 IO 并不会触发线程切换
对于调度器介绍到这里,还没有具体解释前述协程拦截器中的withContext为什么拦挡失败。这里针对这个具体看一下源码实现。
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
其返回类型为MainCoroutineDispatcher,继承自CoroutineDispatcher。
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true public abstract fun dispatch(context: CoroutineContext, block: Runnable) public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block) public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) ......}
CoroutineDispatch实现了ContinuationInterceptor,依据前述解释的CoroutineContext构造,可知咱们自定义的拦截器没有失效是因为被替换了。
CoroutineDispatch中的isDispatchNeeded就是判断是否须要散发,而后dispatch就是执行散发。
ContinuationInterceptor重要的办法就是interceptContinuation,在CoroutineDispatcher中间接返回了DispatchedContinuation对象,它是一个Continuation类型,看一下其resumeWith实现。
override fun resumeWith(result: Result<T>) { val context = continuation.context val state = result.toState() //判断是否须要散发 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_ATOMIC dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_ATOMIC) { withCoroutineContext(this.context, countOrElement) { //不须要散发,间接应用原先的continuation对象的resumewith continuation.resumeWith(result) } } }}
那么散发的判断逻辑是怎么实现的?这要依据具体的dispatcher来看。
如果咱们拿的是Dispatchers.Main,其dispatcher为HandlerContext。
internal class HandlerContext private constructor( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean) : HandlerDispatcher(), Delay { override fun isDispatchNeeded(context: CoroutineContext): Boolean { return !invokeImmediately || Looper.myLooper() != handler.looper } override fun dispatch(context: CoroutineContext, block: Runnable) { if (!handler.post(block)) { cancelOnRejection(context, block) } } ......
其中HandlerContext继承于HandlerDispatcher,而HandlerDispatcher继承于MainCoroutineDispatcher
Dispatcher的根本实现原理大抵为:
- 首先在协程进行启动的时候通过拦截器的形式进行拦挡,对应的办法是interceptContinuation
- 而后返回一个具备切换线程性能的Continuation
- 在每次进行resumeWith的时候,外部再通过isDispatchNeeded进行判断以后协程的运行是否须要切换线程。
- 如果须要则调用dispatch进行线程的切换,保障协程的正确运行。如果要自定义协程线程的切换,能够通过继承CoroutineDispatcher来实现。
这里再简略看一下WithContext,咱们都晓得其不仅能够承受CoroutineDispatcher来帮忙咱们切换线程,同时在执行结束之后还会帮忙咱们将之前切换掉的线程进复原,保障协程运行的连贯性。那这是怎么实现的呢?
withContext的线程复原原理是它外部生成了一个DispatchedCoroutine,保留切换线程时的CoroutineContext与切换之前的Continuation,最初在onCompletionInternal进行复原。咱们简略翻一翻其源码实现。
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> // 创立新的CoroutineContext val oldContext = uCont.context val newContext = oldContext + context ...... //应用新的Dispatcher,笼罩外层 val coroutine = DispatchedCoroutine(newContext, uCont) block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() }}
internal class DispatchedCoroutine<in T>( context: CoroutineContext, uCont: Continuation<T>) : ScopeCoroutine<T>(context, uCont) { //在complete时会会回调 override fun afterCompletion(state: Any?) { // Call afterResume from afterCompletion and not vice-versa, because stack-size is more // important for afterResume implementation afterResume(state) } override fun afterResume(state: Any?) { ////uCont就是父协程,context仍是老版context,因而能够切换回原来的线程上 if (tryResume()) return // completed before getResult invocation -- bail out // Resume in a cancellable way because we have to switch back to the original dispatcher uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) } ......}
- 对于withContext,传入的context会笼罩外层的拦截器并生成一个newContext,因而能够实现线程切换。
- DispatchedCoroutine作为complete传入协程体的创立函数中,因而协程体执行实现后会回调到afterCompletion中。
- DispatchedCoroutine中传入的uCont是父协程,它的拦截器仍是外层的拦截器,因而会切换回原来的线程中。
4.3.1 典型用例
例如:点击一个按钮,进行异步操作后再回调刷新UI
getUserBtn.setOnClickListener { getUser { user -> handler.post { userNameView.text = user.name } }}typealias Callback = (User) -> Unit fun getUser(callback: Callback){ ...}
因为 getUser 函数须要切到其余线程执行,因而回调通常也会在这个非 UI 的线程中调用,所以为了确保 UI 正确被刷新,咱们须要用 handler.post 切换到 UI 线程。
如果要用协程实现呢?
suspend fun getUserCoroutine() = suspendCoroutine<User> { continuation -> getUser { continuation.resume(it) }} getUserBtn.setOnClickListener { GlobalScope.launch(Dispatchers.Main) { userNameView.text = getUserCoroutine().name }}
suspendCoroutine 这个办法并不是帮咱们启动协程的,它运行在协程当中并且帮咱们获取到以后协程的 Continuation 实例,也就是拿到回调,不便前面咱们调用它的 resume 或者 resumeWithException 来返回后果或者抛出异样。
4.3.2 线程绑定
调度器的目标就是切线程,咱们只有提供线程,调度器就应该很不便的创立进去。
suspend fun main() { val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher() GlobalScope.launch(myDispatcher) { log(1) }.join() log(2)}
因为这个线程池是咱们本人创立的,因而咱们须要在适合的时候敞开它。
除了上述的办法,kotlin协程还给出了更简略的api,如下:
GlobalScope.launch(newSingleThreadContext("Dispather")) { //......}.join()
前述咱们是通过线程的形式,同理能够通过线程池转为调度器实现。
Executors.newScheduledThreadPool(10) .asCoroutineDispatcher().use { dispatcher -> GlobalScope.launch(dispatcher) { //...... }.join
五、协程挂起
在前述协程时,常常会呈现suspend关键字和挂起的说法,其含意和用法是什么?一起深刻看一下。
5.1 概述
suspend翻译过去就是中断、挂起,用在函数申明前,起到挂起协程的标识,实质作用是代码调用时为办法增加一个Continuation类型的参数,保障协程中Continuation的高低传递。
挂起函数只能在协程或另一个挂起函数中被调用,如果你在非协程中应用到了挂起函数,会报错。
阻塞:
函数A必须在函数B之前实现执行,线程被锁定以便函数A可能实现其执行
挂起:
函数A尽管曾经启动,但能够暂停,让函数B执行,而后只在稍后复原。线程没有被函数A锁定。
“挂起”是指协程从它以后线程脱离,切换到另一个线程运行。当线程运行到suspend函数时,会临时挂起这个函数及后续代码的执行。简而言之,挂起函数是一个能够启动、暂停和复原的函数。
协程运行的时候每遇到被suspend润饰的办法时,都可能会挂起以后协程,不是必会挂起,例如如下办法就不会被挂起。
private suspend fun a() { println("aa")}
这是因为这种办法不会返回COROUTINE_SUSPENDED类型,这在前面具体解释。
5.2 suspend实质
Kotlin 应用堆栈帧来治理要运行哪个函数以及所有局部变量。
协程在惯例函数根底上增加了suspend和resume两项操作用于解决长时间运行的工作。
【suspend】:挂起或暂停,用于挂起执行以后协程,并保留所有局部变量
【resume】:复原,用于让已挂起的协程从挂起处复原继续执行
挂起(暂停)协程时,会复制并保留以后的堆栈帧以供稍后应用,将信息保留到Continuation对象中。
复原协程时,会将堆栈帧从其保留地位复制回来,对应的Continuation通过调用resumeWith函数才会复原协程的执行,而后函数再次开始运行。同时返回Result类型的胜利或者异样的后果。
public interface Continuation<in T> { //对应这个Continuation的协程上下文 public val context: CoroutineContext //复原相应协程的执行,传递一个胜利或失败的后果作为最初一个挂终点的返回值。 public fun resumeWith(result: Result<T>)} //将[value]作为最初一个挂终点的返回值,复原相应协程的执行。@SinceKotlin("1.3")@InlineOnlypublic inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value)) //复原相应协程的执行,以便在最初一个挂终点之后从新抛出[异样]。@SinceKotlin("1.3")@InlineOnlypublic inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit = resumeWith(Result.failure(exception))
- Continuation 类有一个 resumeWith 函数能够接管 Result 类型的参数。
- 在后果胜利获取时,调用resumeWith(Result.success(value))或者调用拓展函数resume(value);出现异常时,调用resumeWith(Result.failure(exception))或者调用拓展函数resumeWithException(exception)。这就是 Continuation 的复原调用。
@FormUrlEncoded@POST("/api/common/countryList")suspend fun fetchCountryList(@FieldMap params: Map<String, String?>): CountryResponse
前述挂起函数解析后反编译如下:
@FormUrlEncoded@POST("/api/common/countryList")@NullableObject fetchCountryList(@FieldMap @NotNull Map var1, @NotNull Continuation var2);
- 挂起函数反编译后,发现多了一个Continuation参数,有编译器传递,阐明调用挂起函数须要Continuation。
- 只有挂起函数或者协程中才有Continuation,所以挂起函数只能在协程或者其余挂起函数中执行。
5.2.1 Continuation
这里看一下该Continuation的传递起源。
这个函数只能在协程或者挂起函数中执行,阐明Continuation很有可能是从协程中传入来的,查看协程构建源码。
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine}
通过launch启动一个协程时,其通过coroutine的start办法启动协程:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { start(block, receiver, this)}
而后start办法外面调用了CoroutineStart的invoke,这个时候咱们发现了Continuation:
//CoroutineStart的invoke办法呈现了Continuationpublic operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =when (this) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit // will start lazily}@InternalCoroutinesApipublic fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))}
最终回调到Continuation的resumeWith()复原函数中。
public fun <T> Continuation<T>.resumeCancellableWith( result: Result<T>, onCancellation: ((cause: Throwable) -> Unit)? = null): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) else -> resumeWith(result)}
咱们再深刻kotlin源码看一下其外部实现。
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T>): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(receiver, probeCompletion) else { createCoroutineFromSuspendFunction(probeCompletion) { (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)//1 } }}
private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any?): Continuation<Unit> { val context = completion.context // label == 0 when coroutine is not started yet (initially) or label == 1 when it was return if (context === EmptyCoroutineContext) object : RestrictedContinuationImpl(completion as Continuation<Any?>) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } 1 -> { label = 2 result.getOrThrow() // this is the result if the block had suspended } else -> error("This coroutine had already completed") } } else object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } 1 -> { label = 2 result.getOrThrow() // this is the result if the block had suspended } else -> error("This coroutine had already completed") } }}
- createCoroutineUnintercepted(receiver, completion)办法在Kotlin源码中是通过suspend关键字润饰的扩大办法。
- suspend关键字润饰(suspend R.() -> T)对象理论被编译成为一个Function2<r, continuation, Any?>接口对象,而关键字suspend理论编译成了Continuation接口。
所以:
- 协程体自身就是Continuation,即必须在协程内调用suspend挂起函数。
- suspend关键字并不具备暂停、挂起代码块或者函数办法性能。
5.2.2 状态机CPS
协程理论挂起是如何实现的?
这里首先通过一个示例来演示一下状态机。
suspend fun main() { log(1) log(returnSuspended()) log(2) delay(1000) log(3) log(returnImmediately()) log(4)} suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{ continuation -> thread { Thread.sleep(1000) continuation.resume("Return suspended.") } COROUTINE_SUSPENDED} suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{ log(5) "Return immediately."}
这里咱们定义了两个挂起函数,一个会真正挂起,一个会间接返回后果,其运行后果为:
[main]:1[Thread-2]:Return suspended.[Thread-2]:2[kotlinx.coroutines.DefaultExecutor]:3[kotlinx.coroutines.DefaultExecutor]:5[kotlinx.coroutines.DefaultExecutor]:Return immediately.[kotlinx.coroutines.DefaultExecutor]:4
前述代码的理论实现状况大抵如下:
public class ContinuationImpl implements Continuation<Object> { private int label = 0; private final Continuation<Unit> completion; public ContinuationImpl(Continuation<Unit> completion) { this.completion = completion; } @Override public CoroutineContext getContext() { return EmptyCoroutineContext.INSTANCE; } @Override public void resumeWith(@NotNull Object o) { try { Object result = o; switch (label) { case 0: { LogKt.log(1); result = SuspendFunctionsKt.returnSuspended( this); label++; if (isSuspended(result)) return; } case 1: { LogKt.log(result); LogKt.log(2); result = DelayKt.delay(1000, this); label++; if (isSuspended(result)) return; } case 2: { LogKt.log(3); result = SuspendFunctionsKt.returnImmediately( this); label++; if (isSuspended(result)) return; } case 3:{ LogKt.log(result); LogKt.log(4); } } completion.resumeWith(Unit.INSTANCE); } catch (Exception e) { completion.resumeWith(e); } } private boolean isSuspended(Object result) { return result == IntrinsicsKt.getCOROUTINE_SUSPENDED(); }}
- 首先定义了一个ContinuationImpl,即一个Continuation的实现。
- 能够在 Kotlin 的规范库当中找到一个名叫 ContinuationImpl 的类,其 resumeWith 最终调用到了 invokeSuspend,而这个 invokeSuspend 实际上就是咱们的协程体,通常也就是一个 Lambda 表达式。
- 通过 launch启动协程,传入的那个 Lambda 表达式,实际上会被编译成一个 SuspendLambda 的子类,而它又是 ContinuationImpl 的子类。
public class RunSuspend implements Continuation<Unit> { private Object result; @Override public CoroutineContext getContext() { return EmptyCoroutineContext.INSTANCE; } @Override public void resumeWith(@NotNull Object result) { synchronized (this){ this.result = result; notifyAll(); // 协程曾经完结,告诉上面的 wait() 办法进行阻塞 } } public void await() throws Throwable { synchronized (this){ while (true){ Object result = this.result; if(result == null) wait(); // 调用了 Object.wait(),阻塞以后线程,在 notify 或者 notifyAll 调用时返回 else if(result instanceof Throwable){ throw (Throwable) result; } else return; } } }}
接着,定义了一个RunSuspend,用来接管后果。
public static void main(String... args) throws Throwable { RunSuspend runSuspend = new RunSuspend(); ContinuationImpl table = new ContinuationImpl(runSuspend); table.resumeWith(Unit.INSTANCE); runSuspend.await();}
作为 completion 传入的 RunSuspend 实例的 resumeWith 实际上是在 ContinuationImpl 的 resumeWtih 的最初才会被调用,因而它的 await() 一旦进入阻塞态,直到 ContinuationImpl 的整体状态流转结束才会进行阻塞,此时过程也就运行结束失常退出了。
这段代码的运行后果为:
/******打印后果******/[main]:1[Thread-2]:Return suspended.[Thread-2]:2[kotlinx.coroutines.DefaultExecutor]:3[kotlinx.coroutines.DefaultExecutor]:5[kotlinx.coroutines.DefaultExecutor]:Return immediately.[kotlinx.coroutines.DefaultExecutor]:4
- 协程体的执行就是一个状态机,每一次遇到挂起函数,都是一次状态转移,就像咱们后面例子中的 label 一直的自增来实现状态流转一样
- 状态机即代码中每一个挂终点和初始挂终点对应的Continuation都会转化为一种状态,协程复原只是跳转到下一种状态。
- 挂起函数将执行过程分为多个 Continuation 片段,并且利用状态机的形式保障各个片段是程序执行的,所以异步逻辑也能够用程序的代码来实现。
5.3 协程运行原理
前述相干示例更多是为了验证剖析协程的一些个性,这里从协程的创立、启动、复原、线程调度,协程切换等具体解析协程的实现。
5.3.1 协程创立与启动
首先创立一个协程并启动,最常见的莫过于CoroutineScope.launch{},其源码实现为:
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine}
咱们如果不指定start 参数,所以它会应用默认的 CoroutineStart.DEFAULT,最终 coroutine 会失去一个 StandaloneCoroutine。其实现自AbstractCoroutine,实现了Continuation。
前述剖析suspend实质时已知,其最终会调用到createCoroutineUnintercepted,次要是创立了一个新的可挂起计算,通过调用resume(Unit)启动协程,返回值为Continuation,Continuation提供了resumeWith复原协程的接口,用以实现协程复原,Continuation封装了协程的代码运行逻辑和复原接口。
将协程代码进行反编译,再看一下其字节码和java实现,例如
suspend fun test() { CoroutineScope(Dispatchers.IO).launch { delay(11) }}
查看其字节码实现时,可知其编译生成外部类。
协程的计算逻辑封装在invokeSuspend办法中,而SuspendLambda的继承关系为 ,
SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
其中BaseContinuationImpl 局部要害源码如下:
internal abstract class BaseContinuationImpl(...) { // 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写 public final override fun resumeWith(result: Result<Any?>) { ... val outcome = invokeSuspend(param) ... } // 由编译生成的协程相干类来实现 protected abstract fun invokeSuspend(result: Result<Any?>): Any?}
前述的协程示例代码反编译为:
public static final Object test(@NotNull Continuation $completion) { Job var10000 = BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getIO()), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { //挂起标识 Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); //设置挂起后复原,进入的状态 this.label = 1; if (DelayKt.delay(11L, this) == var2) { return var2; } break; case 1: // 是否须要抛出异样 ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }), 3, (Object)null); return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;}
所以,协程的启动流程为:resume(Unit)->resumeWith()->invokeSuspend()。
协程的挂起通过suspend挂起函数实现,协程的复原通过Continuation.resumeWith实现。
5.3.2 协程线程调度
协程的线程调度是通过拦截器实现的,后面提到了协程启动调用到了startCoroutineCancellable,对于协程调度在前述的协程调度器局部已具体介绍了,这里再简略过一下。
@InternalCoroutinesApipublic fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))}
看一下其intercepted()的具体实现:
@SinceKotlin("1.3")public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: thisinternal abstract class ContinuationImpl( ......) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) public override val context: CoroutineContext get() = _context!! @Transient private var intercepted: Continuation<Any?>? = null public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } // context[ContinuationInterceptor] 就是协程的 CoroutineDispatcher ......}public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) ......}
intercepted()最终会应用协程的CoroutineDispatcher的interceptContinuation办法包装原来的 Continuation,拦挡所有的协程运行操作。
DispatchedContinuation拦挡了协程的启动和复原,别离是resumeCancellableWith和重写的resumeWith(Result)。
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation { @Suppress("NOTHING_TO_INLINE") inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) //判断是否须要线程调度 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE //将协程运算散发到另一个线程 dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { //间接在以后线程执行协程运算 resumeUndispatchedWith(result) } } } } override fun resumeWith(result: Result<T>) { val context = continuation.context val state = result.toState() //判断是否须要线程调度 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_ATOMIC //将协程的运算散发到另一个线程 dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_ATOMIC) { withCoroutineContext(this.context, countOrElement) { //间接在以后线程执行协程运算 continuation.resumeWith(result) } } } } } internal abstract class DispatchedTask<in T>( @JvmField public var resumeMode: Int) : SchedulerTask(){ public final override fun run() { //封装了 continuation.resume 逻辑 } ......}
5.3.3 协程挂起与复原
编译器会生成继承自SuspendLambda的子类,协程的真正运算逻辑都在invokeSuspend中。这里咱们先再次回到startCoroutineCancellable函数中。
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit)? = null) =runSafely(completion) { createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)}
看一下其中的resumeCancellableWith办法。
public fun <T> Continuation<T>.resumeCancellableWith( result: Result<T>, onCancellation: ((cause: Throwable) -> Unit)? = null): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) else -> resumeWith(result)}
这是Continuation的扩大办法,最初都会调用到Continuation的resumeWith,这里的Continuation就是前述所说的SuspendLambda,它继承了 BaseContinuationImpl
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable { // This implementation is final. This fact is used to unroll resumeWith recursion. public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { //执行invokeSuspend内的代码块 val outcome = invokeSuspend(param) //如果代码块内执行了挂起办法,协程挂起,resumeWith执行完结,再次调用resumeWith时协程挂终点之后的代码能力继续执行 if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // 如果实现的completion也是BaseContinuationImpl,就会进入循环 current = completion param = outcome } else { // 执行completion resumeWith办法 completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any? .....}
上面看一下invokeSuspend的实现逻辑。
fun main(args: Array<String>) { val coroutineDispatcher = newSingleThreadContext("ctx") // 启动协程 1 GlobalScope.launch(coroutineDispatcher) { println("the first coroutine") async (Dispatchers.IO) { println("the second coroutine 11111") delay(100) println("the second coroutine 222222") }.await() println("the first coroutine end end end") } // 保障 main 线程存活,确保下面两个协程运行实现 Thread.sleep(500)}
前述示例编译成SuspendLambda子类的invokeSuspend办法为:
public final Object invokeSuspend(@NotNull Object $result) { //挂起函数返回标识SUSPEND_FLAG Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); String var3; boolean var4; //label默认初始值为0 switch(this.label) { case 0: ResultKt.throwOnFailure($result); CoroutineScope $this$launch = (CoroutineScope)this.L$0; var3 = "the first coroutine"; var4 = false; System.out.println(var3); //新建并启动 async 协程 Deferred var10000 = BuildersKt.async$default($this$launch, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { //挂起标识 Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); String var2; boolean var3; switch(this.label) { case 0: ResultKt.throwOnFailure($result); var2 = "the second coroutine 11111"; var3 = false; System.out.println(var2); this.label = 1; //判断是否执行delay挂起函数 if (DelayKt.delay(100L, this) == var4) { //挂起,跳出该办法 return var4; } break; case 1: ResultKt.throwOnFailure($result); // 复原协程后再执行一次 resumeWith(),而后无异样的话执行最初的 println() break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } var2 = "the second coroutine 222222"; var3 = false; System.out.println(var2); return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }), 2, (Object)null); //设置挂起后复原时,进入的状态 this.label = 1; //调用await()挂起函数 if (var10000.await(this) == var5) { return var5; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } var3 = "the first coroutine end end end"; var4 = false; System.out.println(var3); return Unit.INSTANCE;}
如果async线程未执行实现,await()返回为IntrinsicsKt.getCOROUTINE_SUSPENDED(),就会 return,launch 协程的invokeSuspend办法执行实现,协程所在线程持续往下运行,此时 launch 协程处于挂起状态。
所以协程的挂起在代码层面来说就是跳出协程执行的办法体,或者说跳出协程以后状态机下的对应状态,而后期待下一个状态来长期在进行执行。
对于协程挂起有三点注意事项:
- 启动其余协程并不会挂起以后协程,所以launch和async启动线程时,除非新协程运行在以后线程,则以后协程只能在新协程运行实现后继续执行,否则以后协程都会马上持续运行。
- 协程挂起并不会阻塞线程,因为协程挂起时相当于执行完协程的办法,线程继续执行其余之后的逻辑。
- 挂起函数并肯定都会挂起协程,例如await()挂起函数如果返回值不等于IntrinsicsKt.getCOROUTINE_SUSPENDED(),则协程继续执行挂终点之后逻辑。
看完invokeSuspend,咱们再次回到startCoroutineCancellable函数中,其调用的createCoroutineUnintercepted 办法中创立的 SuspendLambda 实例是 BaseContinuationImpl 的子类对象,其 completion 参数为下:
launch: if (isLazy) LazyStandaloneCoroutine else StandaloneCoroutine
async: if (isLazy) LazyDeferredCoroutine else DeferredCoroutine
下面这几个类都是 AbstractCoroutine 的子类。而依据 completion 的类型会执行不同的逻辑:
BaseContinuationImpl: 执行协程逻辑
其它: 调用 resumeWith 办法,解决协程的状态,协程挂起后的复原即与它无关
前述的示例中async启动的协程,也会调用其invokeSuspend办法执行async协程,假如async 返回的后果曾经可用时,即非 COROUTINE_SUSPENDED 值,此时 completion 是 DeferredCoroutine 对象,因而就会调用 DeferredCoroutine.resumeWith 办法,而后返回,父协程的复原逻辑便是在这里。
public final override fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state)}
在 makeCompletingOnce 办法中,会依据 state 去解决协程状态,这里最终会走到ResumeAwaitOnCompletion.invoke 来复原父协程,必要的话还会把 async 的后果给它。
private class ResumeAwaitOnCompletion<T>( private val continuation: CancellableContinuationImpl<T>) : JobNode() { override fun invoke(cause: Throwable?) { val state = job.state assert { state !is Incomplete } if (state is CompletedExceptionally) { // Resume with with the corresponding exception to preserve it continuation.resumeWithException(state.cause) } else { // resume 被挂起的协程 @Suppress("UNCHECKED_CAST") continuation.resume(state.unboxState() as T) } }}
这里的 continuation 就是 launch 协程体,也就是 SuspendLambda 对象,于是 invoke 办法会再一次调用到 BaseContinuationImpl.resumeWith 办法,接着调用 SuspendLambda.invokeSuspend, 而后依据 label 取值继续执行接下来的逻辑!
launch 协程复原的过程,从 async 协程的SuspendLambda的子类的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最初 handler 节点外面通过调用resume(result)复原协程。
await()挂起函数复原协程的原理:
- 将 launch 协程封装为 ResumeAwaitOnCompletion 作为 handler 节点增加到 aynsc 协程的 state.list
- 而后在 async 协程实现时会告诉 handler 节点调用 launch 协程的 resume(result) 办法将后果传给 launch 协程,并复原 launch 协程继续执行 await 挂终点之后的逻辑。
5.3.4 协程三层封装
通过前述的一系列剖析可知,协程有三层封装:
- 罕用的launch和async返回的Job、Deferred,外面封装了协程状态,提供了勾销协程接口,而它们的实例都是继承自AbstractCoroutine,它是协程的第一层包装。
- 第二层包装是编译器生成的SuspendLambda的子类,封装了协程的真正运算逻辑,继承自BaseContinuationImpl,其中completion属性就是协程的第一层包装。
- 第三层包装是后面剖析协程的线程调度时提到的DispatchedContinuation,封装了线程调度逻辑,蕴含了协程的第二层包装。
协程其实就是一段能够挂起和复原执行的运算逻辑,而协程的挂起通过挂起函数实现,挂起函数用状态机的形式用挂终点将协程的运算逻辑拆分成不同的片段,每次运行协程执行不同的逻辑片段。
所以协程有两个很大的益处:
- 简化异步编程,反对异步返回;
- 挂起不阻塞线程,提供线程利用率
六、总结
本文通过为什么应用协程,协程如何创立启动,协程的调度原理和协程的挂起原理几个方面对协程进行了初步分析,上面一起回顾一下全文重点内容,对全文内容进行一个总结
协程引入:
- 协程能够让异步代码同步化,升高程序波及的复杂度
- 协程实质是轻量级线程,单个线程能够运行多个协程,协程的运行不会导致线程阻塞
协程启动:
- 协程启动须要三局部:上下文、启动模式、协程体。创立协程的形式有runBlocking、launch和async,举荐应用CoroutineScope.launch的形式创立协程,应用async的形式创立并发执行,同步期待获取返回值的状况。
- Job是launch构建协程返回的一个协程工作,实现时没有返回值,可看成协程对象自身。其提供相干办法可用于察看协程执行状况。Deferred继承自Job,是async构建协程返回的一个协程工作,可通过调用await()办法期待执行实现获取后果。
- 启动协程须要作用域,作用域在协程创立过程中产生,常见的协程作用域有GlobalScope、coroutineScope等,协程配合Jetpack Lifecycle相干组件提供的lifecycleScope等作用域进行应用,异样丝滑好用。
- 协程的启动模式有DEFAULT、ATOMIC、UNDISPATCHED、LAZY四种,留神不同启动模式的区别。
- 如果要在父协程中进行子协程切换操作,能够应用withContext。
协程调度:
- 协程上下文是一个元素的汇合,其定义是递归的,本人蕴含若干个本人,其构造介于set 和 map 之间。
- 协程实现的实质是回调,这个回调即Continuation。协程拦截器的实现就是拦挡Continuation,可在此处进行缓存、日志打印等拦挡解决
- 调度器即确认相干协程在哪个线程上执行,调度的实质是解决挂起复原后协程逻辑在哪里运行的问题,其继承自拦截器。
- 调度器的是实现原理即在协程启动时通过拦截器进行拦挡,返回一个Continuation,再在协程复原进行resumeWith操作时,进行线程切换判断和线程切换。
协程挂起:
- 挂起函数是一个可启动、暂停和复原的函数,被suspend润饰的函数在协程运行时不是肯定会被挂起的。
- 挂起函数的挂起实现原理就是状态机的状态转移。协程体的执行就是一个状态机,每遇到一次挂起函数就是一次状态转移,而协程的复原不过是从一种状态跳转到下一种状态。挂起函数将整个执行过程划分为多个Continuation片段,利用状态机的形式保障各个片段时程序执行的,从而实现了用程序的代码实现异步逻辑。
参考资料:
- 【1】破解Kotlin协程
- 【2】Kotlin Jetpack 实战 | 09.图解协程原理
- 【3】一文看透 Kotlin 协程实质
- 【4】抽丝剥茧Kotlin - 协程
- 【5】Kotlin协程实现原理
- 【6】kotlin 协程-Android实战
- 【7】kotlin 协程 官网领导文档