本篇文章次要剖析Okio读写流程以及超时检测机制。首先会介绍Okio中几个重要的类,而后提供一段用Okio api 实现读写文件代码,依据这段代码进行整体读写流程剖析,以及剖析Okio为什么比间接应用Java io 高效,最初介绍了在读写时Okio如何进行超时检测。
1.OKio介绍
Okio作为Okhttp底层io库,它补充了java.io和java.nio的有余,使拜访、存储和解决数据更加容易。
Okio中几个重要的类介绍
- <font color='red'>
ByteString
</font> 是不可变的字节序列。对于字符数据,最根本的就是String。而ByteString就像是String的兄弟个别,它使得将二进制数据作为一个变量值变得容易。这个类很聪慧:它晓得如何将本人编码和解码为十六进制、base64和utf-8。 - <font color='red'>
Segment
</font> Segment在Okio中作为数据缓冲的载体,一个Segment的数据缓冲大小为8192,即8k。每一个Segment都有前驱和后继结点,也就是说Sement是一个双向链表链表,精确的来说是一个双向循环链表。读取数据从Segment头结点读取写数据从Segment尾结点写。Okio中引入池的概念也就是源码中SegmentPool的实现。SegmentPool负责Segment创立和销毁,SegmentPool最大能够缓存8个Segment。 - <font color='red'>
Buffer
</font> 是一个可变的字节序列。像Arraylist一样。得益于它的底层由Segment实现因而你不须要事后设置缓冲区的大小,
当你将数据从一个缓冲区挪动到另一个缓冲区时,它会重新分配Segment的持有关系,而不是跨Segment复制数据。其中Buffer实现了BufferedSource和BufferedSink,同时具读写性能。 <font color='red'>
Sources
</font> 相似于java中的InputStream,Source作为Okio中读取数据的顶层接口只提供了简略的apilong read(Buffer sink, long byteCount) throws IOException;Timeout timeout();void close() throws IOException;
更多读取api由它的子接口BufferedSource提供,实现类为RealBufferdSource,底层InputStream->Buffer,而后基于Buffer的读取。
<font color='red'>
Sink
</font> 相似于java中的OutPutStream,Sink作为Okio中写入数据的顶层接口也只提供了简略的apivoid write(Buffer source, long byteCount) throws IOException;void flush() throws IOException;Timeout timeout();void close() throws IOException;
更多写入api由它的子接口BufferedSink提供,
实现类为RealBufferedSink,底层将数据写入到Buffer,再由Buffer写入到OutPutStream中。
这里省略了GzipSource,GzipSink,HashingSink,HashingSource...等其余实现Source和Sink的类,只关注主流程。
依据后面介绍和UML图得悉,数据的读写在RealBufferedSource和RealBufferedSink中实现
2.Okio读写流程
作为一个简略切入点,这里提供一段Okio实现的输出流写入到指定文件的代码。
/*** * 将字节输出流写入到指定文件中 * @return true 写入胜利,false 写入失败 */ fun copy(inputStream: InputStream, dest: File): Boolean { val source = Okio.buffer(Okio.source(inputStream)) val sink = Okio.buffer(Okio.sink(dest)) val buffer = Buffer() return try { var length = source.read(buffer, 8192L) while (-1L != length) { sink.write(buffer, length) sink.flush() length = source.read(buffer, 8192L) } true } catch (e: Exception) { e.printStackTrace() false } finally { source.close() sink.close() } }
Okio.source(inputStream)实现了对InputStream的包装,将InputStream包装在Source对象中并返回。
private static Source source(final InputStream in, final Timeout timeout) { ... return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { ... if (byteCount == 0) return 0; try { //超时查看 timeout.throwIfReached(); //从SegmentPool中获取Segment Segment tail = sink.writableSegment(1); //依据Segment中可用大小计算最大能够往Segment中写多少字节 int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); //从inputStream中将数据写到Segment中 int bytesRead = in.read(tail.data, tail.limit, maxToCopy); //如果读完则返回 if (bytesRead == -1) return -1; //追加曾经写入的数据量,用于下次将数据从limit地位开始写入,也就是limit之前都是写入的数据 tail.limit += bytesRead; //修改buffer中存储的字节数量 sink.size += bytesRead; return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } } @Override public void close() throws IOException { in.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "source(" + in + ")"; } }; } //Buffer#writableSegment Segment writableSegment(int minimumCapacity) { ... //1 if (head == null) { head = SegmentPool.take(); return head.next = head.prev = head; } //2 Segment tail = head.prev; if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) { tail = tail.push(SegmentPool.take()); } return tail; }
- 如果链表头为空,则从SegmentPool中获取新节点并指向head结点返回。
- 因为是双向循环列表所以head.prev始终获取的是尾结点(当链表长度为1时指向本人),如果tail结点存储数据的容量已满或者tail.owner为false(即该sement不能追加写入)则从SementPool中获取新节点插入到该结点尾部,并返回新节点。
Okio.buffer(Okio.source(src))等价于Okio.buffer(source),将source包装在RealBufferedSource(source)内并返回。
val source = Okio.buffer(Okio.source(src))source.read(buffer,8192L)
执行read((buffer,8192L)),理论是调用的RealBufferedSource.read(buffer,byteCount)。而RealBufferedSource.read(buffer,byteCount)外部又会调用被包装的Source的read(buffer,length)。即咱们下面剖析过的读数据代码。
//RealBufferedSource#read(buffer,8192L)public long read(Buffer sink, long byteCount) throws IOException { if (buffer.size == 0) { //调用被包装的Source。 //即从InputStream中读取Segment.SIZE个字节写入到buffer中 long read = source.read(buffer,Segment.SIZE); //如果未读到数据则返回 if (read == -1) return -1; } //读到数据,此时数据保留在buffer中,接下来将buffer写入到sink中。行将一个缓冲区写到另一个缓冲区。 long toRead = Math.min(byteCount, buffer.size); return buffer.read(sink, toRead); }//Buffer#read(sink,toRead)public long read(Buffer sink, long byteCount) { if (size == 0) return -1L; if (byteCount > size) byteCount = size; //Okio高效的中央就是buffer#write()的实现,前面会详细分析。这里先了解为将buffer中的数据写入到内部传递进来的sink中 sink.write(this, byteCount); return byteCount; }
Okio.sink(dest)将File转换为OutPutStream,而后包装在Sink对象中。
public static Sink sink(OutputStream out) { return sink(out, new Timeout()); } private static Sink sink(final OutputStream out, final Timeout timeout) { ... return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0) { //超时检测 timeout.throwIfReached(); //获取链表头结点 Segment head = source.head; //计算一次能够读多少字节 int toCopy = (int) Math.min(byteCount, head.limit - head.pos); //从head中读取数据写入到OutPutStream中 out.write(head.data, head.pos, toCopy); //修改head读到哪个地位,下次持续从pos地位开始读 head.pos += toCopy; //递加直到byteCount=0退出循环即示意本次写完 byteCount -= toCopy; //修改buffer中存储的字节大小 source.size -= toCopy; //如果该Segment曾经读完 if (head.pos == head.limit) { //从链表中删除head并将head的下个结点赋值给head source.head = head.pop(); //回收head结点 SegmentPool.recycle(head); } } } @Override public void flush() throws IOException { out.flush(); } @Override public void close() throws IOException { out.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "sink(" + out + ")"; } }; }
Okio.buffer(Okio.sink(dest))等价于Okio.buffer(Sink),将Sink包装在RealBufferedSink(Sink)内并返回。
val sink = Okio.buffer(Okio.sink(dest))sink.write(buffer, length)sink.flush()
执行write(buffer, length)和flush()其实调用的是RealBufferedSink的write(buffer,length)和flush()。RealBufferedSink的write(buffer,length)和flush()最终会调用被包装的Sink的write(buffer,length)和flush()。
//RealBufferedSink#write(Buffer source, long byteCount)public void write(Buffer source, long byteCount) throws IOException { //将source中的数据写入到buffer中,Okio高效的中央就是buffer#write的实现 buffer.write(source, byteCount); emitCompleteSegments();}public BufferedSink emitCompleteSegments() throws IOException { long byteCount = buffer.completeSegmentByteCount(); //查看缓冲区是否被写满,写满则将数据写入到OutPutStream中。未写满则等到下次写满或调用flush或close时将数据写入到OutPutStream中,起到一个缓冲作用。 if (byteCount > 0) { //调用被包装的Sink#write(buffer, byteCount) //将buffer中的数据写入到OutPutStream中 sink.write(buffer, byteCount); } return this;}public long completeSegmentByteCount() { long result = size; if (result == 0) return 0; Segment tail = head.prev; if (tail.limit < Segment.SIZE && tail.owner) { result -= tail.limit - tail.pos; } return result; }
至此Okio的输出到输入根本流程已剖析完。依据源码剖析可知Okio就是对Java io的一个封装和优化,底层还是应用的InputStream和OutputStream。既然和Java io底层应用一样形式读和写,那么它劣势体现在哪里呢?有人可能会说他体现在api的简洁上,构造清晰,链式编程,调用不便。说的对,这算是它的劣势,而这劣势并不能压服我摈弃Java io而应用它,其实你也能够基于java io封装一套链式编程。它和间接应用Java io的最大劣势并不api的简洁上,而是io流拷贝的效率上以及对内存的复用上,下节中会具体介绍。
3.Okio为什么比间接应用Java io更有劣势
上节中咱们提到Okio和间接应用Java io的最大劣势并不api的简洁上,而是io流拷贝的效率上以及对内存的复用上。在说这两个劣势之前咱们先看看间接应用Java io和Okio从输出到输入都通过哪些步骤。
- Java io
InputStream --> BufferedInputStream --> 长期byte数组 --> BufferedOutPutStream --> OutPutStream
由此可见Java io从输出到输入流程中呈现了长期byte数组。意味着从BufferedInputStream->长期byte数组拷贝一次数据,从长期byte数组->BufferedOutPutStream再拷贝一次数据。
- Okio
InputStream --> inBuffer --> 长期buffer --> outBuffer --> OutPutStream
看起来两头局部和Java io步骤一样。实则不然,Java io咱们方才说过经验两次拷贝,而Okio两头局部inBuffer ->长期buffer->outBuffer 其实不齐全是数据的拷贝。在剖析buffer->buffer时咱们会详细描述为什么不齐全是数据拷贝。buffer->buffer 定义在 Buffer#write(Buffer source, long byteCount) 办法中。依据wirte办法正文看出Okio在实现 buffer->buffer 有两个指标。
此处引入该博主对正文的翻译
- 不要节约CPU
不要节约CPU即不要到处复制数据,从将整个Segments从一个缓冲区重新分配到另一个缓冲区。
- 不要节约内存
Segment作为一个不可变量,缓冲区中除了头节点和尾节点的片段以外,相邻的片段,至多应该保障50%以上的数据负载量(指的是Segment中的data数据, Okio认为data数据量在50%以上才算是被无效利用的)。因为头结点中须要读取耗费字节数据,而尾节点中须要写入产生字节数据,因而头结点和尾节点是不能放弃不变性的。
在缓冲区之间挪动片段
在将一个缓冲区写入另一个缓冲区时,咱们更喜爱重新分配整个段,将字节复制到最紧凑的模式。假如咱们有一个缓冲区,其中的片段负载为[91%,61%],如果咱们要在这下面附加一个负载量为[72%]的繁多片段,这样将产生的后果为[91%,61%,72%]。这期间不会进行任何的字节复制操作。(即空间换工夫,就义内存,提供速度)
再假如,咱们有一个缓冲区负载量为:[100%,2%],并且咱们心愿将其附加到一个负载量为[99%,3%]的缓冲区中。这个操作将产生以下局部:[100%、2%、99%、3%],也就是说,咱们不会花工夫去复制字节来进步内存的应用效率,如变成[100%,100%,4%]这样。(即这种状况下Okio不会采取工夫换空间的策略,因为太节约CPU)
在合并缓冲区时,当相邻缓冲区的合并级别不超过100%时,咱们将压缩相邻缓冲区。例如,当咱们在[100%,40%]根底上附加[30%,80%]时,后果将会是[100%,70%,80%]。(也就是两头相邻的负载为40%和30%的两个Segment将会被合并为一个负载为70%的Segment)
宰割片段
有时咱们只想将source buffer中的一部分写入到sink buffer当中,例如,给定一个sink为 [51%,91%],当初咱们想要将一个source为[92%,82%]的前30%写入到这个sink buffer当中。为了简化,咱们首先将source buffer转换为等效缓冲区[30%,62%,82%](即拆分Segment),而后挪动source的头结点Segment即可,最终生成sink[51%,91%,30%]和source[62%,82%]。
依据下面正文的定义,咱们可知在进行buffer数据转移时,依据不同策略执行不同操作以达到CPU和内存之间的均衡,那么来看下buffer转移的代码实现。
//Buffer#writepublic void write(Buffer source, long byteCount) { while (byteCount > 0) { //如果复制的数据量比原缓冲区已有数据量小 if (byteCount < (source.head.limit - source.head.pos)) { //获取指标缓冲区尾结点 Segment tail = head != null ? head.prev : null; //如果指标缓冲区尾结点不为空,并且是数据拥有者即能够追加数据并且指标缓冲区能够存下该数据 if (tail != null && tail.owner && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) { //如将[10%]追加到[20%],间接拷贝。最终后果[30%] //将原缓冲区拷贝到指标缓冲区 source.head.writeTo(tail, (int) byteCount); source.size -= byteCount; size += byteCount; return; } else { //如果指标缓冲区尾结点为空即指标缓冲区为空缓冲区 或者不为空然而的空间有余,或者不是持有者,这时就须要把原缓冲区的头结点宰割为两个Segment, //而后将原缓冲区的头指针更新为宰割后的第一个Segment, 如[92%, 82%]变成[30%, 62%, 82%]这样 source.head = source.head.split((int) byteCount); } } // 从原缓冲区的链表中移除头结点, 并退出到指标缓冲区的尾结点 Segment segmentToMove = source.head; long movedByteCount = segmentToMove.limit - segmentToMove.pos; source.head = segmentToMove.pop(); //如果指标缓冲区为空,则创立链表并将原缓冲区的链表头结点赋值给指标缓冲区结点的头结点 if (head == null) { head = segmentToMove; head.next = head.prev = head; } else { //指标缓冲区不为空,则向指标缓冲区链表追加原缓冲区结点的头结点。并尝试合并,如[60%,20%]追加[10%]。那么指标缓冲区结点为[60%,20%,10%]。而后合并后为[60%,30%]。 //合并胜利回收多余结点以节俭空间 Segment tail = head.prev; tail = tail.push(segmentToMove); tail.compact(); } source.size -= movedByteCount; size += movedByteCount; byteCount -= movedByteCount; } }
依据下面源码剖析以及正文来答复为什么Okio比Java io高效。
Java中读写数据个别为了高效咱们引入BufferedInputStream和BufferedOutPutStream。这里以BufferedInputStream读写磁盘文件为例剖析。在BufferedInputStream中当一次读取的字节数大于缓冲区大小会摒弃缓冲区,间接从磁盘中读取。
如果一次读取的字节数小于缓冲区大小,则先从磁盘中读取缓冲区大小个字节(BufferedInputStream中默认定义为8k)。而后每次从缓冲区读取设置的读取数量。直到缓冲区读完。而后再从磁盘中读取...直到整个磁盘数据读完
而Okio读取时不论你读取的字节长度是否大于缓冲区大小。间接读取8k数据到缓冲区,而后依据你设置的读取大小和以后缓冲区已有数据大小做比拟取最小值来进行数据转移。
举个例子
比方数据16K,读取一次到长期变量:
读取大小设置为4k
- Okio经验0次拷贝,inBuffer->长期buffer,只是宰割inBuffer数据,将宰割后的数据赋值给长期buffer,只是指针的批改
- 而Java io读取一次到长期变量经验1次拷贝,即buffer->长期byte数组。
读取大小设置为8k
- Okio经验0次拷贝,inBuffer->长期buffer,只是指针的批改
- 而Java io读取一次到长期变量经验0次拷贝,因为大于等于缓冲区大小则间接从磁盘读取即InputStream->长期byte数组。
读取大小设置为16k
- Okio经验0次拷贝,inBuffer->长期buffer,只是指针的
批改。然而经验两次read即经验两次指针批改。 - 而Java io读取一次到长期变量经验0次拷贝,因为大于等于缓冲区大小则间接从磁盘读取即InputStream->长期byte数组。经验一次read,然而节约内存。
从下面举例说明中能够看出Okio在CPU和内存做了很好的衡量,超过8k就只读8k,缩小一次性加载到内存的数据。
没超过8k,数据的复制也只是批改链表指针。
小结:
Okio比间接应用Java io高效得益于它底层对缓冲区的实现构造,将数据的缓冲区定义为链表构造是为了更好从缓冲区到缓冲区数据的挪动,即不节约CPU(不到处复制数据),在内存方面它引入SegmentPool来复用Segment。毕竟间接开拓一个8k的byte[]还是很节约的。以及对缓冲区链表结点的数据进行压缩解决缩小不必要的内存开销。
4.Okio的超时检测
超时机制分为同步检测和异步检测机制,先从简略的开始。
上面以读取数据检测超时为例进行阐明。
4.1 同步检测
通过后面剖析,调用read()时其实调用了RealBufferedSource#read()。而RealBufferedSource#read()又会调用被包装的Source,即Okio#source()创立的Source的read()。
//Okio#source()返回的Sourceprivate static Source source(final InputStream in, final Timeout timeout) { ... return new Source() { public long read(Buffer sink, long byteCount) throws IOException { ... try { //超时检测 timeout.throwIfReached(); ... int bytesRead = in.read(tail.data, tail.limit, maxToCopy); ... return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } } ... }; } //Timeoutpublic class Timeout { private boolean hasDeadline; private long deadlineNanoTime; private long timeoutNanos; public Timeout() { } public Timeout deadlineNanoTime(long deadlineNanoTime) { this.hasDeadline = true; this.deadlineNanoTime = deadlineNanoTime; return this; } public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); } }}
依据代码剖析在每次调用read()都会调用timeout#throwIfReached(),联合Timeout类中定义,当调用deadlineNanoTime()设置截止工夫,hasDeadline,deadlineNanoTime会被赋值。即throwIfReached()的调用才会起到查看超时作用,也就是同步检测超时机制就是依据工夫的流逝来判断是否超时。
4.2 异步检测
异步检测Okio用在对Socket输出流的读取和输入流的写入检测,这里仅以输出流检测为例进行阐明。先对异步检测整体设计形容,让咱们对整体上有一个
宏观上的意识;不至于在剖析源码时抓不住重点,最初再对代码实现进行详细分析。
异步检测整体设计如下:
- 构造上应用单链表作为检测超时的构造,将超时工夫封装到结点中,依照超时工夫的升序插入到链表中。也就是马上要过期的结点为头结点的下个结点。而头结点在这里起到了看门狗的作用。所以头节点放弃不变。
- 当开始从socket#inputStream中读取时,启动一个监督线程(Watchdog)。一直的获取头结点的下个结点即被监督的结点并判断是否为空,为空则期待60S,60S后如果还为空则退出监视器;不为空则取出该结点存储的超时工夫判断是否超时。如果没超时则期待该结点存储的超时工夫,工夫到后或者被链表插入操作唤醒,则会走一遍流程;如果超时了则删除该结点,并敞开socket。
- 整个过程如果没有产生超时,则在读取完后删除被监督的结点。直到监督线程wait()期待设置的工夫后发现没有须要监督的结点了,而后退出整个监督线程。或者还在wait()中时,又read()了一次,即链表中增加了新的被监督结点。这时wait被唤醒,唤醒后开始监督新的结点。
上面对代码进行剖析,从以Socket创立Source开始
//Okio.source(socket)public static Source source(Socket socket) throws IOException { //第一步 创立AsyncTimeout并包装socket,包装是为了在超时是调用timedOut()来敞开socket。AsyncTimeout是Timeout的子类 AsyncTimeout timeout = timeout(socket); //第二步 创立source并包装socket.getInputStream() Source source = source(socket.getInputStream(), timeout); //第三步 创立source并包装第二步的source,也就是当内部调用source.read时,其实调用的是第二步的read, 而第三步的包装是为了在第二步read时多加一层监督 return timeout.source(source); } //创立AsyncTimeout并包装socket,AsyncTimeout是Timeout的子类private static AsyncTimeout timeout(final Socket socket) { return new AsyncTimeout() { @Override protected IOException newTimeoutException(@Nullable IOException cause) { ... } @Override protected void timedOut() { try { socket.close(); } catch (Exception e) { ... } } }; }//第二步 创立source并包装socket.getInputStream()private static Source source(final InputStream in, final Timeout timeout) { ... return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { ... try { ... int bytesRead = in.read(tail.data, tail.limit, maxToCopy); ... return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } } ... };}//第三步 创立source并包装第二步的source,也就是当内部调用source.read时,//其实调用的是第二步的read,//而第三步的包装是为了在read时多加一层超时检测public final Source source(final Source source) { return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { boolean throwOnTimeout = false; //在enter办法外部启动超时检测 enter(); try { long result = source.read(sink, byteCount); throwOnTimeout = true; return result; } catch (IOException e) { throw exit(e); } finally { //read执行结束删除被监督的结点 exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; try { source.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } ... };} public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && !hasDeadline) { return; } inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline);} private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { if (head == null) { head = new AsyncTimeout(); //启动监视器 new Watchdog().start(); } long now = System.nanoTime(); ... node.timeoutAt = now + timeoutNanos; //依照超时工夫升序插入到链表中,头结点后的结点就是行将超时的节点 //还有多长时间就超时了 long remainingNanos = node.remainingNanos(now); for (AsyncTimeout prev = head; true; prev = prev.next) { if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { node.next = prev.next; prev.next = node; if (prev == head) { //如果是插入到头节点后,那么唤醒监视器 AsyncTimeout.class.notify(); } break; } } } private static final class Watchdog extends Thread { Watchdog() { super("Okio Watchdog"); setDaemon(true); } public void run() { while (true) { try { AsyncTimeout timedOut; synchronized (AsyncTimeout.class) { timedOut = awaitTimeout(); if (timedOut == null) continue; if (timedOut == head) { head = null; return; } } //示意读取超时敞开socket timedOut.timedOut(); } catch (InterruptedException ignored) { } } } }static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException { //依据头结点(看门狗)获取被监督的结点 AsyncTimeout node = head.next; //如果不存在被监督的结点则期待IDLE_TIMEOUT_MILLIS秒(60s)。 //期待后还是不存在并且工夫曾经超过了60s,则返回头节点(看门狗)而后退出监视器。 //期待后不为空即存在被监督的结点,则返回null,持续下次循环,循环后node!=null。 if (node == null) { long startNanos = System.nanoTime(); AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS); return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head // The idle timeout elapsed. : null; // The situation has changed. } //还有多长时间过期 long waitNanos = node.remainingNanos(System.nanoTime()); //还没有超时,期待 if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); AsyncTimeout.class.wait(waitMillis, (int) waitNanos); //如果是期待后,则阐明超时了,下次循环waitNanos<=0。走上面代码 //如果是被notifyAll后,阐明有新结点的插入,则须要从新判断 return null; } //被监督的结点已超时,删除被被监督的结点,并返回用于内部调用该结点的timedOut()来敞开数据流。 head.next = node.next; node.next = null; return node; }