关于coroutine:Android中的Coroutine协程原理详解

3次阅读

共计 11557 个字符,预计需要花费 29 分钟才能阅读完成。

前言

协程是一个并发计划。也是一种思维。

传统意义上的协程是单线程的,面对 io 密集型工作他的内存耗费更少,进而效率高。然而面对计算密集型的工作不如多线程并行运算效率高。

不同的语言对于协程都有不同的实现,甚至同一种语言对于不同平台的操作系统都有对应的实现。

咱们 kotlin 语言的协程是 coroutines for jvm 的实现形式。底层原理也是利用 java 线程。

基础知识

生态架构

相干依赖库

dependencies {
   // Kotlin
   implementation "org.jetbrains.kotlin:kotlin-stdlib:1.4.32"

   // 协程外围库
   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3"
   // 协程 Android 反对库
   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.3"
   // 协程 Java8 反对库
   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.4.3"
 
   // lifecycle 对于协程的扩大封装
   implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"
   implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"
   implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"
}
​

为什么一些人总感觉协程艰涩难懂?

1. 网络上没有具体的对于协程的概念定义,每种语言、每个系统对其实现都不一样。堪称是七嘴八舌,什么内核态用户态巴拉巴拉,很容易给咱们带偏

2.kotlin 的各种语法糖对咱们造成的烦扰。如:

  • 高阶函数
  • 源码实现类找不到

所以扎实的 kotlin 语法基本功是学习协程的前提。

切实看不懂得中央就反编译为 java,以 java 最终翻译为准。

协程是什么?有什么用?

kotlin 中的协程干的事就是把异步回调代码拍扁了,捋直了,让异步回调代码同步化。除此之外,没有任何特别之处。

创立一个协程,就是编译器背地偷偷生成一系列代码,比如说 状态机

通过 挂起 复原 让状态机 状态流转 实现把层层嵌套的回调代码变成像同步代码那样直观、简洁。

它不是什么线程框架,也不是什么浅近的内核态,用户态。它其实对于咱们安卓来说,就是一个对于回调函数的语法糖。。。

本文将会围绕 挂起 复原 彻底分析协程的实现原理

Kotlin 函数基础知识温习

再 Kotlin 中函数是一等公民,有本人的类型

函数类型

fun foo(){}
// 类型为 () -> Unit
fun foo(p: Int){}
// 类型为 (Int) -> String

class Foo{fun bar(p0: String,p1: Long):Any{}}
// 那么 bar 的类型为:Foo.(String,Long) -> Any
//Foo 就是 bar 的 receiver。也能够写成 (Foo,String,Long) ->Any

函数援用

fun foo(){} 
// 援用是 ::foo
fun foo(p0: Int): String
// 援用也是 ::foo

咋都一样?没方法,就这样规定的。应用的时候 只能靠编译器推断

val f: () -> Unit = ::foo // 编译器会推断出是 fun foo(){} 
val g: (Int) -> String = ::foo // 推断为 fun foo(p0: Int): String

带 Receiver 的写法

class Foo{fun bar(p0: String,p1: Long):Any{}}
val h: (Foo,String,Long) -> Any = Foo:bar

绑定 receiver 的函数援用:

val foo: Foo = Foo()
val m: (String,Long) -> Any = foo:bar

额定知识点

val x: (Foo,String,Long) -> Any = Foo:bar
val y: Function3<Foo,String,Long,Any> = x

Foo.(String,Long) -> Any = (Foo,String,Long) ->Any = Function3<Foo,String,Long,Any>

函数作为参数传递

fun yy(p: (Foo,String,Long)->Any){p(Foo(),"Hello",3L)// 间接 p()就能调用
    //p.invoke(Foo(),"Hello",3L) 也能够用 invoke 模式
}

Lambda

就是匿名函数,它跟一般函数比是没有名字的,听起来如同是废话

// 一般函数
fun func(){println("hello");
}
​
// 去掉函数名 func, 就成了匿名函数
fun(){println("hello");    
}
​
// 能够赋值给一个变量
val func = fun(){println("hello");    
}
​
// 匿名函数的类型
val func :()->Unit = fun(){println("hello");    
}
​
//Lambda 表达式
val func={print("Hello");
}
​
//Lambda 类型
val func :()->String = {print("Hello");
"Hello" // 如果是 Lambda 中,最初一行被当作返回值,能省掉 return。一般函数则不行
}
​
// 带参数 Lambda
val f1: (Int)->Unit = {p:Int ->
print(p);
}
// 可进一步简化为
val f1 = {p:Int ->
print(p);    
}
// 当只有一个参数的时候,还能够写成
val f1: (Int)->Unit = {print(it);
}
​

对于函数的集体经验总结

函数跟匿名函数看起来没啥区别,然而反编译为 java 后还是能看出点差别

如果只是用一般的函数,那么他跟一般 java 函数没啥区别。

比方 fun a() 就是对应 java 办法public void a(){}

然而如果通过函数援用 (:: a) 来用这个函数,那么他并不是间接调用 fun a() 而是从新生成一个Function0

挂起函数

suspend 润饰。

挂起函数中能调用任何函数。

非挂起函数只能调用非挂起函数。

换句话说,suspend 函数只能在 suspend 函数中调用。


简略的挂起函数展现:

//com.example.studycoroutine.chapter.CoroutineRun.kt
suspend fun suspendFun(): Int {return 1;}

挂起函数非凡在哪?

public static final Object suspendFun(Continuation completion) {return Boxing.boxInt(1);
}

这下了解 suspend 为啥只能在 suspend 外面调用了吧?

想要让一本正经的 suspend 函数干活必须要先满足它!!!就是给它外面塞入一颗球。

而后他想调用其余的 suspend 函数,只需将球持续塞到其它的 suspend 办法外面。

一般函数里没这玩意啊,所以压根没法调用 suspend 函数。。。

读到这里,想必各位会有一些疑难:

  • question1. 这不是鸡生蛋生鸡的问题么?第一颗球是哪来的?
  • question2. 为啥编译后返回值也变了?
  • question3.suspendFun 如果在协程体内被调用,那么他的球 (completion) 是谁?

规范库给咱们提供的最原始工具

public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>) {createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
​
public fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit> =
   SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

以一个最简略的形式启动一个协程。

Demo-K1

fun main() {
   val b = suspend {val a = hello2()
       a
  }
   b.createCoroutine(MyCompletionContinuation()).resume(Unit)
}
​
suspend fun hello2() = suspendCoroutine<Int> {
   thread{Thread.sleep(1000)
       it.resume(10086)
  }
}
​
class MyContinuation() : Continuation<Int> {override val context: CoroutineContext = CoroutineName("Co-01")
   override fun resumeWith(result: Result<Int>) {log("MyContinuation resumeWith 后果 = ${result.getOrNull()}")
  }
}

两个创立协程函数区别

startCoroutine 没有返回值 , 而 createCoroutine 返回一个 Continuation, 不难看出是SafeContinuation

如同看起来次要的区别就是 startCoroutine 间接调用 resume(Unit),所以不必包装成 SafeContinuation,而 createCoroutine 则返回一个 SafeContinuation,因为不晓得将会在何时何处调用 resume,必须保障 resume 只调用一次,所以包装为 safeContinuation

SafeContinuationd 的作用是为了确保只有产生异步调用时才挂起

剖析 createCoroutineUnintercepted

//kotlin.coroutines.intrinsics.CoroutinesIntrinsicsH.kt
@SinceKotlin("1.3")
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>): Continuation<Unit>

先说论断

其实能够简略的了解为 kotlin 层面的原语,就是返回一个协程体。

开始剖析

援用代码 Demo-K1 首先 b 是一个匿名函数,他必定要被编译为一个 FunctionX,同时它还被suspend 润饰 所以它必定跟一般匿名函数编译后不一样。

编译后的源码为

public static final void main() {Function1 var0 = (Function1)(new Function1((Continuation)null) {
        int label;
​
        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
           Object var10000;
           switch(this.label) {
           case 0:
              ResultKt.throwOnFailure($result);
              this.label = 1;
              var10000 = TestSampleKt.hello2(this);
              if (var10000 == var3) {return var3;}
              break;
           case 1:
              ResultKt.throwOnFailure($result);
              var10000 = $result;
              break;
           default:
              throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
          }
​
           int a = ((Number)var10000).intValue();
           return Boxing.boxInt(a);
        }
​
        @NotNull
        public final Continuation create(@NotNull Continuation completion) {Intrinsics.checkParameterIsNotNull(completion, "completion");
           Function1 var2 = new <anonymous constructor>(completion);
           return var2;
        }
​
        public final Object invoke(Object var1) {return((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
        }
    });
     boolean var1 = false;
     Continuation var7 = ContinuationKt.createCoroutine(var0, (Continuation)(newMyContinuation()));
     Unit var8 = Unit.INSTANCE;
     boolean var2 = false;
     Companion var3 = Result.Companion;
     boolean var5 = false;
     Object var6 = Result.constructor-impl(var8);
     var7.resumeWith(var6);
  }

咱们能够看到先是 Function1 var0 = new Function1创立了一个对象,此时跟协程没关系,这步只是编译器层面的匿名函数语法优化

如果间接

fun main() {
   suspend {val a = hello2()
       a
  }.createCoroutine(MyContinuation()).resume(Unit)
}

也是一样会创立Function1 var0 = new Function1

解答 question1

持续调用createCoroutine

再持续createCoroutineUnintercepted,找到在 JVM 平台的实现

//kotlin.coroutines.intrinsics.IntrinsicsJVM.class
@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>): Continuation<Unit> {
//probeCompletion 还是咱们传入 completion 对象,在咱们的 Demo 就是 myCoroutine
   val probeCompletion = probeCoroutineCreated(completion)//probeCoroutineCreated 办法点进去看了,如同是 debug 用的. 我的了解是这样的
   //This 就是这个 suspend lambda。在 Demo 中就是 myCoroutineFun
   return if (this is BaseContinuationImpl)
       create(probeCompletion)
   else
//else 分支在咱们 demo 中不会走到
     // 当 [createCoroutineUnintercepted] 遇到不继承 BaseContinuationImpl 的挂起 lambda 时,将应用此函数。createCoroutineFromSuspendFunction(probeCompletion) {(this as Function1<Continuation<T>, Any?>).invoke(it)
      }
}
@NotNull
public final Continuation create(@NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}

completion 传入,并创立一个新的 Function1,作为Continuation 返回,这就是创立进去的协程体对象,协程的工作外围就是它外部的状态机,invokeSuspend函数

调用 create

@NotNull
public final Continuation create(@NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");
    Function1 var2 = new <anonymous constructor>(completion);
    return var2;
}

completion 传入,并创立一个新的 Function1,作为Continuation 返回,这就是创立进去的协程体对象,协程的工作外围就是它外部的状态机,invokeSuspend函数

补充 — 相干类继承关系

解答 question2&3

已知协程启动会调用协程体的 resume, 该调用最终会来到BaseContinuationImpl::resumeWith

internal abstract class BaseContinuationImpl{fun resumeWith(result: Result<Any?>) {// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
       var current = this
       var param = result
       while (true) {with(current) {
               val completion = completion!! // fail fast when trying to resume continuation without completion
               val outcome: Result<Any?> =
                   try {val outcome = invokeSuspend(param)// 调用状态机
                       if (outcome === COROUTINE_SUSPENDED) return
                       Result.success(outcome)
                  } catch (exception: Throwable) {Result.failure(exception)
                  }
               releaseIntercepted() // this state machine instance is terminating
               if (completion is BaseContinuationImpl) {
                   // unrolling recursion via loop
                   current = completion
                   param = outcome
              } else {
                   // 最终走到这里,这个 completion 就是被塞的第一颗球。completion.resumeWith(outcome)
                   return
              }
          }
      }
  }
}

状态机代码截取

public final Object invokeSuspend(@NotNull Object $result) {Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
   Object var10000;
   switch(this.label) {
   case 0:// 第一次进来 label = 0 
      ResultKt.throwOnFailure($result);
      // label 改成 1 了,意味着下一次被复原的时候会走 case 1,这就是所谓的【状态流转】this.label = 1; 
      // 整体眼光向我看齐,我发表个事:this is 协程体对象。var10000 = TestSampleKt.hello2(this);
      if (var10000 == var3) {return var3;}
      break;
   case 1:
      ResultKt.throwOnFailure($result);
      var10000 = $result;
      break;
   default:
      throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
   }

   int a = ((Number)var10000).intValue();
   return Boxing.boxInt(a);
}

question3 答案 进去了,传进去的是 create 创立的那个 continuation

最初再来聊聊 question2,从下面的代码曾经很分明的通知咱们为啥挂起函数反编译后的返回值变为 object 了。

以 hello2 为例子,hello2 能返回代表挂起的白板,也能返回 result。如果返回白板,状态机 return,协程挂起。如果返回 result,那么 hello2 执行结束,是一个没有挂起的挂起函数,通常编译器也会揭示 suspend 修饰词无意义。所以这就是设计须要,没有啥因为所以。

最初,除了间接返回后果的状况,挂起函数肯定会以 resume 结尾,要么返回 result,要么返回异样。代表这个挂起函数返回了。

调用 resume 意义在于从新回调 BaseContinuationImpl 的 resumeWith,进而唤醒状态机,继续执行协程体的代码。

换句话说,咱们自定义的 suspend 函数,肯定要利用 suspendCoroutine 取得续体,即状态机对象,否则无奈实现真正的挂起与 resume。

suspendCoroutine

咱们能够不必 suspendCoroutine,用更间接的 suspendCoroutineUninterceptedOrReturn 也能实现,不过这种形式要手动返回白板。不过肯定要小心,要在正当的状况下返回或者不返回,不然会产生很多意想不到的后果

suspend fun mySuspendOne() = suspendCoroutineUninterceptedOrReturn<String> { continuation ->
    thread {TimeUnit.SECONDS.sleep(1)
        continuation.resume("hello world")
    }
    // 因为咱们这个函数没有返回正确后果,所以必须返回一个挂起标识, 否则 BaseContinuationImpl 会认为实现了工作。// 并且咱们的线程又在运行没有勾销,这将很多意想不到的后果
    kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
}

而 suspendCoroutine 则没有这个隐患

suspend fun mySafeSuspendOne() = suspendCoroutine<String> { continuation ->
    thread {TimeUnit.SECONDS.sleep(1)
        continuation.resume("hello world")
    }
    //suspendCoroutine 函数很聪慧的帮咱们判断返回后果如果不是想要的对象,主动返                
    kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
}
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        // 封装一个代理 Continuation 对象
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        // 依据 block 返回后果判断要不要返回 COROUTINE_SUSPENDED
        safe.getOrThrow()}

SafeContinuation 的神秘

// 调用单参数的这个构造方法
internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)
@Volatile
private var result: Any? = initialResult //UNDECIDED 赋值给 result 
//java 原子属性更新器那一套货色
private companion object {@Suppress("UNCHECKED_CAST")
       @JvmStatic
       private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(SafeContinuation::class.java, Any::class.java as Class<Any?>, "result")
  }
​
internal actual fun getOrThrow(): Any? {
   var result = this.result // atomic read
   if (result === UNDECIDED) { // 如果 UNDECIDED,那么就把 result 设置为 COROUTINE_SUSPENDED
       if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) returnCOROUTINE_SUSPENDED
       result = this.result // reread volatile var
  }
   return when {
       result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
       result is Result.Failure -> throw result.exception
       else -> result // either COROUTINE_SUSPENDED or data <- 这里返回白板
  }
}
​
public actual override fun resumeWith(result: Result<T>) {while (true) { // lock-free loop
           val cur = this.result // atomic read。不了解这里的官网正文为啥叫做原子读。我感觉 Volatile 只能保障可见性。when {
             // 这里如果是 UNDECIDED 就把 后果附上去。cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
             // 如果是挂起状态,就通过 resumeWith 回调状态机
               cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)){delegate.resumeWith(result)
                   return
              }
               else -> throw IllegalStateException("Already resumed")
          }
      }
  }
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()

先回顾一下什么叫真正的挂起,就是 getOrThrow 返回了“白板”,那么什么时候 getOrThrow 能返回白板?答案就是 result 被初始化后值没被批改过。那么也就是说 resumeWith 没有被执行过,即:block(safe)这句代码,block 这个被传进来的函数,执行过程中没有调用 safe 的 resumeWith。原理就是这么简略,cas 代码保障要害逻辑的原子性与并发平安

持续以 Demo-K1 为例子,这里假如 hello2 运行在一条新的子线程,否则依然是没有挂起。

{
   thread{Thread.sleep(1000)
       it.resume(10086)
  }
}

总结

最初,能够说开启一个协程,就是利用编译器生成一个状态机对象,帮咱们把回调代码拍扁,成为同步代码。

正文完
 0