前言

协程是一个并发计划。也是一种思维。

传统意义上的协程是单线程的,面对io密集型工作他的内存耗费更少,进而效率高。然而面对计算密集型的工作不如多线程并行运算效率高。

不同的语言对于协程都有不同的实现,甚至同一种语言对于不同平台的操作系统都有对应的实现。

咱们kotlin语言的协程是 coroutines for jvm的实现形式。底层原理也是利用java 线程。

基础知识

生态架构

相干依赖库

dependencies {   // Kotlin   implementation "org.jetbrains.kotlin:kotlin-stdlib:1.4.32"   // 协程外围库   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3"   // 协程Android反对库   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.3"   // 协程Java8反对库   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.4.3"    // lifecycle对于协程的扩大封装   implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"   implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"   implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"}

为什么一些人总感觉协程艰涩难懂?

1.网络上没有具体的对于协程的概念定义,每种语言、每个系统对其实现都不一样。堪称是七嘴八舌,什么内核态用户态巴拉巴拉,很容易给咱们带偏

2.kotlin的各种语法糖对咱们造成的烦扰。如:

  • 高阶函数
  • 源码实现类找不到

所以扎实的kotlin语法基本功是学习协程的前提。

切实看不懂得中央就反编译为java,以java最终翻译为准。

协程是什么?有什么用?

kotlin中的协程干的事就是把异步回调代码拍扁了,捋直了,让异步回调代码同步化。除此之外,没有任何特别之处。

创立一个协程,就是编译器背地偷偷生成一系列代码,比如说状态机

通过挂起复原让状态机状态流转实现把层层嵌套的回调代码变成像同步代码那样直观、简洁。

它不是什么线程框架,也不是什么浅近的内核态,用户态。它其实对于咱们安卓来说,就是一个对于回调函数的语法糖。。。

本文将会围绕挂起复原彻底分析协程的实现原理

Kotlin函数基础知识温习

再Kotlin中函数是一等公民,有本人的类型

函数类型

fun foo(){}//类型为 () -> Unitfun foo(p: Int){}//类型为 (Int) -> Stringclass Foo{    fun bar(p0: String,p1: Long):Any{}    }//那么 bar 的类型为:Foo.(String,Long) -> Any//Foo就是bar的 receiver。也能够写成 (Foo,String,Long) ->Any

函数援用

fun foo(){} //援用是 ::foofun foo(p0: Int): String//援用也是 ::foo

咋都一样?没方法,就这样规定的。应用的时候 只能靠编译器推断

val f: () -> Unit = ::foo //编译器会推断出是fun foo(){} val g: (Int) -> String = ::foo //推断为fun foo(p0: Int): String

带Receiver的写法

class Foo{    fun bar(p0: String,p1: Long):Any{}}
val h: (Foo,String,Long) -> Any = Foo:bar

绑定receiver的函数援用:

val foo: Foo = Foo()val m: (String,Long) -> Any = foo:bar

额定知识点

val x: (Foo,String,Long) -> Any = Foo:barval y: Function3<Foo,String,Long,Any> = xFoo.(String,Long) -> Any = (Foo,String,Long) ->Any = Function3<Foo,String,Long,Any>

函数作为参数传递

fun yy(p: (Foo,String,Long)->Any){    p(Foo(),"Hello",3L)//间接p()就能调用    //p.invoke(Foo(),"Hello",3L) 也能够用invoke模式}

Lambda

就是匿名函数,它跟一般函数比是没有名字的,听起来如同是废话

//一般函数fun func(){   println("hello");}//去掉函数名 func,就成了匿名函数fun(){   println("hello");    }//能够赋值给一个变量val func = fun(){   println("hello");    }//匿名函数的类型val func :()->Unit = fun(){   println("hello");    }//Lambda表达式val func={   print("Hello");}//Lambda类型val func :()->String = {print("Hello");"Hello" //如果是Lambda中,最初一行被当作返回值,能省掉return。一般函数则不行}//带参数Lambdaval f1: (Int)->Unit = {p:Int ->print(p);}//可进一步简化为val f1 = {p:Int ->print(p);    }//当只有一个参数的时候,还能够写成val f1: (Int)->Unit = {   print(it);}

对于函数的集体经验总结

函数跟匿名函数看起来没啥区别,然而反编译为java后还是能看出点差别

如果只是用一般的函数,那么他跟一般java 函数没啥区别。

比方 fun a() 就是对应java办法public void a(){}

然而如果通过函数援用(:: a)来用这个函数,那么他并不是间接调用fun a()而是从新生成一个Function0

挂起函数

suspend 润饰。

挂起函数中能调用任何函数。

非挂起函数只能调用非挂起函数。

换句话说,suspend函数只能在suspend函数中调用。


简略的挂起函数展现:

//com.example.studycoroutine.chapter.CoroutineRun.ktsuspend fun suspendFun(): Int {    return 1;}

挂起函数非凡在哪?

public static final Object suspendFun(Continuation completion) {    return Boxing.boxInt(1);}

这下了解suspend为啥只能在suspend外面调用了吧?

想要让一本正经的suspend函数干活必须要先满足它!!!就是给它外面塞入一颗球。

而后他想调用其余的suspend函数,只需将球持续塞到其它的suspend办法外面。

一般函数里没这玩意啊,所以压根没法调用suspend函数。。。

读到这里,想必各位会有一些疑难:

  • question1.这不是鸡生蛋生鸡的问题么?第一颗球是哪来的?
  • question2.为啥编译后返回值也变了?
  • question3.suspendFun 如果在协程体内被调用,那么他的球(completion)是谁?

规范库给咱们提供的最原始工具

public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>) {   createCoroutineUnintercepted(completion).intercepted().resume(Unit)}public fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit> =   SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

以一个最简略的形式启动一个协程。

Demo-K1

fun main() {   val b = suspend {       val a = hello2()       a  }   b.createCoroutine(MyCompletionContinuation()).resume(Unit)}suspend fun hello2() = suspendCoroutine<Int> {   thread{       Thread.sleep(1000)       it.resume(10086)  }}class MyContinuation() : Continuation<Int> {   override val context: CoroutineContext = CoroutineName("Co-01")   override fun resumeWith(result: Result<Int>) {       log("MyContinuation resumeWith 后果 = ${result.getOrNull()}")  }}

两个创立协程函数区别

startCoroutine 没有返回值 ,而createCoroutine返回一个Continuation,不难看出是SafeContinuation

如同看起来次要的区别就是startCoroutine间接调用resume(Unit),所以不必包装成SafeContinuation,而createCoroutine则返回一个SafeContinuation,因为不晓得将会在何时何处调用resume,必须保障resume只调用一次,所以包装为safeContinuation

SafeContinuationd的作用是为了确保只有产生异步调用时才挂起

剖析createCoroutineUnintercepted

//kotlin.coroutines.intrinsics.CoroutinesIntrinsicsH.kt@SinceKotlin("1.3")public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>): Continuation<Unit>

先说论断

其实能够简略的了解为kotlin层面的原语,就是返回一个协程体。

开始剖析

援用代码Demo-K1首先b 是一个匿名函数,他必定要被编译为一个FunctionX,同时它还被suspend润饰 所以它必定跟一般匿名函数编译后不一样。

编译后的源码为

public static final void main() {     Function1 var0 = (Function1)(new Function1((Continuation)null) {        int label;        @Nullable        public final Object invokeSuspend(@NotNull Object $result) {           Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();           Object var10000;           switch(this.label) {           case 0:              ResultKt.throwOnFailure($result);              this.label = 1;              var10000 = TestSampleKt.hello2(this);              if (var10000 == var3) {                 return var3;              }              break;           case 1:              ResultKt.throwOnFailure($result);              var10000 = $result;              break;           default:              throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");          }           int a = ((Number)var10000).intValue();           return Boxing.boxInt(a);        }        @NotNull        public final Continuation create(@NotNull Continuation completion) {           Intrinsics.checkParameterIsNotNull(completion, "completion");           Function1 var2 = new <anonymous constructor>(completion);           return var2;        }        public final Object invoke(Object var1) {           return((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);        }    });     boolean var1 = false;     Continuation var7 = ContinuationKt.createCoroutine(var0, (Continuation)(newMyContinuation()));     Unit var8 = Unit.INSTANCE;     boolean var2 = false;     Companion var3 = Result.Companion;     boolean var5 = false;     Object var6 = Result.constructor-impl(var8);     var7.resumeWith(var6);  }

咱们能够看到先是 Function1 var0 = new Function1创立了一个对象,此时跟协程没关系,这步只是编译器层面的匿名函数语法优化

如果间接

fun main() {   suspend {       val a = hello2()       a  }.createCoroutine(MyContinuation()).resume(Unit)}

也是一样会创立Function1 var0 = new Function1

解答question1

持续调用createCoroutine

再持续createCoroutineUnintercepted ,找到在JVM平台的实现

//kotlin.coroutines.intrinsics.IntrinsicsJVM.class@SinceKotlin("1.3")public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(   completion: Continuation<T>): Continuation<Unit> {//probeCompletion还是咱们传入completion对象,在咱们的Demo就是myCoroutine   val probeCompletion = probeCoroutineCreated(completion)//probeCoroutineCreated办法点进去看了,如同是debug用的.我的了解是这样的   //This就是这个suspend lambda。在Demo中就是myCoroutineFun   return if (this is BaseContinuationImpl)       create(probeCompletion)   else//else分支在咱们demo中不会走到     //当 [createCoroutineUnintercepted] 遇到不继承 BaseContinuationImpl 的挂起 lambda 时,将应用此函数。       createCoroutineFromSuspendFunction(probeCompletion) {          (this as Function1<Continuation<T>, Any?>).invoke(it)      }}
@NotNullpublic final Continuation create(@NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");Function1 var2 = new <anonymous constructor>(completion);return var2;}

completion传入,并创立一个新的Function1,作为Continuation返回,这就是创立进去的协程体对象,协程的工作外围就是它外部的状态机,invokeSuspend函数

调用 create

@NotNullpublic final Continuation create(@NotNull Continuation completion) {    Intrinsics.checkNotNullParameter(completion, "completion");    Function1 var2 = new <anonymous constructor>(completion);    return var2;}

completion传入,并创立一个新的Function1,作为Continuation返回,这就是创立进去的协程体对象,协程的工作外围就是它外部的状态机,invokeSuspend函数

补充---相干类继承关系

解答question2&3

已知协程启动会调用协程体的resume,该调用最终会来到BaseContinuationImpl::resumeWith

internal abstract class BaseContinuationImpl{   fun resumeWith(result: Result<Any?>) {          // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume       var current = this       var param = result       while (true) {           with(current) {               val completion = completion!! // fail fast when trying to resume continuation without completion               val outcome: Result<Any?> =                   try {                       val outcome = invokeSuspend(param)//调用状态机                       if (outcome === COROUTINE_SUSPENDED) return                       Result.success(outcome)                  } catch (exception: Throwable) {                       Result.failure(exception)                  }               releaseIntercepted() // this state machine instance is terminating               if (completion is BaseContinuationImpl) {                   // unrolling recursion via loop                   current = completion                   param = outcome              } else {                   //最终走到这里,这个completion就是被塞的第一颗球。                   completion.resumeWith(outcome)                   return              }          }      }  }}

状态机代码截取

public final Object invokeSuspend(@NotNull Object $result) {   Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();   Object var10000;   switch(this.label) {   case 0://第一次进来 label = 0       ResultKt.throwOnFailure($result);      // label改成1了,意味着下一次被复原的时候会走case 1,这就是所谓的【状态流转】      this.label = 1;       //整体眼光向我看齐,我发表个事:this is 协程体对象。      var10000 = TestSampleKt.hello2(this);      if (var10000 == var3) {         return var3;      }      break;   case 1:      ResultKt.throwOnFailure($result);      var10000 = $result;      break;   default:      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");   }   int a = ((Number)var10000).intValue();   return Boxing.boxInt(a);}

question3答案进去了,传进去的是create创立的那个continuation

最初再来聊聊question2,从下面的代码曾经很分明的通知咱们为啥挂起函数反编译后的返回值变为object了。

以hello2为例子,hello2能返回代表挂起的白板,也能返回result。如果返回白板,状态机return,协程挂起。如果返回result,那么hello2执行结束,是一个没有挂起的挂起函数,通常编译器也会揭示 suspend 修饰词无意义。所以这就是设计须要,没有啥因为所以。

最初,除了间接返回后果的状况,挂起函数肯定会以resume结尾,要么返回result,要么返回异样。代表这个挂起函数返回了。

调用resume意义在于从新回调BaseContinuationImpl的resumeWith,进而唤醒状态机,继续执行协程体的代码。

换句话说,咱们自定义的suspend函数,肯定要利用suspendCoroutine 取得续体,即状态机对象,否则无奈实现真正的挂起与resume。

suspendCoroutine

咱们能够不必suspendCoroutine,用更间接的suspendCoroutineUninterceptedOrReturn也能实现,不过这种形式要手动返回白板。不过肯定要小心,要在正当的状况下返回或者不返回,不然会产生很多意想不到的后果

suspend fun mySuspendOne() = suspendCoroutineUninterceptedOrReturn<String> { continuation ->    thread {        TimeUnit.SECONDS.sleep(1)        continuation.resume("hello world")    }    //因为咱们这个函数没有返回正确后果,所以必须返回一个挂起标识,否则BaseContinuationImpl会认为实现了工作。     // 并且咱们的线程又在运行没有勾销,这将很多意想不到的后果    kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED}

而suspendCoroutine则没有这个隐患

suspend fun mySafeSuspendOne() = suspendCoroutine<String> { continuation ->    thread {        TimeUnit.SECONDS.sleep(1)        continuation.resume("hello world")    }    //suspendCoroutine函数很聪慧的帮咱们判断返回后果如果不是想要的对象,主动返                    kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED}
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->        //封装一个代理Continuation对象        val safe = SafeContinuation(c.intercepted())        block(safe)        //依据block返回后果判断要不要返回COROUTINE_SUSPENDED        safe.getOrThrow()    }

SafeContinuation的神秘

//调用单参数的这个构造方法internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)@Volatileprivate var result: Any? = initialResult //UNDECIDED赋值给 result //java原子属性更新器那一套货色private companion object {       @Suppress("UNCHECKED_CAST")       @JvmStatic       private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(           SafeContinuation::class.java, Any::class.java as Class<Any?>, "result"      )  }internal actual fun getOrThrow(): Any? {   var result = this.result // atomic read   if (result === UNDECIDED) { //如果UNDECIDED,那么就把result设置为COROUTINE_SUSPENDED       if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) returnCOROUTINE_SUSPENDED       result = this.result // reread volatile var  }   return when {       result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream       result is Result.Failure -> throw result.exception       else -> result // either COROUTINE_SUSPENDED or data <-这里返回白板  }}public actual override fun resumeWith(result: Result<T>) {       while (true) { // lock-free loop           val cur = this.result // atomic read。不了解这里的官网正文为啥叫做原子读。我感觉 Volatile只能保障可见性。           when {             //这里如果是UNDECIDED 就把 后果附上去。               cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return             //如果是挂起状态,就通过resumeWith回调状态机               cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)){                   delegate.resumeWith(result)                   return              }               else -> throw IllegalStateException("Already resumed")          }      }  }
val safe = SafeContinuation(c.intercepted())block(safe)safe.getOrThrow()

先回顾一下什么叫真正的挂起,就是getOrThrow返回了“白板”,那么什么时候getOrThrow能返回白板?答案就是result被初始化后值没被批改过。那么也就是说resumeWith没有被执行过,即:block(safe)这句代码,block这个被传进来的函数,执行过程中没有调用safe的resumeWith。原理就是这么简略,cas代码保障要害逻辑的原子性与并发平安

持续以Demo-K1为例子,这里假如hello2运行在一条新的子线程,否则依然是没有挂起。

{   thread{       Thread.sleep(1000)       it.resume(10086)  }}

总结

最初,能够说开启一个协程,就是利用编译器生成一个状态机对象,帮咱们把回调代码拍扁,成为同步代码。