后面文章说了PoolChunk如何治理Normal内存块,本文分享PoolSubpage如何治理Small内存块。
源码剖析基于Netty 4.1.52

内存治理算法

PoolSubpage负责管理Small内存块。一个PoolSubpage中的内存块size都雷同,该size对应SizeClasses#sizeClasses表格的一个索引index。
新创建的PoolSubpage都必须退出到PoolArena#smallSubpagePools[index]链表中。
PoolArena#smallSubpagePools是一个PoolSubpage数组,数组中每个元素都是一个PoolSubpage链表,PoolSubpage之间能够通过next,prev组成链表。
感兴趣的同学能够参考《内存对齐类SizeClasses》。

留神,Small内存size并不一定小于pageSize(默认为8K)
默认Small内存size <= 28672(28KB)
对于Normal内存块,Small内存块,pageSize,可参考《PoolChunk实现原理》。

PoolSubpage实际上就是PoolChunk中的一个Normal内存块,大小为其治理的内存块size与pageSize最小公倍数。
PoolSubpage应用位图的形式治理内存块。
PoolSubpage#bitmap是一个long数组,其中每个long元素上每个bit位都能够代表一个内存块是否应用。

内存调配

调配Small内存块有两个步骤

  1. PoolChunk中调配PoolSubpage。

如果PoolArena#smallSubpagePools中曾经有对应的PoolSubpage缓冲,则不须要该步骤。

  1. PoolSubpage上分配内存块

PoolChunk#allocateSubpage

private long allocateSubpage(int sizeIdx) {    // #1    PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);    synchronized (head) {        //allocate a new run        // #2        int runSize = calculateRunSize(sizeIdx);        //runSize must be multiples of pageSize        // #3        long runHandle = allocateRun(runSize);        if (runHandle < 0) {            return -1;        }        // #4        int runOffset = runOffset(runHandle);        int elemSize = arena.sizeIdx2size(sizeIdx);        PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,                           runSize(pageShifts, runHandle), elemSize);        subpages[runOffset] = subpage;        // #5        return subpage.allocate();    }}

#1 这里波及批改PoolArena#smallSubpagePools中的PoolSubpage链表,须要同步操作
#2 计算内存块size和pageSize最小公倍数
#3 调配一个Normal内存块,作为PoolSubpage的底层内存块,大小为Small内存块size和pageSize最小公倍数
#4 构建PoolSubpage
runOffset,即Normal内存块偏移量,也是该PoolSubpage在整个Chunk中的偏移量
elemSize,Small内存块size
#5 在subpage上分配内存块

PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int pageShifts, int runOffset, int runSize, int elemSize) {    // #1    this.chunk = chunk;    this.pageShifts = pageShifts;    this.runOffset = runOffset;    this.runSize = runSize;    this.elemSize = elemSize;    bitmap = new long[runSize >>> 6 + LOG2_QUANTUM]; // runSize / 64 / QUANTUM    init(head, elemSize);}void init(PoolSubpage<T> head, int elemSize) {    doNotDestroy = true;    if (elemSize != 0) {        // #2        maxNumElems = numAvail = runSize / elemSize;        nextAvail = 0;        bitmapLength = maxNumElems >>> 6;        if ((maxNumElems & 63) != 0) {            bitmapLength ++;        }        for (int i = 0; i < bitmapLength; i ++) {            bitmap[i] = 0;        }    }    // #3    addToPool(head);}

#1 bitmap长度为runSize / 64 / QUANTUM,从《内存对齐类SizeClasses》能够看到,runSize都是2^LOG2_QUANTUM的倍数。

#2
elemSize:每个内存块的大小
maxNumElems:内存块数量
bitmapLength:bitmap应用的long元素个数,应用bitmap中一部分元素足以治理全副内存块。
(maxNumElems & 63) != 0,代表maxNumElems不能整除64,所以bitmapLength要加1,用于治理余下的内存块。
#3 增加到PoolSubpage链表中

后面剖析《Netty内存池与PoolArena》中说过,在PoolArena中调配Small内存块时,首先会从PoolArena#smallSubpagePools中查找对应的PoolSubpage。如果找到了,间接从该PoolSubpage上分配内存。否则,调配一个Normal内存块,创立PoolSubpage,再在下面分配内存块。

PoolSubpage#allocate

long allocate() {    // #1    if (numAvail == 0 || !doNotDestroy) {        return -1;    }    // #2    final int bitmapIdx = getNextAvail();    // #3    int q = bitmapIdx >>> 6;    int r = bitmapIdx & 63;    assert (bitmap[q] >>> r & 1) == 0;    bitmap[q] |= 1L << r;    // #4    if (-- numAvail == 0) {        removeFromPool();    }    // #5    return toHandle(bitmapIdx);}

#1 没有可用内存块,调配失败。通常PoolSubpage调配实现后会从PoolArena#smallSubpagePools中移除,不再在该PoolSubpage上分配内存,所以个别不会呈现这种场景。
#2 获取下一个可用内存块的bit下标
#3 设置对应bit为1,即已应用
bitmapIdx >>> 6,获取该内存块在bitmap数组中第q元素
bitmapIdx & 63,获取该内存块是bitmap数组中第q个元素的第r个bit位
bitmap[q] |= 1L << r,将bitmap数组中第q个元素的第r个bit位设置为1,示意曾经应用
#4 所有内存块已调配了,则将其从PoolArena中移除。
#5 toHandle 转换为最终的handle

private int getNextAvail() {    int nextAvail = this.nextAvail;    if (nextAvail >= 0) {        this.nextAvail = -1;        return nextAvail;    }    return findNextAvail();}

nextAvail为初始值或free时开释的值。
如果nextAvail存在,设置为不可用后间接返回该值。
如果不存在,调用findNextAvail查找下一个可用内存块。

private int findNextAvail() {    final long[] bitmap = this.bitmap;    final int bitmapLength = this.bitmapLength;    // #1    for (int i = 0; i < bitmapLength; i ++) {        long bits = bitmap[i];        if (~bits != 0) {            return findNextAvail0(i, bits);        }    }    return -1;}private int findNextAvail0(int i, long bits) {    final int maxNumElems = this.maxNumElems;    final int baseVal = i << 6;    // #2    for (int j = 0; j < 64; j ++) {        if ((bits & 1) == 0) {            int val = baseVal | j;            if (val < maxNumElems) {                return val;            } else {                break;            }        }        bits >>>= 1;    }    return -1;}

#1 遍历bitmap,~bits != 0,示意存在一个bit位不为1,即存在可用内存块。
#2 遍历64个bit位,
(bits & 1) == 0,查看最低bit位是否为0(可用),为0则返回val。
val等于 (i << 6) | j,即i * 64 + j,该bit位在bitmap中是第几个bit位。
bits >>>= 1,右移一位,解决下一个bit位。

内存开释

开释Small内存块可能有两个步骤

  1. 开释PoolSubpage的上内存块
  2. 如果PoolSubpage中的内存块已全副开释,则从Chunk中开释该PoolSubpage,同时从PoolArena#smallSubpagePools移除它。

PoolSubpage#free

boolean free(PoolSubpage<T> head, int bitmapIdx) {    if (elemSize == 0) {        return true;    }    // #1    int q = bitmapIdx >>> 6;    int r = bitmapIdx & 63;    assert (bitmap[q] >>> r & 1) != 0;    bitmap[q] ^= 1L << r;    setNextAvail(bitmapIdx);    // #2    if (numAvail ++ == 0) {        addToPool(head);        return true;    }    // #3    if (numAvail != maxNumElems) {        return true;    } else {        // #4        if (prev == next) {            // Do not remove if this subpage is the only one left in the pool.            return true;        }        // #5        doNotDestroy = false;        removeFromPool();        return false;    }}

#1 将对应bit位设置为能够应用
#2 在PoolSubpage的内存块全副被应用时,开释了某个内存块,这时重新加入到PoolArena中。
#3 未齐全开释,即还存在已分配内存块,返回true
#4 逻辑到这里,是解决所有内存块曾经齐全开释的场景。
PoolArena#smallSubpagePools链表组成双向链表,链表中只有head和以后PoolSubpage时,以后PoolSubpage的prev,next都指向head。
这时以后PoolSubpage是PoolArena中该链表最初一个PoolSubpage,不开释该PoolSubpage,以便下次申请内存时间接从该PoolSubpage上调配。
#5 从PoolArena中移除,并返回false,这时PoolChunk会将开释对应Page节点。

void free(long handle, int normCapacity, ByteBuffer nioBuffer) {    if (isSubpage(handle)) {        // #1        int sizeIdx = arena.size2SizeIdx(normCapacity);        PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);        PoolSubpage<T> subpage = subpages[runOffset(handle)];        assert subpage != null && subpage.doNotDestroy;        synchronized (head) {            // #2            if (subpage.free(head, bitmapIdx(handle))) {                //the subpage is still used, do not free it                return;            }        }    }    // #3    ...}

#1
查找head节点,同步
#2
调用subpage#free开释Small内存块
如果subpage#free返回false,将持续向下执行,这时会开释PoolSubpage整个内存块,否则,不开释PoolSubpage内存块。
#3 开释Normal内存块,就是开释PoolSubpage整个内存块。该局部内容可参考《PoolChunk实现原理》。

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!