关于netty:Netty源码解析-零拷贝机制与PoolArena

本文分享Netty中零拷贝机制与PoolArena的实现原理。

源码剖析基于Netty 4.1

Netty中的零拷贝

首先看一下Netty中实现零拷贝的机制

1.文件传输类DefaultFileRegion#transferTo,调用FileChannel#transferTo,间接将文件缓冲区的数据发送到指标Channel,缩小用户缓冲区的拷贝(通过linux的sendfile函数)。

应用read 和 write

     DMA拷贝                     拷贝,切换到用户态                   拷贝,切换到内核态
硬盘 --------> 内核缓冲区(内核态) ----------------> 用户缓冲区(用户态) ----------------> socket缓冲区 

应用sendfile

     DMA拷贝                    拷贝
硬盘 --------> 内核缓冲区(内核态) ----> socket缓冲区 

缩小用户态,内核态切换,以及数据拷贝

可参考: 操作系统和Web服务器那点事儿

2.Unpooled#wrappedBuffer办法,将byte数据,(jvm)ByteBuffer转换为ByteBuf
CompositeByteBuf#addComponents办法,合并ByteBuf
ByteBuf#slice办法,提取ByteBuf中局部数据片段
这些办法都是基于对象援用的操作,并没有内存拷贝

3.应用堆外内存(jvm)ByteBuffer对Socket读写。
如果应用JVM的堆内存进行Socket读写,JVM会将堆内存拷贝一份到间接内存中,而后才写入Socket中。应用堆外内存能够防止该拷贝操作。
留神,这里从内核缓冲区拷贝到用户缓冲区的操作并不能省略,毕竟咱们须要对数据进行操作,所以还是要拷贝到用户态的。
可参考: 知乎–Java NIO中,对于DirectBuffer,HeapBuffer的疑难

接口关系
ByteBufAllocator,内存分配器,负责为ByteBuf分配内存, 线程平安。
PooledByteBufAllocator,默认的ByteBufAllocator,事后从操作系统中申请一大块内存,在该内存上分配内存给ByteBuf,能够进步性能和减小内存碎片。
UnPooledByteBufAllocator,非池化内存分配器,每次都从操作系统中申请内存。

RecvByteBufAllocator,接管内存分配器,为Channel读入的IO数据调配一块大小正当的buffer空间。具体性能交由外部接口Handle定义。
它次要是针对Channel读入场景增加一些操作,如guess,incMessagesRead,lastBytesRead等等。

ByteBuf,代表一个内存块,提供程序拜访和随机拜访,是一个或多个Byte数组或NIO Buffers的形象视图。
ByteBuf次要能够分为堆外内存DirectByteBuf和堆内存HeapByteBuf。
Netty4中ByteBuf调整为抽象类,从而晋升吞吐量。

上面只关注PooledByteBufAllocator,它是Netty中默认的内存调配策略(unsafe反对),也是了解Netty内存机制的难点。

内存调配

后面文章《ChannelPipeline与Read,Write,Connect事件处理》中解析的read事件处理,
NioByteUnsafe#read

public final void read() {
    ...
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
   
    ...    
    byteBuf = allocHandle.allocate(allocator);  
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    ...
}

recvBufAllocHandle办法返回AdaptiveRecvByteBufAllocator.HandleImpl。(AdaptiveRecvByteBufAllocator,PooledByteBufAllocator都在DefaultChannelConfig中初始化)

AdaptiveRecvByteBufAllocator.HandleImpl#allocate -> AbstractByteBufAllocator#ioBuffer -> PooledByteBufAllocator#directBuffer -> PooledByteBufAllocator#newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // #1
    PoolThreadCache cache = (PoolThreadCache)this.threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    Object buf;
    if (directArena != null) {
        // #2
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // #3
        buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return toLeakAwareBuffer((ByteBuf)buf);
}

AbstractByteBufAllocator#ioBuffer办法会判断以后零碎是否反对unsafe。反对应用堆外内存,不反对应用堆内存。这里只关注堆外内存。
#1 从以后线程缓存中获取PoolArena
#2 在以后线程缓存上分配内存
#3 线程缓存不存在,只能应用非池化内存调配策略了

PooledByteBufAllocator#threadCache是一个PoolThreadLocalCache实例,PoolThreadLocalCache继承于FastThreadLocal,FastThreadLocal这里简略了解为对ThreadLocal的优化。
当PoolThreadLocalCache上某个线程的缓存数据不存在时,通过initialValue办法结构。

PoolThreadLocalCache#initialValue

protected synchronized PoolThreadCache initialValue() {
    // #1
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    // #2
    Thread current = Thread.currentThread();
    if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
        return new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    }
    return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}

#1 从PooledByteBufAllocator的heapArenas,directArenas中获取使用率最小的PoolArena。
PooledByteBufAllocator结构时会默认为PooledByteBufAllocator#directArenas初始化8个PoolArena。
#2 结构PoolThreadCache。

PoolArena,即一个内存区域,能够进行内存申请和开释。
PoolThreadCache为每一个线程关联一个PoolArena(PoolThreadCache#directArena),该线程的内存都在该PoolArena上调配。
Netty反对高并发零碎,可能有很多线程进行同时内存调配。为了缓解线程竞争,通过创立多个PoolArena细化锁的粒度,从而进步并发执行的效率。

留神,一个PoolArena可能会分给不同的线程,能够看到PoolArena上会有一些同步操作。

内存级别

PoolArena中将不同大小的内存块划分为以下级别:
Tiny < 512
Small < 8192
Chunk < 16777216
Huge >= 16777216

PoolArena#tinySubpagePools,smallSubpagePools两个数组用于保护Tiny,Small级别的内存块。
tinySubpagePools,32个元素,每个数组之间差16个字节,大小别离为0,16,32,38,64, … ,496
smallSubpagePools,4个元素,每个数组之间大小翻倍,大小别离为512,1025,2048,4096
这两个数组都是PoolSubpage数组,PoolSubpage大小默认都是8192,Tiny,Small级别的内存都是在PoolSubpage上调配的。

而Chunk对应的治理类为PoolChunk,PoolArena中应用PoolChunkList治理一组PoolChunk。
PoolArena按内存使用率将PoolChunk别离保护到6个PoolChunkList中,
qInit->内存使用率为0~25,
q000->内存使用率为1~50,
q025->内存使用率为25~75,
q050->内存使用率为50~75,
q075->内存使用率为75~100,
q100->内存使用率为100。
留神:PoolSubpage须要从PoolChunk中配置,而Tiny,Small级別的内存则是从PoolSubpage中调配。

内存调配示意图如下

例如,要调配一个496大小的ByteBuf,则从tinySubpagePools[31]中找到一个PoolSubpage,再从这个PoolSubpage调配一块内存。
如果tinySubpagePools[31]没有可用的PoolSubpage,则要先调配一个PoolChunk,再从PoolChunk中调配一个PoolSubpage放到tinySubpagePools[31]中。

PoolArena

PoolArena#allocate

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // #1
    final int normCapacity = normalizeCapacity(reqCapacity);
    
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        // #2
        if (tiny) { // < 512
            // #3
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            // #4
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            // #5
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }
        // #6
        final PoolSubpage<T> head = table[tableIdx];

        // #7       
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            // #8
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                // #9
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                incTinySmallAllocation(tiny);
                return;
            }
        }
        // #10
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
        }
        
        incTinySmallAllocation(tiny);
        return;
    }
    if (normCapacity <= chunkSize) {
        // #11
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
            ++allocationsNormal;
        }
    } else {
        // #12
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, reqCapacity);
    }
}

#1 内存对齐,内存大小转换为16的倍数或2的指数次幂(除了Huge级别的内存)
#2 解决Tiny,Small级別的内存申请
#3 首先尝试在线程缓存上调配。
除了PoolArena,PoolThreadCache还为每个线程保护了Tiny,Small级别的内存(tinySubPageDirectCaches,smallSubPageDirectCaches)
#4 cache调配失败了,抉择tinySubpagePools中的PoolSubpage,并计算下标
#5 Small的操作与tiny相似
#6 定位到对应的PoolSubpage
#7 因为PoolArena能够交给多个线程操作,这里须要同步操作。
#8 留神,head是一个占位节点,并不存储数据,s!=head示意以后tinySubpagePools或smallSubpagePools能够申请到内存,曾经耗尽的PoolSubpage是会从链表中移除。
#9 申请内存,并初始化ByteBuf
#10 执行到这里示意以后tinySubpagePools或smallSubpagePools无奈申请到内存,这里须要申请一个Chunk,并在下面申请对应的内存。
再强调一遍,PoolSubpage须要从PoolChunk中配置,而Tiny,Small级别的内存则是从PoolSubpage中调配。
#11 Chunk级别的内存,通过allocateNormal调配
#12 Huge级别的内存,通过allocateHuge调配

int normalizeCapacity(int reqCapacity) {
    checkPositiveOrZero(reqCapacity, "reqCapacity");
    // #1
    if (reqCapacity >= chunkSize) {
        return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
    }

    // #2
    if (!isTiny(reqCapacity)) { // >= 512
        int normalizedCapacity = reqCapacity;
        normalizedCapacity --;
        normalizedCapacity |= normalizedCapacity >>>  1;
        normalizedCapacity |= normalizedCapacity >>>  2;
        normalizedCapacity |= normalizedCapacity >>>  4;
        normalizedCapacity |= normalizedCapacity >>>  8;
        normalizedCapacity |= normalizedCapacity >>> 16;
        normalizedCapacity ++;

        if (normalizedCapacity < 0) {
            normalizedCapacity >>>= 1;
        }
        assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;

        return normalizedCapacity;
    }
    // #3
    if (directMemoryCacheAlignment > 0) {
        return alignCapacity(reqCapacity);
    }

    // Quantum-spaced
    // #4
    if ((reqCapacity & 15) == 0) {
        return reqCapacity;
    }

    return (reqCapacity & ~15) + 16;
}

directMemoryCacheAlignment,对齐参数,默认为0.
#1 Huge级别的内存,返回原值。
#2 Small,Chunk级别的内存,找到大于reqCapacity的最小的2的指数次幂的值。
这里的做法是将reqCapacity中最高bit位为1以下的bit位通通变成1,最初再加1就失去了所需的值。
理论是将最高bit位的1复制给低位。
以1024为例子,

-- 1024,二进制为10000000000
normalizedCapacity |= normalizedCapacity >>>  1; -- 最高1位向下复制,11000000000
normalizedCapacity |= normalizedCapacity >>>  2; -- 最高2位向下复制,11110000000
normalizedCapacity |= normalizedCapacity >>>  4; -- 最高4位向下复制,11111111000
normalizedCapacity |= normalizedCapacity >>>  8; -- 最高8位向下复制,11111111111
normalizedCapacity |= normalizedCapacity >>> 16; -- 最高16位向下复制,11111111111
normalizedCapacity ++; -- 加1失去后果,100000000000,即2048

到这里,就不难理解后面normalizedCapacity --;的操作了,如果reqCapacity原本就是一个2的指数次幂的值,这样能够防止翻倍。

上面解决溢出场景,去掉符号位。
#3 应用对齐参数对齐,这里不深刻
#4 Tiny级别的内存,转换为大于reqCapacity的最小的16倍数。
去掉最低4个bit位,加上16即可。

PoolArena#allocate -> allocateNormal

private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    // #1
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        return;
    }

    // #2
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    long handle = c.allocate(normCapacity);
    assert handle > 0;
    c.initBuf(buf, handle, reqCapacity);
    qInit.add(c);
}

#1 顺次从q050,q025,q000,qInit,q075上申请内存
为什么要是这个程序呢?

PoolArena中的PoolChunkList之间也组成一个“双向”链表

qInit ---> q000 <---> q025 <---> q050 <---> q075 <---> q100

PoolChunkList中还保护了minUsage,maxUsage,即当一个PoolChunk,使用率大于maxUsage,它将被挪动到下一个PoolChunkList,使用率小于minUsage,则被挪动到前一个PoolChunkList。
留神:q000没有前置节点,它的minUsage为1,即下面的PoolChunk内存齐全开释后,将被销毁。
qInit的前置节点是它本人,但它的minUsage为Integer.MIN_VALUE,即便下面的PoolChunk内存齐全开释后,也不会被销毁,而是持续保留在内存。

不优先从q000调配,正是因为q000上的PoolChunk内存齐全开释后要被销毁,如果在下面调配,则会提早内存的回收进度。
而q075上因为内存利用率太高,导致内存调配的成功率大大降低,因而放到最初。
所以从q050是一个不错的抉择,这样大部分状况下,Chunk的利用率都会放弃在一个较高水平,进步整个利用的内存利用率;

PoolChunkList实际上是一个PoolChunk链表。在PoolChunkList上申请内存,PoolChunkList会遍历链表上PoolChunk节点,直到调配胜利或到链表开端。
PoolChunk调配后,如果内存使用率高于maxUsage,它将被挪动到下一个PoolChunkList。

结构一个PoolChunk

protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
        int pageShifts, int chunkSize) {
    if (directMemoryCacheAlignment == 0) {
        return new PoolChunk<ByteBuffer>(this,
                allocateDirect(chunkSize), pageSize, maxOrder,
                pageShifts, chunkSize, 0);
    }
    final ByteBuffer memory = allocateDirect(chunkSize
            + directMemoryCacheAlignment);
    return new PoolChunk<ByteBuffer>(this, memory, pageSize,
            maxOrder, pageShifts, chunkSize,
            offsetCacheLine(memory));
}

allocateDirect办法向操作系统申请内存,取得一个(jvm)ByteBuffer,
PoolChunk#memory保护了该ByteBuffer,PoolChunk的内存实际上都是在该ByteBuffer上调配。
offsetCacheLine办法获取偏移量,理论内存地位要通过它计算。

内存开释

void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
    if (chunk.unpooled) {
        // #1
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        // #2
        SizeClass sizeClass = sizeClass(normCapacity);
        if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }

        freeChunk(chunk, handle, sizeClass, nioBuffer, false);
    }
}

#1 非池化内存,间接销毁内存
#2 池化内存,首先尝试加到线程缓存中,胜利则不须要其余操作。失败则调用freeChunk

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) {
    final boolean destroyChunk;
    synchronized (this) {
        // #1
        ...
        destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
    }
    if (destroyChunk) {
        // destroyChunk not need to be called while holding the synchronized lock.
        destroyChunk(chunk);
    }
}

#1 chunk.parent即PoolChunkList,PoolChunkList#free会调用PoolChunk开释内存,开释内存后,如果内存使用率低于minUsage,则挪动前一个PoolChunkList,如果前一个PoolChunkList不存在(q000),则返回false,由前面的步骤销毁该PoolChunk。
销毁PoolChunk,能够应用Cleaner或者NoCleaner,前面文章再分享该局部内容。

参考文档:
Netty内存池之PoolArena详解
深入浅出Netty内存治理 PoolArena

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理