NettyByteBuf-一

6次阅读

共计 19832 个字符,预计需要花费 50 分钟才能阅读完成。

欢迎关注公众号:【爱编码
如果有需要后台回复 2019 赠送 1T 的学习资料 哦!!

简介

所有的网路通信都涉及字节序列的移动,所以高效易用的数据结构明显是必不可少的。Netty 的 ByteBuf 实现满足并超越了这些需求。

ByteBuf 结构

ByteBuf 维护了两个不同的索引:一个是用于读取,一个用于写入 。当你从 ByteBuf 读取是,它的readerIndex 将会被递增已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的 witerIndex 也会被递增。

作为一个容器,源码中的如下。有三块区域
discardable bytes:无效空间(已经读取过的空间),可丢弃字节的区域,由readerIndex 指针 控制
readable bytes:内容空间,可读字节的区域, 由 readerIndex 和 writerIndex 指针控制控制
writable bytes:空闲空间,可写入字节的区域, 由 writerIndex 指针和 capacity 容量控制

 * <pre>
 *      +-------------------+------------------+------------------+
 *      | discardable bytes |  readable bytes  |  writable bytes  |
 *      |                   |     (CONTENT)    |                  |
 *      +-------------------+------------------+------------------+
 *      |                   |                  |                  |
 *      0      <=      readerIndex   <=   writerIndex    <=    capacity
 * </pre>

ByteBuf 使用模式

总体分类划分是可根据 JVM 堆内存来区分的。

1. 堆内内存(JVM 堆空间内)
2. 堆外内存(本机直接内存)
3. 复合缓冲区(以上 2 种缓冲区多个混合)

1. 堆内内存

最常用的 ByteBuf 模式是将 数据存储在 JVM 的堆空间中。它能在没有使用池化的情况下提供快速的分配和释放。

2. 堆外内存

JDK 允许 JVM 实现通过本地调用来分配内存。主要是为了避免每次调用本地 I / O 操作之前(或者之后)将缓冲区的内容复制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区)。
** 最大的特点: 它的内容将驻留在常规的会被垃圾回收的堆之外。
最大的缺点:相对于堆缓冲区,它的分配和释放都是较为昂贵的。**

3. 复合缓冲区

常用类:CompositeByteBuf,它为多个 ByteBuf 提供一个聚合视图,将多个缓冲区表示为单个合并缓冲区的虚拟表示。
比如:HTTP 协议:头部和主体这两部分由应用程序的不同模块产生。这个时候把这两部分合并的话,选择 CompositeByteBuf 是比较好的。

ByteBuf 分类

主要分为三大类

Pooled 和 Unpooled(池化)
unsafe 和非 unsafe()
Heap 和 Direct(堆内和堆外)

Pooled 和 Unpooled
Pooled:每次都从预先分配好的内存中去取出一段连续内存封装成一个 ByteBuf 给应用程序使用
Unpooled:每次分配内存的时候,直接调用系统 api,向操作系统申请一
块内存

Heap 和 Direct:
Head:是调用 jvm 的堆内存进行分配的,需要被 gc 进行管理
Direct:是调用 jdk 的 api 进行内存分配,不受 jvm 控制,不会参与到 gc 的过程

Unsafe 和非 Unsafe
jdk 中有 Unsafe 对象可以直接拿到对象的内存地址,并且基于这个内存地址进行读写操作。那么对应的分类的区别就是是否可以拿到 jdk 底层的 Unsafe 进行读写操作了。

Java 为什么会引入及如何使用 Unsafe

内存分配 ByteBufAllocator

这个接口实现负责分配缓冲区并且是线程安全的。从下面的接口方法以及注释可以总结出主要是围绕上面的三种 ByteBuf 内存模式:堆内,堆外以及复合型的内存分配。

/**
 * Implementations are responsible to allocate buffers. Implementations of this interface are expected to be
 * thread-safe.
 */
public interface ByteBufAllocator {

    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

    /**
     * Allocate a {@link ByteBuf}. If it is a direct or heap buffer
     * depends on the actual implementation.
     */
    ByteBuf buffer();

    /**
     * Allocate a {@link ByteBuf} with the given initial capacity.
     * If it is a direct or heap buffer depends on the actual implementation.
     */
    ByteBuf buffer(int initialCapacity);

    /**
     * Allocate a {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity. If it is a direct or heap buffer depends on the actual
     * implementation.
     */
    ByteBuf buffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     */
    ByteBuf ioBuffer();

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     */
    ByteBuf ioBuffer(int initialCapacity);

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     */
    ByteBuf ioBuffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a heap {@link ByteBuf}.
     */
    ByteBuf heapBuffer();

    /**
     * Allocate a heap {@link ByteBuf} with the given initial capacity.
     */
    ByteBuf heapBuffer(int initialCapacity);

    /**
     * Allocate a heap {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity.
     */
    ByteBuf heapBuffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a direct {@link ByteBuf}.
     */
    ByteBuf directBuffer();

    /**
     * Allocate a direct {@link ByteBuf} with the given initial capacity.
     */
    ByteBuf directBuffer(int initialCapacity);

    /**
     * Allocate a direct {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity.
     */
    ByteBuf directBuffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a {@link CompositeByteBuf}.
     * If it is a direct or heap buffer depends on the actual implementation.
     */
    CompositeByteBuf compositeBuffer();

    /**
     * Allocate a {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     * If it is a direct or heap buffer depends on the actual implementation.
     */
    CompositeByteBuf compositeBuffer(int maxNumComponents);

    /**
     * Allocate a heap {@link CompositeByteBuf}.
     */
    CompositeByteBuf compositeHeapBuffer();

    /**
     * Allocate a heap {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     */
    CompositeByteBuf compositeHeapBuffer(int maxNumComponents);

    /**
     * Allocate a direct {@link CompositeByteBuf}.
     */
    CompositeByteBuf compositeDirectBuffer();

    /**
     * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     */
    CompositeByteBuf compositeDirectBuffer(int maxNumComponents);

    /**
     * Returns {@code true} if direct {@link ByteBuf}'s are pooled
     */
    boolean isDirectBufferPooled();

    /**
     * Calculate the new capacity of a {@link ByteBuf} that is used when a {@link ByteBuf} needs to expand by the
     * {@code minNewCapacity} with {@code maxCapacity} as upper-bound.
     */
    int calculateNewCapacity(int minNewCapacity, int maxCapacity);
 }

其中 ByteBufAllocator 的具体实现可以查看其子类,如下图

下面来看看各自子类的功能以及区别

UnpooledByteBufAllocator

heap 内存分配
入口 new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
发现分配 Unpooled、Unsafe、Heap 内存,其实是分配了一个 byte 数组,并保存在 UnpooledHeapByteBuf#array 成员变量中。该内存的初始值容量和最大可扩展容量可以指定。

   public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(maxCapacity);

        checkNotNull(alloc, "alloc");

        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setArray(allocateArray(initialCapacity));
        setIndex(0, 0);
    }


    protected byte[] allocateArray(int initialCapacity) {return new byte[initialCapacity];
    }

查看 UnpooledHeapByteBuf#getByte() 方法,堆内存类型的 ByteBuf 获取的时候。直接通过下标获取 byte 数组中的 byte

 @Override
    public byte getByte(int index) {ensureAccessible();
        return _getByte(index);
    }

   @Override
    protected byte _getByte(int index) {// 该 array 为初始化的时候,实例化的 byte[]
        return HeapByteBufUtil.getByte(array, index);
    }

    static byte getByte(byte[] memory, int index) {
        // 直接拿到一个数组
        return memory[index];
    }

direct 内存分配

入口UnpooledByteBufAllocator#newDirectBuffer() --> UnpooledUnsafeDirectByteBuf

  public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(maxCapacity);
        if (alloc == null) {throw new NullPointerException("alloc");
        }
        checkPositiveOrZero(initialCapacity, "initialCapacity");
        checkPositiveOrZero(maxCapacity, "maxCapacity");
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(allocateDirect(initialCapacity), false);
    }


  protected ByteBuffer allocateDirect(int initialCapacity) {return ByteBuffer.allocateDirect(initialCapacity);
    }

  final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {if (tryFree) {
            ByteBuffer oldBuffer = this.buffer;
            if (oldBuffer != null) {if (doNotFree) {doNotFree = false;} else {freeDirect(oldBuffer);
                }
            }
        }
        this.buffer = buffer;
        memoryAddress = PlatformDependent.directBufferAddress(buffer);
        tmpNioBuf = null;
        capacity = buffer.remaining();}

可以发现,Unpooled、Direct 类型得内存分配实际上是维护了一个底层 jdk 的一个 DirectByteBuffer。分配内存的时候就创建它,并将他保存到 buffer 成员变量。

跟踪iUnpooledHeapByteBuf#_getByte(),就比较简单了,直接使用 jdk 的 api 获取

 @Override
    protected byte _getByte(int index) {
        // 使用 buffer
        return buffer.get(index);
    }

更加详细的分析可以查看下面这篇文章
https://www.jianshu.com/p/158…

PooledByteBufAllocator

入口 PooledByteBufAllocator#newHeapBuffer()PooledByteBufAllocator#newDirectBuffer(),堆内内存和堆外内存分配的模式都比较固定。

1. 拿到线程局部缓存 PoolThreadCache
2. 拿到不同类型的 rena
3. 使用不同类型的 arena 进行内存分配

@Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        // 拿到线程局部缓存
        PoolThreadCache cache = threadCache.get();
        // 拿到 heapArena
        PoolArena<byte[]> heapArena = cache.heapArena;

        final ByteBuf buf;
        if (heapArena != null) {
            // 使用 heapArena 分配内存
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
        } else {buf = PlatformDependent.hasUnsafe() ?
                    new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        // 拿到线程局部缓存
        PoolThreadCache cache = threadCache.get();
        // 拿到 directArena
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            // 使用 directArena 分配内存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

跟踪 threadCache.get() 调用的是 FastThreadLocal#get() 方法。那么其实 threadCache 也是一个 FastThreadLocal, 可以看成是 jdk 的 ThreadLocal,get 方法调用了初始化方法initializel

public final V get() {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {return (V) v;
        }
        // 调用初始化方法
        V value = initialize(threadLocalMap);
        registerCleaner(threadLocalMap);
        return value;
    }

initialValue()方法的逻辑如下

1. 从预先准备好的 heapArenasdirectArenas中获取最少使用的 arena
2. 使用获取到的arean 为参数,实例化一个 PoolThreadCache 并返回

   final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
        private final boolean useCacheForAllThreads;

        PoolThreadLocalCache(boolean useCacheForAllThreads) {this.useCacheForAllThreads = useCacheForAllThreads;}

        @Override
        protected synchronized PoolThreadCache initialValue() {
            /**
             * arena 翻译成竞技场,关于内存非配的逻辑都在这个竞技场中进行分配
             */
            // 获取 heapArena:从 heapArenas 堆内竞技场中拿出使用最少的一个 arena
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            // 获取 directArena:从 directArena 堆内竞技场中拿出使用最少的一个 arena
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

            Thread current = Thread.currentThread();
            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
                // 创建 PoolThreadCache:该 Cache 最终被一个线程使用
                // 通过 heapArena 和 directArena 维护两大块内存:堆和堆外内存
                // 通过 tinyCacheSize,smallCacheSize,normalCacheSize 维护 ByteBuf 缓存列表维护反复使用的内存块
                return new PoolThreadCache(
                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
            }
            // No caching so just use 0 as sizes.
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
        }

      // 省略代码......

      }

查看 PoolThreadCache 其维护了两种类型的内存分配策略,一种是上述通过持有 heapArenadirectArena,另一种是通过维护 tiny,small,normal 对应的缓存列表来维护反复使用的内存。

final class PoolThreadCache {private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

    // 通过 arena 的方式维护内存
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    // 维护了 tiny, small, normal 三种类型的缓存列表
    // Hold the caches for the different size classes, which are tiny, small and normal.
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

    // Used for bitshifting when calculate the index of normal caches later
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    private final int freeSweepAllocationThreshold;
    private final AtomicBoolean freed = new AtomicBoolean();

    private int allocations;

    // TODO: Test if adding padding helps under contention
    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;

        // 通过持有 heapArena 和 directArena,arena 的方式管理内存分配
        this.heapArena = heapArena;
        this.directArena = directArena;

        // 通过 tinyCacheSize,smallCacheSize,normalCacheSize 创建不同类型的缓存列表并保存到成员变量
        if (directArena != null) {
            tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);

            directArena.numThreadCaches.getAndIncrement();} else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            // Create the caches for the heap allocations
            // 创建规格化缓存队列
            tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            // 创建规格化缓存队列
            smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalHeap = log2(heapArena.pageSize);
            // 创建规格化缓存队列
            normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);

            heapArena.numThreadCaches.getAndIncrement();} else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }

        // Only check if there are caches in use.
        if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                && freeSweepAllocationThreshold < 1) {
            throw new IllegalArgumentException("freeSweepAllocationThreshold:"
                    + freeSweepAllocationThreshold + "(expected: > 0)");
        }
    }

    private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) {if (cacheSize > 0 && numCaches > 0) {
            //MemoryRegionCache 维护缓存的一个对象
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                // TODO: maybe use cacheSize / cache.length
                // 每一种 MemoryRegionCache(tiny,small,normal)都表示不同内存大小(不同规格)的一个队列
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {return null;}
    }

    private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {if (cacheSize > 0 && maxCachedBufferCapacity > 0) {int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
            //MemoryRegionCache 维护缓存的一个对象
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
            for (int i = 0; i < cache.length; i++) {// 每一种 MemoryRegionCache(tiny,small,normal)都表示不同内存(不同规格)大小的一个队列
                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
        } else {return null;}
    }

......
}

更加详细分析可参考以下文章
https://www.jianshu.com/p/1cd…

directArena 分配 direct 内存的流程

上一步拿到 PoolThreadCache 之后,获取对应的Arena。那么之后就是 Arena 具体分配内存的步骤。

入口 PooledByteBufAllocator#newDirectBuffer() 方法种有如下代码

 PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        // 拿到 PooledByteBuf 对象, 仅仅是一个对象
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        // 从 cache 种分配内存,并初始化 buf 种内存地址相关的属性
        allocate(cache, buf, reqCapacity);
        return buf;
    }

可以看到分配的过程如下:拿到 PooledByteBuf 对象从 cache 中分配内存,并重置相关属性.

1.newByteBuf(maxCapacity);拿到 PooledByteBuf 对象

  @Override
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {if (HAS_UNSAFE) {
                // 获取一个 PooledByteBuf
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        }

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        // 从带有回收特性的对象池 RECYCLER 获取一个 PooledUnsafeDirectByteBuf
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        //buf 可能是从回收站拿出来的,要进行复用
        buf.reuse(maxCapacity);
        return buf;
    }

2.Recycler 是一个基于线程本地堆栈的对象池。Recycler 维护了一个 ThreadLocal 成员变量,用于返回一个 stack 给回收处理 DefaultHandle,该处理器通过维护这个堆栈来维护PooledUnsafeDirectByteBuf 缓存。

private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        @Override
        protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
            //Recycler 负责用回收处理器 handler 维护 PooledUnsafeDirectByteBuf
            //handler 底层持有一个 stack 作为对象池,维护对象池,handle 同时负责对象回收
            // 存储 handler 为成员变量,使用完该 ByteBuf 可以调用回收方法回收
            return new PooledUnsafeDirectByteBuf(handle, 0);
        }
    };
// 维护了一个 `ThreadLocal`,`initialValue` 方法返回一个堆栈。private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        @Override
        protected Stack<T> initialValue() {return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    ratioMask, maxDelayedQueuesPerThread);
        }

        @Override
        protected void onRemoval(Stack<T> value) {
            // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
            if (value.threadRef.get() == Thread.currentThread()) {if (DELAYED_RECYCLED.isSet()) {DELAYED_RECYCLED.get().remove(value);
               }
            }
        }
    };

3. 再看 Recycler#get()方法

  public final T get() {if (maxCapacityPerThread == 0) {return newObject((Handle<T>) NOOP_HANDLE);
        }
        // 获取对应的堆栈,相当一个回收站
        Stack<T> stack = threadLocal.get();

        // 从栈顶拿出一个来 DefaultHandle(回收处理器)//DefaultHandle 持有一个 value,其实是 PooledUnsafeDirectByteBuf
        DefaultHandle<T> handle = stack.pop();
        // 没有回收处理器,说明没有闲置的 ByteBuf
        if (handle == null) {
            // 新增一个处理器
            handle = stack.newHandle();
            
            // 回调,还记得么?该回调返回一个 PooledUnsafeDirectByteBuf
            // 让处理器持有一个新的 PooledUnsafeDirectByteBuf
            handle.value = newObject(handle);
        }
        // 如果有,则可直接重复使用
        return (T) handle.value;
    }

    public final V get() {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {return (V) v;
        }
        // 回调 initialize
        V value = initialize(threadLocalMap);
        registerCleaner(threadLocalMap);
        return value;
    }

        private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            // 回调
            v = initialValue();} catch (Exception e) {PlatformDependent.throwException(e);
        }

        threadLocalMap.setIndexedVariable(index, v);
        addToVariablesToRemove(threadLocalMap, this);
        return v;
    }


        DefaultHandle<T> newHandle() {
            // 实例化一个处理器并并且初四话成员变量,该成员变量 stack 从 threalocal 中初始化
            return new DefaultHandle<T>(this);
        }

DefaultHandle 用 stack 作为缓存池维护 PooledUnsafeDirectByteBuf, 同理PooledDirectByteBuf 也是一样的。只不过实例化的对象的实现不一样而已。同时,处理器定义了回收的方法是将兑现存回栈内,使用的时候则是从栈顶取出。

static final class DefaultHandle<T> implements Handle<T> {
        private int lastRecycledId;
        private int recycleId;

        boolean hasBeenRecycled;
        // 对象缓存池
        private Stack<?> stack;
        private Object value;

        DefaultHandle(Stack<?> stack) {this.stack = stack;}

        /**
         * 定义回收方法,回收对象到 stack
         * @param object
         */
        @Override
        public void recycle(Object object) {if (object != value) {throw new IllegalArgumentException("object does not belong to handle");
            }

            Stack<?> stack = this.stack;
            if (lastRecycledId != recycleId || stack == null) {throw new IllegalStateException("recycled already");
            }
            // 回收:将自己存进栈中缓存起来
            stack.push(this);
        }
    }

到这我们刚刚看完第一步,到第二步重置缓存内指针的时候了 , 获取到 PooledUnsafeDirectByteBuf 的时候,有可能是从缓存中取出来的。因此需要复用.

static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        // 从带有回收特性的对象池 RECYCLER 获取一个 PooledUnsafeDirectByteBuf
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        //buf 可能是从回收站拿出来的,要进行复用
        buf.reuse(maxCapacity);
        return buf;
    }

    final void reuse(int maxCapacity) {
        // 重置最大容量
        maxCapacity(maxCapacity);
        // 设置引用
        setRefCnt(1);
        // 重置指针
        setIndex0(0, 0);
        // 重置标记值
        discardMarks();}

到这才刚刚完成分配内存的第一步(拿到 PooledByteBuf 对象),以上都是仅仅是获取并且用回收站和回收处理器管理这些对象,这些对象仍然只是一个对象,还没有分配实际的内存。

跟踪PoolArena#allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)

  private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);

        // 不同的规格大小进行内存分配
        /**
         * 分配整体逻辑(先判断 tiny 和 small 规格的,再判断 normal 规格的)* 1. 尝试从缓存上进行内存分配,成功则返回
         * 2. 失败则再从内存堆中进行分配内存
         */
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);

            // 尝试 tiny 和 small 规格的缓存内存分配
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            //tiny 和 small 规格的缓存内存分配尝试失败
            // 从内存堆中分配内存
            synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        //normal 规格
        // 如果分配处出来的内存大于一个值(chunkSize),则执行 allocateHuge
        if (normCapacity <= chunkSize) {
            // 从缓存上进行内存分配
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            // 缓存没有再从内存堆中分配内存
            synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }

其整体分配内存的逻辑是根据不同规格大小的内存需要来的,显示 tinysmall规格的,再是 normal 规格的。分配也是先尝试从缓存中进行内存分配,如果分配失败再从内存堆中进行内存分配。当然,分配出来的内存回和第一步拿到的 PooledByteBuf 进行绑定起来。

总结

主要学习了 ByteBuf 的 基本结构、使用模式、分类、基本的内存分配

下次再学习 ByteBuf 的 命中逻辑以及内存回收

参考文章

https://www.cnblogs.com/xiang…
https://www.jianshu.com/u/fc9…

最后

如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。
关注公众号 【爱编码】,回复2019 有相关资料哦。

正文完
 0