后面文章说了,ChannelHandlerContext#write只是将数据缓存到ChannelOutboundBuffer,等到ChannelHandlerContext#flush时,再将ChannelOutboundBuffer缓存的数据写到Channel中。
本文分享Netty中ChannelOutboundBuffer的实现以及Flush过程。
源码剖析基于Netty 4.1

每个Channel的AbstractUnsafe#outboundBuffer 都保护了一个ChannelOutboundBuffer。
ChannelOutboundBuffer,出站数据缓冲区,负责缓存ChannelHandlerContext#write的数据。通过链表治理数据,链表节点为外部类Entry。

关键字段如下

Entry tailEntry;        // 链表最初一个节点,新增的节点增加其后。Entry unflushedEntry;    // 链表中第一个未刷新的节点Entry flushedEntry;        // 链表中第一个已刷新但数据未写入的节点int flushed;            // 已刷新但数据未写入的节点数

ChannelHandlerContext#flush操作前,须要先刷新一遍待处理的节点(次要是统计本次ChannelHandlerContext#flush操作能够写入多少个节点数据),从unflushedEntry开始。刷新实现后应用flushedEntry标记第一个待写入的节点,flushed为待写入节点数。

后面分享Netty读写过程的文章说过,AbstractUnsafe#write解决写操作时,会调用ChannelOutboundBuffer#addMessage将数据缓存起来

public void addMessage(Object msg, int size, ChannelPromise promise) {    // #1    Entry entry = Entry.newInstance(msg, size, total(msg), promise);    if (tailEntry == null) {        flushedEntry = null;    } else {        Entry tail = tailEntry;        tail.next = entry;    }    tailEntry = entry;    if (unflushedEntry == null) {        unflushedEntry = entry;    }    incrementPendingOutboundBytes(entry.pendingSize, false);}

#1 构建一个Entry,留神,这里应用了对象池RECYCLER,前面有文章具体解析。
次要是更新tailEntry和unflushedEntry
#2 如果以后缓存数量超过阀值WriteBufferWaterMark#high,更新unwritable标记为true,并触发pipeline.fireChannelWritabilityChanged()办法。
因为ChannelOutboundBuffer链表没有大小限度,一直累积数据可能导致 OOM,
为了防止这个问题,咱们能够在unwritable标记为true时,不再持续缓存数据。
Netty只会更新unwritable标记,并不阻止数据缓存,咱们能够依据须要实现该性能。示例如下

if (ctx.channel().isActive() && ctx.channel().isWritable()) {    ctx.writeAndFlush(responseMessage);} else {    ...}

addFlush办法负责刷新节点(ChannelHandlerContext#flush操作前调用该办法统计可写入节点数据数)

public void addFlush() {    // #1    Entry entry = unflushedEntry;    if (entry != null) {        // #2        if (flushedEntry == null) {            // there is no flushedEntry yet, so start with the entry            flushedEntry = entry;        }        do {            // #3            flushed ++;            if (!entry.promise.setUncancellable()) {                // Was cancelled so make sure we free up memory and notify about the freed bytes                int pending = entry.cancel();                decrementPendingOutboundBytes(pending, false, true);            }            entry = entry.next;            // #4        } while (entry != null);        // All flushed so reset unflushedEntry        // #5        unflushedEntry = null;    }}

#1 从unflushedEntry节点开始解决
#2 赋值flushedEntry为unflushedEntry。
ChannelHandlerContext#flush写入实现后会置空flushedEntry
#3 减少flushed
设置节点的ChannelPromise不可勾销
#4 从unflushedEntry开始,遍历前面节点
#5 置空unflushedEntry,示意以后所有节点都已刷新。

nioBuffers办法负责将以后缓存的ByteBuf转发为(jvm)ByteBuffer

public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {    assert maxCount > 0;    assert maxBytes > 0;    long nioBufferSize = 0;    int nioBufferCount = 0;    // #1    final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();    ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);    Entry entry = flushedEntry;    while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {        if (!entry.cancelled) {            ByteBuf buf = (ByteBuf) entry.msg;            final int readerIndex = buf.readerIndex();            final int readableBytes = buf.writerIndex() - readerIndex;            if (readableBytes > 0) {                // #2                if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {                    break;                }                nioBufferSize += readableBytes;                // #3                int count = entry.count;                if (count == -1) {                    //noinspection ConstantValueVariableUse                    entry.count = count = buf.nioBufferCount();                }                int neededSpace = min(maxCount, nioBufferCount + count);                if (neededSpace > nioBuffers.length) {                    nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);                    NIO_BUFFERS.set(threadLocalMap, nioBuffers);                }                // #4                if (count == 1) {                    ByteBuffer nioBuf = entry.buf;                    if (nioBuf == null) {                        // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a                        // derived buffer                        entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);                    }                    nioBuffers[nioBufferCount++] = nioBuf;                } else {                    ...                }                if (nioBufferCount == maxCount) {                    break;                }            }        }        entry = entry.next;    }    this.nioBufferCount = nioBufferCount;    this.nioBufferSize = nioBufferSize;    return nioBuffers;}

#1 从线程缓存中获取nioBuffers变量,这样能够防止重复结构ByteBuffer数组的性能损耗
#2 maxBytes,即本次操作最大的字节数。
maxBytes - readableBytes < nioBufferSize,示意如果本次操作后将超出maxBytes,退出
#3
buf.nioBufferCount(),获取ByteBuffer数量,CompositeByteBuf可能有多个ByteBuffer组成。
neededSpace,即nioBuffers数组中ByteBuffer数量,nioBuffers长度不够时须要扩容。
#4
buf.internalNioBuffer(readerIndex, readableBytes),应用readerIndex, readableBytes结构一个ByteBuffer。
这里波及ByteBuf相干常识,前面有文章具体解析。

ChannelHandlerContext#flush实现后,须要移除对应的缓存节点。

public void removeBytes(long writtenBytes) {    for (;;) {        // #1        Object msg = current();        if (!(msg instanceof ByteBuf)) {            assert writtenBytes == 0;            break;        }        final ByteBuf buf = (ByteBuf) msg;        final int readerIndex = buf.readerIndex();        final int readableBytes = buf.writerIndex() - readerIndex;        // #2        if (readableBytes <= writtenBytes) {            if (writtenBytes != 0) {                progress(readableBytes);                writtenBytes -= readableBytes;            }            remove();        } else { // readableBytes > writtenBytes            // #3            if (writtenBytes != 0) {                buf.readerIndex(readerIndex + (int) writtenBytes);                progress(writtenBytes);            }            break;        }    }    clearNioBuffers();}

#1
current办法返回flushedEntry节点缓存数据。
后果null时,退出循环
#2 以后节点的数据曾经全副写入,
progress办法唤醒数据节点上ChannelProgressivePromise的监听者
writtenBytes减去对应字节数
remove()办法移除节点,开释ByteBuf,flushedEntry标记后移。
#3 以后节点的数据局部写入,它应该是本次ChannelHandlerContext#flush操作的最初一个节点
更新ByteBuf的readerIndex,下次从这里开始读取数据。
退出

移除数据节点

public boolean remove() {    Entry e = flushedEntry;    if (e == null) {        clearNioBuffers();        return false;    }    Object msg = e.msg;    ChannelPromise promise = e.promise;    int size = e.pendingSize;    // #1    removeEntry(e);    if (!e.cancelled) {        // only release message, notify and decrement if it was not canceled before.        // #2        ReferenceCountUtil.safeRelease(msg);        safeSuccess(promise);        decrementPendingOutboundBytes(size, false, true);    }    // recycle the entry    // #3    e.recycle();    return true;}

#1
flushed减1
当flushed为0时,flushedEntry赋值为null,否则flushedEntry指向后一个节点。
#2 开释ByteBuf
#3 以后节点返回对象池中,以便复用。

上面来看一下ChannelHandlerContext#flush操作过程。
ChannelHandlerContext#flush -> HeadContext#flush -> AbstractUnsafe#flush

public final void flush() {    assertEventLoop();    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        return;    }    // #1    outboundBuffer.addFlush();    // #2    flush0();}

#1 刷新outboundBuffer中数据节点
#2 写入操作

flush -> NioSocketChannel#doWrite

protected void doWrite(ChannelOutboundBuffer in) throws Exception {    SocketChannel ch = javaChannel();    int writeSpinCount = config().getWriteSpinCount();    do {        // #1        if (in.isEmpty()) {            clearOpWrite();            return;        }        // #2        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);        int nioBufferCnt = in.nioBufferCount();        switch (nioBufferCnt) {            case 0:                // #3                writeSpinCount -= doWrite0(in);                break;            case 1: {                // #4                ByteBuffer buffer = nioBuffers[0];                int attemptedBytes = buffer.remaining();                final int localWrittenBytes = ch.write(buffer);                if (localWrittenBytes <= 0) {                    // #5                    incompleteWrite(true);                    return;                }                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);                // #6                in.removeBytes(localWrittenBytes);                --writeSpinCount;                break;            }            default: {                // #7                ...            }        }    } while (writeSpinCount > 0);    incompleteWrite(writeSpinCount < 0);}

#1 通过ChannelOutboundBuffer#flushed判断是否没有数据能够写,没有数据则革除关注事件OP_WRITE,间接返回。
#2 获取ChannelOutboundBuffer中ByteBuf保护的(jvm)ByteBuffer,并统计nioBufferSize,nioBufferCount。
#3 这时没有ByteBuffer,然而可能有其余类型的数据(如FileRegion类型),调用doWrite0持续解决,这里不再深刻
#4 只有一个ByteBuffer,调用SocketChannel#write将数据写入Channel。
#5 如果写入数据数量小于等于0,阐明数据没有被写出去(可能是因为套接字的缓冲区满了等起因),那么就须要关注该Channel上的OP_WRITE事件,不便下次EventLoop将Channel轮询进去的时候,能持续写数据。
#6 移除ChannelOutboundBuffer缓存数据节点。
#7 有多个ByteBuffer,调用SocketChannel#write(ByteBuffer[] srcs, int offset, int length),批量写入,与上一种状况解决相似

回顾之前文章《事件循环机制实现原理》中对NioEventLoop#processSelectedKey办法的解析

    ...    if ((readyOps & SelectionKey.OP_WRITE) != 0) {        ch.unsafe().forceFlush();    }

这里会调用forceFlush办法,再次写入数据。

FlushConsolidationHandler
ChannelHandlerContext#flush是很低廉的操作,可能触发零碎调用,但数据又不能缓存太久,应用FlushConsolidationHandler能够尽量达到写入提早与吞吐量之间的衡量。
FlushConsolidationHandler中保护了explicitFlushAfterFlushes变量,
在ChannelOutboundHandler#channelRead中调用flush,如果调用次数小于explicitFlushAfterFlushes, 会拦挡flush操作不执行。
在channelReadComplete后调用flush,则不会拦挡flush操作。

本文波及ByteBuf组件,它是Netty中的内存缓冲区,前面有文章解析。

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!