本文次要分享Netty中PoolChunk如何治理内存。
源码剖析基于Netty 4.1.52

内存治理算法

首先阐明PoolChunk内存组织形式。
PoolChunk的内存大小默认是16M,Netty将它划分为2048个page,每个page为8K。
PoolChunk上能够调配Small内存块。
Normal内存块大小必须是page的倍数。

PoolChunk通过runsAvail字段治理内存块。
runsAvail是PriorityQueue<Long>数组,其中PriorityQueue寄存的是handle。
handle能够了解为一个句柄,保护一个内存块的信息,由以下局部组成

  • o: runOffset ,在chunk中page偏移索引,从0开始,15bit
  • s: size,以后地位可调配的page数量,15bit
  • u: isUsed,是否应用?, 1bit
  • e: isSubpage,是否在subpage中, 1bit
  • b: bitmapIdx,内存块在subpage中的索引,不在subpage则为0, 32bit

后面《内存对齐类SizeClasses》文章说过,SizeClasses将sizeClasses表格中isMultipageSize为1的行取出能够组成一个新表格,这里称为Page表格

runsAvail数组默认长度为40,每个地位index上放的handle代表了存在一个可用内存块,并且可调配pageSize大于等于(pageIdx=index)上的pageSize,小于(pageIdex=index+1)的pageSize。
如runsAvail[11]上的handle的size可调配pageSize可能为16 ~ 19,
如果runsAvail[11]上handle的size为18,如果该handle调配了7个page,剩下的11个page,这时要将handle挪动runsAvail[8](当然,handle的信息要调整)。
这时如果要找调配6个page,就能够从runsAvail[5]开始查找runsAvail数组,如果后面runsAvail[5]~runsAvail[7]都没有handle,就找到了runsAvail[8]。
调配6个page之后,剩下的5个page,handle挪动runsAvail[4]。

先看一下PoolChunk的构造函数

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx, int offset) {    // #1    unpooled = false;    this.arena = arena;    this.memory = memory;    this.pageSize = pageSize;    this.pageShifts = pageShifts;    this.chunkSize = chunkSize;    this.offset = offset;    freeBytes = chunkSize;    runsAvail = newRunsAvailqueueArray(maxPageIdx);    runsAvailMap = new IntObjectHashMap<Long>();    subpages = new PoolSubpage[chunkSize >> pageShifts];    // #2    int pages = chunkSize >> pageShifts;    long initHandle = (long) pages << SIZE_SHIFT;    insertAvailRun(0, pages, initHandle);    cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);}

unpooled: 是否应用内存池
arena:该PoolChunk所属的PoolArena
memory:底层的内存块,对于堆内存,它是一个byte数组,对于间接内存,它是(jvm)ByteBuffer,但无论是哪种模式,其内存大小默认都是16M。
pageSize:page大小,默认为8K。
chunkSize:整个PoolChunk的内存大小,默认为16777216,即16M。
offset:底层内存对齐偏移量,默认为0。
runsAvail:初始化runsAvail
runsAvailMap:记录了每个内存块开始地位和完结地位的runOffset和handle映射。

#2 insertAvailRun办法在runsAvail数组最初地位插入一个handle,该handle代表page偏移地位为0的中央能够调配16M的内存块

内存调配

PoolChunk#allocate

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {    final long handle;    // #1    if (sizeIdx <= arena.smallMaxSizeIdx) {        // small        handle = allocateSubpage(sizeIdx);        if (handle < 0) {            return false;        }        assert isSubpage(handle);    } else {        // #2        int runSize = arena.sizeIdx2size(sizeIdx);        handle = allocateRun(runSize);        if (handle < 0) {            return false;        }    }    // #3    ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;    initBuf(buf, nioBuffer, handle, reqCapacity, cache);    return true;}

#1 解决Small内存块申请,调用allocateSubpage办法解决,后续文章解析。
#2 解决Normal内存块申请
sizeIdx2size办法依据内存块索引查找对应内存块size。sizeIdx2size是PoolArena父类SizeClasses提供的办法,可参考系列文章《内存对齐类SizeClasses》。
allocateRun办法负责调配Normal内存块,返回handle存储了调配的内存块大小和偏移量。

#3 应用handle和底层内存类(ByteBuffer)初始化ByteBuf了。

private long allocateRun(int runSize) {    // #1    int pages = runSize >> pageShifts;    // #2    int pageIdx = arena.pages2pageIdx(pages);    synchronized (runsAvail) {        //find first queue which has at least one big enough run        // #3        int queueIdx = runFirstBestFit(pageIdx);        if (queueIdx == -1) {            return -1;        }        //get run with min offset in this queue        PriorityQueue<Long> queue = runsAvail[queueIdx];        long handle = queue.poll();        assert !isUsed(handle);        // #4        removeAvailRun(queue, handle);        // #5        if (handle != -1) {            handle = splitLargeRun(handle, pages);        }        // #6        freeBytes -= runSize(pageShifts, handle);        return handle;    }}

#1 计算所需的page数量
#2 计算对应的pageIdx
留神,pages2pageIdx办法会将申请内存大小对齐为上述Page表格中的一个size。例如申请172032字节(21个page)的内存块,pages2pageIdx办法计算结果为13,理论调配196608(24个page)的内存块。
#3 从pageIdx开始遍历runsAvail,找到第一个handle。
该handle上能够调配所需内存块。
#4 从runsAvail,runsAvailMap移除该handle信息
#5#3步骤找到的handle上划分出所要的内存块。
#6 缩小可用内存字节数

private long splitLargeRun(long handle, int needPages) {    assert needPages > 0;    // #1    int totalPages = runPages(handle);    assert needPages <= totalPages;    int remPages = totalPages - needPages;    // #2     if (remPages > 0) {        int runOffset = runOffset(handle);        // keep track of trailing unused pages for later use        int availOffset = runOffset + needPages;        long availRun = toRunHandle(availOffset, remPages, 0);        insertAvailRun(availOffset, remPages, availRun);        // not avail        return toRunHandle(runOffset, needPages, 1);    }    //mark it as used    handle |= 1L << IS_USED_SHIFT;    return handle;}

#1 totalPages,从handle中获取以后地位可用page数。
remPages,调配后残余page数。
#2 残余page数大于0
availOffset,计算残余page开始偏移量
生成一个新的handle,availRun
insertAvailRun将availRun插入到runsAvail,runsAvailMap中

内存开释

void free(long handle, int normCapacity, ByteBuffer nioBuffer) {    ...    // #1    int pages = runPages(handle);    synchronized (runsAvail) {        // collapse continuous runs, successfully collapsed runs        // will be removed from runsAvail and runsAvailMap        // #2        long finalRun = collapseRuns(handle);        // #3        finalRun &= ~(1L << IS_USED_SHIFT);        //if it is a subpage, set it to run        finalRun &= ~(1L << IS_SUBPAGE_SHIFT);        insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);        freeBytes += pages << pageShifts;    }    if (nioBuffer != null && cachedNioBuffers != null &&        cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {        cachedNioBuffers.offer(nioBuffer);    }}

#1 计算开释的page数
#2 如果能够,将前后的可用内存块进行合并
#3 插入新的handle

collapseRuns

private long collapseRuns(long handle) {    return collapseNext(collapsePast(handle));}

collapsePast办法合并后面的可用内存块
collapseNext办法合并前面的可用内存块

private long collapseNext(long handle) {    for (;;) {        // #1        int runOffset = runOffset(handle);        int runPages = runPages(handle);        Long nextRun = getAvailRunByOffset(runOffset + runPages);        if (nextRun == null) {            return handle;        }        int nextOffset = runOffset(nextRun);        int nextPages = runPages(nextRun);        //is continuous        // #2        if (nextRun != handle && runOffset + runPages == nextOffset) {            //remove next run            removeAvailRun(nextRun);            handle = toRunHandle(runOffset, runPages + nextPages, 0);        } else {            return handle;        }    }}

#1 getAvailRunByOffset办法从runsAvailMap中找到下一个内存块的handle。
#2 如果是间断的内存块,则移除下一个内存块handle,并将其page合并生成一个新的handle。

上面来看一个例子

大家能够联合例子中runsAvail和内存应用状况的变动,了解下面的代码。
实际上,2个Page的内存块是通过Subpage调配,回收时会放回线程缓存中而不是间接开释存块,但为了展现PoolChunk中内存治理过程,图中不思考这些场景。

PoolChunk在Netty 4.1.52版本批改了算法,引入了jemalloc 4的算法 -- https://github.com/netty/nett...
Netty 4.1.52之前的版本,PoolChunk引入的是jemalloc 3的算法,应用二叉树治理内存块。有趣味的同学能够参考我后续的文章《PoolChunk实现(jemalloc 3的算法)》

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!