关于netty:Netty源码解析-内存池与PoolArena

3次阅读

共计 8426 个字符,预计需要花费 22 分钟才能阅读完成。

咱们晓得,Netty 应用间接内存实现 Netty 零拷贝以晋升性能,
但间接内存的创立和开释可能须要波及零碎调用,是比拟低廉的操作,如果每个申请都创立和开释一个间接内存,那性能必定是不能满足要求的。
这时就须要应用内存池。
即从零碎中申请一大块内存,再在下面调配每个申请所需的内存。

Netty 中的内存池次要波及 PoolArena,PoolChunk 与 PoolSubpage。
本文次要剖析 PoolArena 的作用与实现。
源码剖析基于 Netty 4.1.52

接口关系
ByteBufAllocator,内存分配器,负责为 ByteBuf 分配内存,线程平安。
PooledByteBufAllocator,池化内存分配器,默认的 ByteBufAllocator,事后从操作系统中申请一大块内存,在该内存上分配内存给 ByteBuf,能够进步性能和减小内存碎片。
UnPooledByteBufAllocator,非池化内存分配器,每次都从操作系统中申请内存。

RecvByteBufAllocator,接管内存分配器,为 Channel 读入的 IO 数据调配一块大小正当的 buffer 空间。具体性能交由外部接口 Handle 定义。
它次要是针对 Channel 读入场景增加一些操作,如 guess,incMessagesRead,lastBytesRead 等等。
ByteBuf,调配好的内存块,能够间接应用。

上面只关注 PooledByteBufAllocator,它是 Netty 中默认的内存分配器,也是了解 Netty 内存机制的难点。

内存调配

后面文章《ChannelPipeline 机制与读写过程》中剖析了数据读取过程,
NioByteUnsafe#read

public final void read() {
    ...
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;

    ...
    byteBuf = allocHandle.allocate(allocator);
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    ...
}

recvBufAllocHandle 办法返回 AdaptiveRecvByteBufAllocator.HandleImpl。(AdaptiveRecvByteBufAllocator,PooledByteBufAllocator 都在 DefaultChannelConfig 中初始化)

AdaptiveRecvByteBufAllocator.HandleImpl#allocate -> AbstractByteBufAllocator#ioBuffer -> PooledByteBufAllocator#directBuffer -> PooledByteBufAllocator#newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // #1
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        // #2
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // #3
        buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return toLeakAwareBuffer(buf);
}

AbstractByteBufAllocator#ioBuffer 办法会判断以后零碎是否反对 unsafe。反对时应用间接内存,不反对则应用堆内存。这里只关注间接内存的实现。
#1 从以后线程缓存中获取对应内存池 PoolArena
#2 在以后线程内存池上分配内存
#3 线程缓存不存在,只能应用非池化内存分配内存了

PooledByteBufAllocator#threadCache 是一个 PoolThreadLocalCache 实例,PoolThreadLocalCache 继承于 FastThreadLocal,FastThreadLocal 这里简略了解为对 ThreadLocal 的优化,它为每个线程保护了一个 PoolThreadCache,PoolThreadCache 上关联了内存池。
当 PoolThreadLocalCache 上某个线程的 PoolThreadCache 不存在时,通过 initialValue 办法结构。

PoolThreadLocalCache#initialValue

protected synchronized PoolThreadCache initialValue() {
    // #1
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    // #2
    final Thread current = Thread.currentThread();
    if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
        final PoolThreadCache cache = new PoolThreadCache(
                heapArena, directArena, smallCacheSize, normalCacheSize,
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);

        ...
    }
    // No caching so just use 0 as sizes.
    return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0);
}

#1 从 PooledByteBufAllocator 的 heapArenas,directArenas 中获取使用率最小的 PoolArena。
PooledByteBufAllocator 结构时默认会为 PooledByteBufAllocator#directArenas 初始化 8 个 PoolArena。
#2 结构 PoolThreadCache。

PoolArena,能够了解为一个内存池,负责管理从操作系统中申请到的内存块。
PoolThreadCache 为每一个线程关联一个 PoolArena(PoolThreadCache#directArena),该线程的内存都在该 PoolArena 上调配。
Netty 反对高并发零碎,可能有很多线程进行同时内存调配。为了缓解线程竞争,通过创立多个 PoolArena 细化锁的粒度,从而进步并发执行的效率。
留神,一个 PoolArena 能够会分给多个的线程,能够看到 PoolArena 上会有一些同步操作。

内存级别

后面剖析 SizeClasses 的文章说过,Netty 将内存池中的内存块按大小划分为 3 个级别。
不同级别的内存块治理算法不同。默认划分规定如下:
small <= 28672(3.5K)
normal <= 16777216(2M)
huge > 16777216(2M)

smallSubpagePools 是一个 PoolSubpage 数组,负责保护 small 级别的内存块信息。
PoolChunk 负责保护 normal 级别的内存,PoolChunkList 治理一组 PoolChunk。
PoolArena 按内存使用率将 PoolChunk 别离保护到 6 个 PoolChunkList 中,
PoolArena 按内存使用率将 PoolChunk 别离保护到 6 个 PoolChunkList 中,
qInit-> 内存使用率为 0~25,
q000-> 内存使用率为 1~50,
q025-> 内存使用率为 25~75,
q050-> 内存使用率为 50~75,
q075-> 内存使用率为 75~100,
q100-> 内存使用率为 100。
留神:PoolChunk 是 Netty 每次向操作系统申请的内存块。
PoolSubpage 须要从 PoolChunk 中调配,而 Tiny,Small 级別的内存则是从 PoolSubpage 中调配。

上面来看一下调配过程

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // #1
    final int sizeIdx = size2SizeIdx(reqCapacity);
    // #2
    if (sizeIdx <= smallMaxSizeIdx) {tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
    } else if (sizeIdx < nSizes) {
        // #3
        tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
    } else {
        // #4
        int normCapacity = directMemoryCacheAlignment > 0
                ? normalizeSize(reqCapacity) : reqCapacity;
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, normCapacity);
    }
}

#1 size2SizeIdx 是父类 SizeClasses 提供的办法,它应用特定算法,将申请的内存大小调整为标准大小,划分到对应地位,返回对应索引,可参考《内存对齐类 SizeClasses》
#2 调配 small 级别的内存块
#3 调配 normal 级别的内存块
#4 调配 huge 级别的内存块

private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
                                 final int sizeIdx) {
    // #1
    if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {return;}

    // #2
    final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
    final boolean needsNormalAllocation;
    synchronized (head) {
        // #3
        final PoolSubpage<T> s = head.next;
        needsNormalAllocation = s == head;
        if (!needsNormalAllocation) {assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
            long handle = s.allocate();
            assert handle >= 0;
            s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
        }
    }
    // #4
    if (needsNormalAllocation) {synchronized (this) {allocateNormal(buf, reqCapacity, sizeIdx, cache);
        }
    }

    incSmallAllocation();}

#1 首先尝试在线程缓存上调配。
除了 PoolArena,PoolThreadCache#smallSubPageHeapCaches 还为每个线程保护了 Small 级别的内存缓存
#2 应用后面 SizeClasses#size2SizeIdx 办法计算的索引,获取对应 PoolSubpage
#3 留神,head 是一个占位节点,并不存储数据,s==head 示意以后存在能够用的 PoolSubpage,因为曾经耗尽的 PoolSubpage 是会从链表中移除。
接着从 PoolSubpage 中分配内存,前面有文章解析具体过程
留神,这里必要运行在同步机制中。
#4 没有可用的 PoolSubpage,须要申请一个 Normal 级别的内存块,再在下面调配所需内存

normal 级别的内存也是先尝试在线程缓存中调配,调配失败后再调用 allocateNormal 办法申请
PoolArena#allocate -> allocateNormal

private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {return;}

    // Add a new chunk.
    PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
    boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
    assert success;
    qInit.add(c);
}

#1 顺次从 q050,q025,q000,qInit,q075 上申请内存
为什么要是这个程序呢?

PoolArena 中的 PoolChunkList 之间也组成一个“双向”链表

qInit ---> q000 <---> q025 <---> q050 <---> q075 <---> q100

PoolChunkList 中还保护了 minUsage,maxUsage,即当一个 PoolChunk 使用率大于 maxUsage,它将被挪动到下一个 PoolChunkList,使用率小于 minUsage,则被挪动到前一个 PoolChunkList。
留神:q000 没有前置节点,它的 minUsage 为 1,即下面的 PoolChunk 内存齐全开释后,将被销毁。
qInit 的前置节点是它本人,但它的 minUsage 为 Integer.MIN_VALUE,即便下面的 PoolChunk 内存齐全开释后,也不会被销毁,而是持续保留在内存。

不优先从 q000 调配,正是因为 q000 上的 PoolChunk 内存齐全开释后要被销毁,如果在下面调配,则会提早内存的回收进度。
而 q075 上因为内存利用率太高,导致内存调配的成功率大大降低,因而放到最初。
所以从 q050 是一个不错的抉择,这样大部分状况下,Chunk 的利用率都会放弃在一个较高水平,进步整个利用的内存利用率;

在 PoolChunkList 上申请内存,PoolChunkList 会遍历链表上 PoolChunk 节点,直到调配胜利或达到链表开端。
PoolChunk 调配后,如果内存使用率高于 maxUsage,它将被挪动到下一个 PoolChunkList。

newChunk 办法负责结构一个 PoolChunk,这里是内存池向操作系统申请内存。
DirectArena#newChunk

protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx,
    int pageShifts, int chunkSize) {if (directMemoryCacheAlignment == 0) {
        return new PoolChunk<ByteBuffer>(this,
                allocateDirect(chunkSize), pageSize, pageShifts,
                chunkSize, maxPageIdx, 0);
    }
    final ByteBuffer memory = allocateDirect(chunkSize
            + directMemoryCacheAlignment);
    return new PoolChunk<ByteBuffer>(this, memory, pageSize,
            pageShifts, chunkSize, maxPageIdx,
            offsetCacheLine(memory));
}

allocateDirect 办法向操作系统申请内存,取得一个(jvm)ByteBuffer,
PoolChunk#memory 保护了该 ByteBuffer,PoolChunk 的内存实际上都是在该 ByteBuffer 上调配。

最初是 huge 级别的内存申请

private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
    activeBytesHuge.add(chunk.chunkSize());
    buf.initUnpooled(chunk, reqCapacity);
    allocationsHuge.increment();}

比较简单,没有应用内存池,间接向操作系统申请内存。

内存开释

void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {if (chunk.unpooled) {
        // #1
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();} else {
        // #2
        SizeClass sizeClass = sizeClass(handle);
        if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }

        freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);
    }
}

#1 非池化内存,间接销毁内存
#2 池化内存,首先尝试加到线程缓存中,胜利则不须要其余操作。失败则调用 freeChunk

void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,
               boolean finalizer) {
    final boolean destroyChunk;
    synchronized (this) {
        ...
        destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);
    }
    if (destroyChunk) {
        // destroyChunk not need to be called while holding the synchronized lock.
        destroyChunk(chunk);
    }
}

chunk.parent 即 PoolChunkList,PoolChunkList#free 会调用 PoolChunk 开释内存,开释内存后,如果内存使用率低于 minUsage,则挪动前一个 PoolChunkList,如果前一个 PoolChunkList 不存在 (q000),则返回 false,由前面的步骤销毁该 PoolChunk。
可回顾后面解析 ByteBuf 文章中对于内存销毁的内容。

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!

正文完
 0