对于linux零拷贝技术能够先看下后面一篇文章IO零拷贝,因为java里的零拷贝底层也是依赖的操作系统实现,须要阐明下,Linux提供的零拷贝技术Java并不是全反对,只反对2种:mmap内存映射、sendfile,别离是由FileChannel.map()与FileChannel.transferTo()/transferFrom()实现。
波及的类次要有FileChannel,MappedByteBuffer,DirectByteBuffer。

MappedByteBuffer
先看下ChannelFile的map办法:

public abstract MappedByteBuffer map(MapMode mode,                         long position, long size)throws IOException;
  • mode 限定内存映射区域(MappedByteBuffer)对内存映像文件的拜访模式,有只读,读写与写时拷贝三种。
  • position 文件映射的起始地址,对应内存映射区域的首地址
  • size 文件映射的字节长度,从position往后的字节数,对应内存映射区域的大小

map办法正是NIO基于内存映射(mmap)这种零拷贝形式的一种实现形式。办法返回一个MappedByteBuffer,MappedByteBuffer继承于ByteBuffer,扩大的办法有force(),load(),isLoad()这三个办法:

  • force(),对于处于READ_WRITE模式下的缓冲区,将对缓冲区内容共性强制刷新到本地文件
  • load(),将缓冲区的内容载入物理内存中,并返回这个缓冲区的援用
  • isLoad(),判断缓冲区的内容是否在物理内存中,是返回true,不是返回false

看个示例

public class MappedByteBufferDemo {    public static final String CONTENT = "zero copy by MappedByteBuffer";    public static final String FILE_NAME= "zero_copy/mmap.txt";    public static final String CHARSET = "UTF-8";    /**     * 写文件数据:关上文件通道 fileChannel 并提供读权限、写权限和数据清空权限,     * 通过 fileChannel 映射到一个可写的内存缓冲区 mappedByteBuffer,     * 将指标数据写入 mappedByteBuffer,通过 force() 办法把缓冲区更改的内容强制写入本地文件。     */    @Test    public void writeToFileByMappedByteBuffer(){        //文件门路依据理论来定,我是放在我的项目的resources目录下        Path path = Paths.get(getClass().getResource("/"+FILE_NAME).getPath());        byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));        try(FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,                        StandardOpenOption.WRITE,StandardOpenOption.TRUNCATE_EXISTING)){            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, bytes.length);            if (mappedByteBuffer != null){                mappedByteBuffer.put(bytes);                mappedByteBuffer.force();            }        }catch (IOException e){            e.printStackTrace();        }    }    /**     *     * 读文件数据:关上文件通道 fileChannel 并提供只读权限,通过 fileChannel 映射到一个     * 只可读的内存缓冲区 mappedByteBuffer,读取 mappedByteBuffer 中的字节数组即可失去文件数据。     */    @Test    public void readFileFromMappedByteBuffer(){        Path path = Paths.get(getClass().getResource("/"+FILE_NAME).getPath());        int length = CONTENT.getBytes(Charset.forName(CHARSET)).length;        try(FileChannel fileChannel = FileChannel.open(path,StandardOpenOption.READ)){            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, length);            if (mappedByteBuffer != null){                byte[] bytes = new byte[length];                mappedByteBuffer.get(bytes);                String content = new String(bytes, StandardCharsets.UTF_8);                assertEquals(content,"zero copy by MappedByteBuffer");            }        }catch (IOException e){            e.printStackTrace();        }    }}

这里咱们再来看看map()办法,它是在FileChannelImpl类里实现的,来看下外围代码:

public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {    int pagePosition = (int)(position % allocationGranularity);    long mapPosition = position - pagePosition;    long mapSize = size + pagePosition;    try {        //第一次文件映射导致OOM,手动触发垃圾回收,100S后再尝试映射,如果再失败则抛出异样        addr = map0(imode, mapPosition, mapSize);    } catch (OutOfMemoryError x) {        System.gc();        try {            Thread.sleep(100);        } catch (InterruptedException y) {            Thread.currentThread().interrupt();        }        try {            //addr为内存映射区域的起始地址,通过起始地址+偏移量能够获取指定内存数据。底层是JNI调用C实现            addr = map0(imode, mapPosition, mapSize);        } catch (OutOfMemoryError y) {            throw new IOException("Map failed", y);        }    }    int isize = (int)size;    Unmapper um = new Unmapper(addr, mapSize, isize, mfd);    //通过Util工人反射创立一个DirectByteBuffer实例    if ((!writable) || (imode == MAP_RO)) {        return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);    } else {        return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);    }}  

总结:

  1. MappedByteBuffer底层应用DirectByteBuffer申请堆外虚拟内存,调配的内存不受JVM的-Xmx限度
  2. MappedByteBuffer关上的文件只有在垃圾回收的时候才会被敞开
  3. MappedByteBuffer映射的内存须要用户程序通过java反射调用sum.misc.Cleaner的clean()办法手动开释

DirectByteBuffer
DirectByteBuffer能够调配堆外内存,它是通过Unsafe本地办法allocateMemory()进行调配的,底层调用的是操作系统的malloc()函数。创立DirectByteBuffer对象时还会创立一个Deallocate线程,并通过Cleaner的freeMemory()办法对间接内存进行回收操作,freeMomery()底层调用的是操作系统的free()函数。

DirectByteBuffer(int cap) {    super(-1, 0, cap, cap);    boolean pa = VM.isDirectMemoryPageAligned();    int ps = Bits.pageSize();    long size = Math.max(1L, (long)cap + (pa ? ps : 0));    Bits.reserveMemory(size, cap);    long base = 0;    try {        base = unsafe.allocateMemory(size);    } catch (OutOfMemoryError x) {        Bits.unreserveMemory(size, cap);        throw x;    }    unsafe.setMemory(base, size, (byte) 0);    if (pa && (base % ps != 0)) {        address = base + ps - (base & (ps - 1));    } else {        address = base;    }    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));    att = null;}

DirectByteBuffer是MappedByteBuffer的子类,之前咱们有提到的FileChannel#map()办法中

Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);

它底层就是通过反射创立了DirectByteBuffer实例,而后调配的堆外内存:

static MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd,                                            Runnable unmapper) {    MappedByteBuffer dbb;    if (directByteBufferConstructor == null)        initDBBConstructor();    try {        dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(            new Object[] { new Integer(size), new Long(addr), fd, unmapper });    } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {        throw new InternalError(e);    }    return dbb;}private static void initDBBRConstructor() {    AccessController.doPrivileged(new PrivilegedAction<Void>() {        public Void run() {            try {                Class<?> cl = Class.forName("java.nio.DirectByteBufferR");                Constructor<?> ctor = cl.getDeclaredConstructor(                    new Class<?>[] { int.class, long.class, FileDescriptor.class,                                    Runnable.class });                ctor.setAccessible(true);                directByteBufferRConstructor = ctor;            } catch (ClassNotFoundException | NoSuchMethodException |                     IllegalArgumentException | ClassCastException x) {                throw new InternalError(x);            }            return null;        }});}

DirectByteBuffer自身也有文件内存映射的性能,另外还提供了MappedByteBuffer所没有的能够在内存映像文件进行随机读取get()与写入write()操作。

public byte get() {    return ((unsafe.getByte(ix(nextGetIndex()))));}public byte get(int i) {    return ((unsafe.getByte(ix(checkIndex(i)))));}public ByteBuffer put(byte x) {    unsafe.putByte(ix(nextPutIndex()), ((x)));    return this;}public ByteBuffer put(int i, byte x) {    unsafe.putByte(ix(checkIndex(i)), ((x)));    return this;}

内存映像文件的随机读写都是借助 ix() 办法实现定位的, ix() 办法通过内存映射空间的内存首地址(address)和给定偏移量 i 计算出指针地址,而后由 unsafe 类的 get() 和 put() 办法和对指针指向的数据进行读取或写入。

总结:

  1. DirectByteBuffer是MappedByteBuffer子类,它自身有文件映射内存性能,同时它还具备MappedByteBuffer所没有的在内存映像文件进行随机读取get()与写入write()性能。
  2. DirectByteBuffer是通过Unsafe本地办法申请的堆外内存,回收时须要应用程序自身应用Cleaner类进行回收

FileChannel
FileChannel是一个用于文件读写,映射和操作的通道,它定义了transferFrom()和transferTo()两个形象办法,它通过在通道和通道之间建设连贯实现数据传输。

//通过FileChannel将文件外面的数据写入一个WritableByteChannel的目标通道public abstract long transferTo(long position, long count, WritableByteChannel target)        throws IOException;//将一个源通道ReadableByteChannel中的数据读取到以后FileChannel的文件外面public abstract long transferFrom(ReadableByteChannel src, long position, long count)        throws IOException;

示例:

public class FileChannelDemo {    public static final String CONTENT = "zero copy by FileChannel";    //两个文件放在我的项目的resources目录下    public static final String SOURCE_FILE = "/zero_copy/source.txt";    public static final String TARGET_FILE = "/zero_copy/target.txt";    public static final String CHARSET = "UTF-8";    //先将内容写入source.txt    @Before    public void setup(){        Path path = Paths.get(getClass().getResource(SOURCE_FILE).getPath());        byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));        try(FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,                                        StandardOpenOption.WRITE,StandardOpenOption.TRUNCATE_EXISTING)){            fileChannel.write(ByteBuffer.wrap(bytes));        }catch (IOException e){            e.printStackTrace();        }    }    //通过transferTo将fromChannel上的数据拷贝到toChannel    @Test    public void transferTo()throws Exception{        try(FileChannel fromChannel = new RandomAccessFile(getClass().getResource(SOURCE_FILE).getPath()                                                            ,"rw").getChannel();            FileChannel toChannel = new RandomAccessFile(getClass().getResource(TARGET_FILE).getPath()                                                            ,"rw").getChannel()){            long position = 0L;            long offset = fromChannel.size();            fromChannel.transferTo(position,offset,toChannel);        }    }    //通过transferFrom将fromChannel中的数据拷贝到toChannel    @Test    public void transferFrom()throws Exception{        try(FileChannel fromChannel = new RandomAccessFile(getClass().getResource(SOURCE_FILE).getPath()                                                            ,"rw").getChannel();            FileChannel toChannel = new RandomAccessFile(getClass().getResource(TARGET_FILE).getPath()                                                            ,"rw").getChannel()){            long position = 0L;            long offset = fromChannel.size();            toChannel.transferFrom(fromChannel,position,offset);        }    }}

transferTo()与transferFrom()底层都是基于sendfile实现数据传输的。上面以transferTo()源码为例进行阐明:

public long transferTo(long position, long count, WritableByteChannel target)        throws IOException {    // 计算文件的大小    long sz = size();    // 校验起始地位    if (position > sz)        return 0;    int icount = (int)Math.min(count, Integer.MAX_VALUE);    // 校验偏移量    if ((sz - position) < icount)        icount = (int)(sz - position);    long n;    //内核如果反对sendfile,则应用transferToDirectly    if ((n = transferToDirectly(position, icount, target)) >= 0)        return n;    //内核不反对sendfile,则应用mmap的形式    if ((n = transferToTrustedChannel(position, icount, target)) >= 0)        return n;    //内核不反对sendfile与mmap,则应用传统的IO形式实现读写    return transferToArbitraryChannel(position, icount, target);}//先获取文件描述符targetFD,接着获取同步锁后执行transferToDirectlyInternalprivate long transferToDirectly(long position, int icount, WritableByteChannel target)        throws IOException {    // 省略从target获取targetFD的过程    if (nd.transferToDirectlyNeedsPositionLock()) {        synchronized (positionLock) {            long pos = position();            try {                return transferToDirectlyInternal(position, icount,                        target, targetFD);            } finally {                position(pos);            }        }    } else {        return transferToDirectlyInternal(position, icount, target, targetFD);    }}//transferToDirectlyInternal会调用本地办法transferTo0()尝试以sendfile的形式传输数据private long transferToDirectlyInternal(long position, int icount,                                        WritableByteChannel target,                                        FileDescriptor targetFD) throws IOException {    assert !nd.transferToDirectlyNeedsPositionLock() ||            Thread.holdsLock(positionLock);    long n = -1;    int ti = -1;    try {        begin();        ti = threads.add();        if (!isOpen())            return -1;        do {            n = transferTo0(fd, position, icount, targetFD);        } while ((n == IOStatus.INTERRUPTED) && isOpen());        if (n == IOStatus.UNSUPPORTED_CASE) {            if (target instanceof SinkChannelImpl)                pipeSupported = false;            if (target instanceof FileChannelImpl)                fileSupported = false;            return IOStatus.UNSUPPORTED_CASE;        }        if (n == IOStatus.UNSUPPORTED) {            transferSupported = false;            return IOStatus.UNSUPPORTED;        }        return IOStatus.normalize(n);    } finally {        threads.remove(ti);        end (n > -1);    }}

总结:

  1. FileChannel的transferTo()与transferFrom()底层都是基于sendfile实现数据传输的
  2. transferTo办法外部会判断零碎是否反对sendfile,如果不反对会应用mmap的形式;如果零碎也不反对mmap的形式,则会应用传统的IO形式进行数据传输
  3. transferToyyfi会调用本地办法transferTo0()

Netty的零拷贝实现

netty的零拷贝次要是通过对java.nio.channels.FileChannel的tranferTo()的包装,在文件传输时将文件缓冲区的数据间接发送到目标通道(Channel)。

  1. 应用Direct Buffers,Netty采纳间接缓冲区间接在内存区域调配空间,防止数据的屡次拷贝
  2. 应用CompositeByteBuf,它保留了多个ByteBuf的援用,对外提供对立封装后的ByteBuf接口,防止数据拷贝
  3. Netty的文件传输类DefaultFileRegion通过调用FileChannel.transferTo()办法实现零拷贝,文件缓冲区的数据会间接发送给指标Channel

九,RocketMQ与Kafka里的零拷贝

mmap应用的是非阻塞式IO,基于多路复用解决,实用于小数据块/高频率的IO传输,大块数据会阻塞多路复用线程,sendfile应用的是阻塞式IO,实用于大数据块/低频率的IO传输。

零拷贝形式长处毛病
RocketMQmmap+write实用于小块文件传输,频繁调用时效率高不能很好利用DMA形式,会比sendfile多耗费CPU,内存安全性管制简单,须要防止JVM Crash问题
Kafkasendfile能够利用DMA形式,耗费CPU较少,大块文件传输效率高,无内存平安问题小块文件效率低于mmap形式,只能是BIO形式传输,不能应用NIO形式

总结:

  1. FileChannel调用map()办法最终是应用DirectByteBuffer映射的堆外内存,而后应用MappedByteBuffer进行读写,这就是mmap实现的形式
  2. FileChannel调用transferTo()办法时底层应用本地办法transferTo0()实现的sendfile办法
  3. Netty,RocketMQ,Kafka里的零拷贝也简略提了一下

本文次要参考的文章:
Java NIO-零拷贝实现

深刻分析Linux IO原理和几种零拷贝机制的实现