NIO 根底
什么是 NIO
- Java NIO 全称 Java non-blocking IO,指的是 JDK 提供的新 API。从 JDK 1.4 开始,Java 提供了一系列改良的输出/输入的新个性,被统称为 NIO,即 New IO,是
同步非阻塞
的。 - NIO 相干类都放在 java.nio 包下,并对原 java.io 包中很多类进行了改写。
- NIO 有三大外围局部:
Channel(管道)
、Buffer(缓冲区)
、Selector(选择器)
。 - NIO 是面向
缓冲区
编程的。数据读取到了一个它略微解决的缓冲区,须要时可在缓冲区中前后挪动,这就减少了处理过程中的灵活性,应用它能够提供非阻塞的高伸缩性网络。 - Java NIO 的非阻塞模式,使一个线程从某通道发送申请读取数据,然而它仅能失去目前可用数据,如果目前没有可用数据时,则阐明不会获取,而不是放弃线程阻塞,所以直到数据变为能够读取之前,该线程能够做其余事件。非阻塞写入同理。
三大外围组件
Channel 的根本介绍
NIO 的通道相似于流,但有如下区别:
- 通道是双向的能够进行读写,而流是单向的只能读,或者写
- 通道能够实现异步读写数据
- 通道能够从缓冲区读取数据,也能够写入数据到缓冲区
四种通道:
- FileChannel :从文件中读写数据
- DatagramChannel:通过 UDP 协定,读写网络中的数据
- SocketChannel:能通过 TCP 协定来读写网络中数据,罕用于客户端
- ServerSocketChannel:监听 TCP 连贯,对每个新的连贯会创立一个 SocketChannel
Buffer(缓冲区)根本介绍
NIO 中的 Buffer 用于 NIO 通道(Channel)进行交互。
缓冲区实质上是一个能够读写数据的内存块,能够了解为是一个容器对象(含数组)
,该对象提供了一组办法
,能够更轻松地应用内存块,缓冲区对象内置了一些机制,可能跟踪和记录缓冲区的状态变动状况。
当向 Buffer 写入数据时,Buffer 会记录下写了多少数据,一旦要读取数据,须要通过flip()
办法将 Buffer 从写模式切换到读模式。在读模式下,能够读取之前写入到 Buffer 的所有数据。
当读完了所有数据,就须要清空缓存区,让它能够再次被写入。有两种形式能清空缓冲区,调用clear()
或者compact()
办法。
clear()办法会清空整个缓冲区。compact()办法只会革除曾经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的前面。
Channel 提供从文件、网络读取数据的渠道,然而读取或者都必须通过 Buffer。在 Buffer 子类中保护着一个对应类型的数组,用来存放数据。
Selector 的根本介绍
- Java 的 NIO 应用了非阻塞的 I/O 形式。能够用一个线程解决若干个客户端连贯,就会应用到 Selector(选择器)
- Selector 可能检测到多个注册通道上是否有事件产生(多个 Channel 以事件的模式注册到同一个 selector),如果有事件产生,便获取事件而后针对每个事件进行相应的解决
- 只有在连贯真正有读写事件产生时,才会进行读写,缩小了零碎开销,并且不用为每个连贯都创立一个线程,不必保护多个线程
- 防止了多线程之间上下文切换导致的开销
Selector 的特点
Netty 的 I/O 线程 NioEventLoop 聚合了 Selector(选择器 / 多路复用器),能够并发解决成千盈百个客户端连贯。
当线程从某客户端 Socket 通道进行读写时,若没有数据可用,该线程能够进行其余工作。
线程通常将非阻塞 I/O 的闲暇工夫用于其余通道上执行 I/O 操作,所以独自的线程能够治理多个输入输出通道。
因为读写操作都是非阻塞的,就能够充沛进步 I/O 线程的运行效率,防止因为频繁 I/O 阻塞导致的线程挂起。
一个 I/O 线程能够并发解决 N 个客户端连贯和读写操作,这从根本上解决了传统同步阻塞 I/O 一连贯一线程模型,架构性能、弹性伸缩能力和可靠性都失去极大地晋升。
ByteBuffer 的根本应用
外围依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/28
* @Description ByteBuffer根本应用,读取文件内容并打印
*/
public class ByteBufferTest {
public static void main(String[] args) {
//获取channel
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
//创立ByteBuffer
final ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取文件内容,并存入buffer
channel.read(buffer);
//切换为读模式
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
//清空缓冲区,并重置为写模式
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
输入后果:
1234567890abc
ByteBuffer 的构造
Buffer 中定义了四个属性来提供所其蕴含的数据元素。
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
- capacity:缓冲区的容量。通过构造函数赋予,一旦设置,无奈更改
- limit:缓冲区的界线。位于 limit 后的数据不可读写。缓冲区的限度不能为负,并且不能大于其容量
- position:下一个读写地位的索引(相似 PC)。缓冲区的地位不能为负,并且不能大于 limit
- mark:记录以后 position 的值。position 被扭转后,能够通过调用 reset() 办法复原到 mark 的地位
在一开始的状况下,position 指向第一位写入地位,limit 和 capacity 则等于缓冲区的容量。
在写模式下,position 是写入地位,limit 等于容量,下图示意写入 4 个元素后的状态。
当调用flip()
办法切换为读模式后,position 切换为读取地位,limit 切换为读取限度。
当读取到 limit 地位后,则不能够持续读取。
当调用clear()
办法后,则回归最原始状态。
当调用 compact()办法时,须要留神:此办法为 ByteBuffer 的办法,而不是 Buffer 的办法。
- compact 会把未读完的数据向前压缩,而后切换到写模式
- 数据前移后,原地位的值并未清零,写时会笼罩之前的值
ByteBuffer 的常见办法
调配空间:allocate()
//java.nio.HeapByteBuffer java堆内存,读写效率较低,受到gc影响
System.out.println(ByteBuffer.allocate(1024).getClass());
//java.nio.DirectByteBuffer 间接内存,读写效率较高(少一次拷贝),不会受gc影响,分配内存效率较低,使用不当则可能会产生内存透露
System.out.println(ByteBuffer.allocateDirect(1024).getClass());
flip()
- flip()办法会切换对缓冲区的操作模式,由写->读 / 读->写
put()
- put()办法能够将一个数据放入到缓冲区中。
- 进行该操作后,postition 的值会+1,指向下一个能够放入的地位。
get()
- get()办法会读取缓冲区中的一个值
- 进行该操作后,position 会+1,如果超过了 limit 则会抛出异样
留神:get(i)办法不会扭转 position 的值。
rewind()
- 该办法只能在读模式下应用
- rewind()办法后,会复原 position、limit 和 capacity 的值,变为进行 get()前的值
clear()
- clear()办法会将缓冲区中的各个属性复原为最后的状态,position = 0, capacity = limit
- 此时缓冲区的数据仍然存在,处于“被忘记”状态,下次进行写操作时会笼罩这些数据
mark()和 reset()
- mark()办法会将 postion 的值保留到 mark 属性中
- reset()办法会将 position 的值改为 mark 中保留的值
字符串和 ByteBuffer 互相转换
引入工具类:
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;
import java.nio.ByteBuffer;
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/28
* @Description 工具类
*/
public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
int i;
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(StringUtil.NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
/**
* 打印所有内容
*
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}
/**
* 打印可读取内容
*
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");
// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}
// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");
// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(StringUtil.NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(StringUtil.NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}
public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}
测试类:
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/28
* @Description 字符串和ByteBuffer互相转换
*/
public class TranslateTest {
public static void main(String[] args) {
String str1 = "hello";
String str2;
String str3;
// 通过StandardCharsets的encode办法取得ByteBuffer
// 此时取得的ByteBuffer为读模式,无需通过flip切换模式
ByteBuffer buffer = StandardCharsets.UTF_8.encode(str1);
//也能够应用wrap办法实现,无需通过flip切换模式
ByteBuffer wrap = ByteBuffer.wrap(str1.getBytes());
ByteBufferUtil.debugAll(wrap);
ByteBufferUtil.debugAll(buffer);
// 将缓冲区中的数据转化为字符串
// 通过StandardCharsets解码,取得CharBuffer,再通过toString取得字符串
str2 = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println(str2);
str3 = StandardCharsets.UTF_8.decode(wrap).toString();
System.out.println(str3);
}
}
运行后果:
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
hello
hello
粘包与半包
景象
网络上有多条数据发送给服务端,数据之间应用 \n 进行分隔。
但因为某种原因这些数据在接管时,被进行了重新组合,例如原始数据有 3 条为:
- Hello,world\n
- I’m Jack\n
- How are you?\n
变成了上面的两个 byteBuffer (粘包,半包)
- Hello,world\nI’m Jack\nHo
- w are you?\n
呈现起因
粘包
发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到肯定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送进来。
半包
接管方的缓冲区的大小是无限的,当接管方的缓冲区满了当前,就须要将信息截断,等缓冲区空了当前再持续放入数据。这就会产生一段残缺的数据最初被截断的景象。
解决办法
- 通过
get(index)
办法遍历 ByteBuffer,当遇到\n
后进行解决。 - 记录从 position 到 index 的数据长度,申请对应大小的缓冲区。
- 将缓冲区的数据通过
get()
获取写入到 target 缓冲区中。 - 最初,调用 compact()办法切换为写模式,因为缓冲区中可能还存在未读取的数据。
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description 解决黏包和半包
*/
public class ByteBufferTest {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(32);
//模仿黏包和半包
buffer.put("Hello,world\nI'm Jack\nHo".getBytes(StandardCharsets.UTF_8));
split(buffer);
buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
split(buffer);
}
private static void split(ByteBuffer buffer) {
//切换读模式
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
//找到残缺音讯
if (buffer.get(i) == '\n') {
int length = i + 1 - buffer.position();
final ByteBuffer target = ByteBuffer.allocate(length);
//从buffer中读取,写入 target
for(int j = 0; j < length; j++) {
// 将buffer中的数据写入target中
target.put(buffer.get());
}
// 打印查看后果
ByteBufferUtil.debugAll(target);
}
}
//清空已读局部,并切换写模式
buffer.compact();
}
}
运行后果:
+--------+-------------------- all ------------------------+----------------+
position: [12], limit: [12]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 2c 77 6f 72 6c 64 0a |Hello,world. |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [9], limit: [9]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 49 27 6d 20 4a 61 63 6b 0a |I'm Jack. |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [13], limit: [13]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 6f 77 20 61 72 65 20 79 6f 75 3f 0a |How are you?. |
+--------+-------------------------------------------------+----------------+
文件编程
FileChannel
工作模式
📢:FileChannel 只能工作在阻塞模式下!
获取
不能间接关上 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel()
办法。
- 通过 FileInputStream 获取的 channel 只能读
- 通过 FileOutputStream 获取的 channel 只能写
- 通过 RandomAccessFile 是否能读写依据结构 RandomAccessFile 时的读写模式决定
读取
通过read()
办法将数据填充到 ByteBuffer 中,返回值示意读到了多少字节,-1
示意读到了文件开端。
int readBytes = channel.read(buffer);
写入
因为 channel 是有写入下限的,所以 write() 办法并不能保障一次将 buffer 中的内容全副写入 channel。必须依照以下规定进行写入。
// 通过hasRemaining()办法查看缓冲区中是否还有数据未写入到通道中
while(buffer.hasRemaining()) {
channel.write(buffer);
}
敞开
Channel 必须敞开,不过调用 FileInputStream、FileOutputStream、 RandomAccessFile 的close()
办法时也会间接的调用 Channel 的 close()办法。
地位
channel 也领有一个保留读取数据地位的属性,即 position。
long pos = channel.position();
能够通过 position(int pos)设置 channel 中 position 的值。
long newPos = 10;
channel.position(newPos);
设置以后地位时,如果设置为文件的开端:
- 这时读取会返回 -1
- 这时写入,会追加内容,但要留神如果 position 超过了文件开端,再写入时在新内容和原开端之间会有空洞(00)
强制写入
操作系统出于性能的思考,会将数据缓存,不是立即写入磁盘,而是等到缓存满了当前将所有数据一次性的写入磁盘。能够调用 force(true)
办法将文件内容和元数据(文件的权限等信息)立即写入磁盘。
常见办法
FileChannel 次要用来对本地文件进行 IO 操作,常见的办法有:
- public int read(ByteBuffer dst) :从通道中读取数据到缓冲区中。
- public int write(ByteBuffer src):把缓冲区中的数据写入到通道中。
- public long transferFrom(ReadableByteChannel src,long position,long count):从指标通道中复制数据到以后通道。
- public long transferTo(long position,long count,WriteableByteChannel target):把数据从以后通道复制给指标通道。
应用 FileChannel 写入文本文件
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description FileChannel测试写入文件
*/
public class FileChannelTest {
public static void main(String[] args) {
try (final FileChannel channel = new FileOutputStream("data1.txt").getChannel()) {
String msg = "Hello World!!!";
final ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put(msg.getBytes(StandardCharsets.UTF_8));
buffer.flip();
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
应用 FileChannel 读取文本文件
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description FileChannel测试读取文件
*/
public class FileChannelTest {
public static void main(String[] args) {
try (final FileChannel channel = new FileInputStream("data1.txt").getChannel()) {
final ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
//清空缓冲区,并重置为写模式
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
应用 FileChannel 进行数据传输
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description FileChannel测试文件传输
*/
public class FileChannelTest {
public static void main(String[] args){
try (final FileChannel from = new FileInputStream("data.txt").getChannel();
final FileChannel to = new FileOutputStream("data1.txt").getChannel()) {
// 参数:inputChannel的起始地位,传输数据的大小,目标channel
// 返回值为传输的数据的字节数
// transferTo一次只能传输2G的数据
from.transferTo(0, from.size(), to);
} catch (IOException e) {
e.printStackTrace();
}
}
}
transferTo()办法对应的还有 transferFrom()办法。
尽管 transferTo()办法传输效率较高,底层利用操作系统的零拷贝进行优化,然而 transferTo 办法一次只能传输 2G 的数据。
解决办法:能够依据 transferTo()的返回值来判断,返回值代表传输了多少,通过 from 的 size()大小来每次减去即可。
long size = from.size();
for (long left = size; left > 0; ) {
left -= from.transferTo(size - left, size, to);
}
Channel 和 Buffer 的注意事项
- ByteBuffer 反对类型化的 put 和 get,put 放入什么数据类型,get 就应该应用相应的数据类型来取出,否则可能会产生 ByteUnderflowException 异样。
- 能够将一个一般的 Buffer 转换为只读的 Buffer:asReadOnlyBuffer()办法。
- NIO 提供了 MapperByteBuffer,能够让文件间接在内存(堆外内存)中进行批改,而如何同步到文件由 NIO 来实现。
-
NIO 还反对通过多个 Buffer(即 Buffer 数组)实现读写操作,即Scattering(扩散)和 Gathering(汇集)。
Scattering(扩散)
:在向缓冲区写入数据时,能够应用 Buffer 数组顺次写入,一个 Buffer 数组写满后,持续写入下一个 Buffer 数组。Gathering(汇集)
:从缓冲区读取数据时,能够顺次读取,读完一个 Buffer 再按程序读取下一个。
网络编程
阻塞 vs 非阻塞
阻塞
- 在没有数据可读时,包含数据复制过程中,线程必须阻塞期待,不会占用 CPU,但线程相当于闲置状态
- 32 位 JVM 一个线程 320k,64 位 JVM 一个线程 1024k,为了缩小线程数量,须要采纳线程池技术
- 但即便应用线程池,如果有很多连贯建设,但长时间 inactive,会阻塞线程池中所有线程
非阻塞
- 在某个 Channel 没有可读事件时,线程不用阻塞,它能够去解决其它有可读事件的 Channel
- 数据复制过程中,线程理论还是阻塞的(AIO 改良的中央)
- 写数据时,线程只是期待数据写入 Channel 即可,无需期待 Channel 通过网络把数据发送进来
阻塞案例代码
服务端代码:
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description 应用NIO来了解阻塞模式-服务端
*/
public class Server {
public static void main(String[] args) {
//1. 创立服务器
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
final ByteBuffer buffer = ByteBuffer.allocate(16);
//2. 绑定监听端口
ssc.bind(new InetSocketAddress(7777));
//3. 寄存建设连贯的汇合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
System.out.println("建设连贯...");
//4. accept 建设客户端连贯 , 用来和客户端之间通信
final SocketChannel socketChannel = ssc.accept();
System.out.println("建设连贯实现...");
channels.add(socketChannel);
//5. 接管客户端发送的数据
for (SocketChannel channel : channels) {
System.out.println("正在读取数据...");
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugRead(buffer);
buffer.clear();
System.out.println("数据读取实现...");
}
}
} catch (IOException e) {
System.out.println("出现异常...");
}
}
}
客户端代码:
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description 应用NIO来了解阻塞模式-客户端
*/
public class Client {
public static void main(String[] args) {
try (SocketChannel socketChannel = SocketChannel.open()) {
// 建设连贯
socketChannel.connect(new InetSocketAddress("localhost", 7777));
final ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put("hello".getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行后果:
-
在刚开始服务器运行后:服务器端因 accept 阻塞。
-
在客户端和服务器建设连贯后,客户端发送音讯前:服务器端因通道为空被阻塞。
-
客户端发送数据后,服务器解决通道中的数据。之后再次进入循环时,再次被 accept 阻塞。
- 之前的客户端再次发送音讯,服务器端因为被 accept 阻塞,就无奈解决之前客户端再次发送到通道中的信息了。
非阻塞
- 通过 ServerSocketChannel 的
configureBlocking(false)
办法将取得连贯设置为非阻塞的。此时若没有连贯,accept 会返回 null - 通过 SocketChannel 的
configureBlocking(false)
办法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read 会返回-1
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description 应用NIO来了解阻塞模式-服务端
*/
public class Server {
public static void main(String[] args) {
//1. 创立服务器
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
final ByteBuffer buffer = ByteBuffer.allocate(16);
//2. 绑定监听端口
ssc.bind(new InetSocketAddress(7777));
//3. 寄存建设连贯的汇合
List<SocketChannel> channels = new ArrayList<>();
//设置非阻塞!!
ssc.configureBlocking(false);
while (true) {
System.out.println("建设连贯...");
//4. accept 建设客户端连贯 , 用来和客户端之间通信
final SocketChannel socketChannel = ssc.accept();
//设置非阻塞!!
socketChannel.configureBlocking(false);
System.out.println("建设连贯实现...");
channels.add(socketChannel);
//5. 接管客户端发送的数据
for (SocketChannel channel : channels) {
System.out.println("正在读取数据...");
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugRead(buffer);
buffer.clear();
System.out.println("数据读取实现...");
}
}
} catch (IOException e) {
System.out.println("出现异常...");
}
}
}
因为设置为了非阻塞,会始终执行
while(true)
中的代码,CPU 始终处于繁忙状态,会使得性能变低,所以理论状况中不应用这种办法解决申请。
Selector
根本介绍
- Java 的 NIO 应用了非阻塞的 I/O 形式。能够用一个线程解决若干个客户端连贯,就会应用到 Selector(选择器)。
- Selector 可能检测到多个注册通道上是否有事件产生(多个 Channel 以事件的模式注册到同一个 selector),如果有事件产生,便获取事件而后针对每个事件进行相应的解决。
- 只有在连贯真正有读写事件产生时,才会进行读写,缩小了零碎开销,并且不用为每个连贯都创立一个线程,不必保护多个线程。
- 防止了多线程之间上下文切换导致的开销。
特点
单线程能够配合 Selector 实现对多个 Channel 可读写事件的监控,这称为多路复用。
- 多路复用仅针对网络 IO,一般文件 IO 无奈利用多路复用
-
如果不必 Selector 的非阻塞模式,线程大部分工夫都在做无用功,而 Selector 可能保障
- 有可连贯事件时才去连贯
- 有可读事件才去读取
- 有可写事件才去写入
限于网络传输能力,Channel 未必随时可写,一旦 Channel 可写,会触发 Selector 的可写事件进行写入。
Selector 相干办法阐明
selector.select()
://若未监听到注册管道中有事件,则继续阻塞selector.select(1000)
://阻塞 1000 毫秒,1000 毫秒后返回selector.wakeup()
://唤醒 selectorselector.selectNow()
: //不阻塞,立刻返回
NIO 非阻塞网络编程过程剖析
- 当客户端连贯时,会通过 SeverSocketChannel 失去对应的 SocketChannel。
- Selector 进行监听,调用
select()
办法,返回注册该 Selector 的所有通道中有事件产生的通道个数。 - 将 SocketChannel 注册到 Selector 上,public final SelectionKey register(Selector sel, int ops),一个 Selector 上能够注册多个 SocketChannel。
- 注册后返回一个 SelectionKey,会和该 Selector 关联(以汇合的模式)。
- 进一步失去各个 SelectionKey,有事件产生。
- 再通过 SelectionKey 反向获取 SocketChannel,应用 channnel()办法。
- 能够通过失去的 channel,实现业务解决。
SelectionKey 中定义了四个操作标记位:
OP_READ
示意通道中产生读事件;OP_WRITE
—示意通道中产生写事件;OP_CONNECT
—示意建设连贯;OP_ACCEPT
—申请新连贯。
SelectionKey 的相干办法
办法 | 形容 |
---|---|
public abstract Selector selector(); | 失去与之关联的 Selector 对象 |
public abstract SelectableChannel channel(); | 失去与之关联的通道 |
public final Object attachment() | 失去与之关联的共享数据 |
public abstract SelectionKey interestOps(int ops); | 设置或扭转监听的事件类型 |
public final boolean isReadable(); | 通道是否可读 |
public final boolean isWritable(); | 通道是否可写 |
public final boolean isAcceptable(); | 是否能够建设连贯 ACCEPT |
Selector 根本应用及 Accpet 事件
接下来咱们应用 Selector 实现多路复用,对服务端代码进行改良。
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description Selector根本应用-服务端
*/
public class Server {
public static void main(String[] args) {
try (ServerSocketChannel ssc = ServerSocketChannel.open();
final Selector selector = Selector.open()) {//创立selector 治理多个channel
ssc.bind(new InetSocketAddress(7777));
ssc.configureBlocking(false);
// 将通道注册到选择器中,并设置感兴趣的事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(16);
while (true) {
// 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了CPU空转
// 返回值为就绪的事件个数
int ready = selector.select();
System.out.println("selector就绪总数: " + ready);
// 获取所有事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
//判断key的事件类型
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("获取到客户端连贯...");
}
// 处理完毕后移除
iterator.remove();
}
}
} catch (IOException e) {
System.out.println("出现异常...");
}
}
}
事件产生后,要么解决,要么应用 key.cancel()办法勾销,不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层应用的是程度触发。
当选择器中的通道对应的事件产生后,SelectionKey 会被放到另一个汇合中,然而selecionKey 不会主动移除,所以须要咱们在解决完一个事件后,通过迭代器手动移除其中的 selecionKey。否则会导致已被解决过的事件再次被解决,就会引发谬误。
Read 事件
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description Read事件-服务端
*/
public class Server {
public static void main(String[] args) {
try (ServerSocketChannel ssc = ServerSocketChannel.open();
final Selector selector = Selector.open()) {//创立selector 治理多个channel
ssc.bind(new InetSocketAddress(7777));
ssc.configureBlocking(false);
// 将通道注册到选择器中,并设置感兴趣的事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(16);
while (true) {
// 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了CPU空转
// 返回值为就绪的事件个数
int ready = selector.select();
System.out.println("selector就绪总数: " + ready);
// 获取所有事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
//判断key的事件类型
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("获取到客户端连贯...");
// 设置为非阻塞模式,同时将连贯的通道也注册到抉择其中
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { //读事件
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugRead(buffer);
buffer.clear();
}
// 处理完毕后移除
iterator.remove();
}
}
} catch (IOException e) {
System.out.println("出现异常...");
}
}
}
断开解决
当客户端与服务器之间的连贯断开时,会给服务器端发送一个读事件,对异样断开
和失常断开
须要不同的形式进行解决:
-
失常断开
- 失常断开时,服务器端的 channel.read(buffer)办法的返回值为-1,所以当完结到返回值为-1 时,须要调用 key 的 cancel()办法勾销此事件,并在勾销后移除该事件
-
异样断开
- 异样断开时,会抛出 IOException 异样, 在 try-catch 的catch 块中捕捉异样并调用 key 的 cancel()办法即可
音讯边界
⚠️ 不解决音讯边界存在的问题
将缓冲区的大小设置为 4 个字节,发送 2 个汉字(你好),通过 decode 解码并打印时,会呈现乱码
ByteBuffer buffer = ByteBuffer.allocate(4);
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));
你�
��
这是因为 UTF-8 字符集下,1 个汉字占用 3 个字节,此时缓冲区大小为 4 个字节,一次读工夫无奈解决完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好
的 好
字被拆分为了前半部分和后半局部发送,解码时就会呈现问题。
💡 解决音讯边界
传输的文本可能有以下三种状况:
- 文本大于缓冲区大小,此时须要将缓冲区进行扩容
- 产生半包景象
- 产生粘包景象
解决方案:
- 固定音讯长度,数据包大小一样,服务器按预约长度读取,当发送的数据较少时,须要将数据进行填充,直到长度与音讯规定长度统一。毛病是节约带宽
- 另一种思路是按分隔符拆分,毛病是效率低,须要一个一个字符地去匹配分隔符
- TLV 格局,即 Type 类型、Length 长度、Value 数据(也就是在音讯结尾用一些空间寄存前面数据的长度),如 HTTP 申请头中的 Content-Type 与Content-Length。类型和长度已知的状况下,就能够不便获取音讯大小,调配适合的 buffer,毛病是 buffer 须要提前调配,如果内容过大,则影响 server 吞吐量
上面演示第二种解决方案,按分隔符拆分:
咱们须要在 Accept 事件产生后,将通道注册到 Selector 中时,对每个通道增加一个 ByteBuffer 附件,让每个通道产生读事件时都应用本人的通道,防止与其余通道发生冲突而导致问题。
ByteBuffer buffer = ByteBuffer.allocate(16);
// 增加通道对应的Buffer附件
socketChannel.register(selector, SelectionKey.OP_READ, buffer);
当 Channel 中的数据大于缓冲区时,须要对缓冲区进行扩容操作。此代码中的扩容的断定办法:Channel 调用 compact 办法后,的 position 与 limit 相等,阐明缓冲区中的数据并未被读取(容量太小),此时创立新的缓冲区,其大小扩充为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用 SelectionKey 的 attach 办法将新的缓冲区作为新的附件放入 SelectionKey 中。
// 如果缓冲区太小,就进行扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
// 将旧buffer中的内容放入新的buffer中
buffer.flip();
newBuffer.put(buffer);
// 将新buffer放到key中作为附件
key.attach(newBuffer);
}
改良后的代码如下:
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description Read事件完整版-服务端
*/
public class Server {
public static void main(String[] args) {
try (ServerSocketChannel ssc = ServerSocketChannel.open();
final Selector selector = Selector.open()) {//创立selector 治理多个channel
ssc.bind(new InetSocketAddress(7777));
ssc.configureBlocking(false);
// 将通道注册到选择器中,并设置感兴趣的事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了CPU空转
// 返回值为就绪的事件个数
int ready = selector.select();
System.out.println("selector就绪总数: " + ready);
// 获取所有事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
//判断key的事件类型
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("获取到客户端连贯...");
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
//注册到Selector并且设置读事件,设置附件bytebuffer
socketChannel.register(selector, SelectionKey.OP_READ, byteBuffer);
} else if (key.isReadable()) { //读事件
try {
SocketChannel channel = (SocketChannel) key.channel();
// 通过key取得附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer);
if (read == -1) {
key.cancel();
channel.close();
} else {
// 通过分隔符来分隔buffer中的数据
split(buffer);
// 如果缓冲区太小,就进行扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
// 将旧buffer中的内容放入新的buffer中
buffer.flip();
newBuffer.put(buffer);
// 将新buffer放到key中作为附件
key.attach(newBuffer);
}
}
} catch (IOException e) {
//异样断开,勾销事件
key.cancel();
}
}
// 处理完毕后移除
iterator.remove();
}
}
} catch (IOException e) {
System.out.println("出现异常...");
}
}
private static void split(ByteBuffer buffer) {
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
//找到一条实现数据
if (buffer.get(i) == '\n') {
// 缓冲区长度
int length = i + 1 - buffer.position();
ByteBuffer target = ByteBuffer.allocate(length);
// 将后面的内容写入target缓冲区
for (int j = 0; j < length; j++) {
// 将buffer中的数据写入target中
target.put(buffer.get());
}
ByteBufferUtil.debugAll(target);
}
}
// 切换为写模式,然而缓冲区可能未读完,这里须要应用compact
buffer.compact();
}
}
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description Read事件完整版-客户端
*/
public class Client {
public static void main(String[] args) {
try (SocketChannel socketChannel = SocketChannel.open()) {
// 建设连贯
socketChannel.connect(new InetSocketAddress("localhost", 7777));
final ByteBuffer buffer = ByteBuffer.allocate(32);
buffer.put("01234567890abcdef3333\n".getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
ByteBuffer 的大小调配
- 每个 channel 都须要记录可能被切分的音讯,因为 ByteBuffer 不能被多个 channel 独特应用,因而须要为每个 channel 保护一个独立的 ByteBuffer
- ByteBuffer 不能太大,比方一个 ByteBuffer 1Mb 的话,要反对百万连贯就要 1Tb 内存,因而须要设计大小可变的 ByteBuffer
-
调配思路:
- 一种思路是首先调配一个较小的 buffer,例如 4k,如果发现数据不够,再调配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,长处是音讯间断容易解决,毛病是数据拷贝消耗性能
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多进去的内容写入新的数组,与后面的区别是音讯存储不间断解析简单,长处是防止了拷贝引起的性能损耗
Write 事件
服务器通过 Buffer 通道中写入数据时,可能因为通道容量小于 Buffer 中的数据大小,导致无奈一次性将 Buffer 中的数据全副写入到 Channel 中,这时便须要分屡次写入,具体步骤如下:
- 执行一次写操作,向将 buffer 中的内容写入到 SocketChannel 中,而后判断 Buffer 中是否还有数据
- 若 Buffer 中还有数据,则须要将 SockerChannel 注册到 Seletor 中,并关注写事件,同时将未写完的 Buffer 作为附件一起放入到 SelectionKey 中。
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description Write事件-服务端
*/
public class Server {
public static void main(String[] args) {
try (ServerSocketChannel ssc = ServerSocketChannel.open();
final Selector selector = Selector.open()) {
ssc.bind(new InetSocketAddress(7777));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int ready = selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
//判断key的事件类型
if (key.isAcceptable()) {
final SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
final ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
final int write = socketChannel.write(buffer);
System.out.println("accept事件器写入.."+write);
// 判断是否还有残余内容
if (buffer.hasRemaining()) {
// 注册到Selector中,关注可写事件,并将buffer增加到key的附件中
socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);
}
}else if (key.isWritable()) {
SocketChannel socket = (SocketChannel) key.channel();
// 取得事件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int write = socket.write(buffer);
System.out.println("write事件器写入.."+write);
// 如果曾经实现了写操作,须要移除key中的附件,同时不再对写事件感兴趣
if (!buffer.hasRemaining()) {
key.attach(null);
key.interestOps(0);
}
}
// 处理完毕后移除
iterator.remove();
}
}
} catch (IOException e) {
System.out.println("出现异常...");
}
}
}
/**
* @author 神秘杰克
* 公众号: Java菜鸟程序员
* @date 2021/12/29
* @Description Write事件-客户端
*/
public class Client {
public static void main(String[] args) {
try (SocketChannel socketChannel = SocketChannel.open()) {
// 建设连贯
socketChannel.connect(new InetSocketAddress("localhost", 7777));
int count = 0;
while (true) {
final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += socketChannel.read(buffer);
System.out.println("客户端承受了.."+count);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行后果:
发表回复