本文已收录到  AndroidFamily,技术和职场问题,请关注公众号 [彭旭锐] 发问。

前言

大家好,我是小彭。

在上一篇文章里,咱们聊到了 Square 开源的 I/O 框架 Okio 的三个劣势:精简且全面的 API、基于共享的缓冲区设计以及超时机制。前两个劣势曾经剖析过了,明天咱们来剖析 Okio 的超时检测机制。

本文源码基于 Okio v3.2.0。


思维导图:


1. 意识 Okio 的超时机制

超时机制是一项通用的零碎设计,可能防止零碎长时间阻塞在某些工作上。例如网络申请在超时工夫内没有响应,客户端就会提前中断请求,并提醒用户某些性能不可用。

1.1 说一下 Okio 超时机制的劣势

先思考一个问题,相比于传统 IO 的超时有什么劣势呢?我认为次要体现在 2 个方面:

  • 劣势 1 - Okio 补救了局部 IO 操作不反对超时检测的缺点:

Java 原生 IO 操作是否反对超时,齐全取决于底层的零碎调用是否反对。例如,网络 Socket 反对通过 setSoTimeout API 设置单次 IO 操作的超时工夫,而文件 IO 操作就不反对,应用原生文件 IO 就无奈实现超时。

而 Okio 是对立在应用层实现超时检测,不论零碎调用是否反对超时,都能提供对立的超时检测机制。

  • 劣势 2 - Okio 不仅反对单次 IO 操作的超时检测,还反对蕴含屡次 IO 操作的复合工作超时检测:

Java 原生 IO 操作只能实现对单次 IO 操作的超时检测,无奈实现对蕴含屡次 IO 操作的复合工作超时检测。例如,OkHttp 反对配置单次 connect、read 或 write 操作的超时检测,还反对对一次残缺 Call 申请的超时检测,有时候单个操作没有超时,但串联起来的残缺 call 却超时了。

而 Okio 超时机制和 IO 操作没有强耦合,不仅反对对 IO 操作的超时检测,还反对非 IO 操作的超时检测,所以这种复合工作的超时检测也是能够实现的。

1.2 Timeout 类的作用

Timeout 类是 Okio 超时机制的外围类,Okio 对 Source 输出流和 Sink 输入流都提供了超时机制,咱们在结构 InputStreamSource 和 OutputStreamSink 这些流的实现类时,都须要携带 Timeout 对象:

Source.kt

interface Source : Closeable {    // 返回超时管制对象    fun timeout(): Timeout    ...}

Sink.kt

actual interface Sink : Closeable, Flushable {    // 返回超时管制对象    actual fun timeout(): Timeout    ...}

Timeout 类提供了两种配置超时工夫的形式(如果两种形式同时存在的话,Timeout 会优先采纳更早的截止工夫):

  • 1、timeoutNanos 工作解决工夫: 设置解决单次工作的超时工夫,

最终触发超时的截止工夫是工作的 startTime + timeoutNanos

  • 2、deadlineNanoTime 截止工夫: 间接设置将来的某个工夫点,多个工作整体的超时工夫点。

Timeout.kt

// hasDeadline 这个属性显得没必要private var hasDeadline = false // 是否设置了截止工夫点private var deadlineNanoTime = 0L // 截止工夫点(单位纳秒)private var timeoutNanos = 0L // 解决单次工作的超时工夫(单位纳秒)

创立 Source 和 Sink 对象时,都须要携带 Timeout 对象:

JvmOkio.kt

// ----------------------------------------------------------------------------// 输出流// ----------------------------------------------------------------------------fun InputStream.source(): Source = InputStreamSource(this, Timeout() /*Timeout 对象*/)// 文件输出流fun File.source(): Source = InputStreamSource(inputStream(), Timeout.NONE)// Socket 输出流fun Socket.source(): Source {    val timeout = SocketAsyncTimeout(this)    val source = InputStreamSource(getInputStream(), timeout /*携带 Timeout 对象*/)    // 包装为异步超时    return timeout.source(source)}// ----------------------------------------------------------------------------// 输入流// ----------------------------------------------------------------------------fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout() /*Timeout 对象*/)// 文件输入流fun File.sink(append: Boolean = false): Sink = FileOutputStream(this, append).sink()// Socket 输入流fun Socket.sink(): Sink {    val timeout = SocketAsyncTimeout(this)    val sink = OutputStreamSink(getOutputStream(), timeout /*携带 Timeout 对象*/)    // 包装为异步超时    return timeout.sink(sink)}

在 Timeout 类的根底上,Okio 提供了 2 种超时机制:

  • Timeout 是同步超时
  • AsyncTimeout 是异步超时

Okio 框架


2. Timeout 同步超时

Timeout 同步超时依赖于 Timeout#throwIfReached() 办法。

同步超时在每次执行工作之前,都须要先调用 Timeout#throwIfReached() 查看以后工夫是否达到超时截止工夫。如果超时则会间接抛出超时异样,不会再执行工作。

JvmOkio.kt

private class InputStreamSource(    // 输出流    private val input: InputStream,    // 超时管制    private val timeout: Timeout) : Source {    override fun read(sink: Buffer, byteCount: Long): Long {        // 1、参数校验        if (byteCount == 0L) return 0        require(byteCount >= 0) { "byteCount < 0: $byteCount" }        // 2、查看超时工夫        timeout.throwIfReached()        // 3、执行输出工作(已简化)        val bytesRead = input.read(...)        return bytesRead.toLong()    }    ...}private class OutputStreamSink(    // 输入流    private val out: OutputStream,    // 超时管制    private val timeout: Timeout) : Sink {    override fun write(source: Buffer, byteCount: Long) {        // 1、参数校验        checkOffsetAndCount(source.size, 0, byteCount)        // 2、查看超时工夫        timeout.throwIfReached()        // 3、执行输出工作(已简化)        out.write(...)        ...    }    ...}

看一眼 Timeout#throwIfReached 的源码。 能够看到,同步超时只思考 “deadlineNanoTime 截止工夫”,如果只设置 “timeoutNanos 工作解决工夫” 是有效的,我感觉这个设计容易让开发者出错。

Timeout.kt

@Throws(IOException::class)open fun throwIfReached() {    if (Thread.interrupted()) {        // 传递中断状态        Thread.currentThread().interrupt() // Retain interrupted status.        throw InterruptedIOException("interrupted")    }    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {        // 抛出超时异样        throw InterruptedIOException("deadline reached")    }}

有必要解释所谓 “同步” 的意思:

同步超时就是指工作的 “执行” 和 “超时查看” 是同步的。当工作超时时,Okio 同步超时不会间接中断工作执行,而是须要检被动查超时工夫(Timeout#throwIfReached)来判断是否产生超时,再决定是否中断工作执行。

这其实与 Java 的中断机制是十分类似的:

当 Java 线程的中断标记地位位时,并不是真的会间接中断线程执行,而是被动须要查看中断标记位(Thread.interrupted)来判断是否产生中断,再决定是否中断线程工作。所以说 Java 的线程中断机制是一种 “同步中断”。

能够看出,同步超时存在 “滞后性”:

因为同步超时须要被动查看,所以即便在工作执行过程中产生超时,也必须等到查看时才会发现超时,无奈及时触发超时异样。因而,就须要异步超时机制。

同步超时示意图


3. AsyncTimeout 异步超时

  • 异步超时监控进入: 异步超时在每次执行工作之前,都须要先调用 AsyncTimeout#enter() 办法将 AsyncTimeout 挂载到超时队列中,并依据超时截止工夫的先后顺序排序,队列头部的节点就是会最先超时的工作;
  • 异步超时监控退出: 在每次工作执行完结之后,都须要再调用 AsyncTimeout#exit() 办法将 AsyncTimeout 从超时队列中移除。

留神: enter() 办法和 eixt() 办法必须成对存在。

AsyncTimeout.kt

open class AsyncTimeout : Timeout() {    // 是否在期待队列中    private var inQueue = false    // 后续指针    private var next: AsyncTimeout? = null    // 超时截止工夫    private var timeoutAt = 0L    // 异步超时监控进入    fun enter() {        check(!inQueue) { "Unbalanced enter/exit" }        val timeoutNanos = timeoutNanos()        val hasDeadline = hasDeadline()        if (timeoutNanos == 0L && !hasDeadline) {            return        }        inQueue = true        scheduleTimeout(this, timeoutNanos, hasDeadline)    }    // 异步超时监控退出    // 返回值:是否产生超时(如果节点不存在,阐明被 WatchDog 线程移除,即产生超时)    fun exit(): Boolean {        if (!inQueue) return false        inQueue = false        return cancelScheduledTimeout(this)    }    // 在 WatchDog 线程调用    protected open fun timedOut() {}    companion object {        // 超时队列头节点(哨兵节点)        private var head: AsyncTimeout? = null        // 散发超时监控工作        private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {            synchronized(AsyncTimeout::class.java) {                // 首次增加监控时,须要启动 Watchdog 线程                if (head == null) {                    // 哨兵节点                    head = AsyncTimeout()                    Watchdog().start()                }                // now:以后工夫                val now = System.nanoTime()                // timeoutAt 超时截止工夫:计算 now + timeoutNanos 和 deadlineNanoTime 的较小值                if (timeoutNanos != 0L && hasDeadline) {                    node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)                } else if (timeoutNanos != 0L) {                    node.timeoutAt = now + timeoutNanos                } else if (hasDeadline) {                    node.timeoutAt = node.deadlineNanoTime()                } else {                    throw AssertionError()                }                // remainingNanos 超时剩余时间:以后工夫间隔超时产生的工夫                val remainingNanos = node.remainingNanos(now)                var prev = head!!                // 线性遍历超时队列,依照超时截止工夫将 node 节点插入超时队列                while (true) {                    if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {                        node.next = prev.next                        prev.next = node                        // 如果插入到队列头部,须要唤醒 WatchDog 线程                        if (prev === head) {                            (AsyncTimeout::class.java as Object).notify()                        }                        break                    }                    prev = prev.next!!                }            }        }        // 勾销超时监控工作        // 返回值:是否超时        private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {            synchronized(AsyncTimeout::class.java) {                // 线性遍历超时队列,将 node 节点移除                var prev = head                while (prev != null) {                    if (prev.next === node) {                        prev.next = node.next                        node.next = null                        return false                    }                    prev = prev.next                }                // 如果节点不存在,阐明被 WatchDog 线程移除,即产生超时                return true            }        }    }}

同时,在首次增加异步超时监控时,AsyncTimeout 外部会开启一个 WatchDog 守护线程,依照 “检测 - 期待” 模型察看超时队列的头节点:

  • 如果产生超时,则将头节点移除,并回调 AsyncTimeout#timeOut() 办法。这是一个空办法,须要由子类实现来被动勾销工作;
  • 如果未产生超时,则 WatchDog 线程会计算间隔超时产生的工夫距离,调用 Object#wait(工夫距离) 进入限时期待。

须要留神的是: AsyncTimeout#timeOut() 回调中不能执行耗时操作,否则会影响后续检测的及时性。

有意思的是:咱们会发现 Okio 的超时检测机制和 Android ANR 的超时检测机制十分相似,所以咱们能够说 ANR 也是一种异步超时机制。

AsyncTimeout.kt

private class Watchdog internal constructor() : Thread("Okio Watchdog") {    init {        // 守护线程        isDaemon = true    }    override fun run() {        // 死循环        while (true) {            try {                var timedOut: AsyncTimeout? = null                synchronized(AsyncTimeout::class.java) {                    // 取头节点(Maybe wait)                    timedOut = awaitTimeout()                    // 超时队列为空,退出线程                    if (timedOut === head) {                        head = null                        return                    }                }                // 超时产生,触发 AsyncTimeout#timedOut 回调                timedOut?.timedOut()            } catch (ignored: InterruptedException) {            }        }    }}companion object {    // 超时队列为空时,再期待一轮的工夫    private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60)    private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)    @Throws(InterruptedException::class)    internal fun awaitTimeout(): AsyncTimeout? {        // Get the next eligible node.        val node = head!!.next        // 如果超时队列为空        if (node == null) {            // 须要再期待 60s 后再判断(例如在首次增加监控时)            val startNanos = System.nanoTime()            (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)            return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {                // 退出 WatchDog 线程                head            } else {                // WatchDog 线程从新取一次                null            }        }        // 计算以后工夫间隔超时产生的工夫        var waitNanos = node.remainingNanos(System.nanoTime())        // 未超时,进入限时期待        if (waitNanos > 0) {            // Waiting is made complicated by the fact that we work in nanoseconds,            // but the API wants (millis, nanos) in two arguments.            val waitMillis = waitNanos / 1000000L            waitNanos -= waitMillis * 1000000L            (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())            return null        }        // 超时,将头节点移除        head!!.next = node.next        node.next = null        return node    }}

异步超时示意图

间接看代码不好了解,咱们来举个例子:


4. 举例:OkHttp Call 的异步超时监控

在 OkHttp 中,反对配置一次残缺的 Call 申请上的操作工夫 callTimeout。一次 Call 申请蕴含多个 IO 操作的复合工作,应用传统 IO 是不可能监控超时的,所以须要应用 AsyncTimeout 异步超时。

在 OkHttp 的 RealCall 申请类中,就应用了 AsyncTimeout 异步超时:

  • 1、开始工作: 在 execute() 办法中,调用 AsyncTimeout#enter() 进入异步超时监控,再执行申请;
  • 2、结束任务: 在 callDone() 办法中,调用 AsyncTimeout#exit() 退出异步超时监控。剖析源码发现:callDone() 不仅在申请失常时会调用,在勾销申请时也会回调,保障了 enter() 和 exit() 成对存在;
  • 3、超时回调:AsyncTimeout#timeOut 超时回调中,调用了 Call#cancel() 提前勾销申请。Call#cancel() 会调用到 Socket#close(),让阻塞中的 IO 操作抛出 SocketException 异样,以达到提前中断的目标,最终也会走到 callDone() 执行 exit() 退出异步监控。

Call 超时监控示意图

RealCall

class RealCall(    val client: OkHttpClient,    /** The application's original request unadulterated by redirects or auth headers. */    val originalRequest: Request,    val forWebSocket: Boolean) : Call {    // 3、AsyncTimeout 超时监控    private val timeout = object : AsyncTimeout() {        override fun timedOut() {            // 勾销申请            cancel()        }    }.apply {        timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)    }    // 勾销申请    override fun cancel() {        if (canceled) return // Already canceled.        canceled = true        exchange?.cancel()        // 最终会调用 Socket#close()        connectionToCancel?.cancel()        eventListener.canceled(this)    }    // 1、申请开始(由业务层调用)    override fun execute(): Response {        // 1.1 异步超时监控进入        timeout.enter()        // 1.2 执行申请        client.dispatcher.executed(this)        return getResponseWithInterceptorChain()    }    // 2、申请完结(由 OkHttp 引擎层调用,蕴含失常和异常情况)    // 除了 IO 操作在抛出异样后会走到 callDone(),在勾销申请时也会走到 callDone()    internal fun <E : IOException?> messageDone(        exchange: Exchange,        requestDone: Boolean, // 申请失常完结        responseDone: Boolean, // 响应失常完结        e: E    ): E {        ...        if (callDone) {            return callDone(e)        }        return e    }    private fun <E : IOException?> callDone(e: E): E {        ...        // 查看是否超时        val result = timeoutExit(e)        if (e != null) {            // 申请异样(蕴含超时异样)            eventListener.callFailed(this, result!!)        } else {            // 申请失常完结            eventListener.callEnd(this)        }        return result    }    private fun <E : IOException?> timeoutExit(cause: E): E {        if (timeoutEarlyExit) return cause        // 2.1 异步超时监控退出        if (!timeout.exit()) return cause        // 2.2 包装超时异样        val e = InterruptedIOException("timeout")        if (cause != null) e.initCause(cause)        return e as E    }}

调用 Socket#close() 会让阻塞中的 IO 操作抛出 SocketException 异样:

Socket.java

// Any thread currently blocked in an I/O operation upon this socket will throw a {@link SocketException}.public synchronized void close() throws IOException {    synchronized(closeLock) {        if (isClosed())            return;        if (created)            impl.close();        closed = true;    }}

Exchange 中会捕捉 Socket#close() 抛出的 SocketException 异样:

Exchange.kt

private inner class RequestBodySink(    delegate: Sink,    /** The exact number of bytes to be written, or -1L if that is unknown. */    private val contentLength: Long) : ForwardingSink(delegate) {    @Throws(IOException::class)    override fun write(source: Buffer, byteCount: Long) {        ...        try {            super.write(source, byteCount)            this.bytesReceived += byteCount        } catch (e: IOException) {            // Socket#close() 会抛出异样,被这里拦挡            throw complete(e)        }    }    private fun <E : IOException?> complete(e: E): E {        if (completed) return e        completed = true        return bodyComplete(bytesReceived, responseDone = false, requestDone = true, e = e)    }}fun <E : IOException?> bodyComplete(    bytesRead: Long,    responseDone: Boolean,    requestDone: Boolean,    e: E): E {    ...    // 回调到下面的 RealCall#messageDone    return call.messageDone(this, requestDone, responseDone, e)}

5. OkHttp 超时检测总结

先说一下 Okhttp 定义的 2 种颗粒度的超时:

  • 第 1 种是在单次 connect、read 或 write 操作上的超时;
  • 第 2 种是在一次残缺的 call 申请上的超时,有时候单个操作没有超时,但连接起来的残缺 call 却超时。

其实 Socket 反对通过 setSoTimeout API 设置单次操作的超时工夫,但这个 API 无奈满足需要,比如说 Call 超时是蕴含多个 IO 操作的复合工作,而且不论是 HTTP/1 并行申请还是 HTTP/2 多路复用,都会存在一个 Socket 连贯上同时承载多个申请的状况,无奈辨别是哪个申请超时。

因而,OkHttp 采纳了两种超时监测:

  • 对于 connect 操作,OkHttp 持续应用 Socket 级别的超时,没有问题;
  • 对于 call、read 和 write 的超时,OkHttp 应用一个 Okio 的异步超时机制来监测超时。

参考资料

  • Github · Okio
  • Okio 官网