共计 7510 个字符,预计需要花费 19 分钟才能阅读完成。
后面文章说了,ChannelHandlerContext#write 只是将数据缓存到 ChannelOutboundBuffer,等到 ChannelHandlerContext#flush 时,再将 ChannelOutboundBuffer 缓存的数据写到 Channel 中。
本文分享 Netty 中 ChannelOutboundBuffer 的实现以及 Flush 过程。
源码剖析基于 Netty 4.1
每个 Channel 的 AbstractUnsafe#outboundBuffer 都保护了一个 ChannelOutboundBuffer。
ChannelOutboundBuffer,出站数据缓冲区,负责缓存 ChannelHandlerContext#write的数据。通过链表治理数据,链表节点为外部类 Entry。
关键字段如下
Entry tailEntry; // 链表最初一个节点,新增的节点增加其后。Entry unflushedEntry; // 链表中第一个未刷新的节点 | |
Entry flushedEntry; // 链表中第一个已刷新但数据未写入的节点 | |
int flushed; // 已刷新但数据未写入的节点数 |
ChannelHandlerContext#flush 操作前,须要先刷新一遍待处理的节点(次要是统计本次 ChannelHandlerContext#flush 操作能够写入多少个节点数据),从 unflushedEntry 开始。刷新实现后应用 flushedEntry 标记第一个待写入的节点,flushed 为待写入节点数。
后面分享 Netty 读写过程的文章说过,AbstractUnsafe#write 解决写操作时,会调用 ChannelOutboundBuffer#addMessage 将数据缓存起来
public void addMessage(Object msg, int size, ChannelPromise promise) { | |
// #1 | |
Entry entry = Entry.newInstance(msg, size, total(msg), promise); | |
if (tailEntry == null) {flushedEntry = null;} else { | |
Entry tail = tailEntry; | |
tail.next = entry; | |
} | |
tailEntry = entry; | |
if (unflushedEntry == null) {unflushedEntry = entry;} | |
incrementPendingOutboundBytes(entry.pendingSize, false); | |
} |
#1
构建一个 Entry,留神,这里应用了对象池 RECYCLER,前面有文章具体解析。
次要是更新 tailEntry 和 unflushedEntry#2
如果以后缓存数量超过阀值 WriteBufferWaterMark#high,更新 unwritable 标记为 true,并触发 pipeline.fireChannelWritabilityChanged()
办法。
因为 ChannelOutboundBuffer 链表没有大小限度,一直累积数据可能导致 OOM,
为了防止这个问题,咱们能够在 unwritable 标记为 true 时,不再持续缓存数据。
Netty 只会更新 unwritable 标记,并不阻止数据缓存,咱们能够依据须要实现该性能。示例如下
if (ctx.channel().isActive() && ctx.channel().isWritable()) {ctx.writeAndFlush(responseMessage); | |
} else {...} |
addFlush 办法负责刷新节点(ChannelHandlerContext#flush 操作前调用该办法统计可写入节点数据数)
public void addFlush() { | |
// #1 | |
Entry entry = unflushedEntry; | |
if (entry != null) { | |
// #2 | |
if (flushedEntry == null) { | |
// there is no flushedEntry yet, so start with the entry | |
flushedEntry = entry; | |
} | |
do { | |
// #3 | |
flushed ++; | |
if (!entry.promise.setUncancellable()) { | |
// Was cancelled so make sure we free up memory and notify about the freed bytes | |
int pending = entry.cancel(); | |
decrementPendingOutboundBytes(pending, false, true); | |
} | |
entry = entry.next; | |
// #4 | |
} while (entry != null); | |
// All flushed so reset unflushedEntry | |
// #5 | |
unflushedEntry = null; | |
} | |
} |
#1
从 unflushedEntry 节点开始解决 #2
赋值 flushedEntry 为 unflushedEntry。
ChannelHandlerContext#flush 写入实现后会置空 flushedEntry#3
减少 flushed
设置节点的 ChannelPromise 不可勾销#4
从 unflushedEntry 开始,遍历前面节点#5
置空 unflushedEntry,示意以后所有节点都已刷新。
nioBuffers 办法负责将以后缓存的 ByteBuf 转发为(jvm)ByteBuffer
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) { | |
assert maxCount > 0; | |
assert maxBytes > 0; | |
long nioBufferSize = 0; | |
int nioBufferCount = 0; | |
// #1 | |
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); | |
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); | |
Entry entry = flushedEntry; | |
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {if (!entry.cancelled) {ByteBuf buf = (ByteBuf) entry.msg; | |
final int readerIndex = buf.readerIndex(); | |
final int readableBytes = buf.writerIndex() - readerIndex; | |
if (readableBytes > 0) { | |
// #2 | |
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {break;} | |
nioBufferSize += readableBytes; | |
// #3 | |
int count = entry.count; | |
if (count == -1) { | |
//noinspection ConstantValueVariableUse | |
entry.count = count = buf.nioBufferCount();} | |
int neededSpace = min(maxCount, nioBufferCount + count); | |
if (neededSpace > nioBuffers.length) {nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); | |
NIO_BUFFERS.set(threadLocalMap, nioBuffers); | |
} | |
// #4 | |
if (count == 1) { | |
ByteBuffer nioBuf = entry.buf; | |
if (nioBuf == null) { | |
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a | |
// derived buffer | |
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); | |
} | |
nioBuffers[nioBufferCount++] = nioBuf; | |
} else {...} | |
if (nioBufferCount == maxCount) {break;} | |
} | |
} | |
entry = entry.next; | |
} | |
this.nioBufferCount = nioBufferCount; | |
this.nioBufferSize = nioBufferSize; | |
return nioBuffers; | |
} |
#1
从线程缓存中获取 nioBuffers 变量,这样能够防止重复结构 ByteBuffer 数组的性能损耗 #2
maxBytes,即本次操作最大的字节数。maxBytes - readableBytes < nioBufferSize
,示意如果本次操作后将超出 maxBytes,退出#3
buf.nioBufferCount(),获取 ByteBuffer 数量,CompositeByteBuf 可能有多个 ByteBuffer 组成。
neededSpace,即 nioBuffers 数组中 ByteBuffer 数量,nioBuffers 长度不够时须要扩容。#4
buf.internalNioBuffer(readerIndex, readableBytes)
,应用 readerIndex, readableBytes 结构一个 ByteBuffer。
这里波及 ByteBuf 相干常识,前面有文章具体解析。
ChannelHandlerContext#flush 实现后,须要移除对应的缓存节点。
public void removeBytes(long writtenBytes) {for (;;) { | |
// #1 | |
Object msg = current(); | |
if (!(msg instanceof ByteBuf)) { | |
assert writtenBytes == 0; | |
break; | |
} | |
final ByteBuf buf = (ByteBuf) msg; | |
final int readerIndex = buf.readerIndex(); | |
final int readableBytes = buf.writerIndex() - readerIndex; | |
// #2 | |
if (readableBytes <= writtenBytes) {if (writtenBytes != 0) {progress(readableBytes); | |
writtenBytes -= readableBytes; | |
} | |
remove();} else { // readableBytes > writtenBytes | |
// #3 | |
if (writtenBytes != 0) {buf.readerIndex(readerIndex + (int) writtenBytes); | |
progress(writtenBytes); | |
} | |
break; | |
} | |
} | |
clearNioBuffers();} |
#1
current 办法返回 flushedEntry 节点缓存数据。
后果 null 时,退出循环 #2
以后节点的数据曾经全副写入,
progress 办法唤醒数据节点上 ChannelProgressivePromise 的监听者
writtenBytes 减去对应字节数
remove() 办法移除节点,开释 ByteBuf,flushedEntry 标记后移。#3
以后节点的数据局部写入,它应该是本次 ChannelHandlerContext#flush 操作的最初一个节点
更新 ByteBuf 的 readerIndex,下次从这里开始读取数据。
退出
移除数据节点
public boolean remove() { | |
Entry e = flushedEntry; | |
if (e == null) {clearNioBuffers(); | |
return false; | |
} | |
Object msg = e.msg; | |
ChannelPromise promise = e.promise; | |
int size = e.pendingSize; | |
// #1 | |
removeEntry(e); | |
if (!e.cancelled) { | |
// only release message, notify and decrement if it was not canceled before. | |
// #2 | |
ReferenceCountUtil.safeRelease(msg); | |
safeSuccess(promise); | |
decrementPendingOutboundBytes(size, false, true); | |
} | |
// recycle the entry | |
// #3 | |
e.recycle(); | |
return true; | |
} |
#1
flushed 减 1
当 flushed 为 0 时,flushedEntry 赋值为 null,否则 flushedEntry 指向后一个节点。#2
开释 ByteBuf#3
以后节点返回对象池中,以便复用。
上面来看一下 ChannelHandlerContext#flush 操作过程。
ChannelHandlerContext#flush -> HeadContext#flush -> AbstractUnsafe#flush
public final void flush() {assertEventLoop(); | |
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; | |
if (outboundBuffer == null) {return;} | |
// #1 | |
outboundBuffer.addFlush(); | |
// #2 | |
flush0();} |
#1
刷新 outboundBuffer 中数据节点#2
写入操作
flush -> NioSocketChannel#doWrite
protected void doWrite(ChannelOutboundBuffer in) throws Exception {SocketChannel ch = javaChannel(); | |
int writeSpinCount = config().getWriteSpinCount(); | |
do { | |
// #1 | |
if (in.isEmpty()) {clearOpWrite(); | |
return; | |
} | |
// #2 | |
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); | |
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); | |
int nioBufferCnt = in.nioBufferCount(); | |
switch (nioBufferCnt) { | |
case 0: | |
// #3 | |
writeSpinCount -= doWrite0(in); | |
break; | |
case 1: { | |
// #4 | |
ByteBuffer buffer = nioBuffers[0]; | |
int attemptedBytes = buffer.remaining(); | |
final int localWrittenBytes = ch.write(buffer); | |
if (localWrittenBytes <= 0) { | |
// #5 | |
incompleteWrite(true); | |
return; | |
} | |
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); | |
// #6 | |
in.removeBytes(localWrittenBytes); | |
--writeSpinCount; | |
break; | |
} | |
default: { | |
// #7 | |
... | |
} | |
} | |
} while (writeSpinCount > 0); | |
incompleteWrite(writeSpinCount < 0); | |
} |
#1
通过 ChannelOutboundBuffer#flushed 判断是否没有数据能够写,没有数据则革除关注事件 OP_WRITE,间接返回。#2
获取 ChannelOutboundBuffer 中 ByteBuf 保护的(jvm)ByteBuffer,并统计 nioBufferSize,nioBufferCount。#3
这时没有 ByteBuffer,然而可能有其余类型的数据(如 FileRegion 类型),调用 doWrite0 持续解决,这里不再深刻#4
只有一个 ByteBuffer,调用 SocketChannel#write 将数据写入 Channel。#5
如果写入数据数量小于等于 0,阐明数据没有被写出去(可能是因为套接字的缓冲区满了等起因),那么就须要关注该 Channel 上的 OP_WRITE 事件,不便下次 EventLoop 将 Channel 轮询进去的时候,能持续写数据。#6
移除 ChannelOutboundBuffer 缓存数据节点。#7
有多个 ByteBuffer,调用SocketChannel#write(ByteBuffer[] srcs, int offset, int length)
,批量写入,与上一种状况解决相似
回顾之前文章《事件循环机制实现原理》中对 NioEventLoop#processSelectedKey 办法的解析
... | |
if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();} |
这里会调用 forceFlush 办法,再次写入数据。
FlushConsolidationHandler
ChannelHandlerContext#flush 是很低廉的操作,可能触发零碎调用,但数据又不能缓存太久,应用 FlushConsolidationHandler 能够尽量达到写入提早与吞吐量之间的衡量。
FlushConsolidationHandler 中保护了 explicitFlushAfterFlushes 变量,
在 ChannelOutboundHandler#channelRead 中调用 flush,如果调用次数小于 explicitFlushAfterFlushes,会拦挡 flush 操作不执行。
在 channelReadComplete 后调用 flush,则不会拦挡 flush 操作。
本文波及 ByteBuf 组件,它是 Netty 中的内存缓冲区,前面有文章解析。
如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!