关于android:揭秘kotlin协程中的CoroutineContext

前言

从kotlin1.1开始,协程就被增加到kotlin中作为试验性功能,直到kotlin1.3,协程在kotlin中的api曾经根本稳定下来了,当初kotlin曾经公布到了1.4,为协程增加了更多的性能并进一步欠缺了它,所以咱们当初在kotlin代码中能够释怀的引入kotlin协程并应用它,其实协程并不是kotlin独有的性能,它是一个宽泛的概念,合作式多任务的实现,除了kotlin外,很多语言如Go、Python等都通过本人的形式实现了协程,本文浏览前心愿你曾经晓得如何应用kotlin协程,如果不相熟能够浏览一下官网文档:

kotlin coroutines guide

Coroutine的简略了解

提到协程,很对人会把它和线程进行比拟,就像提到线程,很多人会把它和过程进行比拟,线程和过程别离是操作系统中的CPU调度单位和资源划分单位,它们在操作系统中有专门的数据结构代表,而协程在操作系统中没有专门的数据结构代表,所以协程并不是由操作系统创立和调度,它而是由程序本人创立和调度,因为不须要操作系统调度,所以协程比线程更加的轻量,切换协程比切换线程的开销更小,即它的上下文切换比线程更快,因为操作系统切换线程时个别都会波及到用户态内核态的转换,这是一个开销绝对较大的操作。

协程的实现依赖于线程,它不能脱离线程而存在,因为线程才是CPU调度的根本单位,协程通过程序的调度能够执行在一个或多个线程之中,所以协程须要运行于线程之中,因为协程是由程序本人调度的,所以程序就须要实现调度逻辑,不同语言的调度的实现不一样,在kotlin中,通过Dispatcher来调度协程,而Dispatcher它通常是一个线程池的实现或者基于特定平台(例如Android)主线程的实现,通过调度让协程运行于一个或多个线程之中,这些协程能够在同一线程的不同时刻被执行,也能够在不同线程上的不同时刻被执行。

协程能够说是编程语言的能力, 是下层的能力,它并不需要操作系统和硬件的反对, 是编程语言为了让开发者更容易写出合作式工作的代码,而封装的一种任务调度能力,所以协程通常是蕴含一段特定逻辑的代码块,多个协程之间就组合成一段具备特定逻辑的代码流程,这些编程语言为了让开发者更不便的应用协程,它通常会提供一些关键字, 而这些关键字会通过编译器主动生成了一些反对型代码,例如kotlin中的suspend关键字,对于suspend润饰的办法,编译器会办法生成一些额定的代码。

下面就是我对协程的简略了解,总的来说:协程须要线程的承载运行,协程须要程序本人实现调度,协程让你更容易写出合作式工作。

Coroutine的简略应用

fun main(){
    val scope = CoroutineScope(CoroutineName("Coroutine-Name") + Dispatchers.IO)
    val job = scope.launch(start = CoroutineStart.DEFAULT){
        println("hello world")
    }
    //过程保活1s,只有过程存活的前提下,协程能力会启动和执行
    Thread.sleep(1000)
}

下面首先结构了一个CoroutineScope,它是协程的作用域,用于管制协程的生命周期,结构CoroutineScope须要一个CoroutineContext,它是协程的上下文,用于提供协程启动和运行时须要的信息,这是咱们前面须要重点介绍的,最初通过CoroutineScope的launch办法启动协程并输入hello world,其中启动协程时能够通过CoroutineStart指定协程的启动模式,它是一个枚举值,默认是立刻启动,也通过指定CoroutineStart.LAZY变为提早启动,提早启动须要你被动调用返回的Job对象的start办法后协程才会启动,如果咱们想勾销掉这个协程的执行就能够调用CoroutineScope的cancel办法,或者调用launch办法返回的Job对象的cancel办法,其实CoroutineScope的cancel办法外部也是调用返回的Job对象的cancel办法来完结这个协程。

下面就是启动一个协程的简略步骤,须要用到CoroutineScope、CoroutineContext、CoroutineStart。

通过自定义CoroutineScope,能够在应用程序的某一个档次开启或者管制协程的生命周期,例如Android,在ViewModel和Lifecycle类的生命周期里提供了CoroutineScope,别离是ViewModelScope和LifecycleScope.

CoroutineContext的元素

结构CoroutineScope应用到的CoroutineContext是一个非凡的汇合,这个汇合它既有Map的特点,也有Set的特点,汇合的每一个元素都是Element,每个Element都有一个Key与之对应,对于雷同Key的Element是不能够反复存在的,Element之间能够通过 + 号组合起来,前面我会具体介绍CoroutineContext这个非凡汇合的构造,接下来我先简略解说一下组成CoroutineContext的各个Element的作用,CoroutineContext次要由以下4个Element组成:

  • Job:协程的惟一标识,用来管制协程的生命周期(new、active、completing、completed、cancelling、cancelled);
  • CoroutineDispatcher:指定协程运行的线程(IO、Default、Main、Unconfined);
  • CoroutineName: 指定协程的名称,默认为coroutine;
  • CoroutineExceptionHandler: 指定协程的异样处理器,用来解决未捕捉的异样.

它们之间的关系如下:

上面别离介绍一下4个Element各自的作用:

1、Job

public interface Job : CoroutineContext.Element {
   
    public companion object Key : CoroutineContext.Key<Job> {
        init {
            CoroutineExceptionHandler
        }
    }
  
    public val isActive: Boolean

    public val isCompleted: Boolean

    public val isCancelled: Boolean

    public fun start(): Boolean

    public fun cancel(cause: CancellationException? = null)
  
    public suspend fun join()

    public val children: Sequence<Job>

    //...
}

通过CoroutineScope的扩大办法launch启动一个协程后,它会返回一个Job对象,它是协程的惟一标识,这个Job对象蕴含了这个协程工作的一系列状态,如下:

当一个协程创立后它就处于新建(New)状态,当调用Job的start/join办法后协程就处于沉闷(Active)状态,这是运行状态,协程运行出错或者调用Job的cancel办法都会将以后协程置为勾销中(Cancelling)状态, 处于勾销中状态的协程会等所有子协程都实现后才进入勾销 (Cancelled)状态,当协程执行实现后或者调用CompletableJob(CompletableJob是Job的一个子接口)的complete办法都会让以后协程进入实现中(Completing)状态, 处于实现中状态的协程会等所有子协程都实现后才进入实现(Completed)状态。

尽管协程有New、Cancelling、Completing状态,然而内部是无奈感知这三个状态的,Job只提供了isActive、isCancelled、isCompleted属性来供内部判断协程是否处于Active、Cancelled、Completed状态,当协程处于Active状态时,isActive为true,isCancelled和isCompleted为false,当协程处于Cancelled状态时,isCancelled和isCompleted为true,isActive为false,当协程处于Completed状态时,isCompleted为true,isActive和isCancelled为false。

协程中有两种类型的Job,如果咱们平时启动协程时没有特意地通过CoroutineContext指定一个Job,那么应用launch/async办法启动协程时返回的Job它会产生异样流传,咱们晓得协程有一个父子的概念,例如启动一个协程1,在协程中持续启动协程2、协程3,那么协程1就是协程2、协程3的父协程,协程2、协程3就是协程1的子协程,每个协程都会有一个对应的Job,协程之间的父子关系是通过Job对象维持的,像一颗树一样:

所以异样流传就是这个Job因为除了CancellationException以外的异样而失败时,那么父Job就会感知到并抛出异样,在抛出异样之前,父Job会勾销所有子Job的运行,这也是结构化编程的一个特点,如果要克制这种异样流传的行为,那么能够用到另外一种类型的Job – SupervisorJob,SupervisorJob它不是一个类,它是一个构造方法:

public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

SupervisorJob办法会返回CompletableJob的一个supervisor实现,CompletableJob是Job的一个子接口,它比Job接口多了一个complete办法,这意味着它能够调用complete办法让协程工作进入实现状态,supervisor实现的意思是这个Job它不会产生异样流传,每个Job能够独自被治理,当SupervisorJob因为除了CancellationException以外的异样而失败时,并不会影响到父Job和其余子Job,上面是SupervisorJob的一个应用例子:

fun main(){
     val parentJob = GlobalScope.launch {
       //childJob是一个SupervisorJob
        val childJob = launch(SupervisorJob()){
            throw NullPointerException()
        }
        childJob.join()
        println("parent complete")
    }
    Thread.sleep(1000)
}

childJob抛出异样并不会影响parentJob的运行,parentJob会持续运行并输入parent complete。

2、CoroutineDispatcher

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
  
  public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
    ContinuationInterceptor,
    { it as? CoroutineDispatcher }
  )
  
  public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

  public abstract fun dispatch(context: CoroutineContext, block: Runnable)
  
  //...
}

CoroutineDispatcher能够指定协程的运行线程,CoroutineDispatcher外面有一个dispatch办法,这个dispatch办法用于把协程工作分派到特定线程运行,kotlin曾经内置了CoroutineDispatcher的4个实现,能够通过Dispatchers的Default、IO、Main、Unconfined字段别离返回应用,如下:

public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
  
     @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
  
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}

2.1、Default、IO

Dispatchers.Default和Dispatchers.IO外部都是线程池实现,它们的含意是把协程运行在共享的线程池中,咱们先看Dispatchers.Default的实现,看createDefaultDispatcher办法:

internal actual fun createDefaultDispatcher(): CoroutineDispatcher = if (useCoroutinesScheduler) DefaultScheduler else CommonPool

DefaultScheduler和CommonPool都是CoroutineDispatcher的子类,不同的是DefaultScheduler外部依赖的是kotlin本人实现的线程池逻辑,而CommonPool外部依赖的是java类库中的Executor,默认状况下useCoroutinesScheduler为true,所以createDefaultDispatcher办法返回的是DefaultScheduler实例,咱们看一下这个DefaultScheduler:

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,//DefaultScheduler实例被传进了LimitingDispatcher中
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
  
      //...
}

DefaultScheduler中的IO字段就是Dispatchers.IO,它是LimitingDispatcher实例,所以Dispatchers.IO的实现是LimitingDispatcher,同时咱们要留神到DefaultScheduler是用object字段润饰,这阐明它是一个单例,并且DefaultScheduler实例被传进了LimitingDispatcher的构造方法中,所以LimitingDispatcher就会持有DefaultScheduler实例,而DefaultScheduler它的次要实现都在它的父类ExperimentalCoroutineDispatcher中:

@InternalCoroutinesApi
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
  
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    private var coroutineScheduler = createScheduler()
  
   //返回CoroutineScheduler实例
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
           //dispatch办法委托给了CoroutineScheduler的dispatch办法
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
              //...
        }
  
  internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
        try {
           //dispatchWithContext办法委托给了CoroutineScheduler的dispatch办法
            coroutineScheduler.dispatch(block, context, tailDispatch)
        } catch (e: RejectedExecutionException) {
          //...
        }
    }
  //...
}

咱们再看Dispatchers.IO对应的LimitingDispatcher实现:

private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,//内部传进的DefaultScheduler实例
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {

    private val queue = ConcurrentLinkedQueue<Runnable>()
    private val inFlightTasks = atomic(0)

    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            val inFlight = inFlightTasks.incrementAndGet()

          if (inFlight <= parallelism) {
                //LimitingDispatcher的dispatch办法委托给了DefaultScheduler的dispatchWithContext办法
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }

            queue.add(taskToSchedule)

            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return
            }

            taskToSchedule = queue.poll() ?: return
        }
    }
  
  //...
}

从下面剖析得悉,Dispatchers.Default的实现是DefaultScheduler,Dispatchers.IO的实现是LimitingDispatcher,而LimitingDispatcher持有DefaultScheduler实例,把dispatch操作委托给DefaultScheduler,DefaultScheduler外部持有CoroutineScheduler实例,把dispatch操作委托给CoroutineScheduler,而DefaultScheduler又是一个单例,所以Dispatchers.Default和Dispatchers.IO它们共用同一个CoroutineScheduler实例,它们之间的关系如下:

CoroutineScheduler就是kotlin本人实现的共享线程池,是Dispatchers.Default和Dispatchers.IO外部的独特实现,Dispatchers.Default和Dispatchers.IO共享CoroutineScheduler中的线程,DefaultScheduler和LimitingDispatcher的次要作用是对CoroutineScheduler进行线程数、工作数等配置,CoroutineScheduler应用工作窃取算法(Work Stealing)从新实现了一套线程池的任务调度逻辑,它的性能、扩展性对协程的任务调度更敌对,具体的逻辑能够查看这个类的dispatch办法:

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
  
      override fun execute(command: Runnable) = dispatch(command)
  
      fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        val task = createTask(block, taskContext)
        //...
        if (task.mode == TASK_NON_BLOCKING) {
            //...
        } else {
            //...
        }
      }

          //...
}

所以这个线程池既能够运行两种类型的工作:CPU密集型工作和IO密集型工作,用一个mode来区别,当你为协程指定Dispatchers.Default时,Dispatcher会把协程的工作指定为CPU密集型工作,对应mode为TASK\_NON\_BLOCKING,当你为协程指定Dispatchers.IO时,Dispatcher会把协程的工作指定为IO密集型工作,对应mode为TASK\_PROBABLY\_BLOCKING,所以这时CoroutineScheduler就能够依据task mode作出不同的线程创立、调度、唤醒策略,当启动协程时没有指定Dispatcher,默认会应用Dispatchers.Default。

当运行CPU密集型工作时,CoroutineScheduler最多有corePoolSize个线程被创立,corePoolSize它的取值为max(2, CPU外围数),即它会尽量的等于CPU外围数,当运行IO密集型工作时,它能够创立比corePoolSize更多的线程来运行IO型工作,但不能大于maxPoolSize,maxPoolSize会取一个很大的值,默认为max(corePoolSize, min(CPU外围数 * 128, 2^21 – 2)),即大于corePoolSize,小于2^21 – 2,而2^21 – 2是一个很大的数约为2M,然而CoroutineScheduler是不可能创立这么多线程的,所以就须要内部限度提交的工作数,而Dispatchers.IO结构时就通过LimitingDispatcher默认限度了最大线程并发数parallelism为max(64, CPU外围数),即Dispatchers.IO最多只能提交parallelism个工作到CoroutineScheduler中执行,残余的工作被放进一个队列中期待。

CPU密集型工作:CPU密集型工作的特点是执行工作时CPU会处于繁忙状态,工作会耗费大量的CPU资源,例如计算简单的算术、视频解码等,如果此时线程数太多,超过了CPU外围数,那么这些超出来的线程是得不到CPU的执行的,只会节约内存资源,因为线程自身也有栈等空间,同时线程过多,频繁的线程切换带来的耗费也会影响线程池的性能,所以对于CPU密集型工作,线程池并发线程数等于CPU外围数能力让CPU的执行效率最大化;

IO密集型工作:IO密集型工作的特点是执行工作时CPU会处于闲置状态,工作不会耗费大量的CPU资源,例如网络申请、IO操作等,线程执行IO密集型工作时大多数处于阻塞状态,处于阻塞状态的线程是不占用CPU的执行工夫,这时CPU就处于闲置状态,为了让CPU忙起来,执行IO密集型工作时理当让线程的创立数量更多一点,现实状况下线程数应该等于提交的工作数,对于这些多创立进去的线程,当它们闲置时,线程池个别会有一个超时回收策略,所以大部分状况下并不会占用大量的内存资源,但也会有极其状况,所以对于IO密集型工作,线程池并发线程数应尽可能地多能力进步CPU的吞吐量,这个尽可能地多的水平并不是无限大,而是依据业务状况设定,但必定要大于CPU外围数。

2.2、Unconfined

Dispatchers.Unconfined的含意是不给协程指定运行的线程,在第一次被挂起(suspend)之前,由启动协程的线程执行它,但被挂起后, 会由复原协程的线程继续执行, 如果一个协程会被挂起屡次, 那么每次被复原后, 都有可能被不同线程继续执行,看上面的一个例子:

fun main(){
    GlobalScope.launch(Dispatchers.Unconfined){
        println(Thread.currentThread().name)

        //挂起
        withContext(Dispatchers.IO){
            println(Thread.currentThread().name)
        }
        //复原
        println(Thread.currentThread().name)

        //挂起
        withContext(Dispatchers.Default){
            println(Thread.currentThread().name)
        }
        //复原
        println(Thread.currentThread().name)
    }

    //过程保活
    Thread.sleep(1000)
}

运行输入:
main
DefaultDispatcher-worker-1
DefaultDispatcher-worker-1
DefaultDispatcher-worker-3
DefaultDispatcher-worker-3

协程启动时指定了Dispatchers.Unconfined,所以第一次执行时是由启动协程的线程执行,下面在主线程中启动了协程,所以第一次输入主线程main,withContext办法是一个suspend办法,它能够挂起以后协程,并把指定的代码块运行到给定的上下文中,直到代码块运行实现并返回后果,第一个代码块通过withContext办法把它运行在Dispatchers.IO中,所以第二次输入了线程池中的某一个线程DefaultDispatcher-worker-1,第一个代码块执行结束后,协程在DefaultDispatcher-worker-1线程中复原,所以协程复原后执行在DefaultDispatcher-worker-1线程中,所以第三次持续输入DefaultDispatcher-worker-1,第二个代码块同理。

那么Dispatchers.Unconfined是怎么做到的呢,咱们看下Unconfined对应的CoroutineDispatcher实现 – kotlinx.coroutines.Unconfined:

internal object Unconfined : CoroutineDispatcher() {
  
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // It can only be called by the "yield" function. See also code of "yield" function.
        val yieldContext = context[YieldContext]
        if (yieldContext != null) {
            // report to "yield" that it is an unconfined dispatcher and don't call "block.run()"
            yieldContext.dispatcherWasUnconfined = true
            return
        }
        throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
            "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
            "isDispatchNeeded and dispatch calls.")
    }
}

Unconfined他重写了CoroutineDispatcher的isDispatchNeeded办法和dispatch办法,isDispatchNeeded办法返回了false,示意不须要dispatch,而默认CoroutineDispatcher的isDispatchNeeded办法是返回true的,Dispatchers.Default和Dispatchers.IO都没有重写这个办法,Unconfined的dispatch办法没有任何任务调度的逻辑,只是写明了只有当调用yield办法时,Unconfined的dispatch办法才会被调用,yield办法是一个suspend办法,当在协程中调用这个办法时示意以后协程让出本人所在的线程给其余协程运行,所以失常状况下是不会调用Unconfined的dispatch办法的。

在kotlin中每个协程都有一个Continuation实例与之对应,当协程复原时会调用Continuation的resumeWith办法,它的实现在DispatchedContinuation中,如下:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,//协程的的CoroutineDispatcher实例
    @JvmField val continuation: Continuation<T>//代表协程的Continuation实例
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
  
  //...
  
  override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.dispatch(context, this)
        } else {//Unconfined走这里的逻辑
           //调用executeUnconfined办法
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                   //调用Continuation的resumeWith办法
                    continuation.resumeWith(result)
                }
            }
        }
    }
}

咱们留神到by关键字,这是kotlin中的委托实现,DispatchedContinuation通过类委托增强了Continuation的resumeWith办法,即在调用Continuation的resumeWith办法之前减少了一些本人的逻辑,咱们能够看到DispatchedContinuation的resumeWith办法中会依据CoroutineDispatcher的isDispatchNeeded办法返回值做出不同解决,当isDispatchNeeded办法返回true时,会调用协程的CoroutineDispatcher的dispatch办法,而当isDispatchNeeded办法返回false时,不会调用CoroutineDispatcher的dispatch办法而是调用executeUnconfined办法,下面讲到Unconfined的isDispatchNeeded办法返回了false,咱们看executeUnconfined办法:

private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?,
      mode: Int, 
      doYield: Boolean = false,
    block: () -> Unit
): Boolean {
    assert { mode != MODE_UNINITIALIZED }
   //从ThreadLocal中取出EventLoop
    val eventLoop = ThreadLocalEventLoop.eventLoop
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
   //判断是否在执行Unconfined工作
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
          //调用EventLoop的dispatchUnconfined办法把Unconfined工作放进EventLoop中
        eventLoop.dispatchUnconfined(this)
        true 
    } else {
          //执行Unconfined工作
        runUnconfinedEventLoop(eventLoop, block = block)
        false
    }
}

internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
    eventLoop: EventLoop,
    block: () -> Unit
) {
    eventLoop.incrementUseCount(unconfined = true)
    try {
          //先执行block代码块,block()就是executeUnconfined办法传进的代码块, block()外面会调用Continuation的resumeWith办法
        block()
        while (true) {
           //再调用EventLoop的processUnconfinedEvent办法执行EventLoop中的Unconfined工作,直到EventLoop中的所有Unconfined工作执行完才跳出循环
            if (!eventLoop.processUnconfinedEvent()) break
        }
    } catch (e: Throwable) {
       //...
    } finally {
        eventLoop.decrementUseCount(unconfined = true)
    }
}

能够看到对于Unconfined工作,是在以后线程马上执行或者通过以后线程的EventLoop来执行的,EventLoop是寄存在ThreadLocal中的,所以EventLoop它是跟以后线程相关联的,而EventLoop也是CoroutineDispatcher的一个子类:

internal abstract class EventLoop : CoroutineDispatcher() {
  
  //...
  
  private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
  
  public fun dispatchUnconfined(task: DispatchedTask<*>) {
        val queue = unconfinedQueue ?: ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
        queue.addLast(task)
  }
  
  public fun processUnconfinedEvent(): Boolean {
        val queue = unconfinedQueue ?: return false
        val task = queue.removeFirstOrNull() ?: return false
        task.run()
        return true
  }
}

EventLoop中有一个双端队列用于寄存Unconfined工作,Unconfined工作是指指定了Dispatchers.Unconfined的协程工作,EventLoop的dispatchUnconfined办法用于把Unconfined工作放进队列的尾部,processUnconfinedEvent办法用于从队列的头部移出Unconfined工作执行,所以executeUnconfined办法外面的策略就是:在以后线程立刻执行Unconfined工作,如果以后线程曾经在执行Unconfined工作,就临时把它放进跟以后线程关联的EventLoop中,期待执行,同时Unconfined工作外面会调用Continuation的resumeWith办法复原协程运行,这也是为什么指定了Dispatchers.Unconfined后协程复原可能被复原协程的线程执行的起因。

2.3、Main

Dispatchers.Main的含意是把协程运行在平台相干的只能操作UI对象的Main线程,所以它依据不同的平台有不同的实现,kotlin它反对上面三种平台:

  • kotlin/js:kotlin/js是kotlin对JavaScript的反对,提供了转换kotlin代码,kotlin规范库的能力,npm包治理能力,在kotlin/js上Dispatchers.Main等效于Dispatchers.Default;
  • kotlin/native:kotlin/native是一种将kotlin代码编译为无需虚拟机就可运行的原生二进制文件的技术, 它的次要目标是容许对不须要或不可能应用虚拟机的平台进行编译,例如嵌入式设施或iOS,在kotlin/native上Dispatchers.Main等效于Dispatchers.Default;
  • kotlin/JVM:kotlin/JVM就是须要虚拟机能力编译的平台,例如Android就是属于kotlin/JVM,对于kotlin/JVM咱们须要引入对应的dispatcher,例如Android就须要引入kotlinx-coroutines-android库,它外面有Android对应的Dispatchers.Main实现,其实就是把工作通过Handler运行在Android的主线程.

咱们再看Dispatchers.Main的实现 – MainDispatcherLoader.dispatcher:

internal object MainDispatcherLoader {
    
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        //...次要是通过反射加载实现了MainCoroutineDispatcher的类
    }
}

所以Dispatchers.Main的CoroutineDispatcher实现是MainCoroutineDispatcher,MainCoroutineDispatcher的具体实现就因平台的不同而不同了,如果你间接应用Dispatchers.Main而没有引入对应的库就会引发IllegalStateException异样。

3、CoroutineName

public data class CoroutineName(
    val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
   
    public companion object Key : CoroutineContext.Key<CoroutineName>

    override fun toString(): String = "CoroutineName($name)"
}

CoroutineName就是协程的名字,它的构造很简略, 咱们平时开发个别是不会去指定一个CoroutineName的,因为CoroutineName只在kotlin的调试模式下才会被用的, 它在debug模式下被用于设置协程运行线程的名字:

internal data class CoroutineId(
    val id: Long
) : ThreadContextElement<String>, AbstractCoroutineContextElement(CoroutineId) {
   
    override fun updateThreadContext(context: CoroutineContext): String {
        val coroutineName = context[CoroutineName]?.name ?: "coroutine"
        val currentThread = Thread.currentThread()
        val oldName = currentThread.name
        var lastIndex = oldName.lastIndexOf(DEBUG_THREAD_NAME_SEPARATOR)
        if (lastIndex < 0) lastIndex = oldName.length
        currentThread.name = buildString(lastIndex + coroutineName.length + 10) {
            append(oldName.substring(0, lastIndex))
            append(DEBUG_THREAD_NAME_SEPARATOR)
            append(coroutineName)
            append('#')
            append(id)
        }
        return oldName
    }
  
  //...
}

4、CoroutineExceptionHandler

public interface CoroutineExceptionHandler : CoroutineContext.Element {
   
    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

    public fun handleException(context: CoroutineContext, exception: Throwable)
}

CoroutineExceptionHandler就是协程的异样处理器,用来解决协程运行中未捕捉的异样,每一个创立的协程默认都会有一个异样处理器,咱们能够在启动协程时通过CoroutineContext指定咱们自定义的异样处理器,咱们能够通过CoroutineExceptionHandler办法创立一个CoroutineExceptionHandler,它会返回一个CoroutineExceptionHandler的默认实现,默认实现的handleException办法中调用了咱们传进的handler办法:

public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
    object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
        override fun handleException(context: CoroutineContext, exception: Throwable) =
            handler.invoke(context, exception)
    }

CoroutineExceptionHandler只对launch办法启动的根协程无效,而对async启动的根协程有效,因为async启动的根协程默认会捕捉所有未捕捉异样并把它放在Deferred中,等到用户调用Deferred的await办法才抛出,如下:

fun main(){
    //自定义CoroutineExceptionHandler
    val handler = CoroutineExceptionHandler{ coroutineContext, throwable ->
        println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}")
    }

    //handler无效
    val job = GlobalScope.launch(handler){
        throw IndexOutOfBoundsException("exception thrown from launch")
    }
    job.start()

    //handler有效
    val deferred = GlobalScope.async(handler){
        throw NullPointerException("exception thrown from async")
    }
    deferred.start()

    Thread.sleep(1000)
}

输入:

my coroutineExceptionHandler catch exception, msg = exception thrown from launch

其中只有launch启动的根协程抛出的异样才被CoroutineExceptionHandler解决,而对于async启动的根协程抛出的异样CoroutineExceptionHandler有效,须要咱们调用Deferred的await办法时try catch。

还有子协程抛出的未捕捉异样会委托父协程的CoroutineExceptionHandler解决,子协程设置的CoroutineExceptionHandler永远不会失效(SupervisorJob 除外),如下:

fun main(){
    //根协程的Handler
    val parentHandler = CoroutineExceptionHandler{coroutineContext, throwable ->
        println("parent coroutineExceptionHandler catch exception, msg = ${throwable.message}")
    }
    //启动根协程
    val parentJob = GlobalScope.launch(parentHandler){
        //子协程的Handler
        val childHandler = CoroutineExceptionHandler{coroutineContext, throwable ->
            println("child coroutineExceptionHandler catch exception, msg = ${throwable.message}")
        }
        //启动子协程
        val childJob = launch(childHandler){
            throw IndexOutOfBoundsException("exception thrown from child launch")
        }
        childJob.start()
    }
    parentJob.start()
    
    Thread.sleep(1000)
}

输入:
parent coroutineExceptionHandler catch exception, msg = exception thrown from child launch

能够看到子协程设置CoroutineExceptionHandler没有输入,只有根协程的CoroutineExceptionHandler输入了,然而也有例外,如果子协程是SupervisorJob,那么它设置的CoroutineExceptionHandler是失效的,后面也说过SupervisorJob不会产生异样流传。

当父协程的子协程同时抛出多个异样时,CoroutineExceptionHandler只会捕捉第一个协程抛出的异样,后续协程抛出的异样被保留在第一个异样的suppressed数组中,如下:

fun main(){
    val handler = CoroutineExceptionHandler{coroutineContext, throwable ->
        println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}, suppressed = ${throwable.suppressed.contentToString()}")
    }
    val parentJob = GlobalScope.launch(handler){
        launch {
            try {
                delay(200)
            }finally {
                //第二个抛出的异样
                throw IndexOutOfBoundsException("exception thrown from first child launch")
            }
        }.start()

        launch {
            delay(100)
            //第一个抛出的异样
            throw NullPointerException("exception thrown from second child launch")
        }.start()
    }
    parentJob.start()

    Thread.sleep(1000)
}

输入:
my coroutineExceptionHandler catch exception, msg = exception thrown from second child launch, suppressed = [java.lang.IndexOutOfBoundsException: exception thrown from first child launch]

能够看到CoroutineExceptionHandler只解决了第一个子协程抛出的异样,后续异样都放在了第一个抛出异样的suppressed数组中。

还有勾销协程时会抛出一个CancellationException,它会被所有CoroutineExceptionHandler省略,但能够try catch它,同时当子协程抛出CancellationException时,并不会终止以后父协程的运行:

fun main(){
  val handler = CoroutineExceptionHandler{coroutineContext, throwable ->
        println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}")
    }
    val parentJob = GlobalScope.launch(handler){
        val childJob = launch {
            try {
                delay(Long.MAX_VALUE)
            }catch (e: CancellationException){
                println("catch cancellationException thrown from child launch")
                println("rethrow cancellationException")
                throw CancellationException()
            }finally {
                println("child was canceled")
            }
        }
        //勾销子协程
        childJob.cancelAndJoin()
        println("parent is still running")
    }
    parentJob.start()

    Thread.sleep(1000)
}

输入:
catch cancellationException thrown from child launch
rethrow cancellationException
child was canceled
parent is still running

能够看到当抛出CancellationException时,咱们能够try catch住它,同时当咱们再次抛出它时,协程的CoroutineExceptionHandler并没有解决它,同时父协程不受影响,持续运行。

下面就是CoroutineExceptionHandler解决协程异样时的特点。

CoroutineContext的构造

咱们再次看一下CoroutineContext的全家福:

下面解说了组成CoroutineContext的Element,每一个Element都继承自CoroutineContext,而每一个Element都能够通过 + 号来组合,也能够通过相似map的 [key] 来取值,这和CoroutineContext的运算符重载逻辑和它的构造实现CombinedContext无关,咱们先来看一下CoroutineContext类:

public interface CoroutineContext {
   
    //操作符[]重载,能够通过CoroutineContext[Key]这种模式来获取与Key关联的Element
    public operator fun <E : Element> get(key: Key<E>): E?

    //它是一个汇集函数,提供了从left到right遍历CoroutineContext中每一个Element的能力,并对每一个Element做operation操作
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    //操作符+重载,能够CoroutineContext + CoroutineContext这种模式把两个CoroutineContext合并成一个
    public operator fun plus(context: CoroutineContext): CoroutineContext
    
    //返回一个新的CoroutineContext,这个CoroutineContext删除了Key对应的Element
    public fun minusKey(key: Key<*>): CoroutineContext
  
    //Key定义,空实现,仅仅做一个标识
    public interface Key<E : Element>

   //Element定义,每个Element都是一个CoroutineContext
    public interface Element : CoroutineContext {
       
          //每个Element都有一个Key实例
        public val key: Key<*>
                
          //...
    }
}

除了plus办法,CoroutineContext中的其余三个办法都被CombinedContext、Element、EmptyCoroutineContext重写,CombinedContext就是CoroutineContext汇合构造的实现,它外面是一个递归定义,Element就是CombinedContext中的元素,而EmptyCoroutineContext就示意一个空的CoroutineContext,它外面是空实现。

1、CombinedContext

咱们先看CombinedContext类:

//CombinedContext只蕴含left和element两个成员:left可能为CombinedContext或Element实例,而element就是Element实例
internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
  
      //CombinedContext的get操作的逻辑是:
      //1、先看element是否是匹配,如果匹配,那么element就是须要找的元素,返回element,否则阐明要找的元素在left中,持续从left开始找,依据left是CombinedContext还是Element转到2或3
      //2、如果left又是一个CombinedContext,那么反复1
      //3、如果left是Element,那么调用它的get办法返回
      override fun <E : Element> get(key: Key<E>): E? {
        var cur = this
        while (true) {
            //1
            cur.element[key]?.let { return it }
            val next = cur.left
            if (next is CombinedContext) {//2
                cur = next
            } else {//3
                return next[key]
            }
        }
    }

    //CombinedContext的fold操作的逻辑是:先对left做fold操作,把left做完fold操作的的返回后果和element做operation操作
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
        operation(left.fold(initial, operation), element)

    //CombinedContext的minusKey操作的逻辑是:
    //1、先看element是否是匹配,如果匹配,那么element就是须要删除的元素,返回left,否则阐明要删除的元素在left中,持续从left中删除对应的元素,依据left是否删除了要删除的元素转到2或3或4
    //2、如果left中不存在要删除的元素,那么以后CombinedContext就不存在要删除的元素,间接返回以后CombinedContext实例就行
    //3、如果left中存在要删除的元素,删除了这个元素后,left变为了空,那么间接返回以后CombinedContext的element就行
    //4、如果left中存在要删除的元素,删除了这个元素后,left不为空,那么组合一个新的CombinedContext返回
    public override fun minusKey(key: Key<*>): CoroutineContext {
          //1
        element[key]?.let { return left }
        val newLeft = left.minusKey(key)
        return when {
            newLeft === left -> this//2
            newLeft === EmptyCoroutineContext -> element//3
            else -> CombinedContext(newLeft, element)//4
        }
    }
  
  //...
}

能够发现CombinedContext中的get、fold、minusKey操作都是递归模式的操作,递归的起点就是当这个left是一个Element,咱们再看Element类:

public interface Element : CoroutineContext {

    public val key: Key<*>
        
    //Element的get办法逻辑:如果key和本人的key匹配,那么本人就是要找的Element,返回本人,否则返回null
    public override operator fun <E : Element> get(key: Key<E>): E? =
        if (this.key == key) this as E else null

    //Element的fold办法逻辑:对传入的initial和本人做operation操作
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
        operation(initial, this)

    //Element的minusKey办法逻辑:如果key和本人的key匹配,那么本人就是要删除的Element,返回EmptyCoroutineContext(示意删除了本人),否则阐明本人不须要被删除,返回本人
    public override fun minusKey(key: Key<*>): CoroutineContext =
        if (this.key == key) EmptyCoroutineContext else this
}

当初咱们把CombinedContext和Element联合来看,那么CombinedContext的整体构造如下:

有点像是一个链表,left就是指向下一个结点的指针,有了这个图咱们再从整体看当调用CombinedContext的get、fold、minusKey操作时的拜访程序:get、minusKey操作大体逻辑都是先拜访以后element,不满足,再拜访left的element,程序都是从right到left,而fold的操作大体逻辑是先拜访left,直到递归到最初的element,而后再从left到right的返回,从而拜访了所有的element。

2、CoroutineContext的plus操作

当初咱们来看CoroutineContext惟一没有被重写的办法 – plus办法:

public interface CoroutineContext {
   
    //...
  
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else 
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
}

这个办法看起来有点简单,为了不便咱们了解,我把它简化一下,我把对ContinuationInterceptor的解决去掉,如下:

public interface CoroutineContext {
   
    //...
  
    public operator fun plus(context: CoroutineContext): CoroutineContext =
      //如果要相加的CoroutineContext为空,那么不做任何解决,间接返回
        if (context === EmptyCoroutineContext) this else 
          //如果要相加的CoroutineContext不为空,那么对它进行fold操作
            context.fold(this) { acc, element -> //咱们能够把acc了解成+号右边的CoroutineContext,element了解成+号左边的CoroutineContext的某一个element
                //首先从右边CoroutineContext中删除左边的这个element
                val removed = acc.minusKey(element.key)
                //如果removed为空,阐明右边CoroutineContext删除了和element雷同的元素后为空,那么返回左边的element即可
                if (removed === EmptyCoroutineContext) element else {
                      //如果removed不为空,阐明右边CoroutineContext删除了和element雷同的元素后还有其余元素,那么结构一个新的CombinedContext返回
                      return CombinedContext(removed, element)
                }
            }
}

plus办法大部分状况最终下返回一个CombinedContext,即咱们把两个CoroutineContext相加后,返回一个CombinedContext,在组合成CombinedContext时,+号左边的CoroutineContext中的元素会笼罩+号右边的CoroutineContext中的含有雷同key的元素,如下:

(Dispatchers.Main, "name") + (Dispatchers.IO) = (Dispatchers.IO, "name")

这个笼罩操作就在fold办法的参数operation代码块中实现,通过minusKey办法删除掉反复元素,后面讲过当调用CombinedContext的fold办法时,会从left到right到拜访所有的element,即会从left到right的把每一个element传入operation办法中,作为operation办法的第二个参数,而operation办法第一个参数acc的初始值为fold办法传入的initial值,而后它会一直的更新,每次更新的值为上一次调用operation办法的返回值,所以当两个CoroutineContext相加时,puls办法能够了解为上面的伪代码:

 val acc = 右边的CoroutineContext
  for(var element in 左边的CoroutineContext){
    acc = operation(acc, element)//operation操作中会让element笼罩掉acc中与element雷同的元素
  }
  return acc//所以plus办法最终返回的CoroutineContext是不存在key雷同的element的

所以puls办法最终返回的CoroutineContext是不存在key雷同的element的,+号左边的CoroutineContext中的元素会笼罩+号右边的CoroutineContext中的含有雷同key的元素,这像是Set的个性。

当初咱们再看回简化前的plus办法,它外面有个对ContinuationInterceptor的解决,目标是让ContinuationInterceptor在每次相加后都能变成CoroutineContext中的最初一个元素, ContinuationInterceptor它也是继承自Element,通常叫做协程上下文拦截器,它的次要作用是在协程执行前拦挡它,从而在协程执行前做出一些其余的操作,后面咱们讲到CoroutineDispatcher它自身也继承自ContinuationInterceptor,ContinuationInterceptor有一个interceptContinuation办法用于返回拦挡协程的行为,而这个行为就是后面咱们所讲到Dispatchers.Unconfined时的DispatchedContinuation,DispatchedContinuation在复原协程前依据协程的CoroutineDispatcher类型做出不同的协程分派行为,通过把ContinuationInterceptor放在最初面,协程在查找上下文的element时,总能最快找到拦截器,防止了递归查找,从而让拦挡行为前置执行。

结语

本文次要介绍了CoroutineContext的元素组成和构造,了解CoroutineContext对于了解协程应用有很大的帮忙,因为协程的启动时就离不开CoroutineContext,同时如果你当前想要更深刻的学习协程,例如协程的创立过程,Continuation概念、suspend关键字等,本篇文章也能给你一个抛砖引玉的成果。

以上就是本文的所有内容,心愿大家有所播种!

相干学习视频举荐:

【Kotlin进阶教程】——Kotlin协程从实战到原理_哔哩哔哩_bilibili

Android进阶零碎学习——一节课搞定Kotlin核心技术_哔哩哔哩_bilibili

【Android 进阶学习训练营】全网最全面Kotlin外围Lambda与高阶全面解说_哔哩哔哩_bilibili

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理