关于netty:Netty源码解析-内存池与PoolArena

咱们晓得,Netty应用间接内存实现Netty零拷贝以晋升性能,
但间接内存的创立和开释可能须要波及零碎调用,是比拟低廉的操作,如果每个申请都创立和开释一个间接内存,那性能必定是不能满足要求的。
这时就须要应用内存池。
即从零碎中申请一大块内存,再在下面调配每个申请所需的内存。

Netty中的内存池次要波及PoolArena,PoolChunk与PoolSubpage。
本文次要剖析PoolArena的作用与实现。
源码剖析基于Netty 4.1.52

接口关系
ByteBufAllocator,内存分配器,负责为ByteBuf分配内存, 线程平安。
PooledByteBufAllocator,池化内存分配器,默认的ByteBufAllocator,事后从操作系统中申请一大块内存,在该内存上分配内存给ByteBuf,能够进步性能和减小内存碎片。
UnPooledByteBufAllocator,非池化内存分配器,每次都从操作系统中申请内存。

RecvByteBufAllocator,接管内存分配器,为Channel读入的IO数据调配一块大小正当的buffer空间。具体性能交由外部接口Handle定义。
它次要是针对Channel读入场景增加一些操作,如guess,incMessagesRead,lastBytesRead等等。
ByteBuf,调配好的内存块,能够间接应用。

上面只关注PooledByteBufAllocator,它是Netty中默认的内存分配器,也是了解Netty内存机制的难点。

内存调配

后面文章《ChannelPipeline机制与读写过程》中剖析了数据读取过程,
NioByteUnsafe#read

public final void read() {
    ...
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;

    ...
    byteBuf = allocHandle.allocate(allocator);
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    ...
}

recvBufAllocHandle办法返回AdaptiveRecvByteBufAllocator.HandleImpl。(AdaptiveRecvByteBufAllocator,PooledByteBufAllocator都在DefaultChannelConfig中初始化)

AdaptiveRecvByteBufAllocator.HandleImpl#allocate -> AbstractByteBufAllocator#ioBuffer -> PooledByteBufAllocator#directBuffer -> PooledByteBufAllocator#newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // #1
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        // #2
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // #3
        buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return toLeakAwareBuffer(buf);
}

AbstractByteBufAllocator#ioBuffer办法会判断以后零碎是否反对unsafe。反对时应用间接内存,不反对则应用堆内存。这里只关注间接内存的实现。
#1 从以后线程缓存中获取对应内存池PoolArena
#2 在以后线程内存池上分配内存
#3 线程缓存不存在,只能应用非池化内存分配内存了

PooledByteBufAllocator#threadCache是一个PoolThreadLocalCache实例,PoolThreadLocalCache继承于FastThreadLocal,FastThreadLocal这里简略了解为对ThreadLocal的优化,它为每个线程保护了一个PoolThreadCache,PoolThreadCache上关联了内存池。
当PoolThreadLocalCache上某个线程的PoolThreadCache不存在时,通过initialValue办法结构。

PoolThreadLocalCache#initialValue

protected synchronized PoolThreadCache initialValue() {
    // #1
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    // #2
    final Thread current = Thread.currentThread();
    if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
        final PoolThreadCache cache = new PoolThreadCache(
                heapArena, directArena, smallCacheSize, normalCacheSize,
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);

        ...
    }
    // No caching so just use 0 as sizes.
    return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0);
}

#1 从PooledByteBufAllocator的heapArenas,directArenas中获取使用率最小的PoolArena。
PooledByteBufAllocator结构时默认会为PooledByteBufAllocator#directArenas初始化8个PoolArena。
#2 结构PoolThreadCache。

PoolArena,能够了解为一个内存池,负责管理从操作系统中申请到的内存块。
PoolThreadCache为每一个线程关联一个PoolArena(PoolThreadCache#directArena),该线程的内存都在该PoolArena上调配。
Netty反对高并发零碎,可能有很多线程进行同时内存调配。为了缓解线程竞争,通过创立多个PoolArena细化锁的粒度,从而进步并发执行的效率。
留神,一个PoolArena能够会分给多个的线程,能够看到PoolArena上会有一些同步操作。

内存级别

后面剖析SizeClasses的文章说过,Netty将内存池中的内存块按大小划分为3个级别。
不同级别的内存块治理算法不同。默认划分规定如下:
small <= 28672(3.5K)
normal <= 16777216(2M)
huge > 16777216(2M)

smallSubpagePools是一个PoolSubpage数组,负责保护small级别的内存块信息。
PoolChunk负责保护normal级别的内存,PoolChunkList治理一组PoolChunk。
PoolArena按内存使用率将PoolChunk别离保护到6个PoolChunkList中,
PoolArena按内存使用率将PoolChunk别离保护到6个PoolChunkList中,
qInit->内存使用率为0~25,
q000->内存使用率为1~50,
q025->内存使用率为25~75,
q050->内存使用率为50~75,
q075->内存使用率为75~100,
q100->内存使用率为100。
留神:PoolChunk是Netty每次向操作系统申请的内存块。
PoolSubpage须要从PoolChunk中调配,而Tiny,Small级別的内存则是从PoolSubpage中调配。

上面来看一下调配过程

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // #1
    final int sizeIdx = size2SizeIdx(reqCapacity);
    // #2
    if (sizeIdx <= smallMaxSizeIdx) {
        tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
    } else if (sizeIdx < nSizes) {
        // #3
        tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
    } else {
        // #4
        int normCapacity = directMemoryCacheAlignment > 0
                ? normalizeSize(reqCapacity) : reqCapacity;
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, normCapacity);
    }
}

#1 size2SizeIdx是父类SizeClasses提供的办法,它应用特定算法,将申请的内存大小调整为标准大小,划分到对应地位,返回对应索引,可参考《内存对齐类SizeClasses》
#2 调配small级别的内存块
#3 调配normal级别的内存块
#4 调配huge级别的内存块

private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
                                 final int sizeIdx) {
    // #1
    if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {
        return;
    }

    // #2
    final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
    final boolean needsNormalAllocation;
    synchronized (head) {
        // #3
        final PoolSubpage<T> s = head.next;
        needsNormalAllocation = s == head;
        if (!needsNormalAllocation) {
            assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
            long handle = s.allocate();
            assert handle >= 0;
            s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
        }
    }
    // #4
    if (needsNormalAllocation) {
        synchronized (this) {
            allocateNormal(buf, reqCapacity, sizeIdx, cache);
        }
    }

    incSmallAllocation();
}

#1 首先尝试在线程缓存上调配。
除了PoolArena,PoolThreadCache#smallSubPageHeapCaches还为每个线程保护了Small级别的内存缓存
#2 应用后面SizeClasses#size2SizeIdx办法计算的索引,获取对应PoolSubpage
#3 留神,head是一个占位节点,并不存储数据,s==head示意以后存在能够用的PoolSubpage,因为曾经耗尽的PoolSubpage是会从链表中移除。
接着从PoolSubpage中分配内存,前面有文章解析具体过程
留神,这里必要运行在同步机制中。
#4 没有可用的PoolSubpage,须要申请一个Normal级别的内存块,再在下面调配所需内存

normal级别的内存也是先尝试在线程缓存中调配,调配失败后再调用allocateNormal办法申请
PoolArena#allocate -> allocateNormal

private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
    if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
        return;
    }

    // Add a new chunk.
    PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
    boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
    assert success;
    qInit.add(c);
}

#1 顺次从q050,q025,q000,qInit,q075上申请内存
为什么要是这个程序呢?

PoolArena中的PoolChunkList之间也组成一个“双向”链表

qInit ---> q000 <---> q025 <---> q050 <---> q075 <---> q100

PoolChunkList中还保护了minUsage,maxUsage,即当一个PoolChunk使用率大于maxUsage,它将被挪动到下一个PoolChunkList,使用率小于minUsage,则被挪动到前一个PoolChunkList。
留神:q000没有前置节点,它的minUsage为1,即下面的PoolChunk内存齐全开释后,将被销毁。
qInit的前置节点是它本人,但它的minUsage为Integer.MIN_VALUE,即便下面的PoolChunk内存齐全开释后,也不会被销毁,而是持续保留在内存。

不优先从q000调配,正是因为q000上的PoolChunk内存齐全开释后要被销毁,如果在下面调配,则会提早内存的回收进度。
而q075上因为内存利用率太高,导致内存调配的成功率大大降低,因而放到最初。
所以从q050是一个不错的抉择,这样大部分状况下,Chunk的利用率都会放弃在一个较高水平,进步整个利用的内存利用率;

在PoolChunkList上申请内存,PoolChunkList会遍历链表上PoolChunk节点,直到调配胜利或达到链表开端。
PoolChunk调配后,如果内存使用率高于maxUsage,它将被挪动到下一个PoolChunkList。

newChunk办法负责结构一个PoolChunk,这里是内存池向操作系统申请内存。
DirectArena#newChunk

protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx,
    int pageShifts, int chunkSize) {
    if (directMemoryCacheAlignment == 0) {
        return new PoolChunk<ByteBuffer>(this,
                allocateDirect(chunkSize), pageSize, pageShifts,
                chunkSize, maxPageIdx, 0);
    }
    final ByteBuffer memory = allocateDirect(chunkSize
            + directMemoryCacheAlignment);
    return new PoolChunk<ByteBuffer>(this, memory, pageSize,
            pageShifts, chunkSize, maxPageIdx,
            offsetCacheLine(memory));
}

allocateDirect办法向操作系统申请内存,取得一个(jvm)ByteBuffer,
PoolChunk#memory保护了该ByteBuffer,PoolChunk的内存实际上都是在该ByteBuffer上调配。

最初是huge级别的内存申请

private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
    PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
    activeBytesHuge.add(chunk.chunkSize());
    buf.initUnpooled(chunk, reqCapacity);
    allocationsHuge.increment();
}

比较简单,没有应用内存池,间接向操作系统申请内存。

内存开释

void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
    if (chunk.unpooled) {
        // #1
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        // #2
        SizeClass sizeClass = sizeClass(handle);
        if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }

        freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);
    }
}

#1 非池化内存,间接销毁内存
#2 池化内存,首先尝试加到线程缓存中,胜利则不须要其余操作。失败则调用freeChunk

void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,
               boolean finalizer) {
    final boolean destroyChunk;
    synchronized (this) {
        ...
        destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);
    }
    if (destroyChunk) {
        // destroyChunk not need to be called while holding the synchronized lock.
        destroyChunk(chunk);
    }
}

chunk.parent即PoolChunkList,PoolChunkList#free会调用PoolChunk开释内存,开释内存后,如果内存使用率低于minUsage,则挪动前一个PoolChunkList,如果前一个PoolChunkList不存在(q000),则返回false,由前面的步骤销毁该PoolChunk。
可回顾后面解析ByteBuf文章中对于内存销毁的内容。

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理