关于java:Netty二ByteBuf

40次阅读

共计 4968 个字符,预计需要花费 13 分钟才能阅读完成。

一、ByteBuf

1.1 ByteBuf 介绍

NIO 的 ByteBuffer 大家比拟相熟,它其实就是一个 字节容器 ,然而应用过 ByteBuffer 的童鞋都晓得,ByteBuffer 应用较为简单,例如写模式实现之后须要 手动调用 flip()办法 切换到读模式,再进行相应的读取。

Netty 里的 ByteBuf 是 ByteBuffer 的升级版,给开发者提供了更不便的 API,并拓展了一些其余个性,接下来就来介绍一下 ByteBuf

先看一下 ByteBuf 相干类的类图,对其有个整体的意识,类图如下

能够看到 ByteBuf 实现了 ReferenceCounted 接口,这是因为 ByteBuf 应用了 援用计数,当 ByteBuf 的援用数为 0,则示意该 ByteBuf 已不可用,须要显式开释。

而 ByteBuf 上面的 AbstractByteBuf,实现了 ByteBuf 的一些罕用办法,比方 isReadable、isWritable、readableBytes、writableBytes 等。持续往下看是 AbstractReferenceCountedByteBuf,它在 AbstractByteBuf 的根底上实现了援用计数的相干操作。

而最上面的几个类能够分为 3 种类型的 ByteBuf,别离是PooledByteBuf(池化)、UnpooledByteBuf(非池化)及 CompositeByteBuf(复合),其中 PooledByteBuf 又分为 PooledHeapByteBuf(池化堆内存)和 PooledDirectByteBuf(池化间接内存),UnpooledByteBuf 同理。

ByteBuf 次要提供了如下操作

办法 形容
alloc() 返回创立该 ByteBuf 的 ByteBufAllocator 分配器
capacity() ByteBuf 可存储的最大字节数
hasArray() 如果 ByteBuf 外部是一个字节数组,返回 true
array() 返回 ByteBuf 外部的字节数组,通常与 hasArray()配合应用
readerIndex() 返回 ByteBuf 以后的读取索引
writerIndex() 返回 ByteBuf 以后的写入索引
isReadable() 是否有字节可供读取
isWritable() 是否有字节可供写入
readableBytes() 可读取的字节数
writableBytes() 可写入的字节数
getByte(int) 返回给定索引处的字节,读索引放弃不变,还提供了与 getByte 相似办法,如 getInt()、getLong、getShort
setByte(int index, int value) 设定给定索引出的字节值,写索引放弃不变,还提供了与 setByte 相似办法,如 setInt、setLong、setShort
readByte() 返回以后读取索引 readerIndex 处的字节,并将 readerIndex 加 1
writeByte() 在以后写入索引 writerIndex 处写入一个字节值,并将 writerIndex 加 1

能够通过 readerIndex()、writerIndex()办法看出,ByteBuf 领有两个索引,一个用于读取,一个用于写入,所以它能够写完之后间接进行读取,不须要像 ByteBuffer 那样调用 flip()办法来切换为读模式,如下图:

1.2 援用计数

下面提到 ByteBuf 实现了 ReferenceCounted 接口,ReferenceCounted 接口里次要有 refCnt() 援用数量、retain()减少援用数量、release()缩小援用数量 等操作,如下:

// 返回援用数量
int refCnt();
// 援用数量加 1
ReferenceCounted retain();
// 援用数量加 increment
ReferenceCounted retain(int increment);
// 援用数量减 1
boolean release();
// 援用数量减 decrement
boolean release(int decrement);

ByteBuf 具体援用计数的相干操作是在 AbstractReferenceCountedByteBuf 中实现的,接下来看下 AbstractReferenceCountedByteBuf 的源码,局部源码如下:

public int refCnt() {return updater.refCnt(this);}
public ByteBuf retain() {return updater.retain(this);}
public ByteBuf retain(int increment) {return updater.retain(this, increment);}
public boolean release() {return handleRelease(updater.release(this));}
public boolean release(int decrement) {return handleRelease(updater.release(this, decrement));}

能够看到这些办法外部都是通过 updater 更新器来实现的,所以来看下这个 updater 更新器是什么?

private static final long REFCNT_FIELD_OFFSET =
        ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt");
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

private static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater =
        new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {
    @Override
    protected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {return AIF_UPDATER;}
    @Override
    protected long unsafeOffset() {return REFCNT_FIELD_OFFSET;}
};

如源码所示,updater 更新器是 ReferenceCountUpdater 类型,其外部就是通过 update()办法获取到的 AtomicIntegerFieldUpdater 来原子地更新 援用数量字段,在这里援用数量字段是 refCnt。

ReferenceCountUpdate 的计数形式与一般计数有些不同

  1. refCnt 初始值是 2,代表理论援用数量为 1
  2. 每减少一个援用,refCnt 对应值会加 2,而缩小一个援用,refCnt 对应值减 2
  3. 如果开释了最初一个援用,则 refCnt 对应值会变为 1

所以 如果 refCnt 值为 1,代表理论援用数量为 0,否则理论援用数量 = refCnt/2

1.3 PooledByteBuf 和 UnpooledByteBuf

PooledByteBuf(池化)和 UnpooledByteBuf(非池化)是两种类型的 ByteBuf,顾名思义,一个是从内存池中调配的 ByteBuf,应用完之后偿还至池中,而另一个则是从内存中间接调配的 ByteBuf,应用完即回收。

UnpooledByteBuf 又分为 UnpooledHeapByteBuf(非池化堆内存)和 UnpooledDirectByteBuf(非池化间接内存)。UnpooledHeapByteBuf 和 UnpooledDirectByteBuf 的实现较为简单,UnpooledHeapByteBuf 外部是应用 字节数组 来实现,而 UnpooledDirectByteBuf 外部是通过 NIO 的 DirectByteBuffer 来实现的。

1.3.1 PooledByteBuf

每个线程绑定一个 PoolThreadCache,它蕴含了一个 heapArena(堆内存)和一个 directArena(间接内存),这两个都是 PoolArena 类型,每个线程通过 PooledByteBufAllocator 来分配内存时,会在其绑定的 PoolThreadCache 里进行调配。PoolArena 的结构图如下

上面是如何调配一个池化的 PooledByteBuf,源码细节较多,这里只是大略介绍一下

入口在 PooledByteBufAllocator 的 newHeapBuffer 和 newDirectBuffer 办法,这里以 newHeapBuffer 举例,如下

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    // 获取以后线程的 PoolThreadCache
    PoolThreadCache cache = threadCache.get();
    // 获取 PoolThreadCache 种的 heapArena
    PoolArena<byte[]> heapArena = cache.heapArena;

    final ByteBuf buf;
    if (heapArena != null) {
        // 应用 heapArena 进行调配 ByteBuf
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // 如果没有 heapArena,调配非池化的 ByteBuf
        buf = PlatformDependent.hasUnsafe() ?
                new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }
    return toLeakAwareBuffer(buf);
}

调用 heapArena 的 allocate 办法进行调配,先通过所须要的内存大小来计算 sizeIdx,再判断 sizeIdx,分为 3 种状况 small、normal、huge。如下:

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // 依据须要调配的内存大小来计算 sizeIdx
    final int sizeIdx = size2SizeIdx(reqCapacity);
   
    if (sizeIdx <= smallMaxSizeIdx) {    // 如果 sizeIdx <= smallMaxSizeIdx,阐明是所需内存较小,次要应用 PoolSubpage 来调配
        tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
    } else if (sizeIdx < nSizes) {    // 如果 sizeIdx <= nSizes,阐明所需内存中等,次要应用 PoolChunk 来调配
        tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
    } else {
        // 否则当 sizeIdx 大于等于 nSizes,阐明所需内存较大,则不在池里调配,间接分配内存
        int normCapacity = directMemoryCacheAlignment > 0
                ? normalizeSize(reqCapacity) : reqCapacity;
        allocateHuge(buf, normCapacity);
    }
}

1.4 总结

ByteBuf 是 netty 的根底组件,绝对于 ByteBuffer,ByteBuf 不仅应用更为不便,提供了更全面的 API,并应用了池化技术来优化内存的调配效率。

正文完
 0