关于netty:Netty源码解析-PoolChunk实现原理

64次阅读

共计 5755 个字符,预计需要花费 15 分钟才能阅读完成。

本文次要分享 Netty 中 PoolChunk 如何治理内存。
源码剖析基于 Netty 4.1.52

内存治理算法

首先阐明 PoolChunk 内存组织形式。
PoolChunk 的内存大小默认是 16M,Netty 将它划分为 2048 个 page,每个 page 为 8K。
PoolChunk 上能够调配 Small 内存块。
Normal 内存块大小必须是 page 的倍数。

PoolChunk 通过 runsAvail 字段治理内存块。
runsAvail 是 PriorityQueue<Long> 数组,其中 PriorityQueue 寄存的是 handle。
handle 能够了解为一个句柄,保护一个内存块的信息,由以下局部组成

  • o: runOffset,在 chunk 中 page 偏移索引,从 0 开始,15bit
  • s: size,以后地位可调配的 page 数量,15bit
  • u: isUsed,是否应用?,1bit
  • e: isSubpage,是否在 subpage 中,1bit
  • b: bitmapIdx,内存块在 subpage 中的索引,不在 subpage 则为 0,32bit

后面《内存对齐类 SizeClasses》文章说过,SizeClasses 将 sizeClasses 表格中 isMultipageSize 为 1 的行取出能够组成一个新表格,这里称为 Page 表格

runsAvail 数组默认长度为 40,每个地位 index 上放的 handle 代表了存在一个可用内存块,并且可调配 pageSize 大于等于 (pageIdx=index) 上的 pageSize,小于 (pageIdex=index+1) 的 pageSize。
如 runsAvail[11]上的 handle 的 size 可调配 pageSize 可能为 16 ~ 19,
如果 runsAvail[11]上 handle 的 size 为 18,如果该 handle 调配了 7 个 page,剩下的 11 个 page,这时要将 handle 挪动 runsAvail[8](当然,handle 的信息要调整)。
这时如果要找调配 6 个 page,就能够从 runsAvail[5]开始查找 runsAvail 数组,如果后面 runsAvail[5]~runsAvail[7]都没有 handle,就找到了 runsAvail[8]。
调配 6 个 page 之后,剩下的 5 个 page,handle 挪动 runsAvail[4]。

先看一下 PoolChunk 的构造函数

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx, int offset) {
    // #1
    unpooled = false;
    this.arena = arena;
    this.memory = memory;
    this.pageSize = pageSize;
    this.pageShifts = pageShifts;
    this.chunkSize = chunkSize;
    this.offset = offset;
    freeBytes = chunkSize;

    runsAvail = newRunsAvailqueueArray(maxPageIdx);
    runsAvailMap = new IntObjectHashMap<Long>();
    subpages = new PoolSubpage[chunkSize >> pageShifts];

    // #2
    int pages = chunkSize >> pageShifts;
    long initHandle = (long) pages << SIZE_SHIFT;
    insertAvailRun(0, pages, initHandle);

    cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}

unpooled:是否应用内存池
arena:该 PoolChunk 所属的 PoolArena
memory:底层的内存块,对于堆内存,它是一个 byte 数组,对于间接内存,它是(jvm)ByteBuffer,但无论是哪种模式,其内存大小默认都是 16M。
pageSize:page 大小,默认为 8K。
chunkSize:整个 PoolChunk 的内存大小,默认为 16777216,即 16M。
offset:底层内存对齐偏移量,默认为 0。
runsAvail:初始化 runsAvail
runsAvailMap:记录了每个内存块开始地位和完结地位的 runOffset 和 handle 映射。

#2 insertAvailRun 办法在 runsAvail 数组最初地位插入一个 handle,该 handle 代表 page 偏移地位为 0 的中央能够调配 16M 的内存块

内存调配

PoolChunk#allocate

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
    final long handle;
    // #1
    if (sizeIdx <= arena.smallMaxSizeIdx) {
        // small
        handle = allocateSubpage(sizeIdx);
        if (handle < 0) {return false;}
        assert isSubpage(handle);
    } else {
        // #2
        int runSize = arena.sizeIdx2size(sizeIdx);
        handle = allocateRun(runSize);
        if (handle < 0) {return false;}
    }

    // #3
    ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
    initBuf(buf, nioBuffer, handle, reqCapacity, cache);
    return true;
}

#1 解决 Small 内存块申请,调用 allocateSubpage 办法解决,后续文章解析。
#2 解决 Normal 内存块申请
sizeIdx2size 办法依据内存块索引查找对应内存块 size。sizeIdx2size 是 PoolArena 父类 SizeClasses 提供的办法,可参考系列文章《内存对齐类 SizeClasses》。
allocateRun 办法负责调配 Normal 内存块,返回 handle 存储了调配的内存块大小和偏移量。

#3 应用 handle 和底层内存类 (ByteBuffer) 初始化 ByteBuf 了。

private long allocateRun(int runSize) {
    // #1
    int pages = runSize >> pageShifts;
    // #2
    int pageIdx = arena.pages2pageIdx(pages);

    synchronized (runsAvail) {
        //find first queue which has at least one big enough run
        // #3
        int queueIdx = runFirstBestFit(pageIdx);
        if (queueIdx == -1) {return -1;}

        //get run with min offset in this queue
        PriorityQueue<Long> queue = runsAvail[queueIdx];
        long handle = queue.poll();

        assert !isUsed(handle);
        // #4
        removeAvailRun(queue, handle);
        // #5
        if (handle != -1) {handle = splitLargeRun(handle, pages);
        }
        // #6
        freeBytes -= runSize(pageShifts, handle);
        return handle;
    }
}

#1 计算所需的 page 数量
#2 计算对应的 pageIdx
留神,pages2pageIdx 办法会将申请内存大小对齐为上述 Page 表格中的一个 size。例如申请 172032 字节 (21 个 page) 的内存块,pages2pageIdx 办法计算结果为 13,理论调配 196608(24 个 page)的内存块。
#3 从 pageIdx 开始遍历 runsAvail,找到第一个 handle。
该 handle 上能够调配所需内存块。
#4 从 runsAvail,runsAvailMap 移除该 handle 信息
#5#3 步骤找到的 handle 上划分出所要的内存块。
#6 缩小可用内存字节数

private long splitLargeRun(long handle, int needPages) {
    assert needPages > 0;

    // #1
    int totalPages = runPages(handle);
    assert needPages <= totalPages;

    int remPages = totalPages - needPages;

    // #2 
    if (remPages > 0) {int runOffset = runOffset(handle);

        // keep track of trailing unused pages for later use
        int availOffset = runOffset + needPages;
        long availRun = toRunHandle(availOffset, remPages, 0);
        insertAvailRun(availOffset, remPages, availRun);

        // not avail
        return toRunHandle(runOffset, needPages, 1);
    }

    //mark it as used
    handle |= 1L << IS_USED_SHIFT;
    return handle;
}

#1 totalPages,从 handle 中获取以后地位可用 page 数。
remPages,调配后残余 page 数。
#2 残余 page 数大于 0
availOffset,计算残余 page 开始偏移量
生成一个新的 handle,availRun
insertAvailRun 将 availRun 插入到 runsAvail,runsAvailMap 中

内存开释

void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
    ...

    // #1
    int pages = runPages(handle);

    synchronized (runsAvail) {
        // collapse continuous runs, successfully collapsed runs
        // will be removed from runsAvail and runsAvailMap
        // #2
        long finalRun = collapseRuns(handle);

        // #3
        finalRun &= ~(1L << IS_USED_SHIFT);
        //if it is a subpage, set it to run
        finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
        insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
        freeBytes += pages << pageShifts;
    }

    if (nioBuffer != null && cachedNioBuffers != null &&
        cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {cachedNioBuffers.offer(nioBuffer);
    }
}

#1 计算开释的 page 数
#2 如果能够,将前后的可用内存块进行合并
#3 插入新的 handle

collapseRuns

private long collapseRuns(long handle) {return collapseNext(collapsePast(handle));
}

collapsePast 办法合并后面的可用内存块
collapseNext 办法合并前面的可用内存块

private long collapseNext(long handle) {for (;;) {
        // #1
        int runOffset = runOffset(handle);
        int runPages = runPages(handle);

        Long nextRun = getAvailRunByOffset(runOffset + runPages);
        if (nextRun == null) {return handle;}

        int nextOffset = runOffset(nextRun);
        int nextPages = runPages(nextRun);

        //is continuous
        // #2
        if (nextRun != handle && runOffset + runPages == nextOffset) {
            //remove next run
            removeAvailRun(nextRun);
            handle = toRunHandle(runOffset, runPages + nextPages, 0);
        } else {return handle;}
    }
}

#1 getAvailRunByOffset 办法从 runsAvailMap 中找到下一个内存块的 handle。
#2 如果是间断的内存块,则移除下一个内存块 handle,并将其 page 合并生成一个新的 handle。

上面来看一个例子

大家能够联合例子中 runsAvail 和内存应用状况的变动,了解下面的代码。
实际上,2 个 Page 的内存块是通过 Subpage 调配,回收时会放回线程缓存中而不是间接开释存块,但为了展现 PoolChunk 中内存治理过程,图中不思考这些场景。

PoolChunk 在 Netty 4.1.52 版本批改了算法,引入了 jemalloc 4 的算法 — https://github.com/netty/nett…
Netty 4.1.52 之前的版本,PoolChunk 引入的是 jemalloc 3 的算法,应用二叉树治理内存块。有趣味的同学能够参考我后续的文章《PoolChunk 实现(jemalloc 3 的算法)》

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!

正文完
 0