关于kotlin:Kotlin-coroutine-原理

31次阅读

共计 15446 个字符,预计需要花费 39 分钟才能阅读完成。

Coroutine

lifecycleScope.launch {Log.d("testCoroutineScope","testCoroutineScope start $this")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope middle1")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope middle2")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope end")
        }

上边的代码展现了启动协程的办法,通常在协程体中会调用到 suspend 函数。咱们都理解 kotlin 中协程的反对除了利用到 kotlin 的一些语法个性,同时针对协程还进行了编译器的批改,使得咱们在应用协程时更加直观不便。然而这也带来了另一个问题,咱们更难了解协程的具体工作细节。上面咱们从最让人费解的协程体开始动手。

一、摸索协程体到底是什么?

这里认为通过 launch、async 启动的 block 块就是协程体。

协程体在通过编译器编译后会生成一个新的对象,具体对象的实现是什么样的呢?看看上面反编译后的代码与原代码的比拟:

// 原来的 kotlin 代码
private fun testCoroutineScope() {val block: suspend CoroutineScope.() -> Unit = {Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }

        lifecycleScope.launch(block = block)
    }

为了不便察看,我把协程体独自定义一个 block 变量。kotlin 的协程体只是简略的调用 delay 挂起办法并在办法调用前后增加了 log。

// 反编译后失去的代码
private final void testCoroutineScope() {Function2 block = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope", "testCoroutineScope start");
               this.label = 1;
               if (DelayKt.delay(2000L, this) == var2) {return var2;}
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
            }

            Log.d("testCoroutineScope", "testCoroutineScope end");
            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block, 3, (Object)null);
   }

通过反编译后的协程体是一个实现 Function2 接口类型的对象,Function2 反对两个形参的 invoke 办法。除了 invoke 办法还有 create 和 invokeSuspend 两个办法,所以协程体更精确的形容应该是实现了 Function2 接口的对象。反编译的代码能够帮咱们理解大体的思路,然而一些细节还是有问题的。咱们在网上搜寻对于协程体对象的介绍,介绍中说协程体是 SuspendLambda 类的子类。那么它真的是 SuspendLambda 的子类吗?咱们能够批改代码简略验证下:

private fun testCoroutineScope() {val block: suspend CoroutineScope.() -> Unit = {Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }
        // 查看 block 对象的办法
        block.javaClass.let { clazz ->
            clazz.methods.forEach { method ->
                Log.d("testCoroutineScope", "method is ${method}")
            }
        }

        lifecycleScope.launch(block = block)
    }

咱们通过打印 block 对象的办法根本能够验证问题,上面局部是 log 输入的内容:

D: method is public kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.BaseContinuationImpl.create(kotlin.coroutines.Continuation)
D: method is public final kotlin.coroutines.Continuation com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.create(java.lang.Object,kotlin.coroutines.Continuation)
D: method is public boolean java.lang.Object.equals(java.lang.Object)
D: method is public int kotlin.coroutines.jvm.internal.SuspendLambda.getArity()
D: method is public kotlin.coroutines.jvm.internal.CoroutineStackFrame kotlin.coroutines.jvm.internal.BaseContinuationImpl.getCallerFrame()
D: method is public final java.lang.Class java.lang.Object.getClass()
D: method is public final kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.BaseContinuationImpl.getCompletion()
D: method is public kotlin.coroutines.CoroutineContext kotlin.coroutines.jvm.internal.ContinuationImpl.getContext()
D: method is public java.lang.StackTraceElement kotlin.coroutines.jvm.internal.BaseContinuationImpl.getStackTraceElement()
D: method is public int java.lang.Object.hashCode()
D: method is public final kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.ContinuationImpl.intercepted()
D: method is public java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invoke(java.lang.Object,java.lang.Object)
D: method is public final java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invoke(kotlinx.coroutines.CoroutineScope,kotlin.coroutines.Continuation)
D: method is public final java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invokeSuspend(java.lang.Object)
D: method is public final native void java.lang.Object.notify()
D: method is public final native void java.lang.Object.notifyAll()
D: method is public final void kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(java.lang.Object)
D: method is public java.lang.String kotlin.coroutines.jvm.internal.SuspendLambda.toString()
D: method is public final void java.lang.Object.wait() throws java.lang.InterruptedException
D: method is public final void java.lang.Object.wait(long) throws java.lang.InterruptedException
D: method is public final native void java.lang.Object.wait(long,int) throws java.lang.InterruptedException

咱们从 log 中能够找到 SuspendLambda 的 getArity() 办法, 所以 block 对象必定是继承了 SuspendLambda。在 log 中咱们也能够找到反编译代码中发现的 create 办法、invoke 办法和 invokeSuspend 办法。

二、协程体是如何启动的

这里以 launch 办法的启动过程为准来跟踪启动体的启动过程。先看下启动办法的实现:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit): Job {val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

咱们这里只关怀协程体 block 的启动,所以对于协程的某些细节临时这里不深究。从代码中咱们看到协程启动的办法调用 coroutine.start(),咱们依照执行规范的协程 StandaloneCoroutine 来跟踪查看它的代码。

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {start(block, receiver, this)
    }

这个 start 办法中调用了一个 start 办法,看着有奇怪。其实办法外部调用的是 CoroutineStart 类型的 invoke 办法

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

咱们依照默认的启动类型走应该调用 DEFAULT -> block.startCoroutineCancellable(receiver, completion)。block 是协程体对象,这个办法就是启动协程体的。completion 就是后面创立的 StandaloneCoroutine 对象,receiver 也是 StandaloneCoroutine。startCoroutineCancellable 办法做了什么呢?

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

从代码的语意上看就是创立协程体并应用拦截器进行启动。然而协程体对象不是在调用 launch 办法的时候传进来了吗?为什么这里还要创立呢?其实协程体是依照状态机形式来工作的,协程体中每个可挂起函数的调用都对应状态机的一个状态,这个状态被记录在协程体对象中。为了防止一个协程体被屡次启动后互相烦扰状态,所以这里会每次都创立一个新的协程体对象再启动。

createCoroutineUnintercepted 扩大办法的具体实现如下:

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {createCoroutineFromSuspendFunction(probeCompletion) {(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

协程体继承关系:协程体 -》SuspendLambda-》ContinuationImpl-》BaseContinuationImpl-》Continuation

因为协程体继承了 BaseContinuationImpl,所以这里调用 create(receiver, probeCompletion) 办法创立协程。咱们在后面反编译协程体的代码中也看到了 create 办法的实现:

         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

当然反编译的代码不太精确,理论就是创立协程体对象并返回。

createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)

创立完新的协程体后调用协程体的 intercepted() 办法,这里波及到协程中线程切换的内容,这里先不深究它,简略能够认为他把协程体包装成在某个线程池中运行的协程。resumeCancellableWith() 办法便是真正的启动了协程体。

三、协程体如何挂起,如何复原

后面曾经介绍过协程体就是 SuspendLambda 对象,它的外部依照状态机的形式运行。协程体的每次唤醒都只能运行以后状态的代码并更改到下个状态。

private final void testCoroutineScope() {Function2 block = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope", "testCoroutineScope start");
               this.label = 1;
               if (DelayKt.delay(2000L, this) == var2) {return var2;}
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
            }

            Log.d("testCoroutineScope", "testCoroutineScope end");
            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block, 3, (Object)null);
   }

反编译的代码中能够看到 label 变量,这个变量在保护着状态机的状态。invokeSuspend 办法调用一次,label 的状态变被推进一步。说到这里大家就可能了解了挂起和唤醒的实现原理了,首先 invokeSuspend 被调用一次只执行了一个状态的代码,前面状态的代码都没有执行,这时进入挂起状态。当异步耗时操作执行完结后再次调用 invokeSuspend 办法,既是协程体被唤醒。

反对唤醒挂起的对象都继承了 BaseContinuationImpl,在 BaseContinuationImpl 中咱们能够找到调用 invokeSuspend 办法的中央。

// This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

从代码咱们理解到 invokeSuspend 的返回值为 COROUTINE_SUSPENDED 时,resumeWith 才算真正完结并通过调用 completion.resumeWith(outcome) 来唤醒父协程的执行。invokeSuspend 返回值不是 COROUTINE_SUSPENDED 时,以后的协程体没有执行完并期待下次通过本协程体的 resumeWith 唤醒再次执行。

四、suspend 函数是什么

launch、async 办法启动的协程体就是一个 suspend 函数,他的定义如下:

val block: suspend CoroutineScope.() -> Unit = {Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }

它通过编译后就是 SuspendLambda 对象。

那么一般的 suspend 是怎么样的呢?

首先咱们先看下这段测试代码:

private fun testCoroutineScope2() {val block2: suspend CoroutineScope.() -> Unit = {testSuspendMethod()
        }
        lifecycleScope.launch(block = block2)
    }

    private suspend fun testSuspendMethod() {Log.d("testSuspendMethod", "testSuspendMethod start")
        delay(3000)
        Log.d("testSuspendMethod", "testSuspendMethod end")
    }

咱们在协程中调用了挂起函数,并在挂起函数中调用了 delay 办法。这段代码反编译后的后果入下:

private final void testCoroutineScope2() {Function2 block2 = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               CoroutineActivity var10000 = CoroutineActivity.this;
               this.label = 1;
               if (var10000.testSuspendMethod(this) == var2) {return var2;}
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
            }

            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block2, 3, (Object)null);
   }

协程体的内容咱们之前曾经理解过了,然而调用挂起函数的中央有些不同。在调用挂起函数 testSuspendMethod 的时候多了一个 this 参数,理论申明 testSuspendMethod 的时候没有这个参数。咱们再看下 testSuspendMethod 的反编译后果:

private final Object testSuspendMethod(Continuation var1) {
      Object $continuation;
      label20: {if (var1 instanceof <undefinedtype>) {$continuation = (<undefinedtype>)var1;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         $continuation = new ContinuationImpl(var1) {
            // $FF: synthetic field
            Object result;
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineActivity.this.testSuspendMethod(this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         Log.d("testSuspendMethod", "testSuspendMethod start");
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(3000L, (Continuation)$continuation) == var4) {return var4;}
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to'resume'before'invoke'with coroutine");
      }

      Log.d("testSuspendMethod", "testSuspendMethod end");
      return Unit.INSTANCE;
   }

testSuspendMethod 多出的参数是 continuation 类型的,在调用的中央会把 SuspendLambda 对象传递过去。testSuspendMethod 也通过状态机的形式来治理挂起和唤醒,然而与协程体有所不同。协程体是 SuspendLambda 对象,协程体的状态由 SuspendLambda 对象保留。testSuspendMethod 办法的状态通过 ContinuationImpl 对象来保留。

五、suspendCoroutineCancelable 到底是什么

咱们对上面这段代码进行反编译

// 原始代码
private fun testCoroutineScope3() {val block: suspend CoroutineScope.() -> Unit = {Log.d("testCoroutineScope3", "testCoroutineScope3 start")

            val value = suspendCancellableCoroutine<Int> {Log.d("testCoroutineScope3", "suspendCancellableCoroutine start")
                Handler(Looper.getMainLooper()).postDelayed({it.resume(202) }, 2000)
                Log.d("testCoroutineScope3", "suspendCancellableCoroutine end")
            }

            Log.d("testCoroutineScope3", "testCoroutineScope3 end value:$value")
        }
        lifecycleScope.launch(block = block)
    }
// 编译后的代码, 这里只把 suspendCancellableCoroutine 调用的中央截取进去
switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope3", "testCoroutineScope3 start");
               $i$f$suspendCancellableCoroutine = false;
               this.L$0 = this;
               this.label = 1;
               int var6 = false;
               CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted(this), 1);
               cancellable$iv.initCancellability();
               CancellableContinuation it = (CancellableContinuation)cancellable$iv;
               int var9 = false;
               Log.d("testCoroutineScope3", "suspendCancellableCoroutine start");
               (new Handler(Looper.getMainLooper())).postDelayed((Runnable)(new CoroutineActivity$testCoroutineScope3$block$1$value$1$1(it)), 2000L);
               Log.d("testCoroutineScope3", "suspendCancellableCoroutine end");
               var10000 = cancellable$iv.getResult();
               if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {DebugProbesKt.probeCoroutineSuspended(this);
               }

               if (var10000 == var10) {return var10;}
               break;

从代码能够看出 suspendCancellableCoroutine 的实质就是创立了一个 CancellableContinuationImpl 对象,这个对象保护的状态机完结后会唤醒以后的状态机。CancellableContinuationImpl 的运行没有完结时,getResult 返回 COROUTINE_SUSPENDED 并使以后状态机挂起。

简略了解 suspendCancellableCoroutine 就是为用户提供了自定义的协程挂起形式,用户通过他能够不便的把线程的异步调用转换成协程的挂起调用。有相似性能的还有 suspendCoroutine 办法。

六、总结

  • 协程实现的外围机制就是状态机,协程中所有的挂终点都被分解成一个单个状态,状态机的每次执行都只能执行以后状态对应的代码并更改状态机到下一个状态,而后状态机期待再次启动,也就是所谓的协程挂起。当异步工作执行实现后会再次启动状态机,这时状态机便执行下一个状态的代码,依照这样的法则以此类推。
  • 协程能够嵌套调用,那么就会生成状态机嵌套的构造。状态 机创立时能够指定状态机执行实现后要执行的父状态机,这样保障了树状构造组织的状态机可能有序执行。
  • 协程中定义的状态机对象都实现了 Continuation 接口,在协程的创立时,父协程也是通过 Continuation 接口把本人传递给子协程的。在调用 Continuation 接口的 resumeWith 办法便启动协程执行,也就是启动了状态机。BaseContinuationImpl 就是状态机的模板类,管制着状态机的执行流程,在状态机完结时还会调用到父协程的状态机对象。

我的公众号曾经开明,公众号会同步公布。

请关注我的公众号

正文完
 0