ByteBuf分类

Pooled: 每次申请内存都是从曾经调配好的内存池中取
Unpooled: 每次申请内存都重新分配

Heap: 应用jvm的堆内存
Direct: 操作系统间接内存

Unsafe: 调用native办法间接操作内存
非Unsafe: 通过jdk的api间接操作内存

Unsafe形式: 调用jdk的unsafe实例,依据根底偏移量+index算出总偏移量

static byte getByte(byte[] data, int index) {    return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);}

非Unsafe形式: 就是通过数组下标获取

static byte getByte(byte[] memory, int index) {    return memory[index];}

Unsafe与下面两种分类形式不同,用户无奈决定用哪种形式;是否选用unsafe是由Netty判断能不能拿到底层的unsafe决定的。

if (PlatformDependent.hasUnsafe()) {    buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);} else {    buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);}

UnpooledByteBufAllocator调配形式比较简单,就是每次申请内存的时候都重新分配一块新的内存返回。

重点看PooledByteBufAllocator内存调配流程

PooledByteBufAllocator分配内存流程

以堆外内存非配为例

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {    // 拿到线程独有的PoolThreadCache    PoolThreadCache cache = threadCache.get();    PoolArena<ByteBuffer> directArena = cache.directArena;    ByteBuf buf;    if (directArena != null) {        buf = directArena.allocate(cache, initialCapacity, maxCapacity);    } else {        if (PlatformDependent.hasUnsafe()) {            buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);        } else {            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);        }    }    return toLeakAwareBuffer(buf);}

先通过threadCache拿到一个PoolThreadCache类型的属性,threadCache是PoolThreadLocalCache类型的,PoolThreadLocalCache继承自netty自定义的FastThreadLocal,相似jdk的ThreadLocal,每个线程都有本人的PoolThreadLocalCache,这样每个线程也就有本人的PoolThreadCache。

PoolThreadCache次要有两块区域,arena和cache
通过PoolThreadCache分配内存时,会先从cache获取,cache是之前被应用过被回收的内存;
cache获取不到,就从arena开拓一块内存,用完后再回收到cache,不便下次间接从cache获取。

cache局部是由多个MemoryRegionCache类型的数组组织起来的。不同数组下标代表不同的MemoryRegionCache内存规格。

MemoryRegionCache里有一个队列,队列元素Entry蕴含一个chunk和一个handle,chunk是一块内存区域,而后通过handle能够定位到这个Entry它所援用的内存地位。

所以整个PoolThreadCache的cache局部的构造如下:

队列元素Entry代表一块对应大小的可分配内存。

ByteBufAllocator内存调配

在PooledByteBufAllocator构造函数里,会创立两个PoolArena类型的数组,别离用于堆内和堆外

heapArenas = newArenaArray(nHeapArena);directArenas = newArenaArray(nDirectArena);

nHeapArena默认状况下是 2*CPU核数

final int defaultMinNumArena = runtime.availableProcessors() * 2;

这个应该是跟NioEventLoopGroup无关,默认workGroup会创立 2*CPU核数 个NioEventLoop,这样就能保障每个NioEventLoop线程都有一个本人独享的Arena防止线程争用,不必加锁。

接着下面的directArena.allocate()办法

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {    // 获取一个PooledByteBuf对象,如果对象池里有间接拿来用,否则就创立    PooledByteBuf<T> buf = newByteBuf(maxCapacity);    // 在PoolThreadCache里分配内存    allocate(cache, buf, reqCapacity);    return buf;}

allocate()办法先尝试从缓存中调配,也就是命中缓存;
如果失败,再从arena里开拓一块内存调配

命中缓存调配流程

  1. 找到对应size的MemoryRegionCache
  2. 从队列中弹出一个entry给ByteBuf初始化
  3. 将弹出的entry扔到对象池复用

在PoolThreadCache里分配内存时,都是先尝试从缓存上进行调配,如果缓存上没有适合的,就在arena开拓一块内存。

内存规格

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {    // 对申请的内存大小进行规格化    final int normCapacity = normalizeCapacity(reqCapacity);    // 依据申请内存大小进行调配    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize        int tableIdx;        PoolSubpage<T>[] table;        boolean tiny = isTiny(normCapacity);        if (tiny) { // < 512            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {                // was able to allocate out of the cache so move on                return;            }            tableIdx = tinyIdx(normCapacity);            table = tinySubpagePools;        } else {            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {                // was able to allocate out of the cache so move on                return;            }            tableIdx = smallIdx(normCapacity);            table = smallSubpagePools;        }        final PoolSubpage<T> head = table[tableIdx];        synchronized (head) {            final PoolSubpage<T> s = head.next;            // 示意以后链表没有元素            if (s != head) {                assert s.doNotDestroy && s.elemSize == normCapacity;                long handle = s.allocate();                assert handle >= 0;                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);                if (tiny) {                    allocationsTiny.increment();                } else {                    allocationsSmall.increment();                }                return;            }        }        allocateNormal(buf, reqCapacity, normCapacity);        return;    }    if (normCapacity <= chunkSize) {        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {            // was able to allocate out of the cache so move on            return;        }        allocateNormal(buf, reqCapacity, normCapacity);    } else {        // Huge allocations are never served via the cache so just call allocateHuge        allocateHuge(buf, reqCapacity);    }}

以tiny类型的内存调配为例

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);}private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {    int idx = PoolArena.tinyIdx(normCapacity);    if (area.isDirect()) {        return cache(tinySubPageDirectCaches, idx);    }    return cache(tinySubPageHeapCaches, idx);}// tinyIdx办法间接把 normCapacity/16static int tinyIdx(int normCapacity) {    return normCapacity >>> 4;}private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {    if (cache == null || idx > cache.length - 1) {        return null;    }    // 拿到MemoryRegionCache节点    return cache[idx];}private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {    if (cache == null) {        // no cache found so just return false here        return false;    }    boolean allocated = cache.allocate(buf, reqCapacity);    if (++ allocations >= freeSweepAllocationThreshold) {        allocations = 0;        trim();    }    return allocated;}public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {    // 找到对应size的MemoryRegionCache后,用它的queue弹出一个entry    Entry<T> entry = queue.poll();    if (entry == null) {        return false;    }    // 初始化    initBuf(entry.chunk, entry.handle, buf, reqCapacity);    // 回收entry对象    entry.recycle();    // allocations is not thread-safe which is fine as this is only called from the same thread all time.    ++ allocations;    return true;}protected void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {    chunk.initBufWithSubpage(buf, handle, reqCapacity);}private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {    assert bitmapIdx != 0;    int memoryMapIdx = memoryMapIdx(handle);    PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];    assert subpage.doNotDestroy;    assert reqCapacity <= subpage.elemSize;    buf.init(        this, handle,        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,        arena.parent.threadCache());}void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {    assert handle >= 0;    assert chunk != null;    this.chunk = chunk;    this.handle = handle;    memory = chunk.memory;    this.offset = offset;    this.length = length;    this.maxLength = maxLength;    tmpNioBuf = null;    this.cache = cache;}void recycle() {    chunk = null;    handle = -1;    recyclerHandle.recycle(this);}public void recycle(Object object) {    if (object != value) {        throw new IllegalArgumentException("object does not belong to handle");    }    stack.push(this);}

未命中缓存

arena数据结构

依照内存使用率,把雷同内存使用率的chunk归为一个chunkList,而后chunkList之间双向连贯。

每个chunk的大小是16M,分配内存不可能每次申请都调配16M,所以chunk又能够划分为不同大小的subPage。

page级别内存调配

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {    // 尝试在chunkList链表上进行调配    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||        q075.allocate(buf, reqCapacity, normCapacity)) {        ++allocationsNormal;        return;    }    // 以后没有适合的chunk,就创立一个chunk    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);    // 通过chunk调配指定大小的一块内存,返回一个handle    // handle指向chunk内调配的一块间断内存    long handle = c.allocate(normCapacity);    ++allocationsNormal;    assert handle > 0;    // 拿到内存后,初始化PoolByteBuf    c.initBuf(buf, handle, reqCapacity);    qInit.add(c);}
  1. 现有chunk上调配
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {    if (head == null || normCapacity > maxCapacity) {        // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can        // be handled by the PoolChunks that are contained in this PoolChunkList.        return false;    }    // 遍历chunkList,尝试调配    for (PoolChunk<T> cur = head;;) {        long handle = cur.allocate(normCapacity);        // handle<0阐明调配失败,持续遍历        if (handle < 0) {            cur = cur.next;            if (cur == null) {                return false;            }        } else {            // 调配胜利,初始化byteBuf            cur.initBuf(buf, handle, reqCapacity);            // 查看以后chunk调配后的使用率是否还满足原来的使用率范畴            // 不满足就将它转移到别的chunkList            if (cur.usage() >= maxUsage) {                remove(cur);                nextList.add(cur);            }            return true;        }    }}
  1. 创立一个chunk调配
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {    return new PoolChunk<ByteBuffer>(            this, allocateDirect(chunkSize),            pageSize, maxOrder, pageShifts, chunkSize);}// 调用JDK API,分配内存private static ByteBuffer allocateDirect(int capacity) {    return PlatformDependent.useDirectBufferNoCleaner() ?            PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);}

netty用一个齐全二叉树示意一个chunk被调配的状况。

示意形式有点像线段树,树的每个节点代表一段区间,通过这个节点来示意这段区间有没有被应用。
叶子节点(一个page)示意的区间大小为8k,一个chunk也就是根节点的大小是16M,所以树的高度为11。

节点示意相应的区间是否被调配

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {    unpooled = false;    this.arena = arena;    this.memory = memory;    this.pageSize = pageSize;    this.pageShifts = pageShifts;    this.maxOrder = maxOrder;    this.chunkSize = chunkSize;    unusable = (byte) (maxOrder + 1);    log2ChunkSize = log2(chunkSize);    subpageOverflowMask = ~(pageSize - 1);    freeBytes = chunkSize;    assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;    maxSubpageAllocs = 1 << maxOrder;    // Generate the memory map.    // memoryMap、depthMap示意第几个节点在树的第几层    memoryMap = new byte[maxSubpageAllocs << 1];    depthMap = new byte[memoryMap.length];    int memoryMapIndex = 1;    // 遍历树的每一层    for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time        int depth = 1 << d;        // 遍历这一层的每个节点        for (int p = 0; p < depth; ++ p) {            // in each level traverse left to right and set value to the depth of subtree            memoryMap[memoryMapIndex] = (byte) d;            depthMap[memoryMapIndex] = (byte) d;            memoryMapIndex ++;        }    }    subpages = newSubpageArray(maxSubpageAllocs);}
long allocate(int normCapacity) {    if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize        return allocateRun(normCapacity);    } else {        return allocateSubpage(normCapacity);    }}private long allocateRun(int normCapacity) {    // 算出要调配的内存在树的第几层    int d = maxOrder - (log2(normCapacity) - pageShifts);    int id = allocateNode(d);    if (id < 0) {        return id;    }    freeBytes -= runLength(id);    return id;}// 在算出的层数上进行内存调配private int allocateNode(int d) {    int id = 1;    int initial = - (1 << d); // has last d bits = 0 and rest all = 1    byte val = value(id);    if (val > d) { // unusable        return -1;    }    while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0        id <<= 1;        val = value(id);        if (val > d) {            id ^= 1;            val = value(id);        }    }    byte value = value(id);    assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",            value, id & initial, d);    // 标记这个节点已被应用    setValue(id, unusable); // mark as unusable    // 逐层向上标记它的父节点已被应用    updateParentsAlloc(id);    return id;}

初始化byteBuf

void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {    int memoryMapIdx = memoryMapIdx(handle);    int bitmapIdx = bitmapIdx(handle);    if (bitmapIdx == 0) {        byte val = value(memoryMapIdx);        assert val == unusable : String.valueOf(val);        buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),                    arena.parent.threadCache());    } else {        initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);    }}
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {    assert handle >= 0;    assert chunk != null;    this.chunk = chunk;    this.handle = handle;    memory = chunk.memory;    this.offset = offset;    this.length = length;    this.maxLength = maxLength;    tmpNioBuf = null;    this.cache = cache;}

subPage级别的内存调配

  1. 定位到一个subPage
  2. 初始化subPage(在chunk中找到一个page,对这个page依照自定义的subPage大小进行划分)
  3. 初始化ByteBuf
// tableIdx是间接将申请的内存大小除以16tableIdx = tinyIdx(normCapacity);// 数组,不同下标示意不同大小的subPagetable = tinySubpagePools;// 拿到对应大小的的subPage的链表头结点final PoolSubpage<T> head = table[tableIdx];

tinySubpagePools构造和MemoryRegionCache的tiny数组相似,不同下标代表不同大小的内存。

先通过allocateNormal调配

// 与page级别不同的是,这里会走到上面的allocateSubpagelong allocate(int normCapacity) {    if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize        return allocateRun(normCapacity);    } else {        return allocateSubpage(normCapacity);    }}private long allocateSubpage(int normCapacity) {    // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.    // This is need as we may add it back and so alter the linked-list structure.    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);    synchronized (head) {        // d 间接赋值为11,在树的最初一层调配(因为最初一次层的大小是一个page,大于要调配的大小)        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves        int id = allocateNode(d);        if (id < 0) {            return id;        }        final PoolSubpage<T>[] subpages = this.subpages;        final int pageSize = this.pageSize;        freeBytes -= pageSize;        // subpageIdx: 这个subPage在page中的地位        int subpageIdx = subpageIdx(id);        PoolSubpage<T> subpage = subpages[subpageIdx];        if (subpage == null) {            subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);            subpages[subpageIdx] = subpage;        } else {            subpage.init(head, normCapacity);        }        return subpage.allocate();    }}PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {    this.chunk = chunk;    this.memoryMapIdx = memoryMapIdx;    this.runOffset = runOffset;    this.pageSize = pageSize;    bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64    init(head, elemSize);}void init(PoolSubpage<T> head, int elemSize) {    doNotDestroy = true;    this.elemSize = elemSize;    if (elemSize != 0) {        maxNumElems = numAvail = pageSize / elemSize;        nextAvail = 0;        bitmapLength = maxNumElems >>> 6;        if ((maxNumElems & 63) != 0) {            bitmapLength ++;        }        // bitmap: 示意page中那个subPage已被调配        for (int i = 0; i < bitmapLength; i ++) {            bitmap[i] = 0;        }    }    addToPool(head);}// 将创立好的subPage加到tinySubpagePools的链表中private void addToPool(PoolSubpage<T> head) {    assert prev == null && next == null;    prev = head;    next = head.next;    next.prev = this;    head.next = this;}
long allocate() {    if (elemSize == 0) {        return toHandle(0);    }    if (numAvail == 0 || !doNotDestroy) {        return -1;    }    final int bitmapIdx = getNextAvail();    int q = bitmapIdx >>> 6;    int r = bitmapIdx & 63;    assert (bitmap[q] >>> r & 1) == 0;    bitmap[q] |= 1L << r;    if (-- numAvail == 0) {        removeFromPool();    }    return toHandle(bitmapIdx);}private long toHandle(int bitmapIdx) {    return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {    assert bitmapIdx != 0;    int memoryMapIdx = memoryMapIdx(handle);    PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];    assert subpage.doNotDestroy;    assert reqCapacity <= subpage.elemSize;    buf.init(        this, handle,        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,        arena.parent.threadCache());}

ByteBuf的开释

  1. 将要开释的内存加到缓存
  2. 如果退出缓存失败(缓存已满),就标记该内存未应用
  3. 将ByteBuf放到对象池
private boolean release0(int decrement) {    for (;;) {        int refCnt = this.refCnt;        if (refCnt < decrement) {            throw new IllegalReferenceCountException(refCnt, -decrement);        }        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {            // 援用计数缩小到decrement            if (refCnt == decrement) {                deallocate();                return true;            }            return false;        }    }}protected final void deallocate() {    if (handle >= 0) {        final long handle = this.handle;        this.handle = -1;        memory = null;        chunk.arena.free(chunk, handle, maxLength, cache);        recycle();    }}void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {    if (chunk.unpooled) {        int size = chunk.chunkSize();        destroyChunk(chunk);        activeBytesHuge.add(-size);        deallocationsHuge.increment();    } else {        SizeClass sizeClass = sizeClass(normCapacity);        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {            // cached so not free it.            return;        }        freeChunk(chunk, handle, sizeClass);    }}boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {    MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);    if (cache == null) {        return false;    }    return cache.add(chunk, handle);}public final boolean add(PoolChunk<T> chunk, long handle) {    Entry<T> entry = newEntry(chunk, handle);    boolean queued = queue.offer(entry);    if (!queued) {        // If it was not possible to cache the chunk, immediately recycle the entry        entry.recycle();    }    return queued;}

增加缓存失败

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {    final boolean destroyChunk;    synchronized (this) {        switch (sizeClass) {        case Normal:            ++deallocationsNormal;            break;        case Small:            ++deallocationsSmall;            break;        case Tiny:            ++deallocationsTiny;            break;        default:            throw new Error();        }        destroyChunk = !chunk.parent.free(chunk, handle);    }    if (destroyChunk) {        // destroyChunk not need to be called while holding the synchronized lock.        destroyChunk(chunk);    }}// 标记内存为未应用void free(long handle) {    int memoryMapIdx = memoryMapIdx(handle);    int bitmapIdx = bitmapIdx(handle);    if (bitmapIdx != 0) { // free a subpage        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];        assert subpage != null && subpage.doNotDestroy;        // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.        // This is need as we may add it back and so alter the linked-list structure.        PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);        synchronized (head) {            if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {                return;            }        }    }    freeBytes += runLength(memoryMapIdx);    setValue(memoryMapIdx, depth(memoryMapIdx));    updateParentsFree(memoryMapIdx);}

将ByteBuf退出到对象池

private void recycle() {    recyclerHandle.recycle(this);}