关于nio:Java里的零拷贝

58次阅读

共计 10429 个字符,预计需要花费 27 分钟才能阅读完成。

对于 linux 零拷贝技术能够先看下后面一篇文章 IO 零拷贝,因为 java 里的零拷贝底层也是依赖的操作系统实现,须要阐明下,Linux 提供的零拷贝技术 Java 并不是全反对,只反对 2 种:mmap 内存映射、sendfile,别离是由 FileChannel.map()与 FileChannel.transferTo()/transferFrom()实现。
波及的类次要有 FileChannel,MappedByteBuffer,DirectByteBuffer。

MappedByteBuffer
先看下 ChannelFile 的 map 办法:

public abstract MappedByteBuffer map(MapMode mode,
                         long position, long size)throws IOException;
  • mode 限定内存映射区域 (MappedByteBuffer) 对内存映像文件的拜访模式,有只读,读写与写时拷贝三种。
  • position 文件映射的起始地址,对应内存映射区域的首地址
  • size 文件映射的字节长度,从 position 往后的字节数,对应内存映射区域的大小

map 办法正是 NIO 基于内存映射 (mmap) 这种零拷贝形式的一种实现形式。办法返回一个 MappedByteBuffer,MappedByteBuffer 继承于 ByteBuffer,扩大的办法有 force(),load(),isLoad()这三个办法:

  • force(), 对于处于 READ_WRITE 模式下的缓冲区,将对缓冲区内容共性强制刷新到本地文件
  • load(), 将缓冲区的内容载入物理内存中,并返回这个缓冲区的援用
  • isLoad(), 判断缓冲区的内容是否在物理内存中,是返回 true, 不是返回 false

看个示例

public class MappedByteBufferDemo {
    public static final String CONTENT = "zero copy by MappedByteBuffer";
    public static final String FILE_NAME= "zero_copy/mmap.txt";
    public static final String CHARSET = "UTF-8";

    /**
     * 写文件数据:关上文件通道 fileChannel 并提供读权限、写权限和数据清空权限,* 通过 fileChannel 映射到一个可写的内存缓冲区 mappedByteBuffer,* 将指标数据写入 mappedByteBuffer,通过 force() 办法把缓冲区更改的内容强制写入本地文件。*/
    @Test
    public void writeToFileByMappedByteBuffer(){
        // 文件门路依据理论来定,我是放在我的项目的 resources 目录下
        Path path = Paths.get(getClass().getResource("/"+FILE_NAME).getPath());
        byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
        try(FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,
                        StandardOpenOption.WRITE,StandardOpenOption.TRUNCATE_EXISTING)){MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, bytes.length);
            if (mappedByteBuffer != null){mappedByteBuffer.put(bytes);
                mappedByteBuffer.force();}
        }catch (IOException e){e.printStackTrace();
        }
    }

    /**
     *
     * 读文件数据:关上文件通道 fileChannel 并提供只读权限,通过 fileChannel 映射到一个
     * 只可读的内存缓冲区 mappedByteBuffer,读取 mappedByteBuffer 中的字节数组即可失去文件数据。*/
    @Test
    public void readFileFromMappedByteBuffer(){Path path = Paths.get(getClass().getResource("/"+FILE_NAME).getPath());
        int length = CONTENT.getBytes(Charset.forName(CHARSET)).length;
        try(FileChannel fileChannel = FileChannel.open(path,StandardOpenOption.READ)){MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, length);
            if (mappedByteBuffer != null){byte[] bytes = new byte[length];
                mappedByteBuffer.get(bytes);
                String content = new String(bytes, StandardCharsets.UTF_8);
                assertEquals(content,"zero copy by MappedByteBuffer");
            }
        }catch (IOException e){e.printStackTrace();
        }
    }
}

这里咱们再来看看 map()办法,它是在 FileChannelImpl 类里实现的,来看下外围代码:

public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {int pagePosition = (int)(position % allocationGranularity);
    long mapPosition = position - pagePosition;
    long mapSize = size + pagePosition;
    try {
        // 第一次文件映射导致 OOM,手动触发垃圾回收,100S 后再尝试映射,如果再失败则抛出异样
        addr = map0(imode, mapPosition, mapSize);
    } catch (OutOfMemoryError x) {System.gc();
        try {Thread.sleep(100);
        } catch (InterruptedException y) {Thread.currentThread().interrupt();}
        try {
            //addr 为内存映射区域的起始地址,通过起始地址 + 偏移量能够获取指定内存数据。底层是 JNI 调用 C 实现
            addr = map0(imode, mapPosition, mapSize);
        } catch (OutOfMemoryError y) {throw new IOException("Map failed", y);
        }
    }

    int isize = (int)size;
    Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
    // 通过 Util 工人反射创立一个 DirectByteBuffer 实例
    if ((!writable) || (imode == MAP_RO)) {return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
    } else {return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);
    }
}
  

总结:

  1. MappedByteBuffer 底层应用 DirectByteBuffer 申请堆外虚拟内存,调配的内存不受 JVM 的 -Xmx 限度
  2. MappedByteBuffer 关上的文件只有在垃圾回收的时候才会被敞开
  3. MappedByteBuffer 映射的内存须要用户程序通过 java 反射调用 sum.misc.Cleaner 的 clean()办法手动开释

DirectByteBuffer
DirectByteBuffer 能够调配堆外内存,它是通过 Unsafe 本地办法 allocateMemory()进行调配的,底层调用的是操作系统的 malloc()函数。创立 DirectByteBuffer 对象时还会创立一个 Deallocate 线程,并通过 Cleaner 的 freeMemory()办法对间接内存进行回收操作,freeMomery()底层调用的是操作系统的 free()函数。

DirectByteBuffer(int cap) {super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {address = base + ps - (base & (ps - 1));
    } else {address = base;}
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}

DirectByteBuffer 是 MappedByteBuffer 的子类,之前咱们有提到的 FileChannel#map()办法中

Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);

它底层就是通过反射创立了 DirectByteBuffer 实例,而后调配的堆外内存:

static MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd,
                                            Runnable unmapper) {
    MappedByteBuffer dbb;
    if (directByteBufferConstructor == null)
        initDBBConstructor();
    try {dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(new Object[] {new Integer(size), new Long(addr), fd, unmapper });
    } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {throw new InternalError(e);
    }
    return dbb;
}

private static void initDBBRConstructor() {AccessController.doPrivileged(new PrivilegedAction<Void>() {public Void run() {
            try {Class<?> cl = Class.forName("java.nio.DirectByteBufferR");
                Constructor<?> ctor = cl.getDeclaredConstructor(new Class<?>[] { int.class, long.class, FileDescriptor.class,
                                    Runnable.class });
                ctor.setAccessible(true);
                directByteBufferRConstructor = ctor;
            } catch (ClassNotFoundException | NoSuchMethodException |
                     IllegalArgumentException | ClassCastException x) {throw new InternalError(x);
            }
            return null;
        }});
}

DirectByteBuffer 自身也有文件内存映射的性能,另外还提供了 MappedByteBuffer 所没有的能够在内存映像文件进行随机读取 get()与写入 write()操作。

public byte get() {return ((unsafe.getByte(ix(nextGetIndex()))));
}

public byte get(int i) {return ((unsafe.getByte(ix(checkIndex(i)))));
}

public ByteBuffer put(byte x) {unsafe.putByte(ix(nextPutIndex()), ((x)));
    return this;
}

public ByteBuffer put(int i, byte x) {unsafe.putByte(ix(checkIndex(i)), ((x)));
    return this;
}

内存映像文件的随机读写都是借助 ix() 办法实现定位的,ix() 办法通过内存映射空间的内存首地址(address)和给定偏移量 i 计算出指针地址,而后由 unsafe 类的 get() 和 put() 办法和对指针指向的数据进行读取或写入。

总结:

  1. DirectByteBuffer 是 MappedByteBuffer 子类,它自身有文件映射内存性能,同时它还具备 MappedByteBuffer 所没有的在内存映像文件进行随机读取 get()与写入 write()性能。
  2. DirectByteBuffer 是通过 Unsafe 本地办法申请的堆外内存,回收时须要应用程序自身应用 Cleaner 类进行回收

FileChannel
FileChannel 是一个用于文件读写,映射和操作的通道,它定义了 transferFrom()和 transferTo()两个形象办法,它通过在通道和通道之间建设连贯实现数据传输。

// 通过 FileChannel 将文件外面的数据写入一个 WritableByteChannel 的目标通道
public abstract long transferTo(long position, long count, WritableByteChannel target)
        throws IOException;

// 将一个源通道 ReadableByteChannel 中的数据读取到以后 FileChannel 的文件外面
public abstract long transferFrom(ReadableByteChannel src, long position, long count)
        throws IOException;

示例:

public class FileChannelDemo {
    public static final String CONTENT = "zero copy by FileChannel";
    // 两个文件放在我的项目的 resources 目录下
    public static final String SOURCE_FILE = "/zero_copy/source.txt";
    public static final String TARGET_FILE = "/zero_copy/target.txt";
    public static final String CHARSET = "UTF-8";

    // 先将内容写入 source.txt
    @Before
    public void setup(){Path path = Paths.get(getClass().getResource(SOURCE_FILE).getPath());
        byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
        try(FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,
                                        StandardOpenOption.WRITE,StandardOpenOption.TRUNCATE_EXISTING)){fileChannel.write(ByteBuffer.wrap(bytes));
        }catch (IOException e){e.printStackTrace();
        }
    }

    // 通过 transferTo 将 fromChannel 上的数据拷贝到 toChannel
    @Test
    public void transferTo()throws Exception{try(FileChannel fromChannel = new RandomAccessFile(getClass().getResource(SOURCE_FILE).getPath()
                                                            ,"rw").getChannel();
            FileChannel toChannel = new RandomAccessFile(getClass().getResource(TARGET_FILE).getPath()
                                                            ,"rw").getChannel()){
            long position = 0L;
            long offset = fromChannel.size();
            fromChannel.transferTo(position,offset,toChannel);
        }
    }
    // 通过 transferFrom 将 fromChannel 中的数据拷贝到 toChannel
    @Test
    public void transferFrom()throws Exception{try(FileChannel fromChannel = new RandomAccessFile(getClass().getResource(SOURCE_FILE).getPath()
                                                            ,"rw").getChannel();
            FileChannel toChannel = new RandomAccessFile(getClass().getResource(TARGET_FILE).getPath()
                                                            ,"rw").getChannel()){
            long position = 0L;
            long offset = fromChannel.size();
            toChannel.transferFrom(fromChannel,position,offset);
        }
    }
}

transferTo()与 transferFrom()底层都是基于 sendfile 实现数据传输的。上面以 transferTo()源码为例进行阐明:

public long transferTo(long position, long count, WritableByteChannel target)
        throws IOException {
    // 计算文件的大小
    long sz = size();
    // 校验起始地位
    if (position > sz)
        return 0;
    int icount = (int)Math.min(count, Integer.MAX_VALUE);
    // 校验偏移量
    if ((sz - position) < icount)
        icount = (int)(sz - position);

    long n;
    // 内核如果反对 sendfile,则应用 transferToDirectly
    if ((n = transferToDirectly(position, icount, target)) >= 0)
        return n;
    // 内核不反对 sendfile,则应用 mmap 的形式
    if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
        return n;
    // 内核不反对 sendfile 与 mmap,则应用传统的 IO 形式实现读写
    return transferToArbitraryChannel(position, icount, target);
}
// 先获取文件描述符 targetFD,接着获取同步锁后执行 transferToDirectlyInternal
private long transferToDirectly(long position, int icount, WritableByteChannel target)
        throws IOException {
    // 省略从 target 获取 targetFD 的过程
    if (nd.transferToDirectlyNeedsPositionLock()) {synchronized (positionLock) {long pos = position();
            try {
                return transferToDirectlyInternal(position, icount,
                        target, targetFD);
            } finally {position(pos);
            }
        }
    } else {return transferToDirectlyInternal(position, icount, target, targetFD);
    }
}
//transferToDirectlyInternal 会调用本地办法 transferTo0()尝试以 sendfile 的形式传输数据
private long transferToDirectlyInternal(long position, int icount,
                                        WritableByteChannel target,
                                        FileDescriptor targetFD) throws IOException {assert !nd.transferToDirectlyNeedsPositionLock() ||
            Thread.holdsLock(positionLock);

    long n = -1;
    int ti = -1;
    try {begin();
        ti = threads.add();
        if (!isOpen())
            return -1;
        do {n = transferTo0(fd, position, icount, targetFD);
        } while ((n == IOStatus.INTERRUPTED) && isOpen());
        if (n == IOStatus.UNSUPPORTED_CASE) {if (target instanceof SinkChannelImpl)
                pipeSupported = false;
            if (target instanceof FileChannelImpl)
                fileSupported = false;
            return IOStatus.UNSUPPORTED_CASE;
        }
        if (n == IOStatus.UNSUPPORTED) {
            transferSupported = false;
            return IOStatus.UNSUPPORTED;
        }
        return IOStatus.normalize(n);
    } finally {threads.remove(ti);
        end (n > -1);
    }
}

总结:

  1. FileChannel 的 transferTo()与 transferFrom()底层都是基于 sendfile 实现数据传输的
  2. transferTo 办法外部会判断零碎是否反对 sendfile,如果不反对会应用 mmap 的形式;如果零碎也不反对 mmap 的形式,则会应用传统的 IO 形式进行数据传输
  3. transferToyyfi 会调用本地办法 transferTo0()

Netty 的零拷贝实现

netty 的零拷贝次要是通过对 java.nio.channels.FileChannel 的 tranferTo()的包装,在文件传输时将文件缓冲区的数据间接发送到目标通道(Channel)。

  1. 应用 Direct Buffers,Netty 采纳间接缓冲区间接在内存区域调配空间,防止数据的屡次拷贝
  2. 应用 CompositeByteBuf,它保留了多个 ByteBuf 的援用,对外提供对立封装后的 ByteBuf 接口,防止数据拷贝
  3. Netty 的文件传输类 DefaultFileRegion 通过调用 FileChannel.transferTo()办法实现零拷贝,文件缓冲区的数据会间接发送给指标 Channel

九,RocketMQ 与 Kafka 里的零拷贝

mmap 应用的是非阻塞式 IO,基于多路复用解决,实用于小数据块 / 高频率的 IO 传输,大块数据会阻塞多路复用线程,sendfile 应用的是阻塞式 IO,实用于大数据块 / 低频率的 IO 传输。

零拷贝形式 长处 毛病
RocketMQ mmap+write 实用于小块文件传输,频繁调用时效率高 不能很好利用 DMA 形式,会比 sendfile 多耗费 CPU,内存安全性管制简单,须要防止 JVM Crash 问题
Kafka sendfile 能够利用 DMA 形式,耗费 CPU 较少,大块文件传输效率高,无内存平安问题 小块文件效率低于 mmap 形式,只能是 BIO 形式传输,不能应用 NIO 形式

总结:

  1. FileChannel 调用 map()办法最终是应用 DirectByteBuffer 映射的堆外内存,而后应用 MappedByteBuffer 进行读写,这就是 mmap 实现的形式
  2. FileChannel 调用 transferTo()办法时底层应用本地办法 transferTo0()实现的 sendfile 办法
  3. Netty,RocketMQ,Kafka 里的零拷贝也简略提了一下

本文次要参考的文章:
Java NIO- 零拷贝实现

深刻分析 Linux IO 原理和几种零拷贝机制的实现

正文完
 0