乐趣区

【J2SE】java NIO 基础学习

NIO 与 IO 的区别

IO
NIO

阻塞式
非阻塞式、选择器 selectors

面向流:单向流动,直接将数据从一方流向另一方
面向缓存:将数据放到缓存区中进行存取,经通道进行数据的传输

缓冲 Buffer
根据数据类型的不同,提供了对应的类型缓冲区(boolean 类型除外),每一个 Buffer 类都是 Buffer 接口的一个实例。通过 Buffer 类.allocate() 方法获取缓冲区;对缓冲区的数据进行操作可以使用 put 方法和 get 方法。
四个核心属性
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

capacity:容量,表示缓冲区中最大存储容量,一旦声明不可更改。
limit:界限,表示限制可对缓冲区操作数据的范围,范围外的数据不可被操作。
position:位置,表示当前操作的数据位于缓冲区中的位置。
mark:标记,表示记录当前 position 的位置。

常用方法(以 ByteBuffer 为例)
public static ByteBuffer allocateDirect(int capacity):分配一个直接缓冲区 public static ByteBuffer allocate(int capacity):分配一个间接缓冲区

当分配一个缓冲区时,capacity=capacity,mark=-1,position=0,limit=capacity,源码分析如下:
public static ByteBuffer allocate(int capacity) {

return new HeapByteBuffer(capacity, capacity);
}

// class HeapByteBuffer extends ByteBuffer
HeapByteBuffer(int cap, int lim) {
// 调用 ByteBuffer 的构造函数传入默认参数:mark=-1,position=0,limit=capacity
super(-1, 0, lim, cap, new byte[cap], 0);
};

// public abstract class ByteBuffer extends Buffer
ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
super(mark, pos, lim, cap);
this.hb = hb; // final byte[] hb;
this.offset = offset; // final int offset;
}

Buffer(int mark, int pos, int lim, int cap) {

this.capacity = cap;
limit(lim); // 设置 limit
position(pos); // 设置 position
if (mark >= 0) {

this.mark = mark;
}
}
public final ByteBuffer put(byte[] src):将一个字节数组放入缓冲区。
每当放置一个字节时,position 将会 +1,保证 position 的值就是下一个可插入数据的 buffer 单元位置。源码分析如下:
public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}

// 由 allocate 方法调用分配缓冲区可知,返回的是 Buffer 的实现类 HeapByteBuffer 对象
public ByteBuffer put(byte[] src, int offset, int length) {
checkBounds(offset, length, src.length); // 检查是否下标越界
if (length > remaining()) // 检查是否超出了可操作的数据范围 = limit-position
throw new BufferOverflowException();
System.arraycopy(src, offset, hb, ix(position()), length);
position(position() + length); // 重设 position
return this;
}
public ByteBuffer get(byte[] dst):从缓冲区中读取数据到 dst 中。应在 flip() 方法后调用。
获取数据,是在缓冲区字节数组中的 position 位置处开始,读取一次完毕后,并会记录当前读取的位置,即 position,以便于下一次调用 get 方法继续读取。
public ByteBuffer get(byte[] dst) {
return get(dst, 0, dst.length);
}

// 调用 HeapByteBuffer 对象的 get 方法
public ByteBuffer get(byte[] dst, int offset, int length) {

// 从缓冲区的字节数组 final byte[] hb 中,拷贝从 hb 的 offset+position(注:offset=0)处的长度为 length 的数据到 dst 中
System.arraycopy(hb, ix(position()), dst, offset, length);
position(position() + length); // 设置 position
return this;
}
通过源码分析可知,当 put 操作后,position 记录的是下一个可用的 buffer 单元,而 get 会从 position 位置处开始获取数据,这显然是无法获得的,因此需要重新设置 position,即 flip() 方法。
public final Buffer flip():翻转缓冲区,在一个通道读取或 PUT 操作序列之后,调用此方法以准备一个通道写入或相对获取操作的序列
将此通道的缓冲区的界限设置为当前 position,保证了有可操作的数据。
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
public final Buffer mark():标记当前 position
可用于在 put 操作转 get 操作时标记当前的 position 位置,以便于调用 reset 方法从该位置继续操作
public final Buffer mark() {
mark = position;
return this;
}
public final Buffer reset():回到 mark 标记的位置
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
public final Buffer clear():清除缓冲,重置初始化原始状态
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
public final Buffer rewind():倒回,用于重新读取数据
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
直接缓冲区与间接缓冲区
间接缓冲:通过 allocate 方法分配的缓冲区。当程序发起 read 请求获取磁盘文件时,该文件首先被 OS 读取到内核地址空间中,并 copy 一份原始数据传入 JVM 用户地址空间,再传给应用程序。增加了一个 copy 操作,导致效率降低。
直接缓冲:通过 allocateDirecr 方法分配的缓冲区,此缓冲区建立在物理内存中。直接在两个空间中开辟内存空间,创建映射文件,去除了在内核地址空间和用户地址空间中的 copy 操作,使得直接通过物理内存传输数据。虽然有效提高了效率,但是分配和销毁该缓冲区的成本高于间接缓冲,且对于缓冲区中的数据将交付给 OS 管理,程序员无法控制。
通道 Channel
用于源节点与目标节点之间的连接,负责对缓冲区中的数据提供传输服务。
常用类
​ FileChannel:用于读取、写入、映射和操作文件的通道。
​ SocketChannel:通过 TCP 读写网络中的数据。
​ ServerSocketChannerl:通过 UDP 读写网络中的数据通道。
​ DatagramChannel:通过 UDP 读写网络中的数据通道。

​ 本地 IO:FileInputStream、FileOutputStream、RandomAccessFile
​ 网络 IO:Socket、ServerSocket、DatagramSocket
获取 Channel 方式(以 FileChannel 为例)
​ 1. Files.newByteChannel 工具类静态方法
​ 2. getChannel 方法:通过对象动态获取,使用间接缓冲区。
FileInputStream fis = new FileInputStream(ORIGINAL_FILE);
FileOutputStream fos = new FileOutputStream(OUTPUT_FILE);

// 获取通道
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();

// 提供缓冲区(间接缓冲区)
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (inChannel.read(buffer) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}
​ 3. 静态 open 方法:使用 open 获取到的 Channel 通道,使用直接缓冲区。
FileChannel inChannel =
FileChannel.open(Paths.get(ORIGINAL_FILE), StandardOpenOption.READ);
FileChannel outChannel =
FileChannel.open(Paths.get(OUTPUT_FILE), StandardOpenOption.READ,
StandardOpenOption.CREATE, StandardOpenOption.WRITE);

// 使用物理内存 内存映射文件
MappedByteBuffer inBuffer =
inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outBuffer =
outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());

byte[] dst = new byte[inBuffer.limit()];
inBuffer.get(dst);
outBuffer.put(dst);

// 使用 DMA 直接存储器存储
inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());
public static FileChannel open(Path path, OpenOption… options):从 path 路径中以某种方式获取文件的 Channel

StandardOpenOption
描述

CREATE
创建一个新的文件,如果存在,则覆盖。

CREATE_NEW
创建一个新的文件,如果该文件已经存在则失败。

DELETE_ON_CLOSE
关闭时删除。

DSYNC
要求将文件内容的每次更新都与底层存储设备同步写入。

READ
读方式

SPARSE
稀疏文件

SYNC
要求将文件内容或元数据的每次更新都同步写入底层存储设备。

TRUNCATE_EXISTING
如果文件已经存在,并且打开 wirte 访问,则其长度将截断为 0。

WRITE
写方式

APPEND
如果文件以 wirte 访问打开,则字节将被写入文件的末尾而不是开头。

public abstract MappedByteBuffer map(MapMode mode, long position, long size):将通道的文件区域映射到内从中。当操作较大的文件时,将数据映射到物理内存中才是值得的,因为映射到内存是需要开销的。

FileChannel.MapMode
描述

PRIVATE
专用映射模式(写入时拷贝)

READ_ONLY
只读模式

READ_WRIT
读写模式

public abstract long transferFrom(ReadableByteChannel src, long position, long count):从给定的可读取通道 src,传输到本通道中。直接使用直接存储器(DMA)对数据进行存储。public abstract long transferTo(long position, long count, WritableByteChannel target):将本通道的文件传输到可写入的 target 通道中。

分散(Scatter)与聚集(Gather)
​ 分散读取:将通道中的数据分散到多个缓冲区中。public final long read(ByteBuffer[] dsts)
​ 聚集写入:将多个缓冲区中的数据聚集到一个 Channel 通道中。public final long write(ByteBuffer[] srcs)
字符集(Charset)
public final ByteBuffer encode(CharBuffer cb):编码 public final CharBuffer decode(ByteBuffer bb):解码

网络通信的阻塞与非阻塞
阻塞是相对网络传输而言的。传统的 IO 流都是阻塞的,在网络通信中,由于 IO 阻塞,需要为每一个客户端创建一个独立的线程来进行数据传输,性能大大降低;而 NIO 是非阻塞的,当存在空闲线程时,可以转去操作其他通道,因此不必非要创建一个独立的线程来服务每一个客户端请求。
选择器(Selector)
SelectableChannle 对象的多路复用器,可同时对多个 SelectableChannle 对象的 IO 状态监听,每当创建一个 Channel 时,就向 Selector 进行注册,交由 Selector 进行管理,只有 Channel 准备就绪时,Selector 可会将任务分配给一个或多个线程去执行。Selector 可以同时管理多个 Channel,是非阻塞 IO 的核心。
NIO 阻塞式
服务器 Server 不断监听客户端 Client 的请求,当建立了一个 Channel 时,服务器进行 read 操作,接收客户端发送的数据,只有当客户端断开连接 close,或者执行 shutdownOutput 操作时,服务器才知晓没有数据了,否则会一直进行 read 操作;当客户端在 read 操作获取服务器的反馈时,若服务器没有关闭连接或者 shutdownInput 时也会一直阻塞。示例代码如下:
static final String ORIGINAL_FILE = “F:/1.png”;
static final String OUTPUT_FILE = “F:/2.jpg”;
public void server() throws Exception {
// 打开 TCP 通道,绑定端口监听
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(9988));

ByteBuffer buf = ByteBuffer.allocate(1024);
// 获取连接
SocketChannel accept = null;
while ((accept= serverChannel.accept()) != null) {
FileChannel fileChannel = FileChannel.open(
Paths.get(OUTPUT_FILE), StandardOpenOption.CREATE,
StandardOpenOption.WRITE);

// 读取客户端的请求数据
while (accept.read(buf) != -1) {
buf.flip();
fileChannel.write(buf);
buf.clear();
}

// 发送执行结果
buf.put(“ 成功接收 ”.getBytes());
buf.flip();
accept.write(buf);
buf.clear();

fileChannel.close();
// 关闭连接,否则客户端会一直等待读取导致阻塞,可使用 shutdownInput,但任务已结束,该 close
accept.close();
}
serverChannel.close();
}
public void client() throws Exception {
// 打开一个 socket 通道
SocketChannel clientChannel = SocketChannel.open(
new InetSocketAddress(“127.0.0.1”, 9988));
// 创建缓冲区和文件传输通道
FileChannel fileChannel = FileChannel.open(Paths.get(ORIGINAL_FILE),
StandardOpenOption.READ);
ByteBuffer buf = ByteBuffer.allocate(1024);

while (fileChannel.read(buf) != -1) {
buf.flip();
clientChannel.write(buf);
buf.clear();
}

// 关闭输出(不关闭通道),告知服务器已经发送完毕,去掉下面一行代码服务区将一直读取导致阻塞
clientChannel.shutdownOutput();

int len = 0;
while ((len = clientChannel.read(buf)) != -1) {
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}

fileChannel.close();
clientChannel.close();
}
NIO 非阻塞式
通过在通道 Channel 中调用 configureBlocking 将 blocking 设置为 false,让 Channel 可以进行异步 I/O 操作。
public void client() throws Exception {
// 打开一个 socket 通道
SocketChannel clientChannel = SocketChannel.open(
new InetSocketAddress(“127.0.0.1”, 9988));
ByteBuffer buf = ByteBuffer.allocate(1024);

// 告知服务器,已经发送完毕
// clientChannel.shutdownOutput();
// 设置非阻塞
clientChannel.configureBlocking(Boolean.FALSE);

buf.put(“ 哈哈 ”.getBytes());
buf.flip();
clientChannel.write(buf);

clientChannel.close();
}
public void server() throws Exception {
// 打开 TCP 通道,绑定端口监听
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(Boolean.FALSE);
serverChannel.bind(new InetSocketAddress(9988));

// 创建一个 Selector 用于管理 Channel
Selector selector = Selector.open();
// 将服务器的 Channel 注册到 selector 中,并添加 OP_ACCEPT 事件,让 selector 监听通道的请求
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

// 一直判断是否有已经准备就绪的 Channel
while (selector.select() > 0) {
// 存在一个已经准备就绪的 Channel,获取 SelectionKey 集合中获取触发该事件的所有 key
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey sk = keys.next();
SocketChannel accept = null;
ByteBuffer buffer = null;
// 针对不同的状态进行操作
if (sk.isAcceptable()) {
// 可被连接,设置非阻塞并注册到 selector 中
accept = serverChannel.accept();
accept.configureBlocking(Boolean.FALSE);
accept.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
// 可读,获取该选择器上的 Channel 进行读操作
accept = (SocketChannel) sk.channel();
buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = accept.read(buffer)) != -1) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
}
// 移除本次操作的 SelectionKey
keys.remove();
}
serverChannel.close();
}
方法使用说明

ServerSocketChannel 对象只能注册 accept 事件。
设置 configureBlocking 为 false,才能使套接字通道中进行异步 I/O 操作。
调用 selectedKeys 方法,返回发生了 SelectionKey 对象的集合。
调用 remove 方法,用于从 SelectionKey 集合中移除已经被处理的 key,若不处理,那么它将继续以当前的激活事件状态继续存在。

Pipe 管道
Channel 都是双向通道传输,而 Pipe 就是为了实现单向管道传送的通道对,有一个 source 通道(Pipe.SourceChannel)和一个 sink 通道(Pipe.SinkChannel)。sink 用于写数据,source 用于读数据。直接使用 Pipe.open() 获取 Pipe 对象,操作和 FileChannel 一样。

退出移动版