共计 9398 个字符,预计需要花费 24 分钟才能阅读完成。
上一篇文章分享了 PoolArena 如何通过 PoolChunk,PoolSubpage 治理内存。
本文则分享 PoolChunk,PoolSubpage 中如何分配内存。
源码剖析基于 Netty 4.1
首先阐明 PoolChunk 内存组织形式。
PoolChunk 的内存大小默认是 16M,它将内存组织成为一颗完满二叉树。
二叉树的每一层每个节点所代表的内存大小都是均等的,并且每一层节点所代表的内存大小总和加起来都是 16M。
每一层节点可分配内存是父节点的 1 /2。整颗二叉树的总层数为 12,层数从 0 开始。
示意图如下
(下标 -> 层数,最大可调配块大小)
先看一下 PoolChunk 的构造函数
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
unpooled = false;
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.maxOrder = maxOrder;
this.chunkSize = chunkSize;
this.offset = offset;
unusable = (byte) (maxOrder + 1);
log2ChunkSize = log2(chunkSize);
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;
assert maxOrder < 30 : "maxOrder should be < 30, but is:" + maxOrder;
maxSubpageAllocs = 1 << maxOrder;
// Generate the memory map.
memoryMap = new byte[maxSubpageAllocs << 1];
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
subpages = newSubpageArray(maxSubpageAllocs);
}
unpooled:是否应用内存池
arena:该 PoolChunk 所属的 PoolArena
memory:底层的内存块,对于堆内存,它是一个 byte 数组,对于间接内存,它是(jvm)ByteBuffer,但无论是哪种模式,其内存大小默认都是 16M。
pageSize:叶子节点大小,默认为 8192,即 8K。
maxOrder:示意二叉树最大的层数,从 0 开始。默认为 11。
chunkSize:整个 PoolChunk 的内存大小,默认为 16777216,即 16M。
offset:为了内存对齐而应用的偏移数量,默认为 0。
unusable:示意节点已被调配,不必了,默认为 12。
freeBytes:闲暇内存字节数。
每个 PoolChunk 都要按内存使用率关联到一个 PoolChunkList 上,内存使用率正是通过 freeBytes 计算。
maxSubpageAllocs:叶子节点数量,默认为 2048,即 2^11。
log2ChunkSize:用于计算偏移量,默认为 24。
subpageOverflowMask:用于判断申请内存是否为 PoolSubpage,默认为 -8192。
pageShifts:用于计算分配内存所在二叉树层数,默认为 13。
memoryMap:初始化内存治理二叉树,将每一层节点值设置为层数 d。
应用数组保护二叉树,第 d 层的开始下标为 1<<d
。(数组第 0 个元素不应用)。
depthMap:保留二叉树的层数,用于通过地位下标找到其在整棵树中对应的层数。
留神:depthMap 的值代表二叉树的层数,初始化后不再变动。
memoryMap 的值代表以后节点最大可申请内存块,在分配内存过程中一直变动。
节点最大可申请内存块能够通过层数 d 计算,即2 ^ (pageShifts + maxOrder - d)
。
PoolChunk#allocate
long allocate(int normCapacity) {if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity);
} else {return allocateSubpage(normCapacity);
}
}
若申请内存大于 pageSize,调用 allocateRun 办法调配 Chunk 级别的内存。
否则调用 allocateSubpage 办法调配 PoolSubpage,再在 PoolSubpage 上调配所需内存。
PoolChunk#allocateRun
private long allocateRun(int normCapacity) {
// #1
int d = maxOrder - (log2(normCapacity) - pageShifts);
// #2
int id = allocateNode(d);
if (id < 0) {return id;}
// #2
freeBytes -= runLength(id);
return id;
}
#1
计算应该在哪层调配分配内存maxOrder - (log2(normCapacity) - pageShifts)
,如 16K,即 2^14,计算结果为 10,即在 10 层调配。#2
缩小闲暇内存字节数。
PoolChunk#allocateNode,在 d 层调配一个节点
private int allocateNode(int d) {
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
// #1
byte val = value(id);
if (val > d) { // unusable
return -1;
}
// #2
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
// #3
id <<= 1;
val = value(id);
// #4
if (val > d) {
// #5
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
// #6
setValue(id, unusable); // mark as unusable
// #7
updateParentsAlloc(id);
return id;
}
#1
memoryMap[1] > d,第 0 层的可分配内存有余,表明该 PoolChunk 内存不能满足调配,调配失败。#2
遍历二叉树,找到满足内存调配的节点。val < d
,即该节点内存满足调配。id & initial = 0
,即 id < 1<<d
,d 层之前循环继续执行。这里并不会呈现 val > d 的场景,但会呈现 val == d 的场景,如
PoolChunk 以后可分配内存为 2M,即 memoryMap[1] = 3,这时申请 2M 内存,在 0 - 2 层,都是 val == d。可参考前面的实例。#3
向下找到下一层下标,留神,子树左节点的下标是父节点下标的 2 倍。#4
val > d
,示意以后节点不能满足调配#5
id ^= 1
,查找同一父节点下的兄弟节点,在兄弟节点上分配内存。id ^= 1
,当 id 为偶数,即为id+=1
,当 id 为奇数,即为id-=1
。
因为后面通过 id <<= 1
找到下一层下标都是偶数,这里等于 id+=1。#6
因为一开始判断了 PoolChunk 内存是否足以调配,所以这里肯定能够找到一个可调配节点。
这里标注找到的节点已调配。#7
更新找到节点的父节点最大可分配内存块大小
private void updateParentsAlloc(int id) {
// #1
while (id > 1) {
// #2
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
id = parentId;
}
}
#1
向父节点遍历,直到根节点 #2
id >>> 1,找到父节点
取以后节点和兄弟节点中较小值,作为父节点的值,示意父节点最大可分配内存块大小。
如 memoryMap[1] = 0,示意最大可分配内存块为 16M。
调配 8M 后,memoryMap[1] = 1,示意以后最大可分配内存块为 8M。
上面看一则实例,大家能够联合实例了解下面的代码
上面看一下如何调配 PoolSubpage。
PoolChunk#PoolSubpage
private long allocateSubpage(int normCapacity) {
// #1
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
// #2
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
int id = allocateNode(d);
if (id < 0) {return id;}
// #3
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
// #4
if (subpage == null) {subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {subpage.init(head, normCapacity);
}
return subpage.allocate();}
}
#1
新结构的 PoolSubpage,须要搁置到 PoolArena#tinySubpagePools 或 smallSubpagePools 数组中。
这里找到 PoolArena 中 tinySubpagePools 或 smallSubpagePools 数组中对应链表的 head 节点。
能够看到这里应用了 head 节点执行同步操作,这是有必要的,否则可能多个线程同时批改链表。#2
调配一个叶子节点,PoolSubpage 实际上就是 PoolChunk 一个叶子节点。#3
在 subpages 数组中找到对应的 PoolSubpage。
因为 id 是整个二叉树 (memoryMap) 的节点下标,而 subpages 只是叶子节点数组,所以要去掉高位,默认取低 10 个 bit 位的值(小于 2048)。#4
初始化 PoolSubpage。
PoolSubpage
上面看看 PoolSubpage 如何治理 Tiny,Small 级别的内存块。
先阐明 PoolSubpage 如何组织内存。
PoolSubpage 实际上就是 PoolChunk 中的一个 Page 节点,默认大小为 8192。
它中将内存划分为若干内存块,每个内存块大小是雷同的,并应用位图的形式治理这些内存块。
PoolSubpage#bitmap 是一个 long 数组,其中每个 long 元素上每个 bit 位都能够代表一个内存块是否应用。
PoolSubpage
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);
}
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
// #1
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {bitmapLength ++;}
for (int i = 0; i < bitmapLength; i ++) {bitmap[i] = 0;
}
}
addToPool(head);
}
chunk:该 PoolSubpage 是哪儿 PoolChunk 的 Page 节点
memoryMapIdx:对应 Page 节点在 PoolChunk#memoryMap 的索引
runOffset:对应 Page 节点在 PoolChunk#memory 的偏移量
pageSize:PoolSubpage 整体内存大小,默认为
bitmap:内存治理位图数组,数组长度默认为pageSize >>> 10
,即 pageSize / 16 / 64,
因为每个内存块最小为 16 字节,每个 long 有 64 位,所以这个长度足够了。
elemSize:每个内存块的大小
maxNumElems:内存块总数量。
bitmapLength:bitmap 应用的元素个数,应用 bitmap 中一部分元素足以治理全副内存块。maxNumElems >>> 6
,即 maxNumElems / 64
。(maxNumElems & 63) != 0
,代表 maxNumElems 不能整除 64,所以 bitmapLength 要加 1,用于治理余下的内存块。
每个 PoolSubpage 都要加到 PoolArena#tinySubpagePools 或 smallSubpagePools 数组中,这个两个数组元素都是链表,由内存块大小雷同的 PoolSubpage 组成。
addToPool 办法正是将 PoolSubpage 增加到 PoolArena 中,从这个办法能够看到,结构的 PoolSubpage 会插入到 head 前面,并且这些链表会组成环,即最初一个节点的 next 指向 head。
PoolSubpage#allocate
long allocate() {if (elemSize == 0) {return toHandle(0);
}
// #1
if (numAvail == 0 || !doNotDestroy) {return -1;}
// #2
final int bitmapIdx = getNextAvail();
// #3
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r;
// #4
if (-- numAvail == 0) {removeFromPool();
}
// #5
return toHandle(bitmapIdx);
}
#1
没有可用内存块,调配失败#2
获取下一个可用内存块的 bit 下标#3
设置对应 bit 为 1,即已应用bitmapIdx >>> 6
,获取该内存块在 bitmap 数组中第 q 元素bitmapIdx & 63
,获取该内存块是 bitmap 数组中第 q 个元素的第 r 个 bit 位bitmap[q] |= 1L << r
将 bitmap 数组中第 q 个元素的第 r 个 bit 位设置为 1,示意曾经应用#4
所有内存块已调配了,则将其从 PoolArena 中移除。#5
toHandle 转换为最终的标记值
PoolSubpage#getNextAvail
private int getNextAvail() {
int nextAvail = this.nextAvail;
if (nextAvail >= 0) {
this.nextAvail = -1;
return nextAvail;
}
return findNextAvail();}
nextAvail 为初始值或 free 时开释的值。
如果 nextAvail 存在,设置为不可用后间接返回该值。
如果不存在,调用 findNextAvail 查找下一个可用内存块。
private int findNextAvail() {final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
// #1
for (int i = 0; i < bitmapLength; i ++) {long bits = bitmap[i];
if (~bits != 0) {return findNextAvail0(i, bits);
}
}
return -1;
}
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
final int baseVal = i << 6;
// #2
for (int j = 0; j < 64; j ++) {if ((bits & 1) == 0) {
int val = baseVal | j;
if (val < maxNumElems) {return val;} else {break;}
}
bits >>>= 1;
}
return -1;
}
#1
遍历 bitmap,~bits != 0
,示意存在一个 bit 位不为 1,即存在可用内存块。#2
遍历 64 个 bit 位,(bits & 1) == 0
,查看最低 bit 位是否为 0(可用),为 0 则返回 val。
val 等于 (i << 6) | j
,即i * 64 + j
,该 bit 位在 bitmap 中是第几个 bit 位。bits >>>= 1
,右移一位,解决下一个 bit 位。
最初解析一下 PoolSubpage#toHandle 办法
private long toHandle(int bitmapIdx) {return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
低 32 个 bit 位存储 memoryMapIdx,即对应 Page 节点在 PoolChunk#memoryMap 的索引。
高 33 ~ 61 个 bit 存储 bitmapIdx,即调配了 PoolSubpage 第几个内存块。
而 0x4000000000000000L,即 2^62。这里设置第 62 个 bit 位为 1,代表内存块调配在 PoolSubpage 中。因为 bitmapIdx 可能为 0,仅应用高 32 位示意 bitmapIdx,无奈辨别 bitmapIdx 为 0 和没有调配在 PoolSubpage 的场景,所以置 62 位为 1 做标记。
内存开释
PoolChunk#free
void free(long handle) {
// #1
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
// #2
if (bitmapIdx != 0) { // free a subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {return;}
}
}
freeBytes += runLength(memoryMapIdx);
setValue(memoryMapIdx, depth(memoryMapIdx));
updateParentsFree(memoryMapIdx);
}
#1
获取 memoryMapIdx 和 bitmapIdx#2
bitmapIdx 不为 0,即调配在 PoolSubpage。
找到对应的 PoolSubpage,调用 PoolSubpage#free 是否内存。bitmapIdx & 0x3FFFFFFF
,后面说了,高 33 ~ 61 个 bit 才是真正的 bitmapIdx,这里取出该值。
PoolSubpage#free 办法返回 false,即代表该 PoolSubpage 所有内存块已开释,能够开释对应 Page 节点。#3
解决到这里,就是开释 Chunk 级别的内存块了。
减少闲暇内存字节数。
设置二叉树中对应的节点为未调配
对应批改该节点的父节点。
PoolSubpage#free
boolean free(PoolSubpage<T> head, int bitmapIdx) {if (elemSize == 0) {return true;}
// #1
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
bitmap[q] ^= 1L << r;
setNextAvail(bitmapIdx);
// #2
if (numAvail ++ == 0) {addToPool(head);
return true;
}
// #3
if (numAvail != maxNumElems) {return true;} else {
// #4
if (prev == next) {
// Do not remove if this subpage is the only one left in the pool.
return true;
}
// #5
doNotDestroy = false;
removeFromPool();
return false;
}
}
#1
将对应 bit 位设置为能够#2
在 PoolSubpage 已齐全应用时开释了内存,这时重新加入到 PoolArena 中。#3
未齐全开释,即还存在已分配内存块,返回 true#4
解决到这里,是所有内存块曾经齐全开释的场景prev == next
,只有一种场景,就是 prev,next 都指向 head,该 PoolSubpage 是 PoolArena 中该链表最初一个 PoolSubpage,这时不开释该 PoolSubpage,以便下次申请内存时间接从该 PoolSubpage 上调配。#5
从 PoolArena 中移除,并返回 false,这时 PoolChunk 会将开释对应 Page 节点。
如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!