共计 15961 个字符,预计需要花费 40 分钟才能阅读完成。
本篇文章次要剖析 Okio 读写流程以及超时检测机制。首先会介绍 Okio 中几个重要的类,而后提供一段用 Okio api 实现读写文件代码,依据这段代码进行整体读写流程剖析,以及剖析 Okio 为什么比间接应用 Java io 高效,最初介绍了在读写时 Okio 如何进行超时检测。
1.OKio 介绍
Okio 作为 Okhttp 底层 io 库,它补充了 java.io 和 java.nio 的有余,使拜访、存储和解决数据更加容易。
Okio 中几个重要的类介绍
- <font color=’red’>
ByteString
</font> 是不可变的字节序列。对于字符数据,最根本的就是 String。而 ByteString 就像是 String 的兄弟个别,它使得将二进制数据作为一个变量值变得容易。这个类很聪慧:它晓得如何将本人编码和解码为十六进制、base64 和 utf-8。 - <font color=’red’>
Segment
</font> Segment 在 Okio 中作为数据缓冲的载体,一个 Segment 的数据缓冲大小为 8192, 即 8k。每一个 Segment 都有前驱和后继结点,也就是说 Sement 是一个双向链表链表, 精确的来说是一个双向循环链表。读取数据从 Segment 头结点读取写数据从 Segment 尾结点写。Okio 中引入池的概念也就是源码中 SegmentPool 的实现。SegmentPool 负责 Segment 创立和销毁,SegmentPool 最大能够缓存 8 个 Segment。 - <font color=’red’>
Buffer
</font> 是一个可变的字节序列。像 Arraylist 一样。得益于它的底层由 Segment 实现因而你不须要事后设置缓冲区的大小,
当你将数据从一个缓冲区挪动到另一个缓冲区时,它会重新分配 Segment 的持有关系,而不是跨 Segment 复制数据。其中 Buffer 实现了 BufferedSource 和 BufferedSink,同时具读写性能。 -
<font color=’red’>
Sources
</font> 相似于 java 中的 InputStream,Source 作为 Okio 中读取数据的顶层接口只提供了简略的 apilong read(Buffer sink, long byteCount) throws IOException; Timeout timeout(); void close() throws IOException;
更多读取 api 由它的子接口 BufferedSource 提供,实现类为 RealBufferdSource,底层 InputStream->Buffer, 而后基于 Buffer 的读取。
-
<font color=’red’>
Sink
</font> 相似于 java 中的 OutPutStream,Sink 作为 Okio 中写入数据的顶层接口也只提供了简略的 apivoid write(Buffer source, long byteCount) throws IOException; void flush() throws IOException; Timeout timeout(); void close() throws IOException;
更多写入 api 由它的子接口 BufferedSink 提供,
实现类为 RealBufferedSink,底层将数据写入到 Buffer,再由 Buffer 写入到 OutPutStream 中。
这里省略了 GzipSource,GzipSink,HashingSink,HashingSource… 等其余实现 Source 和 Sink 的类,只关注主流程。
依据后面介绍和 UML 图得悉, 数据的读写在 RealBufferedSource 和 RealBufferedSink 中实现
2.Okio 读写流程
作为一个简略切入点,这里提供一段 Okio 实现的输出流写入到指定文件的代码。
/***
* 将字节输出流写入到指定文件中
* @return true 写入胜利,false 写入失败
*/
fun copy(inputStream: InputStream, dest: File): Boolean {val source = Okio.buffer(Okio.source(inputStream))
val sink = Okio.buffer(Okio.sink(dest))
val buffer = Buffer()
return try {var length = source.read(buffer, 8192L)
while (-1L != length) {sink.write(buffer, length)
sink.flush()
length = source.read(buffer, 8192L)
}
true
} catch (e: Exception) {e.printStackTrace()
false
} finally {source.close()
sink.close()}
}
Okio.source(inputStream)实现了对 InputStream 的包装,将 InputStream 包装在 Source 对象中并返回。
private static Source source(final InputStream in, final Timeout timeout) {
...
return new Source() {@Override public long read(Buffer sink, long byteCount) throws IOException {
...
if (byteCount == 0) return 0;
try {
// 超时查看
timeout.throwIfReached();
// 从 SegmentPool 中获取 Segment
Segment tail = sink.writableSegment(1);
// 依据 Segment 中可用大小计算最大能够往 Segment 中写多少字节
int maxToCopy = (int)
Math.min(byteCount, Segment.SIZE - tail.limit);
// 从 inputStream 中将数据写到 Segment 中
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
// 如果读完则返回
if (bytesRead == -1) return -1;
// 追加曾经写入的数据量, 用于下次将数据从 limit 地位开始写入,也就是 limit 之前都是写入的数据
tail.limit += bytesRead;
// 修改 buffer 中存储的字节数量
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
@Override public void close() throws IOException {in.close();
}
@Override public Timeout timeout() {return timeout;}
@Override public String toString() {return "source(" + in + ")";
}
};
}
//Buffer#writableSegment
Segment writableSegment(int minimumCapacity) {
...
//1
if (head == null) {head = SegmentPool.take();
return head.next = head.prev = head;
}
//2
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {tail = tail.push(SegmentPool.take());
}
return tail;
}
- 如果链表头为空, 则从 SegmentPool 中获取新节点并指向 head 结点返回。
- 因为是双向循环列表所以 head.prev 始终获取的是尾结点 (当链表长度为 1 时指向本人), 如果 tail 结点存储数据的容量已满或者 tail.owner 为 false(即该 sement 不能追加写入) 则从 SementPool 中获取新节点插入到该结点尾部, 并返回新节点。
Okio.buffer(Okio.source(src))等价于 Okio.buffer(source), 将 source 包装在 RealBufferedSource(source)内并返回。
val source = Okio.buffer(Okio.source(src))
source.read(buffer,8192L)
执行 read((buffer,8192L)),理论是调用的 RealBufferedSource.read(buffer,byteCount)。而 RealBufferedSource.read(buffer,byteCount)外部又会调用被包装的 Source 的 read(buffer,length)。即咱们下面剖析过的读数据代码。
//RealBufferedSource#read(buffer,8192L)
public long read(Buffer sink, long byteCount) throws IOException {if (buffer.size == 0) {
// 调用被包装的 Source。// 即从 InputStream 中读取 Segment.SIZE 个字节写入到 buffer 中
long read =
source.read(buffer,Segment.SIZE);
// 如果未读到数据则返回
if (read == -1) return -1;
}
// 读到数据, 此时数据保留在 buffer 中,接下来将 buffer 写入到 sink 中。行将一个缓冲区写到另一个缓冲区。long toRead = Math.min(byteCount, buffer.size);
return buffer.read(sink, toRead);
}
//Buffer#read(sink,toRead)
public long read(Buffer sink, long byteCount) {if (size == 0) return -1L;
if (byteCount > size) byteCount = size;
//Okio 高效的中央就是 buffer#write()的实现, 前面会详细分析。这里先了解为将 buffer 中的数据写入到内部传递进来的 sink 中
sink.write(this, byteCount);
return byteCount;
}
Okio.sink(dest)将 File 转换为 OutPutStream, 而后包装在 Sink 对象中。
public static Sink sink(OutputStream out) {return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
...
return new Sink() {@Override public void write(Buffer source, long byteCount) throws IOException {checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// 超时检测
timeout.throwIfReached();
// 获取链表头结点
Segment head = source.head;
// 计算一次能够读多少字节
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
// 从 head 中读取数据写入到 OutPutStream 中
out.write(head.data, head.pos, toCopy);
// 修改 head 读到哪个地位, 下次持续从 pos 地位开始读
head.pos += toCopy;
// 递加直到 byteCount= 0 退出循环即示意本次写完
byteCount -= toCopy;
// 修改 buffer 中存储的字节大小
source.size -= toCopy;
// 如果该 Segment 曾经读完
if (head.pos == head.limit) {
// 从链表中删除 head 并将 head 的下个结点赋值给 head
source.head = head.pop();
// 回收 head 结点
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {out.flush();
}
@Override public void close() throws IOException {out.close();
}
@Override public Timeout timeout() {return timeout;}
@Override public String toString() {return "sink(" + out + ")";
}
};
}
Okio.buffer(Okio.sink(dest))等价于 Okio.buffer(Sink),将 Sink 包装在 RealBufferedSink(Sink)内并返回。
val sink = Okio.buffer(Okio.sink(dest))
sink.write(buffer, length)
sink.flush()
执行 write(buffer, length)和 flush()其实调用的是 RealBufferedSink 的 write(buffer,length)和 flush()。RealBufferedSink 的 write(buffer,length)和 flush()最终会调用被包装的 Sink 的 write(buffer,length)和 flush()。
//RealBufferedSink#write(Buffer source, long byteCount)
public void write(Buffer source, long byteCount)
throws IOException {
// 将 source 中的数据写入到 buffer 中,Okio 高效的中央就是 buffer#write 的实现
buffer.write(source, byteCount);
emitCompleteSegments();}
public BufferedSink emitCompleteSegments() throws IOException {long byteCount = buffer.completeSegmentByteCount();
// 查看缓冲区是否被写满, 写满则将数据写入到 OutPutStream 中。未写满则等到下次写满或调用 flush 或 close 时将数据写入到 OutPutStream 中, 起到一个缓冲作用。if (byteCount > 0) {// 调用被包装的 Sink#write(buffer, byteCount)
// 将 buffer 中的数据写入到 OutPutStream 中
sink.write(buffer, byteCount);
}
return this;
}
public long completeSegmentByteCount() {
long result = size;
if (result == 0) return 0;
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {result -= tail.limit - tail.pos;}
return result;
}
至此 Okio 的输出到输入根本流程已剖析完。依据源码剖析可知 Okio 就是对 Java io 的一个封装和优化, 底层还是应用的 InputStream 和 OutputStream。既然和 Java io 底层应用一样形式读和写,那么它劣势体现在哪里呢?有人可能会说他体现在 api 的简洁上,构造清晰,链式编程,调用不便。说的对,这算是它的劣势,而这劣势并不能压服我摈弃 Java io 而应用它,其实你也能够基于 java io 封装一套链式编程。它和间接应用 Java io 的最大劣势并不 api 的简洁上,而是 io 流拷贝的效率上以及对内存的复用上,下节中会具体介绍。
3.Okio 为什么比间接应用 Java io 更有劣势
上节中咱们提到 Okio 和间接应用 Java io 的最大劣势并不 api 的简洁上,而是 io 流拷贝的效率上以及对内存的复用上。在说这两个劣势之前咱们先看看间接应用 Java io 和 Okio 从输出到输入都通过哪些步骤。
- Java io
InputStream –> BufferedInputStream –> 长期 byte 数组 –> BufferedOutPutStream –> OutPutStream
由此可见 Java io 从输出到输入流程中呈现了长期 byte 数组。意味着从 BufferedInputStream-> 长期 byte 数组拷贝一次数据,从长期 byte 数组 ->BufferedOutPutStream 再拷贝一次数据。
- Okio
InputStream –> inBuffer –> 长期 buffer –> outBuffer –> OutPutStream
看起来两头局部和 Java io 步骤一样。实则不然,Java io 咱们方才说过经验两次拷贝,而 Okio 两头局部 inBuffer -> 长期 buffer->outBuffer 其实不齐全是数据的拷贝。在剖析buffer->buffer 时咱们会详细描述为什么不齐全是数据拷贝。buffer->buffer 定义在 Buffer#write(Buffer source, long byteCount) 办法中。依据 wirte 办法正文看出 Okio 在实现 buffer->buffer 有两个指标。
此处引入该博主对正文的翻译
- 不要节约 CPU
不要节约 CPU 即不要到处复制数据,从将整个 Segments 从一个缓冲区重新分配到另一个缓冲区。
- 不要节约内存
Segment 作为一个不可变量,缓冲区中除了头节点和尾节点的片段以外,相邻的片段,至多应该保障 50% 以上的数据负载量(指的是 Segment 中的 data 数据, Okio 认为 data 数据量在 50% 以上才算是被无效利用的)。因为头结点中须要读取耗费字节数据,而尾节点中须要写入产生字节数据,因而头结点和尾节点是不能放弃不变性的。
在缓冲区之间挪动片段
在将一个缓冲区写入另一个缓冲区时,咱们更喜爱重新分配整个段,将字节复制到最紧凑的模式。假如咱们有一个缓冲区,其中的片段负载为 [91%,61%],如果咱们要在这下面附加一个负载量为[72%] 的繁多片段,这样将产生的后果为[91%,61%,72%]。这期间不会进行任何的字节复制操作。(即空间换工夫,就义内存,提供速度)
再假如,咱们有一个缓冲区负载量为:[100%,2%],并且咱们心愿将其附加到一个负载量为 [99%,3%] 的缓冲区中。这个操作将产生以下局部:[100%、2%、99%、3%],也就是说,咱们不会花工夫去复制字节来进步内存的应用效率,如变成 [100%,100%,4%] 这样。(即这种状况下 Okio 不会采取工夫换空间的策略,因为太节约 CPU)
在合并缓冲区时,当相邻缓冲区的合并级别不超过 100% 时,咱们将压缩相邻缓冲区。例如,当咱们在 [100%,40%] 根底上附加 [30%,80%] 时,后果将会是[100%,70%,80%]。(也就是两头相邻的负载为 40% 和 30% 的两个 Segment 将会被合并为一个负载为 70% 的 Segment)
宰割片段
有时咱们只想将 source buffer 中的一部分写入到 sink buffer 当中,例如,给定一个 sink 为 [51%,91%],当初咱们想要将一个 source 为 [92%,82%] 的前 30% 写入到这个 sink buffer 当中。为了简化,咱们首先将 source buffer 转换为等效缓冲区 [30%,62%,82%](即拆分 Segment),而后挪动 source 的头结点 Segment 即可,最终生成 sink[51%,91%,30%] 和 source[62%,82%]。
依据下面正文的定义,咱们可知在进行 buffer 数据转移时,依据不同策略执行不同操作以达到 CPU 和内存之间的均衡,那么来看下 buffer 转移的代码实现。
//Buffer#write
public void write(Buffer source, long byteCount) {while (byteCount > 0) {
// 如果复制的数据量比原缓冲区已有数据量小
if (byteCount < (source.head.limit - source.head.pos)) {
// 获取指标缓冲区尾结点
Segment tail = head != null ? head.prev : null;
// 如果指标缓冲区尾结点不为空,并且是数据拥有者即能够追加数据并且指标缓冲区能够存下该数据
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {// 如将 [10%] 追加到[20%], 间接拷贝。最终后果[30%]
// 将原缓冲区拷贝到指标缓冲区
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
// 如果指标缓冲区尾结点为空即指标缓冲区为空缓冲区 或者不为空然而的空间有余,或者不是持有者,这时就须要把原缓冲区的头结点宰割为两个 Segment,// 而后将原缓冲区的头指针更新为宰割后的第一个 Segment, 如 [92%, 82%] 变成 [30%, 62%, 82%] 这样
source.head = source.head.split((int) byteCount);
}
}
// 从原缓冲区的链表中移除头结点, 并退出到指标缓冲区的尾结点
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
source.head = segmentToMove.pop();
// 如果指标缓冲区为空,则创立链表并将原缓冲区的链表头结点赋值给指标缓冲区结点的头结点
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {// 指标缓冲区不为空, 则向指标缓冲区链表追加原缓冲区结点的头结点。并尝试合并,如 [60%,20%] 追加[10%]。那么指标缓冲区结点为[60%,20%,10%]。而后合并后为[60%,30%]。// 合并胜利回收多余结点以节俭空间
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
依据下面源码剖析以及正文来答复为什么 Okio 比 Java io 高效。
Java 中读写数据个别为了高效咱们引入 BufferedInputStream 和 BufferedOutPutStream。这里以 BufferedInputStream 读写磁盘文件为例剖析。在 BufferedInputStream 中当一次读取的字节数大于缓冲区大小会摒弃缓冲区, 间接从磁盘中读取。
如果一次读取的字节数小于缓冲区大小,则先从磁盘中读取缓冲区大小个字节(BufferedInputStream 中默认定义为 8k)。而后每次从缓冲区读取设置的读取数量。直到缓冲区读完。而后再从磁盘中读取 … 直到整个磁盘数据读完
而 Okio 读取时不论你读取的字节长度是否大于缓冲区大小。间接读取 8k 数据到缓冲区,而后依据你设置的读取大小和以后缓冲区已有数据大小做比拟取最小值来进行数据转移。
举个例子
比方数据 16K, 读取一次到长期变量:
读取大小设置为 4k
- Okio 经验 0 次拷贝,inBuffer-> 长期 buffer,只是宰割 inBuffer 数据, 将宰割后的数据赋值给长期 buffer, 只是指针的批改
- 而 Java io 读取一次到长期变量经验 1 次拷贝,即 buffer-> 长期 byte 数组。
读取大小设置为 8k
- Okio 经验 0 次拷贝,inBuffer-> 长期 buffer,只是指针的批改
- 而 Java io 读取一次到长期变量经验 0 次拷贝,因为大于等于缓冲区大小则间接从磁盘读取即 InputStream-> 长期 byte 数组。
读取大小设置为 16k
- Okio 经验 0 次拷贝,inBuffer-> 长期 buffer,只是指针的
批改。然而经验两次 read 即经验两次指针批改。 - 而 Java io 读取一次到长期变量经验 0 次拷贝,因为大于等于缓冲区大小则间接从磁盘读取即 InputStream-> 长期 byte 数组。经验一次 read,然而节约内存。
从下面举例说明中能够看出 Okio 在 CPU 和内存做了很好的衡量, 超过 8k 就只读 8k, 缩小一次性加载到内存的数据。
没超过 8k,数据的复制也只是批改链表指针。
小结:
Okio 比间接应用 Java io 高效得益于它底层对缓冲区的实现构造,将数据的缓冲区定义为链表构造是为了更好从缓冲区到缓冲区数据的挪动,即不节约 CPU(不到处复制数据),在内存方面它引入 SegmentPool 来复用 Segment。毕竟间接开拓一个 8k 的 byte[]还是很节约的。以及对缓冲区链表结点的数据进行压缩解决缩小不必要的内存开销。
4.Okio 的超时检测
超时机制分为同步检测和异步检测机制,先从简略的开始。
上面以读取数据检测超时为例进行阐明。
4.1 同步检测
通过后面剖析,调用 read()时其实调用了 RealBufferedSource#read()。而 RealBufferedSource#read()又会调用被包装的 Source,即 Okio#source()创立的 Source 的 read()。
//Okio#source()返回的 Source
private static Source source(final InputStream in, final Timeout timeout) {
...
return new Source() {public long read(Buffer sink, long byteCount) throws IOException {
...
try {
// 超时检测
timeout.throwIfReached();
...
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
...
return bytesRead;
} catch (AssertionError e) {if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
...
};
}
//Timeout
public class Timeout {
private boolean hasDeadline;
private long deadlineNanoTime;
private long timeoutNanos;
public Timeout() {}
public Timeout deadlineNanoTime(long deadlineNanoTime) {
this.hasDeadline = true;
this.deadlineNanoTime = deadlineNanoTime;
return this;
}
public void throwIfReached() throws IOException {if (Thread.interrupted()) {throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {throw new InterruptedIOException("deadline reached");
}
}
}
依据代码剖析在每次调用 read()都会调用 timeout#throwIfReached(), 联合 Timeout 类中定义,当调用 deadlineNanoTime()设置截止工夫,hasDeadline,deadlineNanoTime 会被赋值。即 throwIfReached()的调用才会起到查看超时作用,也就是同步检测超时机制就是依据工夫的流逝来判断是否超时。
4.2 异步检测
异步检测 Okio 用在对 Socket 输出流的读取和输入流的写入检测,这里仅以输出流检测为例进行阐明。先对异步检测整体设计形容,让咱们对整体上有一个
宏观上的意识;不至于在剖析源码时抓不住重点,最初再对代码实现进行详细分析。
异步检测整体设计如下:
- 构造上应用单链表作为检测超时的构造,将超时工夫封装到结点中,依照超时工夫的升序插入到链表中。也就是马上要过期的结点为头结点的下个结点。而头结点在这里起到了看门狗的作用。所以头节点放弃不变。
- 当开始从 socket#inputStream 中读取时,启动一个监督线程(Watchdog)。一直的获取头结点的下个结点即被监督的结点并判断是否为空, 为空则期待 60S,60S 后如果还为空则退出监视器;不为空则取出该结点存储的超时工夫判断是否超时。如果没超时则期待该结点存储的超时工夫,工夫到后或者被链表插入操作唤醒,则会走一遍流程;如果超时了则删除该结点,并敞开 socket。
- 整个过程如果没有产生超时,则在读取完后删除被监督的结点。直到监督线程 wait()期待设置的工夫后发现没有须要监督的结点了,而后退出整个监督线程。或者还在 wait()中时,又 read()了一次,即链表中增加了新的被监督结点。这时 wait 被唤醒,唤醒后开始监督新的结点。
上面对代码进行剖析,从以 Socket 创立 Source 开始
//Okio.source(socket)
public static Source source(Socket socket) throws IOException {// 第一步 创立 AsyncTimeout 并包装 socket,包装是为了在超时是调用 timedOut()来敞开 socket。AsyncTimeout 是 Timeout 的子类
AsyncTimeout timeout = timeout(socket);
// 第二步 创立 source 并包装 socket.getInputStream()
Source source = source(socket.getInputStream(), timeout);
// 第三步 创立 source 并包装第二步的 source, 也就是当内部调用 source.read 时,其实调用的是第二步的 read,
而第三步的包装是为了在第二步 read 时多加一层监督
return timeout.source(source);
}
// 创立 AsyncTimeout 并包装 socket,AsyncTimeout 是 Timeout 的子类
private static AsyncTimeout timeout(final Socket socket) {return new AsyncTimeout() {@Override protected IOException newTimeoutException(@Nullable IOException cause) {...}
@Override protected void timedOut() {
try {socket.close();
} catch (Exception e) {...}
}
};
}
// 第二步 创立 source 并包装 socket.getInputStream()
private static Source source(final InputStream in, final Timeout timeout) {
...
return new Source() {@Override public long read(Buffer sink, long byteCount) throws IOException {
...
try {
...
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
...
return bytesRead;
} catch (AssertionError e) {if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
...
};
}
// 第三步 创立 source 并包装第二步的 source, 也就是当内部调用 source.read 时,// 其实调用的是第二步的 read,
// 而第三步的包装是为了在 read 时多加一层超时检测
public final Source source(final Source source) {return new Source() {@Override public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
// 在 enter 办法外部启动超时检测
enter();
try {long result = source.read(sink, byteCount);
throwOnTimeout = true;
return result;
} catch (IOException e) {throw exit(e);
} finally {
//read 执行结束删除被监督的结点
exit(throwOnTimeout);
}
}
@Override public void close() throws IOException {
boolean throwOnTimeout = false;
try {source.close();
throwOnTimeout = true;
} catch (IOException e) {throw exit(e);
} finally {exit(throwOnTimeout);
}
}
@Override public Timeout timeout() {return AsyncTimeout.this;}
...
};
}
public final void enter() {if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {return;}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
private static synchronized void scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {if (head == null) {head = new AsyncTimeout();
// 启动监视器
new Watchdog().start();
}
long now = System.nanoTime();
...
node.timeoutAt = now + timeoutNanos;
// 依照超时工夫升序插入到链表中, 头结点后的结点就是行将超时的节点
// 还有多长时间就超时了
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
// 如果是插入到头节点后, 那么唤醒监视器
AsyncTimeout.class.notify();}
break;
}
}
}
private static final class Watchdog extends Thread {Watchdog() {super("Okio Watchdog");
setDaemon(true);
}
public void run() {while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {timedOut = awaitTimeout();
if (timedOut == null) continue;
if (timedOut == head) {
head = null;
return;
}
}
// 示意读取超时敞开 socket
timedOut.timedOut();} catch (InterruptedException ignored) {}}
}
}
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {// 依据头结点 (看门狗) 获取被监督的结点
AsyncTimeout node = head.next;
// 如果不存在被监督的结点则期待 IDLE_TIMEOUT_MILLIS 秒(60s)。// 期待后还是不存在并且工夫曾经超过了 60s, 则返回头节点 (看门狗) 而后退出监视器。// 期待后不为空即存在被监督的结点, 则返回 null,持续下次循环, 循环后 node!=null。if (node == null) {long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
// 还有多长时间过期
long waitNanos = node.remainingNanos(System.nanoTime());
// 还没有超时,期待
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
// 如果是期待后,则阐明超时了,下次循环 waitNanos<=0。走上面代码
// 如果是被 notifyAll 后,阐明有新结点的插入, 则须要从新判断
return null;
}
// 被监督的结点已超时,删除被被监督的结点,并返回用于内部调用该结点的 timedOut()来敞开数据流。head.next = node.next;
node.next = null;
return node;
}