关于netty:Netty源码解析-PoolSubpage实现原理

11次阅读

共计 5421 个字符,预计需要花费 14 分钟才能阅读完成。

后面文章说了 PoolChunk 如何治理 Normal 内存块,本文分享 PoolSubpage 如何治理 Small 内存块。
源码剖析基于 Netty 4.1.52

内存治理算法

PoolSubpage 负责管理 Small 内存块。一个 PoolSubpage 中的内存块 size 都雷同,该 size 对应 SizeClasses#sizeClasses 表格的一个索引 index。
新创建的 PoolSubpage 都必须退出到 PoolArena#smallSubpagePools[index]链表中。
PoolArena#smallSubpagePools 是一个 PoolSubpage 数组,数组中每个元素都是一个 PoolSubpage 链表,PoolSubpage 之间能够通过 next,prev 组成链表。
感兴趣的同学能够参考《内存对齐类 SizeClasses》。

留神,Small 内存 size 并不一定小于 pageSize(默认为 8K)
默认 Small 内存 size <= 28672(28KB)
对于 Normal 内存块,Small 内存块,pageSize,可参考《PoolChunk 实现原理》。

PoolSubpage 实际上就是 PoolChunk 中的一个 Normal 内存块,大小为其治理的内存块 size 与 pageSize 最小公倍数。
PoolSubpage 应用位图的形式治理内存块。
PoolSubpage#bitmap 是一个 long 数组,其中每个 long 元素上每个 bit 位都能够代表一个内存块是否应用。

内存调配

调配 Small 内存块有两个步骤

  1. PoolChunk 中调配 PoolSubpage。

如果 PoolArena#smallSubpagePools 中曾经有对应的 PoolSubpage 缓冲,则不须要该步骤。

  1. PoolSubpage 上分配内存块

PoolChunk#allocateSubpage

private long allocateSubpage(int sizeIdx) {
    // #1
    PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
    synchronized (head) {
        //allocate a new run
        // #2
        int runSize = calculateRunSize(sizeIdx);
        //runSize must be multiples of pageSize
        // #3
        long runHandle = allocateRun(runSize);
        if (runHandle < 0) {return -1;}
        // #4
        int runOffset = runOffset(runHandle);
        int elemSize = arena.sizeIdx2size(sizeIdx);

        PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
                           runSize(pageShifts, runHandle), elemSize);

        subpages[runOffset] = subpage;
        // #5
        return subpage.allocate();}
}

#1 这里波及批改 PoolArena#smallSubpagePools 中的 PoolSubpage 链表,须要同步操作
#2 计算内存块 size 和 pageSize 最小公倍数
#3 调配一个 Normal 内存块,作为 PoolSubpage 的底层内存块,大小为 Small 内存块 size 和 pageSize 最小公倍数
#4 构建 PoolSubpage
runOffset,即 Normal 内存块偏移量,也是该 PoolSubpage 在整个 Chunk 中的偏移量
elemSize,Small 内存块 size
#5 在 subpage 上分配内存块

PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int pageShifts, int runOffset, int runSize, int elemSize) {
    // #1
    this.chunk = chunk;
    this.pageShifts = pageShifts;
    this.runOffset = runOffset;
    this.runSize = runSize;
    this.elemSize = elemSize;
    bitmap = new long[runSize >>> 6 + LOG2_QUANTUM]; // runSize / 64 / QUANTUM
    init(head, elemSize);
}

void init(PoolSubpage<T> head, int elemSize) {
    doNotDestroy = true;
    if (elemSize != 0) {
        // #2
        maxNumElems = numAvail = runSize / elemSize;
        nextAvail = 0;
        bitmapLength = maxNumElems >>> 6;
        if ((maxNumElems & 63) != 0) {bitmapLength ++;}

        for (int i = 0; i < bitmapLength; i ++) {bitmap[i] = 0;
        }
    }
    // #3
    addToPool(head);
}

#1 bitmap 长度为 runSize / 64 / QUANTUM,从《内存对齐类 SizeClasses》能够看到,runSize 都是 2^LOG2_QUANTUM 的倍数。

#2
elemSize:每个内存块的大小
maxNumElems:内存块数量
bitmapLength:bitmap 应用的 long 元素个数,应用 bitmap 中一部分元素足以治理全副内存块。
(maxNumElems & 63) != 0,代表 maxNumElems 不能整除 64,所以 bitmapLength 要加 1,用于治理余下的内存块。
#3 增加到 PoolSubpage 链表中

后面剖析《Netty 内存池与 PoolArena》中说过,在 PoolArena 中调配 Small 内存块时,首先会从 PoolArena#smallSubpagePools 中查找对应的 PoolSubpage​。如果找到了,间接从该 PoolSubpage​上分配内存。否则,调配一个 Normal 内存块,创立 PoolSubpage​,再在下面分配内存块。

PoolSubpage#allocate

long allocate() {
    // #1
    if (numAvail == 0 || !doNotDestroy) {return -1;}
    // #2
    final int bitmapIdx = getNextAvail();
    // #3
    int q = bitmapIdx >>> 6;
    int r = bitmapIdx & 63;
    assert (bitmap[q] >>> r & 1) == 0;
    bitmap[q] |= 1L << r;
    // #4
    if (-- numAvail == 0) {removeFromPool();
    }
    // #5
    return toHandle(bitmapIdx);
}

#1 没有可用内存块,调配失败。通常 PoolSubpage 调配实现后会从 PoolArena#smallSubpagePools 中移除,不再在该 PoolSubpage 上分配内存,所以个别不会呈现这种场景。
#2 获取下一个可用内存块的 bit 下标
#3 设置对应 bit 为 1,即已应用
bitmapIdx >>> 6,获取该内存块在 bitmap 数组中第 q 元素
bitmapIdx & 63,获取该内存块是 bitmap 数组中第 q 个元素的第 r 个 bit 位
bitmap[q] |= 1L << r,将 bitmap 数组中第 q 个元素的第 r 个 bit 位设置为 1,示意曾经应用
#4 所有内存块已调配了,则将其从 PoolArena 中移除。
#5 toHandle 转换为最终的 handle

private int getNextAvail() {
    int nextAvail = this.nextAvail;
    if (nextAvail >= 0) {
        this.nextAvail = -1;
        return nextAvail;
    }
    return findNextAvail();}

nextAvail 为初始值或 free 时开释的值。
如果 nextAvail 存在,设置为不可用后间接返回该值。
如果不存在,调用 findNextAvail 查找下一个可用内存块。

private int findNextAvail() {final long[] bitmap = this.bitmap;
    final int bitmapLength = this.bitmapLength;
    // #1
    for (int i = 0; i < bitmapLength; i ++) {long bits = bitmap[i];
        if (~bits != 0) {return findNextAvail0(i, bits);
        }
    }
    return -1;
}

private int findNextAvail0(int i, long bits) {
    final int maxNumElems = this.maxNumElems;
    final int baseVal = i << 6;

    // #2
    for (int j = 0; j < 64; j ++) {if ((bits & 1) == 0) {
            int val = baseVal | j;
            if (val < maxNumElems) {return val;} else {break;}
        }
        bits >>>= 1;
    }
    return -1;
}

#1 遍历 bitmap,~bits != 0,示意存在一个 bit 位不为 1,即存在可用内存块。
#2 遍历 64 个 bit 位,
(bits & 1) == 0,查看最低 bit 位是否为 0(可用),为 0 则返回 val。
val 等于 (i << 6) | j,即i * 64 + j,该 bit 位在 bitmap 中是第几个 bit 位。
bits >>>= 1,右移一位,解决下一个 bit 位。

内存开释

开释 Small 内存块可能有两个步骤

  1. 开释 PoolSubpage 的上内存块
  2. 如果 PoolSubpage 中的内存块已全副开释,则从 Chunk 中开释该 PoolSubpage,同时从 PoolArena#smallSubpagePools 移除它。

PoolSubpage#free

boolean free(PoolSubpage<T> head, int bitmapIdx) {if (elemSize == 0) {return true;}
    // #1
    int q = bitmapIdx >>> 6;
    int r = bitmapIdx & 63;
    assert (bitmap[q] >>> r & 1) != 0;
    bitmap[q] ^= 1L << r;

    setNextAvail(bitmapIdx);
    // #2
    if (numAvail ++ == 0) {addToPool(head);
        return true;
    }

    // #3
    if (numAvail != maxNumElems) {return true;} else {
        // #4
        if (prev == next) {
            // Do not remove if this subpage is the only one left in the pool.
            return true;
        }

        // #5
        doNotDestroy = false;
        removeFromPool();
        return false;
    }
}

#1 将对应 bit 位设置为能够应用
#2 在 PoolSubpage 的内存块全副被应用时,开释了某个内存块,这时重新加入到 PoolArena 中。
#3 未齐全开释,即还存在已分配内存块,返回 true
#4 逻辑到这里,是解决所有内存块曾经齐全开释的场景。
PoolArena#smallSubpagePools 链表组成双向链表,链表中只有 head 和以后 PoolSubpage 时,以后 PoolSubpage 的 prev,next 都指向 head。
这时以后​PoolSubpage 是 PoolArena 中该链表最初一个 PoolSubpage,不开释该 PoolSubpage,以便下次申请内存时间接从该 PoolSubpage 上调配。
#5 从 PoolArena 中移除,并返回 false,这时 PoolChunk 会将开释对应 Page 节点。

void free(long handle, int normCapacity, ByteBuffer nioBuffer) {if (isSubpage(handle)) {
        // #1
        int sizeIdx = arena.size2SizeIdx(normCapacity);
        PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);

        PoolSubpage<T> subpage = subpages[runOffset(handle)];
        assert subpage != null && subpage.doNotDestroy;

        synchronized (head) {
            // #2
            if (subpage.free(head, bitmapIdx(handle))) {
                //the subpage is still used, do not free it
                return;
            }
        }
    }

    // #3
    ...
}

#1
查找 head 节点,同步
#2
调用 subpage#free 开释 Small 内存块
如果 subpage#free 返回 false,将持续向下执行,这时会开释 PoolSubpage 整个内存块,否则,不开释 PoolSubpage 内存块。
#3 开释 Normal 内存块,就是开释 PoolSubpage 整个内存块。该局部内容可参考《PoolChunk 实现原理》。

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!

正文完
 0