关于java:Netty二ByteBuf

一、ByteBuf

1.1 ByteBuf介绍

NIO的ByteBuffer大家比拟相熟,它其实就是一个字节容器,然而应用过ByteBuffer的童鞋都晓得,ByteBuffer应用较为简单,例如写模式实现之后须要手动调用flip()办法切换到读模式,再进行相应的读取。

Netty里的ByteBuf是ByteBuffer的升级版,给开发者提供了更不便的API,并拓展了一些其余个性,接下来就来介绍一下ByteBuf

先看一下ByteBuf相干类的类图,对其有个整体的意识,类图如下

能够看到ByteBuf实现了ReferenceCounted接口,这是因为ByteBuf应用了援用计数,当ByteBuf的援用数为0,则示意该ByteBuf已不可用,须要显式开释。

而ByteBuf上面的AbstractByteBuf,实现了ByteBuf的一些罕用办法,比方isReadable、isWritable、readableBytes、writableBytes等。持续往下看是AbstractReferenceCountedByteBuf,它在AbstractByteBuf的根底上实现了援用计数的相干操作。

而最上面的几个类能够分为3种类型的ByteBuf,别离是PooledByteBuf(池化)、UnpooledByteBuf(非池化)及CompositeByteBuf(复合),其中PooledByteBuf又分为PooledHeapByteBuf(池化堆内存)和PooledDirectByteBuf(池化间接内存),UnpooledByteBuf同理。

ByteBuf次要提供了如下操作

办法 形容
alloc() 返回创立该ByteBuf的ByteBufAllocator分配器
capacity() ByteBuf可存储的最大字节数
hasArray() 如果ByteBuf外部是一个字节数组,返回true
array() 返回ByteBuf外部的字节数组,通常与hasArray()配合应用
readerIndex() 返回ByteBuf以后的读取索引
writerIndex() 返回ByteBuf以后的写入索引
isReadable() 是否有字节可供读取
isWritable() 是否有字节可供写入
readableBytes() 可读取的字节数
writableBytes() 可写入的字节数
getByte(int) 返回给定索引处的字节,读索引放弃不变,还提供了与getByte相似办法,如getInt()、getLong、getShort
setByte(int index, int value) 设定给定索引出的字节值,写索引放弃不变,还提供了与setByte相似办法,如setInt、setLong、setShort
readByte() 返回以后读取索引readerIndex处的字节,并将readerIndex加1
writeByte() 在以后写入索引writerIndex处写入一个字节值,并将writerIndex加1

能够通过readerIndex()、writerIndex()办法看出,ByteBuf领有两个索引,一个用于读取,一个用于写入,所以它能够写完之后间接进行读取,不须要像ByteBuffer那样调用flip()办法来切换为读模式,如下图:

1.2 援用计数

下面提到ByteBuf实现了ReferenceCounted接口,ReferenceCounted接口里次要有refCnt()援用数量、retain()减少援用数量、release()缩小援用数量等操作,如下:

// 返回援用数量
int refCnt();
// 援用数量加1
ReferenceCounted retain();
// 援用数量加increment
ReferenceCounted retain(int increment);
// 援用数量减1
boolean release();
// 援用数量减decrement
boolean release(int decrement);

ByteBuf具体援用计数的相干操作是在AbstractReferenceCountedByteBuf中实现的,接下来看下AbstractReferenceCountedByteBuf的源码,局部源码如下:

public int refCnt() {return updater.refCnt(this);}
public ByteBuf retain() {return updater.retain(this);}
public ByteBuf retain(int increment) {return updater.retain(this, increment);}
public boolean release() {return handleRelease(updater.release(this));}
public boolean release(int decrement) {return handleRelease(updater.release(this, decrement));}

能够看到这些办法外部都是通过updater更新器来实现的,所以来看下这个updater更新器是什么?

private static final long REFCNT_FIELD_OFFSET =
        ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt");
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

private static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater =
        new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {
    @Override
    protected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {
        return AIF_UPDATER;
    }
    @Override
    protected long unsafeOffset() {
        return REFCNT_FIELD_OFFSET;
    }
};

如源码所示,updater更新器是ReferenceCountUpdater类型,其外部就是通过update()办法获取到的AtomicIntegerFieldUpdater来原子地更新 援用数量字段,在这里援用数量字段是refCnt。

ReferenceCountUpdate的计数形式与一般计数有些不同

  1. refCnt初始值是2,代表理论援用数量为1
  2. 每减少一个援用,refCnt对应值会加2,而缩小一个援用,refCnt对应值减2
  3. 如果开释了最初一个援用,则refCnt对应值会变为1

所以如果refCnt值为1,代表理论援用数量为0,否则理论援用数量 = refCnt/2

1.3 PooledByteBuf 和 UnpooledByteBuf

PooledByteBuf(池化)和UnpooledByteBuf(非池化)是两种类型的ByteBuf,顾名思义,一个是从内存池中调配的ByteBuf,应用完之后偿还至池中,而另一个则是从内存中间接调配的ByteBuf,应用完即回收。

UnpooledByteBuf又分为UnpooledHeapByteBuf(非池化堆内存)和UnpooledDirectByteBuf(非池化间接内存)。UnpooledHeapByteBuf和UnpooledDirectByteBuf的实现较为简单,UnpooledHeapByteBuf外部是应用字节数组来实现,而UnpooledDirectByteBuf外部是通过NIO的DirectByteBuffer来实现的。

1.3.1 PooledByteBuf

每个线程绑定一个PoolThreadCache,它蕴含了一个heapArena(堆内存)和一个directArena(间接内存),这两个都是PoolArena类型,每个线程通过PooledByteBufAllocator来分配内存时,会在其绑定的PoolThreadCache里进行调配。PoolArena的结构图如下

上面是如何调配一个池化的PooledByteBuf,源码细节较多,这里只是大略介绍一下

入口在PooledByteBufAllocator的 newHeapBuffer 和 newDirectBuffer 办法,这里以newHeapBuffer举例,如下

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    // 获取以后线程的PoolThreadCache
    PoolThreadCache cache = threadCache.get();
    // 获取PoolThreadCache种的heapArena
    PoolArena<byte[]> heapArena = cache.heapArena;

    final ByteBuf buf;
    if (heapArena != null) {
        // 应用heapArena进行调配ByteBuf
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // 如果没有heapArena,调配非池化的ByteBuf
        buf = PlatformDependent.hasUnsafe() ?
                new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }
    return toLeakAwareBuffer(buf);
}

调用heapArena的allocate办法进行调配,先通过所须要的内存大小来计算sizeIdx,再判断sizeIdx,分为3种状况 small、normal、huge。如下:

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // 依据须要调配的内存大小来计算sizeIdx
    final int sizeIdx = size2SizeIdx(reqCapacity);
   
    if (sizeIdx <= smallMaxSizeIdx) {    // 如果sizeIdx <= smallMaxSizeIdx,阐明是所需内存较小,次要应用PoolSubpage来调配
        tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
    } else if (sizeIdx < nSizes) {    // 如果sizeIdx <= nSizes,阐明所需内存中等,次要应用PoolChunk来调配
        tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
    } else {
        // 否则当sizeIdx大于等于nSizes,阐明所需内存较大,则不在池里调配,间接分配内存
        int normCapacity = directMemoryCacheAlignment > 0
                ? normalizeSize(reqCapacity) : reqCapacity;
        allocateHuge(buf, normCapacity);
    }
}

1.4 总结

ByteBuf是netty的根底组件,绝对于ByteBuffer,ByteBuf不仅应用更为不便,提供了更全面的API,并应用了池化技术来优化内存的调配效率。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理