乐趣区

关于netty:Netty源码解析-零拷贝机制与PoolArena

本文分享 Netty 中零拷贝机制与 PoolArena 的实现原理。

源码剖析基于 Netty 4.1

Netty 中的零拷贝

首先看一下 Netty 中实现零拷贝的机制

1. 文件传输类 DefaultFileRegion#transferTo,调用 FileChannel#transferTo,间接将文件缓冲区的数据发送到指标 Channel,缩小用户缓冲区的拷贝(通过 linux 的 sendfile 函数)。

应用 read 和 write

     DMA 拷贝                     拷贝,切换到用户态                   拷贝,切换到内核态
硬盘 --------> 内核缓冲区(内核态) ----------------> 用户缓冲区(用户态) ----------------> socket 缓冲区 

应用 sendfile

     DMA 拷贝                    拷贝
硬盘 --------> 内核缓冲区(内核态) ----> socket 缓冲区 

缩小用户态,内核态切换,以及数据拷贝

可参考: 操作系统和 Web 服务器那点事儿

2.Unpooled#wrappedBuffer 办法,将 byte 数据,(jvm)ByteBuffer 转换为 ByteBuf
CompositeByteBuf#addComponents 办法,合并 ByteBuf
ByteBuf#slice 办法,提取 ByteBuf 中局部数据片段
这些办法都是基于对象援用的操作,并没有内存拷贝

3. 应用堆外内存 (jvm)ByteBuffer 对 Socket 读写。
如果应用 JVM 的堆内存进行 Socket 读写,JVM 会将堆内存拷贝一份到间接内存中,而后才写入 Socket 中。应用堆外内存能够防止该拷贝操作。
留神,这里从内核缓冲区拷贝到用户缓冲区的操作并不能省略,毕竟咱们须要对数据进行操作,所以还是要拷贝到用户态的。
可参考: 知乎 –Java NIO 中,对于 DirectBuffer,HeapBuffer 的疑难

接口关系
ByteBufAllocator,内存分配器,负责为 ByteBuf 分配内存,线程平安。
PooledByteBufAllocator,默认的 ByteBufAllocator,事后从操作系统中申请一大块内存,在该内存上分配内存给 ByteBuf,能够进步性能和减小内存碎片。
UnPooledByteBufAllocator,非池化内存分配器,每次都从操作系统中申请内存。

RecvByteBufAllocator,接管内存分配器,为 Channel 读入的 IO 数据调配一块大小正当的 buffer 空间。具体性能交由外部接口 Handle 定义。
它次要是针对 Channel 读入场景增加一些操作,如 guess,incMessagesRead,lastBytesRead 等等。

ByteBuf,代表一个内存块,提供程序拜访和随机拜访,是一个或多个 Byte 数组或 NIO Buffers 的形象视图。
ByteBuf 次要能够分为堆外内存 DirectByteBuf 和堆内存 HeapByteBuf。
Netty4 中 ByteBuf 调整为抽象类,从而晋升吞吐量。

上面只关注 PooledByteBufAllocator,它是 Netty 中默认的内存调配策略(unsafe 反对),也是了解 Netty 内存机制的难点。

内存调配

后面文章《ChannelPipeline 与 Read,Write,Connect 事件处理》中解析的 read 事件处理,
NioByteUnsafe#read

public final void read() {
    ...
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
   
    ...    
    byteBuf = allocHandle.allocate(allocator);  
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    ...
}

recvBufAllocHandle 办法返回 AdaptiveRecvByteBufAllocator.HandleImpl。(AdaptiveRecvByteBufAllocator,PooledByteBufAllocator 都在 DefaultChannelConfig 中初始化)

AdaptiveRecvByteBufAllocator.HandleImpl#allocate -> AbstractByteBufAllocator#ioBuffer -> PooledByteBufAllocator#directBuffer -> PooledByteBufAllocator#newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // #1
    PoolThreadCache cache = (PoolThreadCache)this.threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    Object buf;
    if (directArena != null) {
        // #2
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // #3
        buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return toLeakAwareBuffer((ByteBuf)buf);
}

AbstractByteBufAllocator#ioBuffer 办法会判断以后零碎是否反对 unsafe。反对应用堆外内存,不反对应用堆内存。这里只关注堆外内存。
#1 从以后线程缓存中获取 PoolArena
#2 在以后线程缓存上分配内存
#3 线程缓存不存在,只能应用非池化内存调配策略了

PooledByteBufAllocator#threadCache 是一个 PoolThreadLocalCache 实例,PoolThreadLocalCache 继承于 FastThreadLocal,FastThreadLocal 这里简略了解为对 ThreadLocal 的优化。
当 PoolThreadLocalCache 上某个线程的缓存数据不存在时,通过 initialValue 办法结构。

PoolThreadLocalCache#initialValue

protected synchronized PoolThreadCache initialValue() {
    // #1
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    // #2
    Thread current = Thread.currentThread();
    if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
        return new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    }
    return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}

#1 从 PooledByteBufAllocator 的 heapArenas,directArenas 中获取使用率最小的 PoolArena。
PooledByteBufAllocator 结构时会默认为 PooledByteBufAllocator#directArenas 初始化 8 个 PoolArena。
#2 结构 PoolThreadCache。

PoolArena,即一个内存区域,能够进行内存申请和开释。
PoolThreadCache 为每一个线程关联一个 PoolArena(PoolThreadCache#directArena),该线程的内存都在该 PoolArena 上调配。
Netty 反对高并发零碎,可能有很多线程进行同时内存调配。为了缓解线程竞争,通过创立多个 PoolArena 细化锁的粒度,从而进步并发执行的效率。

留神,一个 PoolArena 可能会分给不同的线程,能够看到 PoolArena 上会有一些同步操作。

内存级别

PoolArena 中将不同大小的内存块划分为以下级别:
Tiny < 512
Small < 8192
Chunk < 16777216
Huge >= 16777216

PoolArena#tinySubpagePools,smallSubpagePools 两个数组用于保护 Tiny,Small 级别的内存块。
tinySubpagePools,32 个元素,每个数组之间差 16 个字节,大小别离为 0,16,32,38,64, … ,496
smallSubpagePools,4 个元素,每个数组之间大小翻倍,大小别离为 512,1025,2048,4096
这两个数组都是 PoolSubpage 数组,PoolSubpage 大小默认都是 8192,Tiny,Small 级别的内存都是在 PoolSubpage 上调配的。

而 Chunk 对应的治理类为 PoolChunk,PoolArena 中应用 PoolChunkList 治理一组 PoolChunk。
PoolArena 按内存使用率将 PoolChunk 别离保护到 6 个 PoolChunkList 中,
qInit-> 内存使用率为 0~25,
q000-> 内存使用率为 1~50,
q025-> 内存使用率为 25~75,
q050-> 内存使用率为 50~75,
q075-> 内存使用率为 75~100,
q100-> 内存使用率为 100。
留神:PoolSubpage 须要从 PoolChunk 中配置,而 Tiny,Small 级別的内存则是从 PoolSubpage 中调配。

内存调配示意图如下

例如,要调配一个 496 大小的 ByteBuf,则从 tinySubpagePools[31]中找到一个 PoolSubpage,再从这个 PoolSubpage 调配一块内存。
如果 tinySubpagePools[31]没有可用的 PoolSubpage,则要先调配一个 PoolChunk,再从 PoolChunk 中调配一个 PoolSubpage 放到 tinySubpagePools[31]中。

PoolArena

PoolArena#allocate

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // #1
    final int normCapacity = normalizeCapacity(reqCapacity);
    
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        // #2
        if (tiny) { // < 512
            // #3
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            // #4
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            // #5
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }
        // #6
        final PoolSubpage<T> head = table[tableIdx];

        // #7       
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            // #8
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                // #9
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                incTinySmallAllocation(tiny);
                return;
            }
        }
        // #10
        synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);
        }
        
        incTinySmallAllocation(tiny);
        return;
    }
    if (normCapacity <= chunkSize) {
        // #11
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);
            ++allocationsNormal;
        }
    } else {
        // #12
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, reqCapacity);
    }
}

#1 内存对齐,内存大小转换为 16 的倍数或 2 的指数次幂 (除了 Huge 级别的内存)
#2 解决 Tiny,Small 级別的内存申请
#3 首先尝试在线程缓存上调配。
除了 PoolArena,PoolThreadCache 还为每个线程保护了 Tiny,Small 级别的内存 (tinySubPageDirectCaches,smallSubPageDirectCaches)
#4 cache 调配失败了,抉择 tinySubpagePools 中的 PoolSubpage,并计算下标
#5 Small 的操作与 tiny 相似
#6 定位到对应的 PoolSubpage
#7 因为 PoolArena 能够交给多个线程操作,这里须要同步操作。
#8 留神,head 是一个占位节点,并不存储数据,s!=head 示意以后 tinySubpagePools 或 smallSubpagePools 能够申请到内存,曾经耗尽的 PoolSubpage 是会从链表中移除。
#9 申请内存,并初始化 ByteBuf
#10 执行到这里示意以后 tinySubpagePools 或 smallSubpagePools 无奈申请到内存,这里须要申请一个 Chunk,并在下面申请对应的内存。
再强调一遍,PoolSubpage 须要从 PoolChunk 中配置,而 Tiny,Small 级别的内存则是从 PoolSubpage 中调配。
#11 Chunk 级别的内存,通过 allocateNormal 调配
#12 Huge 级别的内存,通过 allocateHuge 调配

int normalizeCapacity(int reqCapacity) {checkPositiveOrZero(reqCapacity, "reqCapacity");
    // #1
    if (reqCapacity >= chunkSize) {return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
    }

    // #2
    if (!isTiny(reqCapacity)) { // >= 512
        int normalizedCapacity = reqCapacity;
        normalizedCapacity --;
        normalizedCapacity |= normalizedCapacity >>>  1;
        normalizedCapacity |= normalizedCapacity >>>  2;
        normalizedCapacity |= normalizedCapacity >>>  4;
        normalizedCapacity |= normalizedCapacity >>>  8;
        normalizedCapacity |= normalizedCapacity >>> 16;
        normalizedCapacity ++;

        if (normalizedCapacity < 0) {normalizedCapacity >>>= 1;}
        assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;

        return normalizedCapacity;
    }
    // #3
    if (directMemoryCacheAlignment > 0) {return alignCapacity(reqCapacity);
    }

    // Quantum-spaced
    // #4
    if ((reqCapacity & 15) == 0) {return reqCapacity;}

    return (reqCapacity & ~15) + 16;
}

directMemoryCacheAlignment,对齐参数,默认为 0.
#1 Huge 级别的内存,返回原值。
#2 Small,Chunk 级别的内存,找到大于 reqCapacity 的最小的 2 的指数次幂的值。
这里的做法是将 reqCapacity 中最高 bit 位为 1 以下的 bit 位通通变成 1,最初再加 1 就失去了所需的值。
理论是将最高 bit 位的 1 复制给低位。
以 1024 为例子,

-- 1024,二进制为 10000000000
normalizedCapacity |= normalizedCapacity >>>  1; -- 最高 1 位向下复制,11000000000
normalizedCapacity |= normalizedCapacity >>>  2; -- 最高 2 位向下复制,11110000000
normalizedCapacity |= normalizedCapacity >>>  4; -- 最高 4 位向下复制,11111111000
normalizedCapacity |= normalizedCapacity >>>  8; -- 最高 8 位向下复制,11111111111
normalizedCapacity |= normalizedCapacity >>> 16; -- 最高 16 位向下复制,11111111111
normalizedCapacity ++; -- 加 1 失去后果,100000000000,即 2048

到这里,就不难理解后面 normalizedCapacity --; 的操作了,如果 reqCapacity 原本就是一个 2 的指数次幂的值,这样能够防止翻倍。

上面解决溢出场景,去掉符号位。
#3 应用对齐参数对齐,这里不深刻
#4 Tiny 级别的内存,转换为大于 reqCapacity 的最小的 16 倍数。
去掉最低 4 个 bit 位,加上 16 即可。

PoolArena#allocate -> allocateNormal

private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    // #1
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {return;}

    // #2
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    long handle = c.allocate(normCapacity);
    assert handle > 0;
    c.initBuf(buf, handle, reqCapacity);
    qInit.add(c);
}

#1 顺次从 q050,q025,q000,qInit,q075 上申请内存
为什么要是这个程序呢?

PoolArena 中的 PoolChunkList 之间也组成一个“双向”链表

qInit ---> q000 <---> q025 <---> q050 <---> q075 <---> q100

PoolChunkList 中还保护了 minUsage,maxUsage,即当一个 PoolChunk,使用率大于 maxUsage,它将被挪动到下一个 PoolChunkList,使用率小于 minUsage,则被挪动到前一个 PoolChunkList。
留神:q000 没有前置节点,它的 minUsage 为 1,即下面的 PoolChunk 内存齐全开释后,将被销毁。
qInit 的前置节点是它本人,但它的 minUsage 为 Integer.MIN_VALUE,即便下面的 PoolChunk 内存齐全开释后,也不会被销毁,而是持续保留在内存。

不优先从 q000 调配,正是因为 q000 上的 PoolChunk 内存齐全开释后要被销毁,如果在下面调配,则会提早内存的回收进度。
而 q075 上因为内存利用率太高,导致内存调配的成功率大大降低,因而放到最初。
所以从 q050 是一个不错的抉择,这样大部分状况下,Chunk 的利用率都会放弃在一个较高水平,进步整个利用的内存利用率;

PoolChunkList 实际上是一个 PoolChunk 链表。在 PoolChunkList 上申请内存,PoolChunkList 会遍历链表上 PoolChunk 节点,直到调配胜利或到链表开端。
PoolChunk 调配后,如果内存使用率高于 maxUsage,它将被挪动到下一个 PoolChunkList。

结构一个 PoolChunk

protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
        int pageShifts, int chunkSize) {if (directMemoryCacheAlignment == 0) {
        return new PoolChunk<ByteBuffer>(this,
                allocateDirect(chunkSize), pageSize, maxOrder,
                pageShifts, chunkSize, 0);
    }
    final ByteBuffer memory = allocateDirect(chunkSize
            + directMemoryCacheAlignment);
    return new PoolChunk<ByteBuffer>(this, memory, pageSize,
            maxOrder, pageShifts, chunkSize,
            offsetCacheLine(memory));
}

allocateDirect 办法向操作系统申请内存,取得一个(jvm)ByteBuffer,
PoolChunk#memory 保护了该 ByteBuffer,PoolChunk 的内存实际上都是在该 ByteBuffer 上调配。
offsetCacheLine 办法获取偏移量,理论内存地位要通过它计算。

内存开释

void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {if (chunk.unpooled) {
        // #1
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();} else {
        // #2
        SizeClass sizeClass = sizeClass(normCapacity);
        if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }

        freeChunk(chunk, handle, sizeClass, nioBuffer, false);
    }
}

#1 非池化内存,间接销毁内存
#2 池化内存,首先尝试加到线程缓存中,胜利则不须要其余操作。失败则调用 freeChunk

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) {
    final boolean destroyChunk;
    synchronized (this) {
        // #1
        ...
        destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
    }
    if (destroyChunk) {
        // destroyChunk not need to be called while holding the synchronized lock.
        destroyChunk(chunk);
    }
}

#1 chunk.parent 即 PoolChunkList,PoolChunkList#free 会调用 PoolChunk 开释内存,开释内存后,如果内存使用率低于 minUsage,则挪动前一个 PoolChunkList,如果前一个 PoolChunkList 不存在 (q000),则返回 false,由前面的步骤销毁该 PoolChunk。
销毁 PoolChunk,能够应用 Cleaner 或者 NoCleaner,前面文章再分享该局部内容。

参考文档:
Netty 内存池之 PoolArena 详解
深入浅出 Netty 内存治理 PoolArena

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!

退出移动版