关于android:揭秘kotlin协程中的CoroutineContext

39次阅读

共计 28793 个字符,预计需要花费 72 分钟才能阅读完成。

前言

从 kotlin1.1 开始,协程就被增加到 kotlin 中作为试验性功能,直到 kotlin1.3,协程在 kotlin 中的 api 曾经根本稳定下来了,当初 kotlin 曾经公布到了 1.4,为协程增加了更多的性能并进一步欠缺了它,所以咱们当初在 kotlin 代码中能够释怀的引入 kotlin 协程并应用它,其实协程并不是 kotlin 独有的性能,它是一个宽泛的概念,合作式多任务的实现,除了 kotlin 外,很多语言如 Go、Python 等都通过本人的形式实现了协程,本文浏览前心愿你曾经晓得如何应用 kotlin 协程,如果不相熟能够浏览一下官网文档:

kotlin coroutines guide

Coroutine 的简略了解

提到协程,很对人会把它和线程进行比拟,就像提到线程,很多人会把它和过程进行比拟,线程和过程别离是操作系统中的 CPU 调度单位和资源划分单位,它们在操作系统中有专门的数据结构代表,而协程在操作系统中没有专门的数据结构代表,所以协程并不是由操作系统创立和调度,它而是 由程序本人创立和调度,因为不须要操作系统调度,所以协程比线程更加的轻量,切换协程比切换线程的开销更小,即它的上下文切换比线程更快,因为操作系统切换线程时个别都会波及到用户态内核态的转换,这是一个开销绝对较大的操作。

协程的实现依赖于线程,它不能脱离线程而存在,因为线程才是 CPU 调度的根本单位,协程通过程序的调度能够执行在 一个或多个线程之中 ,所以协程须要运行于线程之中,因为协程是由程序本人调度的,所以程序就须要实现调度逻辑,不同语言的调度的实现不一样,在 kotlin 中,通过Dispatcher 来调度协程,而 Dispatcher 它通常是一个线程池的实现或者基于特定平台 (例如 Android) 主线程的实现,通过调度让协程运行于一个或多个线程之中,这些协程能够在同一线程的不同时刻被执行,也能够在不同线程上的不同时刻被执行。

协程能够说是编程语言的能力,是下层的能力,它并不需要操作系统和硬件的反对,是编程语言为了让开发者更容易写出 合作式工作 的代码,而封装的一种任务调度能力,所以协程通常是蕴含一段 特定逻辑的代码块 ,多个协程之间就组合成一段具备 特定逻辑的代码流程 ,这些编程语言为了让开发者更不便的应用协程,它通常会提供一些关键字,而这些关键字会通过编译器主动生成了一些反对型代码,例如 kotlin 中的suspend 关键字,对于 suspend 润饰的办法,编译器会办法生成一些额定的代码。

下面就是我对协程的简略了解,总的来说:协程须要线程的承载运行,协程须要程序本人实现调度,协程让你更容易写出合作式工作。

Coroutine 的简略应用

fun main(){val scope = CoroutineScope(CoroutineName("Coroutine-Name") + Dispatchers.IO)
    val job = scope.launch(start = CoroutineStart.DEFAULT){println("hello world")
    }
    // 过程保活 1s,只有过程存活的前提下,协程能力会启动和执行
    Thread.sleep(1000)
}

下面首先结构了一个 CoroutineScope,它是协程的作用域,用于管制协程的生命周期,结构 CoroutineScope 须要一个CoroutineContext,它是协程的上下文,用于提供协程启动和运行时须要的信息,这是咱们前面须要重点介绍的,最初通过 CoroutineScope 的 launch 办法启动协程并输入 hello world,其中启动协程时能够通过CoroutineStart 指定协程的启动模式,它是一个枚举值,默认是立刻启动,也通过指定 CoroutineStart.LAZY 变为提早启动,提早启动须要你被动调用返回的 Job 对象的 start 办法后协程才会启动,如果咱们想勾销掉这个协程的执行就能够调用 CoroutineScope 的 cancel 办法,或者调用 launch 办法返回的 Job 对象的 cancel 办法,其实 CoroutineScope 的 cancel 办法外部也是调用返回的 Job 对象的 cancel 办法来完结这个协程。

下面就是启动一个协程的简略步骤,须要用到 CoroutineScope、CoroutineContext、CoroutineStart。

通过自定义 CoroutineScope,能够在应用程序的某一个档次开启或者管制协程的生命周期,例如 Android,在 ViewModel 和 Lifecycle 类的生命周期里提供了 CoroutineScope,别离是 ViewModelScope 和 LifecycleScope.

CoroutineContext 的元素

结构 CoroutineScope 应用到的 CoroutineContext 是一个非凡的汇合,这个汇合它既有 Map 的特点,也有 Set 的特点,汇合的每一个元素都是 Element,每个 Element 都有一个 Key 与之对应,对于雷同 Key 的 Element 是不能够反复存在的,Element 之间能够通过 + 号组合起来,前面我会具体介绍 CoroutineContext 这个非凡汇合的构造,接下来我先简略解说一下组成 CoroutineContext 的各个 Element 的作用,CoroutineContext 次要由以下 4 个 Element 组成:

  • Job:协程的惟一标识,用来管制协程的生命周期(new、active、completing、completed、cancelling、cancelled);
  • CoroutineDispatcher:指定协程运行的线程(IO、Default、Main、Unconfined);
  • CoroutineName: 指定协程的名称,默认为 coroutine;
  • CoroutineExceptionHandler: 指定协程的异样处理器,用来解决未捕捉的异样.

它们之间的关系如下:

上面别离介绍一下 4 个 Element 各自的作用:

1、Job

public interface Job : CoroutineContext.Element {
   
    public companion object Key : CoroutineContext.Key<Job> {
        init {CoroutineExceptionHandler}
    }
  
    public val isActive: Boolean

    public val isCompleted: Boolean

    public val isCancelled: Boolean

    public fun start(): Boolean

    public fun cancel(cause: CancellationException? = null)
  
    public suspend fun join()

    public val children: Sequence<Job>

    //...
}

通过 CoroutineScope 的扩大办法 launch 启动一个协程后,它会返回一个 Job 对象,它是协程的惟一标识,这个 Job 对象蕴含了这个协程工作的一系列状态,如下:

当一个协程 创立 后它就处于新建 (New) 状态,当调用 Job 的 start/join 办法后协程就处于沉闷 (Active) 状态,这是运行状态,协程运行 出错 或者调用 Job 的 cancel 办法都会将以后协程置为勾销中 (Cancelling) 状态, 处于勾销中状态的协程会等所有子协程 都实现 后才进入勾销 (Cancelled)状态,当协程执行 实现 后或者调用 CompletableJob(CompletableJob 是 Job 的一个子接口)的 complete 办法都会让以后协程进入实现中 (Completing) 状态,处于实现中状态的协程会等所有子协程 都实现 后才进入实现 (Completed) 状态。

尽管协程有 New、Cancelling、Completing 状态,然而内部是无奈感知这三个状态的,Job 只提供了 isActive、isCancelled、isCompleted 属性来供内部判断协程是否处于 Active、Cancelled、Completed 状态,当协程处于 Active 状态时,isActive 为 true,isCancelled 和 isCompleted 为 false,当协程处于 Cancelled 状态时,isCancelled 和 isCompleted 为 true,isActive 为 false,当协程处于 Completed 状态时,isCompleted 为 true,isActive 和 isCancelled 为 false。

协程中有两种类型的 Job,如果咱们平时启动协程时没有特意地通过 CoroutineContext 指定一个 Job,那么应用 launch/async 办法启动协程时返回的 Job 它会产生 异样流传 ,咱们晓得协程有一个父子的概念,例如启动一个协程 1,在协程中持续启动协程 2、协程 3,那么协程 1 就是协程 2、协程 3 的父协程,协程 2、协程 3 就是协程 1 的子协程,每个协程都会有一个对应的 Job, 协程之间的父子关系是通过 Job 对象维持的,像一颗树一样:

所以异样流传就是这个 Job 因为除了 CancellationException 以外的异样而失败时,那么父 Job 就会感知到并抛出异样,在抛出异样之前,父 Job 会勾销所有子 Job 的运行,这也是结构化编程的一个特点,如果要克制这种异样流传的行为,那么能够用到另外一种类型的 Job – SupervisorJob,SupervisorJob 它不是一个类,它是一个构造方法:

public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

SupervisorJob 办法会返回 CompletableJob 的一个 supervisor 实现,CompletableJob 是 Job 的一个子接口,它比 Job 接口多了一个 complete 办法,这意味着它能够调用 complete 办法让协程工作进入实现状态,supervisor 实现的意思是这个 Job 它不会产生异样流传,每个 Job 能够独自被治理,当 SupervisorJob 因为除了 CancellationException 以外的异样而失败时,并不会影响到父 Job 和其余子 Job,上面是 SupervisorJob 的一个应用例子:

fun main(){
     val parentJob = GlobalScope.launch {
       //childJob 是一个 SupervisorJob
        val childJob = launch(SupervisorJob()){throw NullPointerException()
        }
        childJob.join()
        println("parent complete")
    }
    Thread.sleep(1000)
}

childJob 抛出异样并不会影响 parentJob 的运行,parentJob 会持续运行并输入 parent complete。

2、CoroutineDispatcher

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
  
  public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
    ContinuationInterceptor,
    {it as? CoroutineDispatcher}
  )
  
  public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

  public abstract fun dispatch(context: CoroutineContext, block: Runnable)
  
  //...
}

CoroutineDispatcher 能够指定协程的运行线程,CoroutineDispatcher 外面有一个 dispatch 办法,这个 dispatch 办法用于把协程工作分派到特定线程运行,kotlin 曾经内置了 CoroutineDispatcher 的 4 个实现,能够通过 Dispatchers 的 Default、IO、Main、Unconfined 字段别离返回应用,如下:

public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
  
     @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
  
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher}

2.1、Default、IO

Dispatchers.Default 和 Dispatchers.IO 外部都是线程池实现,它们的含意是把协程运行在共享的线程池中,咱们先看 Dispatchers.Default 的实现,看 createDefaultDispatcher 办法:

internal actual fun createDefaultDispatcher(): CoroutineDispatcher = if (useCoroutinesScheduler) DefaultScheduler else CommonPool

DefaultScheduler 和 CommonPool 都是 CoroutineDispatcher 的子类,不同的是 DefaultScheduler 外部依赖的是 kotlin 本人实现的线程池逻辑,而 CommonPool 外部依赖的是 java 类库中的 Executor,默认状况下 useCoroutinesScheduler 为 true,所以 createDefaultDispatcher 办法返回的是 DefaultScheduler 实例,咱们看一下这个 DefaultScheduler:

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,//DefaultScheduler 实例被传进了 LimitingDispatcher 中
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
  
      //...
}

DefaultScheduler 中的 IO 字段就是 Dispatchers.IO,它是 LimitingDispatcher 实例,所以 Dispatchers.IO 的实现是 LimitingDispatcher,同时咱们要留神到 DefaultScheduler 是用 object 字段润饰,这阐明它是一个单例,并且 DefaultScheduler 实例被传进了 LimitingDispatcher 的构造方法中,所以 LimitingDispatcher 就会持有 DefaultScheduler 实例,而 DefaultScheduler 它的次要实现都在它的父类 ExperimentalCoroutineDispatcher 中:

@InternalCoroutinesApi
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
  
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    private var coroutineScheduler = createScheduler()
  
   // 返回 CoroutineScheduler 实例
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
           //dispatch 办法委托给了 CoroutineScheduler 的 dispatch 办法
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {//...}
  
  internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
        try {
           //dispatchWithContext 办法委托给了 CoroutineScheduler 的 dispatch 办法
            coroutineScheduler.dispatch(block, context, tailDispatch)
        } catch (e: RejectedExecutionException) {//...}
    }
  //...
}

咱们再看 Dispatchers.IO 对应的 LimitingDispatcher 实现:

private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,// 内部传进的 DefaultScheduler 实例
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {private val queue = ConcurrentLinkedQueue<Runnable>()
    private val inFlightTasks = atomic(0)

    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {val inFlight = inFlightTasks.incrementAndGet()

          if (inFlight <= parallelism) {
                //LimitingDispatcher 的 dispatch 办法委托给了 DefaultScheduler 的 dispatchWithContext 办法
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }

            queue.add(taskToSchedule)

            if (inFlightTasks.decrementAndGet() >= parallelism) {return}

            taskToSchedule = queue.poll() ?: return}
    }
  
  //...
}

从下面剖析得悉,Dispatchers.Default 的实现是 DefaultScheduler,Dispatchers.IO 的实现是LimitingDispatcher,而 LimitingDispatcher 持有 DefaultScheduler 实例,把 dispatch 操作委托给 DefaultScheduler,DefaultScheduler 外部持有CoroutineScheduler 实例,把 dispatch 操作委托给 CoroutineScheduler,而 DefaultScheduler 又是一个单例,所以 Dispatchers.Default 和 Dispatchers.IO 它们 共用 同一个 CoroutineScheduler 实例,它们之间的关系如下:

CoroutineScheduler就是 kotlin 本人实现的共享线程池,是 Dispatchers.Default 和 Dispatchers.IO 外部的独特实现,Dispatchers.Default 和 Dispatchers.IO 共享 CoroutineScheduler 中的线程,DefaultScheduler 和 LimitingDispatcher 的次要作用是对 CoroutineScheduler 进行线程数、工作数等配置,CoroutineScheduler 应用 工作窃取算法 (Work Stealing) 从新实现了一套线程池的任务调度逻辑,它的性能、扩展性对协程的任务调度更敌对,具体的逻辑能够查看这个类的 dispatch 办法:

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {override fun execute(command: Runnable) = dispatch(command)
  
      fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {val task = createTask(block, taskContext)
        //...
        if (task.mode == TASK_NON_BLOCKING) {//...} else {//...}
      }

          //...
}

所以这个线程池既能够运行两种类型的工作:CPU 密集型工作和 IO 密集型工作,用一个 mode 来区别,当你为协程指定 Dispatchers.Default 时,Dispatcher 会把协程的工作指定为 CPU 密集型工作,对应 mode 为TASK\_NON\_BLOCKING,当你为协程指定 Dispatchers.IO 时,Dispatcher 会把协程的工作指定为 IO 密集型工作,对应 mode 为TASK\_PROBABLY\_BLOCKING,所以这时 CoroutineScheduler 就能够依据 task mode 作出不同的线程创立、调度、唤醒策略,当启动协程时没有指定 Dispatcher,默认会应用 Dispatchers.Default。

当运行 CPU 密集型工作时,CoroutineScheduler 最多有 corePoolSize 个线程被创立,corePoolSize 它的取值为 max(2, CPU 外围数),即它会尽量的等于 CPU 外围数,当运行 IO 密集型工作时,它能够创立比 corePoolSize 更多的线程来运行 IO 型工作,但不能大于 maxPoolSize,maxPoolSize 会取一个很大的值,默认为 max(corePoolSize, min(CPU 外围数 * 128, 2^21 – 2)),即大于 corePoolSize,小于 2^21 – 2,而 2^21 – 2 是一个很大的数约为 2M,然而 CoroutineScheduler 是不可能创立这么多线程的,所以就须要内部限度提交的工作数,而 Dispatchers.IO 结构时就通过LimitingDispatcher 默认限度了最大线程并发数 parallelism 为max(64, CPU 外围数),即 Dispatchers.IO 最多只能提交 parallelism 个工作到 CoroutineScheduler 中执行,残余的工作被放进一个队列中期待。

CPU 密集型工作:CPU 密集型工作的特点是执行工作时 CPU 会处于繁忙状态,工作会耗费大量的 CPU 资源,例如计算简单的算术、视频解码等,如果此时线程数太多,超过了 CPU 外围数,那么这些超出来的线程是得不到 CPU 的执行的,只会节约内存资源,因为线程自身也有栈等空间,同时线程过多,频繁的线程切换带来的耗费也会影响线程池的性能,所以对于 CPU 密集型工作,线程池并发线程数等于 CPU 外围数能力让 CPU 的执行效率最大化;

IO 密集型工作:IO 密集型工作的特点是执行工作时 CPU 会处于闲置状态,工作不会耗费大量的 CPU 资源,例如网络申请、IO 操作等,线程执行 IO 密集型工作时大多数处于阻塞状态,处于阻塞状态的线程是不占用 CPU 的执行工夫,这时 CPU 就处于闲置状态,为了让 CPU 忙起来,执行 IO 密集型工作时理当让线程的创立数量更多一点,现实状况下线程数应该等于提交的工作数,对于这些多创立进去的线程,当它们闲置时,线程池个别会有一个超时回收策略,所以大部分状况下并不会占用大量的内存资源,但也会有极其状况,所以对于 IO 密集型工作,线程池并发线程数应尽可能地多能力进步 CPU 的吞吐量,这个尽可能地多的水平并不是无限大,而是依据业务状况设定,但必定要大于 CPU 外围数。

2.2、Unconfined

Dispatchers.Unconfined 的含意是不给协程指定运行的线程,在第一次被挂起 (suspend) 之前,由启动协程的线程执行它,但被挂起后, 会由复原协程的线程继续执行, 如果一个协程会被挂起屡次, 那么每次被复原后, 都有可能被不同线程继续执行,看上面的一个例子:

fun main(){GlobalScope.launch(Dispatchers.Unconfined){println(Thread.currentThread().name)

        // 挂起
        withContext(Dispatchers.IO){println(Thread.currentThread().name)
        }
        // 复原
        println(Thread.currentThread().name)

        // 挂起
        withContext(Dispatchers.Default){println(Thread.currentThread().name)
        }
        // 复原
        println(Thread.currentThread().name)
    }

    // 过程保活
    Thread.sleep(1000)
}

运行输入:main
DefaultDispatcher-worker-1
DefaultDispatcher-worker-1
DefaultDispatcher-worker-3
DefaultDispatcher-worker-3

协程启动时指定了 Dispatchers.Unconfined,所以第一次执行时是由启动协程的线程执行,下面在主线程中启动了协程,所以第一次输入主线程 main,withContext 办法是一个 suspend 办法,它能够挂起以后协程,并把指定的代码块运行到给定的上下文中,直到代码块运行实现并返回后果,第一个代码块通过 withContext 办法把它运行在 Dispatchers.IO 中,所以第二次输入了线程池中的某一个线程 DefaultDispatcher-worker-1,第一个代码块执行结束后,协程在 DefaultDispatcher-worker- 1 线程中复原,所以协程复原后执行在 DefaultDispatcher-worker- 1 线程中,所以第三次持续输入 DefaultDispatcher-worker-1,第二个代码块同理。

那么 Dispatchers.Unconfined 是怎么做到的呢,咱们看下 Unconfined 对应的 CoroutineDispatcher 实现 – kotlinx.coroutines.Unconfined:

internal object Unconfined : CoroutineDispatcher() {override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // It can only be called by the "yield" function. See also code of "yield" function.
        val yieldContext = context[YieldContext]
        if (yieldContext != null) {// report to "yield" that it is an unconfined dispatcher and don't call"block.run()"
            yieldContext.dispatcherWasUnconfined = true
            return
        }
        throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function." +
            "If you wrap Unconfined dispatcher in your code, make sure you properly delegate" +
            "isDispatchNeeded and dispatch calls.")
    }
}

Unconfined 他重写了 CoroutineDispatcher 的 isDispatchNeeded 办法和 dispatch 办法,isDispatchNeeded 办法返回了 false,示意不须要 dispatch,而默认 CoroutineDispatcher 的 isDispatchNeeded 办法是返回 true 的,Dispatchers.Default 和 Dispatchers.IO 都没有重写这个办法,Unconfined 的 dispatch 办法没有任何任务调度的逻辑,只是写明了只有当调用 yield 办法时,Unconfined 的 dispatch 办法才会被调用,yield 办法是一个 suspend 办法,当在协程中调用这个办法时示意以后协程让出本人所在的线程给其余协程运行,所以失常状况下是不会调用 Unconfined 的 dispatch 办法的。

在 kotlin 中每个协程都有一个 Continuation 实例与之对应,当协程 复原 时会调用 Continuation 的 resumeWith 办法,它的实现在 DispatchedContinuation 中,如下:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,// 协程的的 CoroutineDispatcher 实例
    @JvmField val continuation: Continuation<T>// 代表协程的 Continuation 实例
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
  
  //...
  
  override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.dispatch(context, this)
        } else {//Unconfined 走这里的逻辑
           // 调用 executeUnconfined 办法
            executeUnconfined(state, MODE_ATOMIC) {withCoroutineContext(this.context, countOrElement) {
                   // 调用 Continuation 的 resumeWith 办法
                    continuation.resumeWith(result)
                }
            }
        }
    }
}

咱们留神到 by 关键字,这是 kotlin 中的委托实现,DispatchedContinuation 通过 类委托增强 了 Continuation 的 resumeWith 办法,即在调用 Continuation 的 resumeWith 办法之前减少了一些本人的逻辑,咱们能够看到 DispatchedContinuation 的 resumeWith 办法中会依据 CoroutineDispatcher 的 isDispatchNeeded 办法返回值做出不同解决,当 isDispatchNeeded 办法返回 true 时,会调用协程的 CoroutineDispatcher 的 dispatch 办法,而当 isDispatchNeeded 办法返回 false 时,不会调用 CoroutineDispatcher 的 dispatch 办法而是调用 executeUnconfined 办法,下面讲到 Unconfined 的 isDispatchNeeded 办法返回了 false,咱们看 executeUnconfined 办法:

private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?,
      mode: Int, 
      doYield: Boolean = false,
    block: () -> Unit): Boolean {assert { mode != MODE_UNINITIALIZED}
   // 从 ThreadLocal 中取出 EventLoop
    val eventLoop = ThreadLocalEventLoop.eventLoop
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
   // 判断是否在执行 Unconfined 工作
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
          // 调用 EventLoop 的 dispatchUnconfined 办法把 Unconfined 工作放进 EventLoop 中
        eventLoop.dispatchUnconfined(this)
        true 
    } else {
          // 执行 Unconfined 工作
        runUnconfinedEventLoop(eventLoop, block = block)
        false
    }
}

internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
    eventLoop: EventLoop,
    block: () -> Unit) {eventLoop.incrementUseCount(unconfined = true)
    try {// 先执行 block 代码块,block()就是 executeUnconfined 办法传进的代码块, block()外面会调用 Continuation 的 resumeWith 办法
        block()
        while (true) {
           // 再调用 EventLoop 的 processUnconfinedEvent 办法执行 EventLoop 中的 Unconfined 工作,直到 EventLoop 中的所有 Unconfined 工作执行完才跳出循环
            if (!eventLoop.processUnconfinedEvent()) break
        }
    } catch (e: Throwable) {//...} finally {eventLoop.decrementUseCount(unconfined = true)
    }
}

能够看到对于 Unconfined 工作,是在以后线程马上执行或者通过以后线程的 EventLoop 来执行的,EventLoop 是寄存在 ThreadLocal 中的,所以 EventLoop 它是跟以后线程相关联的,而 EventLoop 也是 CoroutineDispatcher 的一个子类:

internal abstract class EventLoop : CoroutineDispatcher() {
  
  //...
  
  private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
  
  public fun dispatchUnconfined(task: DispatchedTask<*>) {val queue = unconfinedQueue ?: ArrayQueue<DispatchedTask<*>>().also {unconfinedQueue = it}
        queue.addLast(task)
  }
  
  public fun processUnconfinedEvent(): Boolean {
        val queue = unconfinedQueue ?: return false
        val task = queue.removeFirstOrNull() ?: return false
        task.run()
        return true
  }
}

EventLoop 中有一个双端队列用于寄存 Unconfined 工作,Unconfined 工作是指指定了 Dispatchers.Unconfined 的协程工作,EventLoop 的 dispatchUnconfined 办法用于把 Unconfined 工作放进队列的尾部,processUnconfinedEvent 办法用于从队列的头部移出 Unconfined 工作执行,所以 executeUnconfined 办法外面的策略就是:在以后线程立刻执行 Unconfined 工作,如果以后线程曾经在执行 Unconfined 工作,就临时把它放进跟以后线程关联的 EventLoop 中,期待执行,同时 Unconfined 工作外面会调用 Continuation 的 resumeWith 办法复原协程运行,这也是为什么指定了 Dispatchers.Unconfined 后协程复原可能被复原协程的线程执行的起因。

2.3、Main

Dispatchers.Main 的含意是把协程运行在平台相干的只能操作 UI 对象的 Main 线程,所以它依据不同的平台有不同的实现,kotlin 它反对上面三种平台:

  • kotlin/js:kotlin/js 是 kotlin 对 JavaScript 的反对,提供了转换 kotlin 代码,kotlin 规范库的能力,npm 包治理能力,在 kotlin/js 上 Dispatchers.Main 等效于 Dispatchers.Default;
  • kotlin/native:kotlin/native 是一种将 kotlin 代码编译为无需虚拟机就可运行的原生二进制文件的技术, 它的次要目标是容许对不须要或不可能应用虚拟机的平台进行编译,例如嵌入式设施或 iOS,在 kotlin/native 上 Dispatchers.Main 等效于 Dispatchers.Default;
  • kotlin/JVM:kotlin/JVM 就是须要虚拟机能力编译的平台,例如 Android 就是属于 kotlin/JVM,对于 kotlin/JVM 咱们须要引入对应的 dispatcher,例如 Android 就须要引入 kotlinx-coroutines-android 库,它外面有 Android 对应的 Dispatchers.Main 实现,其实就是把工作通过 Handler 运行在 Android 的主线程.

咱们再看 Dispatchers.Main 的实现 – MainDispatcherLoader.dispatcher:

internal object MainDispatcherLoader {
    
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {//... 次要是通过反射加载实现了 MainCoroutineDispatcher 的类}
}

所以 Dispatchers.Main 的 CoroutineDispatcher 实现是 MainCoroutineDispatcher,MainCoroutineDispatcher 的具体实现就因平台的不同而不同了,如果你间接应用 Dispatchers.Main 而没有引入对应的库就会引发 IllegalStateException 异样。

3、CoroutineName

public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {
   
    public companion object Key : CoroutineContext.Key<CoroutineName>

    override fun toString(): String = "CoroutineName($name)"
}

CoroutineName 就是协程的名字,它的构造很简略, 咱们平时开发个别是不会去指定一个 CoroutineName 的,因为 CoroutineName 只在 kotlin 的调试模式下才会被用的, 它在 debug 模式下被用于设置协程运行线程的名字:

internal data class CoroutineId(val id: Long) : ThreadContextElement<String>, AbstractCoroutineContextElement(CoroutineId) {override fun updateThreadContext(context: CoroutineContext): String {val coroutineName = context[CoroutineName]?.name ?: "coroutine"
        val currentThread = Thread.currentThread()
        val oldName = currentThread.name
        var lastIndex = oldName.lastIndexOf(DEBUG_THREAD_NAME_SEPARATOR)
        if (lastIndex < 0) lastIndex = oldName.length
        currentThread.name = buildString(lastIndex + coroutineName.length + 10) {append(oldName.substring(0, lastIndex))
            append(DEBUG_THREAD_NAME_SEPARATOR)
            append(coroutineName)
            append('#')
            append(id)
        }
        return oldName
    }
  
  //...
}

4、CoroutineExceptionHandler

public interface CoroutineExceptionHandler : CoroutineContext.Element {
   
    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

    public fun handleException(context: CoroutineContext, exception: Throwable)
}

CoroutineExceptionHandler 就是协程的异样处理器,用来解决协程运行中未捕捉的异样,每一个创立的协程默认都会有一个异样处理器,咱们能够在启动协程时通过 CoroutineContext 指定咱们自定义的异样处理器,咱们能够通过 CoroutineExceptionHandler 办法创立一个 CoroutineExceptionHandler,它会返回一个 CoroutineExceptionHandler 的默认实现,默认实现的 handleException 办法中调用了咱们传进的 handler 办法:

public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
    object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {override fun handleException(context: CoroutineContext, exception: Throwable) =
            handler.invoke(context, exception)
    }

CoroutineExceptionHandler 只对 launch 办法启动的根协程无效,而对 async 启动的根协程有效,因为 async 启动的 根协程 默认会捕捉所有未捕捉异样并把它放在 Deferred 中,等到用户调用 Deferred 的 await 办法才抛出,如下:

fun main(){
    // 自定义 CoroutineExceptionHandler
    val handler = CoroutineExceptionHandler{ coroutineContext, throwable ->
        println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}")
    }

    //handler 无效
    val job = GlobalScope.launch(handler){throw IndexOutOfBoundsException("exception thrown from launch")
    }
    job.start()

    //handler 有效
    val deferred = GlobalScope.async(handler){throw NullPointerException("exception thrown from async")
    }
    deferred.start()

    Thread.sleep(1000)
}

输入:my coroutineExceptionHandler catch exception, msg = exception thrown from launch

其中只有 launch 启动的根协程抛出的异样才被 CoroutineExceptionHandler 解决,而对于 async 启动的根协程抛出的异样 CoroutineExceptionHandler 有效,须要咱们调用 Deferred 的 await 办法时 try catch。

还有子协程抛出的未捕捉异样会委托父协程的 CoroutineExceptionHandler 解决,子协程设置的 CoroutineExceptionHandler 永远不会失效(SupervisorJob 除外),如下:

fun main(){
    // 根协程的 Handler
    val parentHandler = CoroutineExceptionHandler{coroutineContext, throwable ->
        println("parent coroutineExceptionHandler catch exception, msg = ${throwable.message}")
    }
    // 启动根协程
    val parentJob = GlobalScope.launch(parentHandler){
        // 子协程的 Handler
        val childHandler = CoroutineExceptionHandler{coroutineContext, throwable ->
            println("child coroutineExceptionHandler catch exception, msg = ${throwable.message}")
        }
        // 启动子协程
        val childJob = launch(childHandler){throw IndexOutOfBoundsException("exception thrown from child launch")
        }
        childJob.start()}
    parentJob.start()
    
    Thread.sleep(1000)
}

输入:parent coroutineExceptionHandler catch exception, msg = exception thrown from child launch

能够看到子协程设置 CoroutineExceptionHandler 没有输入,只有根协程的 CoroutineExceptionHandler 输入了,然而也有例外,如果子协程是SupervisorJob,那么它设置的 CoroutineExceptionHandler 是失效的,后面也说过 SupervisorJob 不会产生异样流传。

当父协程的子协程同时抛出多个异样时,CoroutineExceptionHandler 只会捕捉第一个协程抛出的异样,后续协程抛出的异样被保留在第一个异样的 suppressed 数组中,如下:

fun main(){
    val handler = CoroutineExceptionHandler{coroutineContext, throwable ->
        println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}, suppressed = ${throwable.suppressed.contentToString()}")
    }
    val parentJob = GlobalScope.launch(handler){
        launch {
            try {delay(200)
            }finally {
                // 第二个抛出的异样
                throw IndexOutOfBoundsException("exception thrown from first child launch")
            }
        }.start()

        launch {delay(100)
            // 第一个抛出的异样
            throw NullPointerException("exception thrown from second child launch")
        }.start()}
    parentJob.start()

    Thread.sleep(1000)
}

输入:my coroutineExceptionHandler catch exception, msg = exception thrown from second child launch, suppressed = [java.lang.IndexOutOfBoundsException: exception thrown from first child launch]

能够看到 CoroutineExceptionHandler 只解决了第一个子协程抛出的异样,后续异样都放在了第一个抛出异样的 suppressed 数组中。

还有勾销协程时会抛出一个 CancellationException,它会被所有 CoroutineExceptionHandler 省略,但能够 try catch 它,同时当子协程抛出 CancellationException 时,并不会终止以后父协程的运行:

fun main(){
  val handler = CoroutineExceptionHandler{coroutineContext, throwable ->
        println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}")
    }
    val parentJob = GlobalScope.launch(handler){
        val childJob = launch {
            try {delay(Long.MAX_VALUE)
            }catch (e: CancellationException){println("catch cancellationException thrown from child launch")
                println("rethrow cancellationException")
                throw CancellationException()}finally {println("child was canceled")
            }
        }
        // 勾销子协程
        childJob.cancelAndJoin()
        println("parent is still running")
    }
    parentJob.start()

    Thread.sleep(1000)
}

输入:catch cancellationException thrown from child launch
rethrow cancellationException
child was canceled
parent is still running

能够看到当抛出 CancellationException 时,咱们能够 try catch 住它,同时当咱们再次抛出它时,协程的 CoroutineExceptionHandler 并没有解决它,同时父协程不受影响,持续运行。

下面就是 CoroutineExceptionHandler 解决协程异样时的特点。

CoroutineContext 的构造

咱们再次看一下 CoroutineContext 的全家福:

下面解说了组成 CoroutineContext 的 Element,每一个 Element 都继承自 CoroutineContext,而每一个 Element 都能够通过 + 号来组合,也能够通过相似 map 的 [key] 来取值,这和 CoroutineContext 的运算符重载逻辑和它的构造实现 CombinedContext 无关,咱们先来看一下 CoroutineContext 类:

public interface CoroutineContext {// 操作符 [] 重载,能够通过 CoroutineContext[Key]这种模式来获取与 Key 关联的 Element
    public operator fun <E : Element> get(key: Key<E>): E?

    // 它是一个汇集函数,提供了从 left 到 right 遍历 CoroutineContext 中每一个 Element 的能力,并对每一个 Element 做 operation 操作
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    // 操作符 + 重载,能够 CoroutineContext + CoroutineContext 这种模式把两个 CoroutineContext 合并成一个
    public operator fun plus(context: CoroutineContext): CoroutineContext
    
    // 返回一个新的 CoroutineContext,这个 CoroutineContext 删除了 Key 对应的 Element
    public fun minusKey(key: Key<*>): CoroutineContext
  
    //Key 定义,空实现,仅仅做一个标识
    public interface Key<E : Element>

   //Element 定义,每个 Element 都是一个 CoroutineContext
    public interface Element : CoroutineContext {
       
          // 每个 Element 都有一个 Key 实例
        public val key: Key<*>
                
          //...
    }
}

除了 plus 办法,CoroutineContext 中的其余三个办法都被 CombinedContext、Element、EmptyCoroutineContext 重写,CombinedContext 就是 CoroutineContext 汇合构造的实现,它外面是一个 递归定义,Element 就是 CombinedContext 中的元素,而 EmptyCoroutineContext 就示意一个空的 CoroutineContext,它外面是空实现。

1、CombinedContext

咱们先看 CombinedContext 类:

//CombinedContext 只蕴含 left 和 element 两个成员:left 可能为 CombinedContext 或 Element 实例,而 element 就是 Element 实例
internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
  
      //CombinedContext 的 get 操作的逻辑是://1、先看 element 是否是匹配,如果匹配,那么 element 就是须要找的元素,返回 element,否则阐明要找的元素在 left 中,持续从 left 开始找,依据 left 是 CombinedContext 还是 Element 转到 2 或 3
      //2、如果 left 又是一个 CombinedContext,那么反复 1
      //3、如果 left 是 Element,那么调用它的 get 办法返回
      override fun <E : Element> get(key: Key<E>): E? {
        var cur = this
        while (true) {
            //1
            cur.element[key]?.let {return it}
            val next = cur.left
            if (next is CombinedContext) {//2
                cur = next
            } else {//3
                return next[key]
            }
        }
    }

    //CombinedContext 的 fold 操作的逻辑是:先对 left 做 fold 操作,把 left 做完 fold 操作的的返回后果和 element 做 operation 操作
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
        operation(left.fold(initial, operation), element)

    //CombinedContext 的 minusKey 操作的逻辑是://1、先看 element 是否是匹配,如果匹配,那么 element 就是须要删除的元素,返回 left,否则阐明要删除的元素在 left 中,持续从 left 中删除对应的元素,依据 left 是否删除了要删除的元素转到 2 或 3 或 4
    //2、如果 left 中不存在要删除的元素,那么以后 CombinedContext 就不存在要删除的元素,间接返回以后 CombinedContext 实例就行
    //3、如果 left 中存在要删除的元素,删除了这个元素后,left 变为了空,那么间接返回以后 CombinedContext 的 element 就行
    //4、如果 left 中存在要删除的元素,删除了这个元素后,left 不为空,那么组合一个新的 CombinedContext 返回
    public override fun minusKey(key: Key<*>): CoroutineContext {
          //1
        element[key]?.let {return left}
        val newLeft = left.minusKey(key)
        return when {
            newLeft === left -> this//2
            newLeft === EmptyCoroutineContext -> element//3
            else -> CombinedContext(newLeft, element)//4
        }
    }
  
  //...
}

能够发现 CombinedContext 中的 get、fold、minusKey 操作都是递归模式的操作,递归的起点就是当这个 left 是一个 Element,咱们再看 Element 类:

public interface Element : CoroutineContext {

    public val key: Key<*>
        
    //Element 的 get 办法逻辑:如果 key 和本人的 key 匹配,那么本人就是要找的 Element,返回本人,否则返回 null
    public override operator fun <E : Element> get(key: Key<E>): E? =
        if (this.key == key) this as E else null

    //Element 的 fold 办法逻辑:对传入的 initial 和本人做 operation 操作
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
        operation(initial, this)

    //Element 的 minusKey 办法逻辑:如果 key 和本人的 key 匹配,那么本人就是要删除的 Element,返回 EmptyCoroutineContext(示意删除了本人),否则阐明本人不须要被删除,返回本人
    public override fun minusKey(key: Key<*>): CoroutineContext =
        if (this.key == key) EmptyCoroutineContext else this
}

当初咱们把 CombinedContext 和 Element 联合来看,那么 CombinedContext 的整体构造如下:

有点像是一个链表,left 就是指向下一个结点的指针,有了这个图咱们再从整体看当调用 CombinedContext 的 get、fold、minusKey 操作时的拜访程序:get、minusKey 操作大体逻辑都是先拜访以后 element,不满足,再拜访 left 的 element,程序都是从 right 到 left,而 fold 的操作大体逻辑是先拜访 left,直到递归到最初的 element,而后再从 left 到 right 的返回,从而拜访了所有的 element。

2、CoroutineContext 的 plus 操作

当初咱们来看 CoroutineContext 惟一没有被重写的办法 – plus办法:

public interface CoroutineContext {
   
    //...
  
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else 
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
}

这个办法看起来有点简单,为了不便咱们了解,我把它简化一下,我把对 ContinuationInterceptor 的解决去掉,如下:

public interface CoroutineContext {
   
    //...
  
    public operator fun plus(context: CoroutineContext): CoroutineContext =
      // 如果要相加的 CoroutineContext 为空,那么不做任何解决,间接返回
        if (context === EmptyCoroutineContext) this else 
          // 如果要相加的 CoroutineContext 不为空,那么对它进行 fold 操作
            context.fold(this) { acc, element -> // 咱们能够把 acc 了解成 + 号右边的 CoroutineContext,element 了解成 + 号左边的 CoroutineContext 的某一个 element
                // 首先从右边 CoroutineContext 中删除左边的这个 element
                val removed = acc.minusKey(element.key)
                // 如果 removed 为空,阐明右边 CoroutineContext 删除了和 element 雷同的元素后为空,那么返回左边的 element 即可
                if (removed === EmptyCoroutineContext) element else {
                      // 如果 removed 不为空,阐明右边 CoroutineContext 删除了和 element 雷同的元素后还有其余元素,那么结构一个新的 CombinedContext 返回
                      return CombinedContext(removed, element)
                }
            }
}

plus 办法大部分状况最终下返回一个 CombinedContext,即咱们把两个 CoroutineContext 相加后,返回一个 CombinedContext,在组合成 CombinedContext 时,+ 号左边的 CoroutineContext 中的元素会 笼罩+ 号右边的 CoroutineContext 中的含有雷同 key 的元素,如下:

(Dispatchers.Main, "name") + (Dispatchers.IO) = (Dispatchers.IO, "name")

这个笼罩操作就在 fold 办法的参数 operation 代码块 中实现,通过 minusKey 办法删除掉反复元素,后面讲过当调用 CombinedContext 的 fold 办法时,会从 left 到 right 到拜访所有的 element,即会从 left 到 right 的把每一个 element 传入 operation 办法中,作为 operation 办法的第二个参数,而 operation 办法第一个参数 acc 的初始值为 fold 办法传入的 initial 值,而后它会一直的更新,每次更新的值为上一次调用 operation 办法的返回值,所以当两个 CoroutineContext 相加时,puls 办法能够了解为上面的伪代码:

 val acc = 右边的 CoroutineContext
  for(var element in 左边的 CoroutineContext){acc = operation(acc, element)//operation 操作中会让 element 笼罩掉 acc 中与 element 雷同的元素
  }
  return acc// 所以 plus 办法最终返回的 CoroutineContext 是不存在 key 雷同的 element 的

所以 puls 办法最终返回的 CoroutineContext 是不存在 key 雷同的 element 的,+ 号左边的 CoroutineContext 中的元素会笼罩 + 号右边的 CoroutineContext 中的含有雷同 key 的元素,这像是 Set 的个性。

当初咱们再看回简化前的 plus 办法,它外面有个对 ContinuationInterceptor 的解决,目标是让 ContinuationInterceptor 在每次相加后都能变成 CoroutineContext 中的 最初 一个元素,ContinuationInterceptor 它也是继承自 Element,通常叫做协程上下文拦截器,它的次要作用是在协程执行前拦挡它,从而在协程执行前做出一些其余的操作,后面咱们讲到 CoroutineDispatcher 它自身也继承自 ContinuationInterceptor,ContinuationInterceptor 有一个 interceptContinuation 办法用于返回拦挡协程的行为,而这个行为就是后面咱们所讲到 Dispatchers.Unconfined 时的DispatchedContinuation,DispatchedContinuation 在复原协程前依据协程的 CoroutineDispatcher 类型做出不同的协程分派行为,通过把 ContinuationInterceptor 放在最初面,协程在查找上下文的 element 时,总能最快找到拦截器,防止了递归查找,从而让拦挡行为前置执行。

结语

本文次要介绍了 CoroutineContext 的元素组成和构造,了解 CoroutineContext 对于了解协程应用有很大的帮忙,因为协程的启动时就离不开 CoroutineContext,同时如果你当前想要更深刻的学习协程,例如协程的创立过程,Continuation 概念、suspend 关键字等,本篇文章也能给你一个抛砖引玉的成果。

以上就是本文的所有内容,心愿大家有所播种!

相干学习视频举荐:

【Kotlin 进阶教程】——Kotlin 协程从实战到原理_哔哩哔哩_bilibili

Android 进阶零碎学习——一节课搞定 Kotlin 核心技术_哔哩哔哩_bilibili

【Android 进阶学习训练营】全网最全面 Kotlin 外围 Lambda 与高阶全面解说_哔哩哔哩_bilibili

正文完
 0