乐趣区

关于Netty:NettyByteBuf

ByteBuf 分类

Pooled: 每次申请内存都是从曾经调配好的内存池中取
Unpooled: 每次申请内存都重新分配

Heap: 应用 jvm 的堆内存
Direct: 操作系统间接内存

Unsafe: 调用 native 办法间接操作内存
非 Unsafe: 通过 jdk 的 api 间接操作内存

Unsafe 形式: 调用 jdk 的 unsafe 实例,依据根底偏移量 +index 算出总偏移量

static byte getByte(byte[] data, int index) {return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}

非 Unsafe 形式: 就是通过数组下标获取

static byte getByte(byte[] memory, int index) {return memory[index];
}

Unsafe 与下面两种分类形式不同,用户无奈决定用哪种形式;是否选用 unsafe 是由 Netty 判断能不能拿到底层的 unsafe 决定的。

if (PlatformDependent.hasUnsafe()) {buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}

UnpooledByteBufAllocator 调配形式比较简单,就是每次申请内存的时候都重新分配一块新的内存返回。

重点看 PooledByteBufAllocator 内存调配流程

PooledByteBufAllocator 分配内存流程

以堆外内存非配为例

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // 拿到线程独有的 PoolThreadCache
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    ByteBuf buf;
    if (directArena != null) {buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {if (PlatformDependent.hasUnsafe()) {buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }

    return toLeakAwareBuffer(buf);
}

先通过 threadCache 拿到一个 PoolThreadCache 类型的属性,threadCache 是 PoolThreadLocalCache 类型的,PoolThreadLocalCache 继承自 netty 自定义的 FastThreadLocal,相似 jdk 的 ThreadLocal,每个线程都有本人的 PoolThreadLocalCache,这样每个线程也就有本人的 PoolThreadCache。

PoolThreadCache 次要有两块区域,arena 和 cache
通过 PoolThreadCache 分配内存时,会先从 cache 获取,cache 是之前被应用过被回收的内存;
cache 获取不到,就从 arena 开拓一块内存,用完后再回收到 cache,不便下次间接从 cache 获取。

cache 局部是由多个 MemoryRegionCache 类型的数组组织起来的。不同数组下标代表不同的 MemoryRegionCache 内存规格。

MemoryRegionCache 里有一个队列,队列元素 Entry 蕴含一个 chunk 和一个 handle,chunk 是一块内存区域,而后通过 handle 能够定位到这个 Entry 它所援用的内存地位。

所以整个 PoolThreadCache 的 cache 局部的构造如下:

队列元素 Entry 代表一块对应大小的可分配内存。

ByteBufAllocator 内存调配

在 PooledByteBufAllocator 构造函数里,会创立两个 PoolArena 类型的数组,别离用于堆内和堆外

heapArenas = newArenaArray(nHeapArena);
directArenas = newArenaArray(nDirectArena);

nHeapArena 默认状况下是 2*CPU 核数

final int defaultMinNumArena = runtime.availableProcessors() * 2;

这个应该是跟 NioEventLoopGroup 无关,默认 workGroup 会创立 2*CPU 核数 个 NioEventLoop,这样就能保障每个 NioEventLoop 线程都有一个本人独享的 Arena 防止线程争用,不必加锁。

接着下面的 directArena.allocate()办法

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    // 获取一个 PooledByteBuf 对象,如果对象池里有间接拿来用,否则就创立
    PooledByteBuf<T> buf = newByteBuf(maxCapacity);
    // 在 PoolThreadCache 里分配内存
    allocate(cache, buf, reqCapacity);
    return buf;
}

allocate()办法先尝试从缓存中调配,也就是命中缓存;
如果失败,再从 arena 里开拓一块内存调配

命中缓存调配流程

  1. 找到对应 size 的 MemoryRegionCache
  2. 从队列中弹出一个 entry 给 ByteBuf 初始化
  3. 将弹出的 entry 扔到对象池复用

在 PoolThreadCache 里分配内存时,都是先尝试从缓存上进行调配,如果缓存上没有适合的,就在 arena 开拓一块内存。

内存规格

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // 对申请的内存大小进行规格化
    final int normCapacity = normalizeCapacity(reqCapacity);
    // 依据申请内存大小进行调配
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        final PoolSubpage<T> head = table[tableIdx];

        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            // 示意以后链表没有元素
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {allocationsTiny.increment();
                } else {allocationsSmall.increment();
                }
                return;
            }
        }
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    if (normCapacity <= chunkSize) {if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, reqCapacity);
    }
}

以 tiny 类型的内存调配为例

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}
// tinyIdx 办法间接把 normCapacity/16
static int tinyIdx(int normCapacity) {return normCapacity >>> 4;}

private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {if (cache == null || idx > cache.length - 1) {return null;}
    // 拿到 MemoryRegionCache 节点
    return cache[idx];
}

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {if (cache == null) {
        // no cache found so just return false here
        return false;
    }
    boolean allocated = cache.allocate(buf, reqCapacity);
    if (++ allocations >= freeSweepAllocationThreshold) {
        allocations = 0;
        trim();}
    return allocated;
}

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    // 找到对应 size 的 MemoryRegionCache 后,用它的 queue 弹出一个 entry
    Entry<T> entry = queue.poll();
    if (entry == null) {return false;}
    // 初始化
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    // 回收 entry 对象
    entry.recycle();

    // allocations is not thread-safe which is fine as this is only called from the same thread all time.
    ++ allocations;
    return true;
}

protected void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {chunk.initBufWithSubpage(buf, handle, reqCapacity);
}

private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
    assert bitmapIdx != 0;

    int memoryMapIdx = memoryMapIdx(handle);

    PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
    assert subpage.doNotDestroy;
    assert reqCapacity <= subpage.elemSize;

    buf.init(
        this, handle,
        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
        arena.parent.threadCache());
}

void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
    assert handle >= 0;
    assert chunk != null;

    this.chunk = chunk;
    this.handle = handle;
    memory = chunk.memory;
    this.offset = offset;
    this.length = length;
    this.maxLength = maxLength;
    tmpNioBuf = null;
    this.cache = cache;
}

void recycle() {
    chunk = null;
    handle = -1;
    recyclerHandle.recycle(this);
}
public void recycle(Object object) {if (object != value) {throw new IllegalArgumentException("object does not belong to handle");
    }
    stack.push(this);
}

未命中缓存

arena 数据结构

依照内存使用率,把雷同内存使用率的 chunk 归为一个 chunkList,而后 chunkList 之间双向连贯。

每个 chunk 的大小是 16M,分配内存不可能每次申请都调配 16M,所以 chunk 又能够划分为不同大小的 subPage。

page 级别内存调配

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    // 尝试在 chunkList 链表上进行调配
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        ++allocationsNormal;
        return;
    }

    // 以后没有适合的 chunk,就创立一个 chunk
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    // 通过 chunk 调配指定大小的一块内存,返回一个 handle
    // handle 指向 chunk 内调配的一块间断内存
    long handle = c.allocate(normCapacity);
    ++allocationsNormal;
    assert handle > 0;
    // 拿到内存后,初始化 PoolByteBuf
    c.initBuf(buf, handle, reqCapacity);
    qInit.add(c);
}
  1. 现有 chunk 上调配
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {if (head == null || normCapacity > maxCapacity) {
        // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
        // be handled by the PoolChunks that are contained in this PoolChunkList.
        return false;
    }
    // 遍历 chunkList,尝试调配
    for (PoolChunk<T> cur = head;;) {long handle = cur.allocate(normCapacity);
        // handle<0 阐明调配失败,持续遍历
        if (handle < 0) {
            cur = cur.next;
            if (cur == null) {return false;}
        } else {
            // 调配胜利,初始化 byteBuf
            cur.initBuf(buf, handle, reqCapacity);
            // 查看以后 chunk 调配后的使用率是否还满足原来的使用率范畴
            // 不满足就将它转移到别的 chunkList
            if (cur.usage() >= maxUsage) {remove(cur);
                nextList.add(cur);
            }
            return true;
        }
    }
}
  1. 创立一个 chunk 调配
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    return new PoolChunk<ByteBuffer>(this, allocateDirect(chunkSize),
            pageSize, maxOrder, pageShifts, chunkSize);
}
// 调用 JDK API,分配内存
private static ByteBuffer allocateDirect(int capacity) {return PlatformDependent.useDirectBufferNoCleaner() ?
            PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
}

netty 用一个齐全二叉树示意一个 chunk 被调配的状况。

示意形式有点像线段树,树的每个节点代表一段区间,通过这个节点来示意这段区间有没有被应用。
叶子节点 (一个 page) 示意的区间大小为 8k,一个 chunk 也就是根节点的大小是 16M,所以树的高度为 11。

节点示意相应的区间是否被调配

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    unpooled = false;
    this.arena = arena;
    this.memory = memory;
    this.pageSize = pageSize;
    this.pageShifts = pageShifts;
    this.maxOrder = maxOrder;
    this.chunkSize = chunkSize;
    unusable = (byte) (maxOrder + 1);
    log2ChunkSize = log2(chunkSize);
    subpageOverflowMask = ~(pageSize - 1);
    freeBytes = chunkSize;

    assert maxOrder < 30 : "maxOrder should be < 30, but is:" + maxOrder;
    maxSubpageAllocs = 1 << maxOrder;

    // Generate the memory map.
    // memoryMap、depthMap 示意第几个节点在树的第几层
    memoryMap = new byte[maxSubpageAllocs << 1];
    depthMap = new byte[memoryMap.length];
    int memoryMapIndex = 1;
    // 遍历树的每一层
    for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
        int depth = 1 << d;
        // 遍历这一层的每个节点
        for (int p = 0; p < depth; ++ p) {
            // in each level traverse left to right and set value to the depth of subtree
            memoryMap[memoryMapIndex] = (byte) d;
            depthMap[memoryMapIndex] = (byte) d;
            memoryMapIndex ++;
        }
    }

    subpages = newSubpageArray(maxSubpageAllocs);
}
long allocate(int normCapacity) {if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
        return allocateRun(normCapacity);
    } else {return allocateSubpage(normCapacity);
    }
}

private long allocateRun(int normCapacity) {
    // 算出要调配的内存在树的第几层
    int d = maxOrder - (log2(normCapacity) - pageShifts);
    int id = allocateNode(d);
    if (id < 0) {return id;}
    freeBytes -= runLength(id);
    return id;
}
// 在算出的层数上进行内存调配
private int allocateNode(int d) {
    int id = 1;
    int initial = - (1 << d); // has last d bits = 0 and rest all = 1
    byte val = value(id);
    if (val > d) { // unusable
        return -1;
    }
    while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
        id <<= 1;
        val = value(id);
        if (val > d) {
            id ^= 1;
            val = value(id);
        }
    }
    byte value = value(id);
    assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
            value, id & initial, d);
    // 标记这个节点已被应用
    setValue(id, unusable); // mark as unusable
    // 逐层向上标记它的父节点已被应用
    updateParentsAlloc(id);
    return id;
}

初始化 byteBuf

void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);
    if (bitmapIdx == 0) {byte val = value(memoryMapIdx);
        assert val == unusable : String.valueOf(val);
        buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
                    arena.parent.threadCache());
    } else {initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
    }
}
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
    assert handle >= 0;
    assert chunk != null;

    this.chunk = chunk;
    this.handle = handle;
    memory = chunk.memory;
    this.offset = offset;
    this.length = length;
    this.maxLength = maxLength;
    tmpNioBuf = null;
    this.cache = cache;
}

subPage 级别的内存调配

  1. 定位到一个 subPage
  2. 初始化 subPage(在 chunk 中找到一个 page,对这个 page 依照自定义的 subPage 大小进行划分)
  3. 初始化 ByteBuf
// tableIdx 是间接将申请的内存大小除以 16
tableIdx = tinyIdx(normCapacity);
// 数组,不同下标示意不同大小的 subPage
table = tinySubpagePools;
// 拿到对应大小的的 subPage 的链表头结点
final PoolSubpage<T> head = table[tableIdx];

tinySubpagePools 构造和 MemoryRegionCache 的 tiny 数组相似,不同下标代表不同大小的内存。

先通过 allocateNormal 调配

// 与 page 级别不同的是,这里会走到上面的 allocateSubpage
long allocate(int normCapacity) {if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
        return allocateRun(normCapacity);
    } else {return allocateSubpage(normCapacity);
    }
}
private long allocateSubpage(int normCapacity) {
    // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
    // This is need as we may add it back and so alter the linked-list structure.
    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
    synchronized (head) {// d 间接赋值为 11,在树的最初一层调配(因为最初一次层的大小是一个 page,大于要调配的大小)
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        int id = allocateNode(d);
        if (id < 0) {return id;}

        final PoolSubpage<T>[] subpages = this.subpages;
        final int pageSize = this.pageSize;

        freeBytes -= pageSize;
        // subpageIdx: 这个 subPage 在 page 中的地位
        int subpageIdx = subpageIdx(id);
        PoolSubpage<T> subpage = subpages[subpageIdx];
        if (subpage == null) {subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
            subpages[subpageIdx] = subpage;
        } else {subpage.init(head, normCapacity);
        }
        return subpage.allocate();}
}

PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
    this.chunk = chunk;
    this.memoryMapIdx = memoryMapIdx;
    this.runOffset = runOffset;
    this.pageSize = pageSize;
    bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
    init(head, elemSize);
}

void init(PoolSubpage<T> head, int elemSize) {
    doNotDestroy = true;
    this.elemSize = elemSize;
    if (elemSize != 0) {
        maxNumElems = numAvail = pageSize / elemSize;
        nextAvail = 0;
        bitmapLength = maxNumElems >>> 6;
        if ((maxNumElems & 63) != 0) {bitmapLength ++;}
        // bitmap: 示意 page 中那个 subPage 已被调配
        for (int i = 0; i < bitmapLength; i ++) {bitmap[i] = 0;
        }
    }
    addToPool(head);
}

// 将创立好的 subPage 加到 tinySubpagePools 的链表中
private void addToPool(PoolSubpage<T> head) {
    assert prev == null && next == null;
    prev = head;
    next = head.next;
    next.prev = this;
    head.next = this;
}
long allocate() {if (elemSize == 0) {return toHandle(0);
    }

    if (numAvail == 0 || !doNotDestroy) {return -1;}

    final int bitmapIdx = getNextAvail();
    int q = bitmapIdx >>> 6;
    int r = bitmapIdx & 63;
    assert (bitmap[q] >>> r & 1) == 0;
    bitmap[q] |= 1L << r;

    if (-- numAvail == 0) {removeFromPool();
    }

    return toHandle(bitmapIdx);
}

private long toHandle(int bitmapIdx) {return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
    assert bitmapIdx != 0;

    int memoryMapIdx = memoryMapIdx(handle);

    PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
    assert subpage.doNotDestroy;
    assert reqCapacity <= subpage.elemSize;

    buf.init(
        this, handle,
        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
        arena.parent.threadCache());
}

ByteBuf 的开释

  1. 将要开释的内存加到缓存
  2. 如果退出缓存失败(缓存已满),就标记该内存未应用
  3. 将 ByteBuf 放到对象池
private boolean release0(int decrement) {for (;;) {
        int refCnt = this.refCnt;
        if (refCnt < decrement) {throw new IllegalReferenceCountException(refCnt, -decrement);
        }

        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
            // 援用计数缩小到 decrement
            if (refCnt == decrement) {deallocate();
                return true;
            }
            return false;
        }
    }
}

protected final void deallocate() {if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        chunk.arena.free(chunk, handle, maxLength, cache);
        recycle();}
}

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {if (chunk.unpooled) {int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();} else {SizeClass sizeClass = sizeClass(normCapacity);
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }

        freeChunk(chunk, handle, sizeClass);
    }
}

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
    if (cache == null) {return false;}
    return cache.add(chunk, handle);
}

public final boolean add(PoolChunk<T> chunk, long handle) {Entry<T> entry = newEntry(chunk, handle);
    boolean queued = queue.offer(entry);
    if (!queued) {
        // If it was not possible to cache the chunk, immediately recycle the entry
        entry.recycle();}

    return queued;
}

增加缓存失败

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
    final boolean destroyChunk;
    synchronized (this) {switch (sizeClass) {
        case Normal:
            ++deallocationsNormal;
            break;
        case Small:
            ++deallocationsSmall;
            break;
        case Tiny:
            ++deallocationsTiny;
            break;
        default:
            throw new Error();}
        destroyChunk = !chunk.parent.free(chunk, handle);
    }
    if (destroyChunk) {
        // destroyChunk not need to be called while holding the synchronized lock.
        destroyChunk(chunk);
    }
}
// 标记内存为未应用
void free(long handle) {int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);

    if (bitmapIdx != 0) { // free a subpage
        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage != null && subpage.doNotDestroy;

        // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
        // This is need as we may add it back and so alter the linked-list structure.
        PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
        synchronized (head) {if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {return;}
        }
    }
    freeBytes += runLength(memoryMapIdx);
    setValue(memoryMapIdx, depth(memoryMapIdx));
    updateParentsFree(memoryMapIdx);
}

将 ByteBuf 退出到对象池

private void recycle() {recyclerHandle.recycle(this);
}
退出移动版