乐趣区

关于android:Kotlin-协程和-Android-SQLite-API-中的线程模型

从 Room 2.1 版本之后,开发者们能够通过定义 suspend DAO 函数来应用 Kotlin 协程了。协程在解决异步操作时体现得异样优良,它能够让您用程序天然的代码解决诸如操作数据库一类的耗时操作,而不再须要专门在线程之间来回切换工作、处理结果或谬误了。Room 反对协程后,能够在数据库操作中应用由并发作用域、生命周期、嵌套所带来的一些便当。

在咱们为 Room 增加协程的反对期间,咱们遇到并解决了在协程模型和 Android SQL API 中没想到的一些问题。在本篇文章中,咱们会向您论述咱们遇到的这些问题,以及咱们的解决方案。

预料外的问题

先看一下上面这段代码,看似是平安的,但实际上这段代码存在着问题:

/**
 * 将指定的 [amount] 的金额,从 [accountA] 转移到 [accountB]
 */
suspend fun transferMoney(accountA: String, accountB: String, amount: Int) {
   // 应用了 IO dispatcher,所以该 DB 的操作在 IO 线程上进行
    withContext(Dispatchers.IO) {database.beginTransaction() // 在 IO-Thread-1 线程上开始执行事务
        try {
    // 协程能够在与调度器(这里就是 Dispatchers.IO)相关联的任何线程上绑定并继续执行。同时,因为事务也是在 IO-Thread-1 中开始的,因而咱们可能恰好能够胜利执行查问。moneyDao.decrease(accountA, amount) // 挂起函数

    // 如果协程又持续在 IO-Thread-2 上执行,那么下列操作数据库的代码可能会引起死锁,因为它须要等到 IO-Thread-1 的线程执行完结后才能够持续。moneyDao.increase(accountB, amount) // 挂起函数

            database.setTransactionSuccessful() // 永远不会执行这一行} finally {database.endTransaction() // 永远不会执行这一行
        }
    }
}

Android 的 SQLite 事务受制于单个线程

上述代码中的问题在于 Android 的 SQLite 事务是受制于单个线程的。当一个正在进行的事务中的某个查问在以后线程中被执行时,它会被视为是该事务的一部分并容许继续执行。但当这个查问在另外一个线程中被执行时,那它就不再属于这个事务的一部分了,这样的话就会导致这个查问被阻塞,直到事务在另外一个线程执行实现。这也是 beginTransaction 和 endTransaction 这两个 API 可能保障原子性的一个前提。当数据库的事务操作都是在一个线程上实现的,这样的 API 不会有任何问题,然而应用协程之后问题就来了,因为协程是不绑定在任何特定的线程上的。也就是说,问题的本源就是在协程挂起之后会继续执行所绑定的那个线程,而这样是不能保障和挂起之前所绑定的线程是同一个线程。

在协程中应用数据库事务操作可能会引起死锁

简略实现

为了解决 Android SQLite 的这个限度,咱们须要一个相似于 runInTransaction 这样能够承受挂起代码块的 API,这个 API 实现起来就像写一个单线程的调度器一样:

suspend fun <T> RoomDatabase.runInTransaction(block: suspend () -> T
): T = withContext(newSingleThreadContext("DB")) {beginTransaction()
    try {val result = block.invoke(this)
        setTransactionSuccessful()
        return@runBlocking result
    } finally {endTransaction()
    }
}

以上实现仅仅是个开始,然而当在挂起代码块中应用另一个调度器的话就会出问题了:

// 一个很简略的退税函数
suspend fun sendTaxRefund(federalAccount: String, taypayerList: List<Taxpayer>) {
    database.runInTransaction {
        val refundJobs = taypayerList.map { taxpayer ->
            coroutineScope {
             // 并行去计算退税金额
                async(Dispatchers.IO) {val amount = irsTool.calculateRefund(taxpayer)
                    moneyDao.decrease(federalAccount, amount)
                    moneyDao.increase(taxpayer.account, amount)
                }
            }
        }
        // 期待所有计算工作完结
        refundJobs.joinAll()}
}

因为接管的参数是一个挂起代码块,所以这部分代码就有可能应用一个不同的调度器来启动子协程,这样就会导致执行数据库操作的是另外的一个线程。因而,一个比拟好的实现是应该容许应用相似于 async、launch 或 withContext 这样的规范协程结构器。而在理论利用中,只有数据库操作才须要被调度到单事务线程。

介绍 withTransaction

为了解决下面的问题,咱们构建了 withTransaction API.withTransaction(kotlin.coroutines.SuspendFunction0)),它模拟了 withContext API,然而提供了专为平安执行 Room 事务而构建的协程上下文,您能够依照如下形式编写代码:

fun transferMoney(
    accountA: String,
    accountB: String,
    amount: Int
) = GlobalScope.launch(Dispatchers.Main) {
    roomDatabase.withTransaction {moneyDao.decrease(accountA, amount)
        moneyDao.increase(accountB, amount)
    }
    Toast.makeText(context, "Transfer Completed.", Toast.LENGTH_SHORT).show()}

在深入研究 Room withTransaction API 的实现前,让咱们先回顾一下已提到的一些协程的概念。CoroutineContext 蕴含了须要对协程工作进行调度的信息,它携带了以后的 CoroutineDispatcher 和 Job 对象,以及一些额定的数据,当然也能够对它进行扩大来使其蕴含更多信息。CoroutineContext 的一个重要特色是它们被同一协程作用域下的子协程所继承,比方 withContext 代码块的作用域。这一机制可能让子协程持续应用同一个调度器,或在父协程被勾销时,它们会被一起勾销。实质上,Room 提供的挂起事务 API 会创立一个专门的协程上下文来在同一个事务作用域下执行数据库操作。

withTransaction API 在上下文中创立了三个要害元素:

  • 单线程调度器,用于执行数据库操作;
  • 上下文元素,帮忙 DAO 函数判断其是否处在事务中;
  • ThreadContextElement,用来标记事务协程中所应用的调度线程。

事务调度器

CoroutineDispatcher 会决定协程该绑定到哪个线程中执行。比方,Dispatchers.IO 会应用一个共享线程池分流执行那些会产生阻塞的操作,而 Dispatchers.Main 会在 Android 主线程中执行协程。由 Room 创立的事务调度器可能从 Room 的 Executor) 获取繁多线程,并将事务分发给该线程,而不是分发给一个随便创立的新线程。这一点很重要,因为 executor 能够由用户来配置,并且可作为测试工具应用。在事务开始时,Room 会取得 executor 中某个线程的控制权,直到事务完结。在事务执行期间,即便调度器因子协程产生了变动,已执行的数据库操作仍会被调配到该事务线程上。

获取一个事务线程并不是一个阻塞操作,它也不应该是阻塞操作,因为如果没有可用线程的话,应该执行挂起操作,而后告诉调用方,防止影响其余协程的执行。它还会将一个 runnable 插入队列,而后期待其运行,这也是线程可运行的一个标记。suspendCancellableCoroutine 函数为咱们搭建了连贯基于回调的 API 和协程之间的桥梁。在这种状况下,一旦之前入队列的 runnable 执行了,就代表着一个线程可用,咱们会应用 runBlocking 启动一个事件循环来获取此线程的控制权。而后 runBlocking 所创立的调度器会将要执行的代码块分发给已取得的线程。另外,Job 被用来挂起和放弃线程的可用性,直到事务执行实现为止。要留神的是,一旦协程被勾销了或者是无奈获取到线程,就要有防范措施。获取事务线程的相干代码如下:

/**
 * 构建并返回一个 [ContinuationInterceptor] 用来将协程散发到获取到的线程中,并执行事务。[controlJob] 用来通过勾销工作来控制线程的开释。*/
private suspend fun Executor.acquireTransactionThread(controlJob: Job): ContinuationInterceptor = suspendCancellableCoroutine { continuation ->
    continuation.invokeOnCancellation {
        // 当咱们在期待获取到可用线程时,如果失败了或者工作勾销,咱们是不可能进行期待这一动作的,但咱们能够勾销 controlJob,这样一旦获取到控制权,很快就会被开释。controlJob.cancel()}
    try {
        execute {
            // runBlocking 创立一个 event loop 来执行协程中的工作代码
            runBlocking {
               // 获取到线程后,通过返回有 runBlocking 创立的拦截器来复原 suspendCancellableCoroutine,拦截器将会被用来拦挡和散发代码块到获取的线程中
                continuation.resume(coroutineContext[ContinuationInterceptor]!!)

               // 挂起 runBlocking 协程,直到 controlJob 实现。因为协程是空的,所以这将会阻止 runBlocking 立刻完结。controlJob.join()}
        }
    } catch (ex: RejectedExecutionException) {
       // 无奈获取线程,勾销协程
        continuation.cancel(
            IllegalStateException("Unable to acquire a thread to perform the transaction.", ex)
        )
    }
}

事务上下文元素

有了调度器后,咱们就能够创立事务中的元素来增加到上下文中,并放弃着对调度器的援用。如果在事务作用域内调用了 DAO 函数,就能够把 DAO 函数从新路由到相应的线程中。咱们创立的事务元素如下:

internal class TransactionElement(
    private val transactionThreadControlJob: Job,
    internal val transactionDispatcher: ContinuationInterceptor
) : CoroutineContext.Element {

   // Singleton key 用于检索此上下文中的 element
    companion object Key : CoroutineContext.Key<TransactionElement>

    override val key: CoroutineContext.Key<TransactionElement>
        get() = TransactionElement

    /**
     * 这个 element 用来统计事务数量(蕴含嵌套事务)。调用 [acquire] 来减少计数,调用 [release] 来缩小计数。如果在调用 [release] 时计数达到 0,则事务被勾销,事务线程会被开释
     */
    private val referenceCount = AtomicInteger(0)

    fun acquire() {referenceCount.incrementAndGet()
    }

    fun release() {val count = referenceCount.decrementAndGet()
        if (count < 0) {
            throw IllegalStateException("Transaction was never started or was already released.")
        } else if (count == 0) {
           // 勾销管制事务线程的 job 会导致它被 release
            transactionThreadControlJob.cancel()}
    }
}

TransactionElement 函数中的 acquire 和 release 是用来跟踪嵌套事务的。因为 beginTransaction 和 endTransaction 容许嵌套调用,咱们也想保留这个个性,然而咱们只须要在最外层事务实现时开释事务线程即可。这些性能的用法在稍后的 withTransaction 实现中会介绍。

事务线程标记

上文中提到的创立事务上下文中所需的最初一个要害元素是 ThreadContextElement。CoroutineContext 中的这个元素相似于 ThreadLocal,它可能跟踪线程中是否有正在进行的事务。这个 element 是由一个 ThreadLocal 反对,对于调度器所用的每个线程,它都会在 ThreadLocal 上设置一个值来执行协程代码块。线程一旦实现工作后,这个值会被重置。在咱们的例子中,这个值是没有意义的,在 Room 中也只须要确定这个值是否存在即可。如果协程上下文能够拜访平台中存在的 ThreadLocal<SQLiteSession>,则能够从协程所绑定的任何线程向其散发 begin/ends 命令,如果做不到,那在事务实现前只能阻塞线程。但咱们依然须要追踪每个阻塞的数据库办法是在哪个事务上运行,以及哪个线程负责平台事务。

Room 的 withTransaction API 中应用的 ThreadContextElement 会标识数据库中的阻塞函数。Room 中的阻塞函数,蕴含 DAO 生成的那些,在它们被事务协程调用后会被非凡解决,用来保障它们不会在其余的调度器上运行。如果您的 DAO 同时具备这两种类型的性能,则能够在 withTransaction 块中将阻塞函数与挂起函数混合和匹配。通过将 ThreadContextElement 增加到协程上下文中,并从 DAO 函数中拜访它,咱们能够验证阻塞函数是否处于正确的作用域中。如果不是, 咱们会抛出异样而不是造成死锁 。在之后,咱们打算将阻塞函数也从新路由到事务线程中。

private final ThreadLocal<Integer> mSuspendingTransactionId = new ThreadLocal<>();
public void assertNotSuspendingTransaction() {if (!inTransaction() && mSuspendingTransactionId.get() != null) {
        throw new IllegalStateException("Cannot access database on a different"
                + "coroutine context inherited from a suspending transaction.");
    }
}

这三个元素的组合形成了咱们的事务上下文:

private suspend fun RoomDatabase.createTransactionContext(): CoroutineContext {val controlJob = Job()
    val dispatcher = queryExecutor.acquireTransactionThread(controlJob)
    val transactionElement = TransactionElement(controlJob, dispatcher)
    val threadLocalElement =
        suspendingTransactionId.asContextElement(controlJob.hashCode())
    return dispatcher + transactionElement + threadLocalElement
}

事务 API 的实现

创立了事务上下文之后,咱们终于能够提供一个平安的 API 用于在协程中执行数据库事务。接下来要做的就是将这个上下文和通常的 begin/end 事务模式联合起来:

suspend fun <R> RoomDatabase.withTransaction(block: suspend () -> R
): R {
    // 如果能够的话就应用继承的事务上下文,这样容许嵌套挂起的事务
    val transactionContext =
        coroutineContext[TransactionElement]?.transactionDispatcher 
            ?: createTransactionContext()
    return withContext(transactionContext) {val transactionElement = coroutineContext[TransactionElement]!!
        transactionElement.acquire()
        try {beginTransaction()
            try {
                // 在一个新的 scope 中封装 suspend 代码块,来期待子协程
                val result = coroutineScope {block.invoke(this)
                }
                setTransactionSuccessful()
                return@withContext result
            } finally {endTransaction()
            }
        } finally {transactionElement.release()
        }
    }
}

Android 中 SQLite 的线程限度是正当的,这在 Kotlin 还没呈现时未然如此设计了。协程引入了新的编程范式,扭转了传统 Java 并发编程的一些思维模式。间接勾销 Android 线程对 SQLite 事务的限度是不可行的,因为咱们心愿提供一个向后兼容的解决方案,而上述这些办法的组合最终让咱们在应用协程和 Fluent API 的解决方案中施展了创造性。

退出移动版