NIO 根底
什么是 NIO
- Java NIO 全称 Java non-blocking IO,指的是 JDK 提供的新 API。从 JDK 1.4 开始,Java 提供了一系列改良的输出/输入的新个性,被统称为 NIO,即 New IO,是
同步非阻塞
的。 - NIO 相干类都放在 java.nio 包下,并对原 java.io 包中很多类进行了改写。
- NIO 有三大外围局部:
Channel(管道)
、Buffer(缓冲区)
、Selector(选择器)
。 - NIO 是面向
缓冲区
编程的。数据读取到了一个它略微解决的缓冲区,须要时可在缓冲区中前后挪动,这就减少了处理过程中的灵活性,应用它能够提供非阻塞的高伸缩性网络。 - Java NIO 的非阻塞模式,使一个线程从某通道发送申请读取数据,然而它仅能失去目前可用数据,如果目前没有可用数据时,则阐明不会获取,而不是放弃线程阻塞,所以直到数据变为能够读取之前,该线程能够做其余事件。非阻塞写入同理。
三大外围组件
Channel 的根本介绍
NIO 的通道相似于流,但有如下区别:
- 通道是双向的能够进行读写,而流是单向的只能读,或者写
- 通道能够实现异步读写数据
- 通道能够从缓冲区读取数据,也能够写入数据到缓冲区
四种通道:
- FileChannel :从文件中读写数据
- DatagramChannel:通过 UDP 协定,读写网络中的数据
- SocketChannel:能通过 TCP 协定来读写网络中数据,罕用于客户端
- ServerSocketChannel:监听 TCP 连贯,对每个新的连贯会创立一个 SocketChannel
Buffer(缓冲区)根本介绍
NIO 中的 Buffer 用于 NIO 通道(Channel)进行交互。
缓冲区实质上是一个能够读写数据的内存块,能够了解为是一个容器对象(含数组)
,该对象提供了一组办法
,能够更轻松地应用内存块,缓冲区对象内置了一些机制,可能跟踪和记录缓冲区的状态变动状况。
当向 Buffer 写入数据时,Buffer 会记录下写了多少数据,一旦要读取数据,须要通过flip()
办法将 Buffer 从写模式切换到读模式。在读模式下,能够读取之前写入到 Buffer 的所有数据。
当读完了所有数据,就须要清空缓存区,让它能够再次被写入。有两种形式能清空缓冲区,调用clear()
或者compact()
办法。
clear()办法会清空整个缓冲区。compact()办法只会革除曾经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的前面。
Channel 提供从文件、网络读取数据的渠道,然而读取或者都必须通过 Buffer。在 Buffer 子类中保护着一个对应类型的数组,用来存放数据。
Selector 的根本介绍
- Java 的 NIO 应用了非阻塞的 I/O 形式。能够用一个线程解决若干个客户端连贯,就会应用到 Selector(选择器)
- Selector 可能检测到多个注册通道上是否有事件产生(多个 Channel 以事件的模式注册到同一个 selector),如果有事件产生,便获取事件而后针对每个事件进行相应的解决
- 只有在连贯真正有读写事件产生时,才会进行读写,缩小了零碎开销,并且不用为每个连贯都创立一个线程,不必保护多个线程
- 防止了多线程之间上下文切换导致的开销
Selector 的特点
Netty 的 I/O 线程 NioEventLoop 聚合了 Selector(选择器 / 多路复用器),能够并发解决成千盈百个客户端连贯。
当线程从某客户端 Socket 通道进行读写时,若没有数据可用,该线程能够进行其余工作。
线程通常将非阻塞 I/O 的闲暇工夫用于其余通道上执行 I/O 操作,所以独自的线程能够治理多个输入输出通道。
因为读写操作都是非阻塞的,就能够充沛进步 I/O 线程的运行效率,防止因为频繁 I/O 阻塞导致的线程挂起。
一个 I/O 线程能够并发解决 N 个客户端连贯和读写操作,这从根本上解决了传统同步阻塞 I/O 一连贯一线程模型,架构性能、弹性伸缩能力和可靠性都失去极大地晋升。
ByteBuffer 的根本应用
外围依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version></dependency>
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/28 * @Description ByteBuffer根本应用,读取文件内容并打印 */public class ByteBufferTest { public static void main(String[] args) { //获取channel try (FileChannel channel = new FileInputStream("data.txt").getChannel()) { //创立ByteBuffer final ByteBuffer buffer = ByteBuffer.allocate(1024); //读取文件内容,并存入buffer channel.read(buffer); //切换为读模式 buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } //清空缓冲区,并重置为写模式 buffer.clear(); } catch (IOException e) { e.printStackTrace(); } }}
输入后果:
1234567890abc
ByteBuffer 的构造
Buffer 中定义了四个属性来提供所其蕴含的数据元素。
// Invariants: mark <= position <= limit <= capacityprivate int mark = -1;private int position = 0;private int limit;private int capacity;
- capacity:缓冲区的容量。通过构造函数赋予,一旦设置,无奈更改
- limit:缓冲区的界线。位于 limit 后的数据不可读写。缓冲区的限度不能为负,并且不能大于其容量
- position:下一个读写地位的索引(相似 PC)。缓冲区的地位不能为负,并且不能大于 limit
- mark:记录以后 position 的值。position 被扭转后,能够通过调用 reset() 办法复原到 mark 的地位
在一开始的状况下,position 指向第一位写入地位,limit 和 capacity 则等于缓冲区的容量。
在写模式下,position 是写入地位,limit 等于容量,下图示意写入 4 个元素后的状态。
当调用flip()
办法切换为读模式后,position 切换为读取地位,limit 切换为读取限度。
当读取到 limit 地位后,则不能够持续读取。
当调用clear()
办法后,则回归最原始状态。
当调用 compact()办法时,须要留神:此办法为 ByteBuffer 的办法,而不是 Buffer 的办法。
- compact 会把未读完的数据向前压缩,而后切换到写模式
- 数据前移后,原地位的值并未清零,写时会笼罩之前的值
ByteBuffer 的常见办法
调配空间:allocate()
//java.nio.HeapByteBuffer java堆内存,读写效率较低,受到gc影响System.out.println(ByteBuffer.allocate(1024).getClass());//java.nio.DirectByteBuffer 间接内存,读写效率较高(少一次拷贝),不会受gc影响,分配内存效率较低,使用不当则可能会产生内存透露System.out.println(ByteBuffer.allocateDirect(1024).getClass());
flip()
- flip()办法会切换对缓冲区的操作模式,由写->读 / 读->写
put()
- put()办法能够将一个数据放入到缓冲区中。
- 进行该操作后,postition 的值会+1,指向下一个能够放入的地位。
get()
- get()办法会读取缓冲区中的一个值
- 进行该操作后,position 会+1,如果超过了 limit 则会抛出异样
留神:get(i)办法不会扭转 position 的值。
rewind()
- 该办法只能在读模式下应用
- rewind()办法后,会复原 position、limit 和 capacity 的值,变为进行 get()前的值
clear()
- clear()办法会将缓冲区中的各个属性复原为最后的状态,position = 0, capacity = limit
- 此时缓冲区的数据仍然存在,处于“被忘记”状态,下次进行写操作时会笼罩这些数据
mark()和 reset()
- mark()办法会将 postion 的值保留到 mark 属性中
- reset()办法会将 position 的值改为 mark 中保留的值
字符串和 ByteBuffer 互相转换
引入工具类:
import io.netty.util.internal.MathUtil;import io.netty.util.internal.StringUtil;import java.nio.ByteBuffer;/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/28 * @Description 工具类 */public class ByteBufferUtil { private static final char[] BYTE2CHAR = new char[256]; private static final char[] HEXDUMP_TABLE = new char[256 * 4]; private static final String[] HEXPADDING = new String[16]; private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4]; private static final String[] BYTE2HEX = new String[256]; private static final String[] BYTEPADDING = new String[16]; static { final char[] DIGITS = "0123456789abcdef".toCharArray(); for (int i = 0; i < 256; i++) { HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F]; HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F]; } int i; // Generate the lookup table for hex dump paddings for (i = 0; i < HEXPADDING.length; i++) { int padding = HEXPADDING.length - i; StringBuilder buf = new StringBuilder(padding * 3); for (int j = 0; j < padding; j++) { buf.append(" "); } HEXPADDING[i] = buf.toString(); } // Generate the lookup table for the start-offset header in each row (up to 64KiB). for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) { StringBuilder buf = new StringBuilder(12); buf.append(StringUtil.NEWLINE); buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L)); buf.setCharAt(buf.length() - 9, '|'); buf.append('|'); HEXDUMP_ROWPREFIXES[i] = buf.toString(); } // Generate the lookup table for byte-to-hex-dump conversion for (i = 0; i < BYTE2HEX.length; i++) { BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i); } // Generate the lookup table for byte dump paddings for (i = 0; i < BYTEPADDING.length; i++) { int padding = BYTEPADDING.length - i; StringBuilder buf = new StringBuilder(padding); for (int j = 0; j < padding; j++) { buf.append(' '); } BYTEPADDING[i] = buf.toString(); } // Generate the lookup table for byte-to-char conversion for (i = 0; i < BYTE2CHAR.length; i++) { if (i <= 0x1f || i >= 0x7f) { BYTE2CHAR[i] = '.'; } else { BYTE2CHAR[i] = (char) i; } } } /** * 打印所有内容 * * @param buffer */ public static void debugAll(ByteBuffer buffer) { int oldlimit = buffer.limit(); buffer.limit(buffer.capacity()); StringBuilder origin = new StringBuilder(256); appendPrettyHexDump(origin, buffer, 0, buffer.capacity()); System.out.println("+--------+-------------------- all ------------------------+----------------+"); System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit); System.out.println(origin); buffer.limit(oldlimit); } /** * 打印可读取内容 * * @param buffer */ public static void debugRead(ByteBuffer buffer) { StringBuilder builder = new StringBuilder(256); appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position()); System.out.println("+--------+-------------------- read -----------------------+----------------+"); System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit()); System.out.println(builder); } private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) { if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) { throw new IndexOutOfBoundsException( "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length + ") <= " + "buf.capacity(" + buf.capacity() + ')'); } if (length == 0) { return; } dump.append( " +-------------------------------------------------+" + StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" + StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+"); final int startIndex = offset; final int fullRows = length >>> 4; final int remainder = length & 0xF; // Dump the rows which have 16 bytes. for (int row = 0; row < fullRows; row++) { int rowStartIndex = (row << 4) + startIndex; // Per-row prefix. appendHexDumpRowPrefix(dump, row, rowStartIndex); // Hex dump int rowEndIndex = rowStartIndex + 16; for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2HEX[getUnsignedByte(buf, j)]); } dump.append(" |"); // ASCII dump for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]); } dump.append('|'); } // Dump the last row which has less than 16 bytes. if (remainder != 0) { int rowStartIndex = (fullRows << 4) + startIndex; appendHexDumpRowPrefix(dump, fullRows, rowStartIndex); // Hex dump int rowEndIndex = rowStartIndex + remainder; for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2HEX[getUnsignedByte(buf, j)]); } dump.append(HEXPADDING[remainder]); dump.append(" |"); // Ascii dump for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]); } dump.append(BYTEPADDING[remainder]); dump.append('|'); } dump.append(StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+"); } private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) { if (row < HEXDUMP_ROWPREFIXES.length) { dump.append(HEXDUMP_ROWPREFIXES[row]); } else { dump.append(StringUtil.NEWLINE); dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L)); dump.setCharAt(dump.length() - 9, '|'); dump.append('|'); } } public static short getUnsignedByte(ByteBuffer buffer, int index) { return (short) (buffer.get(index) & 0xFF); }}
测试类:
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/28 * @Description 字符串和ByteBuffer互相转换 */public class TranslateTest { public static void main(String[] args) { String str1 = "hello"; String str2; String str3; // 通过StandardCharsets的encode办法取得ByteBuffer // 此时取得的ByteBuffer为读模式,无需通过flip切换模式 ByteBuffer buffer = StandardCharsets.UTF_8.encode(str1); //也能够应用wrap办法实现,无需通过flip切换模式 ByteBuffer wrap = ByteBuffer.wrap(str1.getBytes()); ByteBufferUtil.debugAll(wrap); ByteBufferUtil.debugAll(buffer); // 将缓冲区中的数据转化为字符串 // 通过StandardCharsets解码,取得CharBuffer,再通过toString取得字符串 str2 = StandardCharsets.UTF_8.decode(buffer).toString(); System.out.println(str2); str3 = StandardCharsets.UTF_8.decode(wrap).toString(); System.out.println(str3); }}
运行后果:
+--------+-------------------- all ------------------------+----------------+position: [0], limit: [5] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f |hello |+--------+-------------------------------------------------+----------------++--------+-------------------- all ------------------------+----------------+position: [0], limit: [5] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f |hello |+--------+-------------------------------------------------+----------------+hellohello
粘包与半包
景象
网络上有多条数据发送给服务端,数据之间应用 \n 进行分隔。
但因为某种原因这些数据在接管时,被进行了重新组合,例如原始数据有 3 条为:
- Hello,world\n
- I’m Jack\n
- How are you?\n
变成了上面的两个 byteBuffer (粘包,半包)
- Hello,world\nI’m Jack\nHo
- w are you?\n
呈现起因
粘包
发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到肯定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送进来。
半包
接管方的缓冲区的大小是无限的,当接管方的缓冲区满了当前,就须要将信息截断,等缓冲区空了当前再持续放入数据。这就会产生一段残缺的数据最初被截断的景象。
解决办法
- 通过
get(index)
办法遍历 ByteBuffer,当遇到\n
后进行解决。 - 记录从 position 到 index 的数据长度,申请对应大小的缓冲区。
- 将缓冲区的数据通过
get()
获取写入到 target 缓冲区中。 - 最初,调用 compact()办法切换为写模式,因为缓冲区中可能还存在未读取的数据。
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description 解决黏包和半包 */public class ByteBufferTest { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(32); //模仿黏包和半包 buffer.put("Hello,world\nI'm Jack\nHo".getBytes(StandardCharsets.UTF_8)); split(buffer); buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8)); split(buffer); } private static void split(ByteBuffer buffer) { //切换读模式 buffer.flip(); for (int i = 0; i < buffer.limit(); i++) { //找到残缺音讯 if (buffer.get(i) == '\n') { int length = i + 1 - buffer.position(); final ByteBuffer target = ByteBuffer.allocate(length); //从buffer中读取,写入 target for(int j = 0; j < length; j++) { // 将buffer中的数据写入target中 target.put(buffer.get()); } // 打印查看后果 ByteBufferUtil.debugAll(target); } } //清空已读局部,并切换写模式 buffer.compact(); }}
运行后果:
+--------+-------------------- all ------------------------+----------------+position: [12], limit: [12] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 48 65 6c 6c 6f 2c 77 6f 72 6c 64 0a |Hello,world. |+--------+-------------------------------------------------+----------------++--------+-------------------- all ------------------------+----------------+position: [9], limit: [9] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 49 27 6d 20 4a 61 63 6b 0a |I'm Jack. |+--------+-------------------------------------------------+----------------++--------+-------------------- all ------------------------+----------------+position: [13], limit: [13] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 48 6f 77 20 61 72 65 20 79 6f 75 3f 0a |How are you?. |+--------+-------------------------------------------------+----------------+
文件编程
FileChannel
工作模式
:FileChannel 只能工作在阻塞模式下!
获取
不能间接关上 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel()
办法。
- 通过 FileInputStream 获取的 channel 只能读
- 通过 FileOutputStream 获取的 channel 只能写
- 通过 RandomAccessFile 是否能读写依据结构 RandomAccessFile 时的读写模式决定
读取
通过read()
办法将数据填充到 ByteBuffer 中,返回值示意读到了多少字节,-1
示意读到了文件开端。
int readBytes = channel.read(buffer);
写入
因为 channel 是有写入下限的,所以 write() 办法并不能保障一次将 buffer 中的内容全副写入 channel。必须依照以下规定进行写入。
// 通过hasRemaining()办法查看缓冲区中是否还有数据未写入到通道中while(buffer.hasRemaining()) { channel.write(buffer);}
敞开
Channel 必须敞开,不过调用 FileInputStream、FileOutputStream、 RandomAccessFile 的close()
办法时也会间接的调用 Channel 的 close()办法。
地位
channel 也领有一个保留读取数据地位的属性,即 position。
long pos = channel.position();
能够通过 position(int pos)设置 channel 中 position 的值。
long newPos = 10;channel.position(newPos);
设置以后地位时,如果设置为文件的开端:
- 这时读取会返回 -1
- 这时写入,会追加内容,但要留神如果 position 超过了文件开端,再写入时在新内容和原开端之间会有空洞(00)
强制写入
操作系统出于性能的思考,会将数据缓存,不是立即写入磁盘,而是等到缓存满了当前将所有数据一次性的写入磁盘。能够调用 force(true)
办法将文件内容和元数据(文件的权限等信息)立即写入磁盘。
常见办法
FileChannel 次要用来对本地文件进行 IO 操作,常见的办法有:
- public int read(ByteBuffer dst) :从通道中读取数据到缓冲区中。
- public int write(ByteBuffer src):把缓冲区中的数据写入到通道中。
- public long transferFrom(ReadableByteChannel src,long position,long count):从指标通道中复制数据到以后通道。
- public long transferTo(long position,long count,WriteableByteChannel target):把数据从以后通道复制给指标通道。
应用 FileChannel 写入文本文件
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description FileChannel测试写入文件 */public class FileChannelTest { public static void main(String[] args) { try (final FileChannel channel = new FileOutputStream("data1.txt").getChannel()) { String msg = "Hello World!!!"; final ByteBuffer buffer = ByteBuffer.allocate(16); buffer.put(msg.getBytes(StandardCharsets.UTF_8)); buffer.flip(); channel.write(buffer); } catch (IOException e) { e.printStackTrace(); } }}
应用 FileChannel 读取文本文件
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description FileChannel测试读取文件 */public class FileChannelTest { public static void main(String[] args) { try (final FileChannel channel = new FileInputStream("data1.txt").getChannel()) { final ByteBuffer buffer = ByteBuffer.allocate(16); channel.read(buffer); buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } //清空缓冲区,并重置为写模式 buffer.clear(); } catch (IOException e) { e.printStackTrace(); } }}
应用 FileChannel 进行数据传输
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description FileChannel测试文件传输 */public class FileChannelTest { public static void main(String[] args){ try (final FileChannel from = new FileInputStream("data.txt").getChannel(); final FileChannel to = new FileOutputStream("data1.txt").getChannel()) { // 参数:inputChannel的起始地位,传输数据的大小,目标channel // 返回值为传输的数据的字节数 // transferTo一次只能传输2G的数据 from.transferTo(0, from.size(), to); } catch (IOException e) { e.printStackTrace(); } }}
transferTo()办法对应的还有 transferFrom()办法。
尽管 transferTo()办法传输效率较高,底层利用操作系统的零拷贝进行优化,然而 transferTo 办法一次只能传输 2G 的数据。
解决办法:能够依据 transferTo()的返回值来判断,返回值代表传输了多少,通过 from 的 size()大小来每次减去即可。
long size = from.size();for (long left = size; left > 0; ) { left -= from.transferTo(size - left, size, to);}
Channel 和 Buffer 的注意事项
- ByteBuffer 反对类型化的 put 和 get,put 放入什么数据类型,get 就应该应用相应的数据类型来取出,否则可能会产生 ByteUnderflowException 异样。
- 能够将一个一般的 Buffer 转换为只读的 Buffer:asReadOnlyBuffer()办法。
- NIO 提供了 MapperByteBuffer,能够让文件间接在内存(堆外内存)中进行批改,而如何同步到文件由 NIO 来实现。
NIO 还反对通过多个 Buffer(即 Buffer 数组)实现读写操作,即Scattering(扩散)和 Gathering(汇集)。
Scattering(扩散)
:在向缓冲区写入数据时,能够应用 Buffer 数组顺次写入,一个 Buffer 数组写满后,持续写入下一个 Buffer 数组。Gathering(汇集)
:从缓冲区读取数据时,能够顺次读取,读完一个 Buffer 再按程序读取下一个。
网络编程
阻塞 vs 非阻塞
阻塞
- 在没有数据可读时,包含数据复制过程中,线程必须阻塞期待,不会占用 CPU,但线程相当于闲置状态
- 32 位 JVM 一个线程 320k,64 位 JVM 一个线程 1024k,为了缩小线程数量,须要采纳线程池技术
- 但即便应用线程池,如果有很多连贯建设,但长时间 inactive,会阻塞线程池中所有线程
非阻塞
- 在某个 Channel 没有可读事件时,线程不用阻塞,它能够去解决其它有可读事件的 Channel
- 数据复制过程中,线程理论还是阻塞的(AIO 改良的中央)
- 写数据时,线程只是期待数据写入 Channel 即可,无需期待 Channel 通过网络把数据发送进来
阻塞案例代码
服务端代码:
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description 应用NIO来了解阻塞模式-服务端 */public class Server { public static void main(String[] args) { //1. 创立服务器 try (ServerSocketChannel ssc = ServerSocketChannel.open()) { final ByteBuffer buffer = ByteBuffer.allocate(16); //2. 绑定监听端口 ssc.bind(new InetSocketAddress(7777)); //3. 寄存建设连贯的汇合 List<SocketChannel> channels = new ArrayList<>(); while (true) { System.out.println("建设连贯..."); //4. accept 建设客户端连贯 , 用来和客户端之间通信 final SocketChannel socketChannel = ssc.accept(); System.out.println("建设连贯实现..."); channels.add(socketChannel); //5. 接管客户端发送的数据 for (SocketChannel channel : channels) { System.out.println("正在读取数据..."); channel.read(buffer); buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); System.out.println("数据读取实现..."); } } } catch (IOException e) { System.out.println("出现异常..."); } }}
客户端代码:
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description 应用NIO来了解阻塞模式-客户端 */public class Client { public static void main(String[] args) { try (SocketChannel socketChannel = SocketChannel.open()) { // 建设连贯 socketChannel.connect(new InetSocketAddress("localhost", 7777)); final ByteBuffer buffer = ByteBuffer.allocate(10); buffer.put("hello".getBytes(StandardCharsets.UTF_8)); buffer.flip(); socketChannel.write(buffer); } catch (IOException e) { e.printStackTrace(); } }}
运行后果:
在刚开始服务器运行后:服务器端因 accept 阻塞。
在客户端和服务器建设连贯后,客户端发送音讯前:服务器端因通道为空被阻塞。
客户端发送数据后,服务器解决通道中的数据。之后再次进入循环时,再次被 accept 阻塞。
- 之前的客户端再次发送音讯,服务器端因为被 accept 阻塞,就无奈解决之前客户端再次发送到通道中的信息了。
非阻塞
- 通过 ServerSocketChannel 的
configureBlocking(false)
办法将取得连贯设置为非阻塞的。此时若没有连贯,accept 会返回 null - 通过 SocketChannel 的
configureBlocking(false)
办法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read 会返回-1
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description 应用NIO来了解阻塞模式-服务端 */public class Server { public static void main(String[] args) { //1. 创立服务器 try (ServerSocketChannel ssc = ServerSocketChannel.open()) { final ByteBuffer buffer = ByteBuffer.allocate(16); //2. 绑定监听端口 ssc.bind(new InetSocketAddress(7777)); //3. 寄存建设连贯的汇合 List<SocketChannel> channels = new ArrayList<>(); //设置非阻塞!! ssc.configureBlocking(false); while (true) { System.out.println("建设连贯..."); //4. accept 建设客户端连贯 , 用来和客户端之间通信 final SocketChannel socketChannel = ssc.accept(); //设置非阻塞!! socketChannel.configureBlocking(false); System.out.println("建设连贯实现..."); channels.add(socketChannel); //5. 接管客户端发送的数据 for (SocketChannel channel : channels) { System.out.println("正在读取数据..."); channel.read(buffer); buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); System.out.println("数据读取实现..."); } } } catch (IOException e) { System.out.println("出现异常..."); } }}
因为设置为了非阻塞,会始终执行while(true)
中的代码,CPU 始终处于繁忙状态,会使得性能变低,所以理论状况中不应用这种办法解决申请。
Selector
根本介绍
- Java 的 NIO 应用了非阻塞的 I/O 形式。能够用一个线程解决若干个客户端连贯,就会应用到 Selector(选择器)。
- Selector 可能检测到多个注册通道上是否有事件产生(多个 Channel 以事件的模式注册到同一个 selector),如果有事件产生,便获取事件而后针对每个事件进行相应的解决。
- 只有在连贯真正有读写事件产生时,才会进行读写,缩小了零碎开销,并且不用为每个连贯都创立一个线程,不必保护多个线程。
- 防止了多线程之间上下文切换导致的开销。
特点
单线程能够配合 Selector 实现对多个 Channel 可读写事件的监控,这称为多路复用。
- 多路复用仅针对网络 IO,一般文件 IO 无奈利用多路复用
如果不必 Selector 的非阻塞模式,线程大部分工夫都在做无用功,而 Selector 可能保障
- 有可连贯事件时才去连贯
- 有可读事件才去读取
- 有可写事件才去写入
限于网络传输能力,Channel 未必随时可写,一旦 Channel 可写,会触发 Selector 的可写事件进行写入。
Selector 相干办法阐明
selector.select()
://若未监听到注册管道中有事件,则继续阻塞selector.select(1000)
://阻塞 1000 毫秒,1000 毫秒后返回selector.wakeup()
://唤醒 selectorselector.selectNow()
: //不阻塞,立刻返回
NIO 非阻塞网络编程过程剖析
- 当客户端连贯时,会通过 SeverSocketChannel 失去对应的 SocketChannel。
- Selector 进行监听,调用
select()
办法,返回注册该 Selector 的所有通道中有事件产生的通道个数。 - 将 SocketChannel 注册到 Selector 上,public final SelectionKey register(Selector sel, int ops),一个 Selector 上能够注册多个 SocketChannel。
- 注册后返回一个 SelectionKey,会和该 Selector 关联(以汇合的模式)。
- 进一步失去各个 SelectionKey,有事件产生。
- 再通过 SelectionKey 反向获取 SocketChannel,应用 channnel()办法。
- 能够通过失去的 channel,实现业务解决。
SelectionKey 中定义了四个操作标记位:OP_READ
示意通道中产生读事件;OP_WRITE
—示意通道中产生写事件;OP_CONNECT
—示意建设连贯;OP_ACCEPT
—申请新连贯。
SelectionKey 的相干办法
办法 | 形容 |
---|---|
public abstract Selector selector(); | 失去与之关联的 Selector 对象 |
public abstract SelectableChannel channel(); | 失去与之关联的通道 |
public final Object attachment() | 失去与之关联的共享数据 |
public abstract SelectionKey interestOps(int ops); | 设置或扭转监听的事件类型 |
public final boolean isReadable(); | 通道是否可读 |
public final boolean isWritable(); | 通道是否可写 |
public final boolean isAcceptable(); | 是否能够建设连贯 ACCEPT |
Selector 根本应用及 Accpet 事件
接下来咱们应用 Selector 实现多路复用,对服务端代码进行改良。
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Selector根本应用-服务端 */public class Server { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); final Selector selector = Selector.open()) {//创立selector 治理多个channel ssc.bind(new InetSocketAddress(7777)); ssc.configureBlocking(false); // 将通道注册到选择器中,并设置感兴趣的事件 ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer buffer = ByteBuffer.allocate(16); while (true) { // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了CPU空转 // 返回值为就绪的事件个数 int ready = selector.select(); System.out.println("selector就绪总数: " + ready); // 获取所有事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); //判断key的事件类型 if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("获取到客户端连贯..."); } // 处理完毕后移除 iterator.remove(); } } } catch (IOException e) { System.out.println("出现异常..."); } }}
事件产生后,要么解决,要么应用 key.cancel()办法勾销,不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层应用的是程度触发。
当选择器中的通道对应的事件产生后,SelectionKey 会被放到另一个汇合中,然而selecionKey 不会主动移除,所以须要咱们在解决完一个事件后,通过迭代器手动移除其中的 selecionKey。否则会导致已被解决过的事件再次被解决,就会引发谬误。
Read 事件
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Read事件-服务端 */public class Server { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); final Selector selector = Selector.open()) {//创立selector 治理多个channel ssc.bind(new InetSocketAddress(7777)); ssc.configureBlocking(false); // 将通道注册到选择器中,并设置感兴趣的事件 ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer buffer = ByteBuffer.allocate(16); while (true) { // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了CPU空转 // 返回值为就绪的事件个数 int ready = selector.select(); System.out.println("selector就绪总数: " + ready); // 获取所有事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); //判断key的事件类型 if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("获取到客户端连贯..."); // 设置为非阻塞模式,同时将连贯的通道也注册到抉择其中 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { //读事件 SocketChannel channel = (SocketChannel) key.channel(); channel.read(buffer); buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); } // 处理完毕后移除 iterator.remove(); } } } catch (IOException e) { System.out.println("出现异常..."); } }}
断开解决
当客户端与服务器之间的连贯断开时,会给服务器端发送一个读事件,对异样断开
和失常断开
须要不同的形式进行解决:
失常断开
- 失常断开时,服务器端的 channel.read(buffer)办法的返回值为-1,所以当完结到返回值为-1 时,须要调用 key 的 cancel()办法勾销此事件,并在勾销后移除该事件
异样断开
- 异样断开时,会抛出 IOException 异样, 在 try-catch 的catch 块中捕捉异样并调用 key 的 cancel()办法即可
音讯边界
⚠️ 不解决音讯边界存在的问题
将缓冲区的大小设置为 4 个字节,发送 2 个汉字(你好),通过 decode 解码并打印时,会呈现乱码
ByteBuffer buffer = ByteBuffer.allocate(4);// 解码并打印System.out.println(StandardCharsets.UTF_8.decode(buffer));你���
这是因为 UTF-8 字符集下,1 个汉字占用 3 个字节,此时缓冲区大小为 4 个字节,一次读工夫无奈解决完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好
的 好
字被拆分为了前半部分和后半局部发送,解码时就会呈现问题。
解决音讯边界
传输的文本可能有以下三种状况:
- 文本大于缓冲区大小,此时须要将缓冲区进行扩容
- 产生半包景象
- 产生粘包景象
解决方案:
- 固定音讯长度,数据包大小一样,服务器按预约长度读取,当发送的数据较少时,须要将数据进行填充,直到长度与音讯规定长度统一。毛病是节约带宽
- 另一种思路是按分隔符拆分,毛病是效率低,须要一个一个字符地去匹配分隔符
- TLV 格局,即 Type 类型、Length 长度、Value 数据(也就是在音讯结尾用一些空间寄存前面数据的长度),如 HTTP 申请头中的 Content-Type 与Content-Length。类型和长度已知的状况下,就能够不便获取音讯大小,调配适合的 buffer,毛病是 buffer 须要提前调配,如果内容过大,则影响 server 吞吐量
上面演示第二种解决方案,按分隔符拆分:
咱们须要在 Accept 事件产生后,将通道注册到 Selector 中时,对每个通道增加一个 ByteBuffer 附件,让每个通道产生读事件时都应用本人的通道,防止与其余通道发生冲突而导致问题。
ByteBuffer buffer = ByteBuffer.allocate(16);// 增加通道对应的Buffer附件socketChannel.register(selector, SelectionKey.OP_READ, buffer);
当 Channel 中的数据大于缓冲区时,须要对缓冲区进行扩容操作。此代码中的扩容的断定办法:Channel 调用 compact 办法后,的 position 与 limit 相等,阐明缓冲区中的数据并未被读取(容量太小),此时创立新的缓冲区,其大小扩充为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用 SelectionKey 的 attach 办法将新的缓冲区作为新的附件放入 SelectionKey 中。
// 如果缓冲区太小,就进行扩容if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2); // 将旧buffer中的内容放入新的buffer中 buffer.flip(); newBuffer.put(buffer); // 将新buffer放到key中作为附件 key.attach(newBuffer);}
改良后的代码如下:
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Read事件完整版-服务端 */public class Server { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); final Selector selector = Selector.open()) {//创立selector 治理多个channel ssc.bind(new InetSocketAddress(7777)); ssc.configureBlocking(false); // 将通道注册到选择器中,并设置感兴趣的事件 ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了CPU空转 // 返回值为就绪的事件个数 int ready = selector.select(); System.out.println("selector就绪总数: " + ready); // 获取所有事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); //判断key的事件类型 if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("获取到客户端连贯..."); socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(16); //注册到Selector并且设置读事件,设置附件bytebuffer socketChannel.register(selector, SelectionKey.OP_READ, byteBuffer); } else if (key.isReadable()) { //读事件 try { SocketChannel channel = (SocketChannel) key.channel(); // 通过key取得附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = channel.read(buffer); if (read == -1) { key.cancel(); channel.close(); } else { // 通过分隔符来分隔buffer中的数据 split(buffer); // 如果缓冲区太小,就进行扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); // 将旧buffer中的内容放入新的buffer中 buffer.flip(); newBuffer.put(buffer); // 将新buffer放到key中作为附件 key.attach(newBuffer); } } } catch (IOException e) { //异样断开,勾销事件 key.cancel(); } } // 处理完毕后移除 iterator.remove(); } } } catch (IOException e) { System.out.println("出现异常..."); } } private static void split(ByteBuffer buffer) { buffer.flip(); for (int i = 0; i < buffer.limit(); i++) { //找到一条实现数据 if (buffer.get(i) == '\n') { // 缓冲区长度 int length = i + 1 - buffer.position(); ByteBuffer target = ByteBuffer.allocate(length); // 将后面的内容写入target缓冲区 for (int j = 0; j < length; j++) { // 将buffer中的数据写入target中 target.put(buffer.get()); } ByteBufferUtil.debugAll(target); } } // 切换为写模式,然而缓冲区可能未读完,这里须要应用compact buffer.compact(); }}
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Read事件完整版-客户端 */public class Client { public static void main(String[] args) { try (SocketChannel socketChannel = SocketChannel.open()) { // 建设连贯 socketChannel.connect(new InetSocketAddress("localhost", 7777)); final ByteBuffer buffer = ByteBuffer.allocate(32); buffer.put("01234567890abcdef3333\n".getBytes(StandardCharsets.UTF_8)); buffer.flip(); socketChannel.write(buffer); } catch (IOException e) { e.printStackTrace(); } }}
ByteBuffer 的大小调配
- 每个 channel 都须要记录可能被切分的音讯,因为 ByteBuffer 不能被多个 channel 独特应用,因而须要为每个 channel 保护一个独立的 ByteBuffer
- ByteBuffer 不能太大,比方一个 ByteBuffer 1Mb 的话,要反对百万连贯就要 1Tb 内存,因而须要设计大小可变的 ByteBuffer
调配思路:
- 一种思路是首先调配一个较小的 buffer,例如 4k,如果发现数据不够,再调配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,长处是音讯间断容易解决,毛病是数据拷贝消耗性能
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多进去的内容写入新的数组,与后面的区别是音讯存储不间断解析简单,长处是防止了拷贝引起的性能损耗
Write 事件
服务器通过 Buffer 通道中写入数据时,可能因为通道容量小于 Buffer 中的数据大小,导致无奈一次性将 Buffer 中的数据全副写入到 Channel 中,这时便须要分屡次写入,具体步骤如下:
- 执行一次写操作,向将 buffer 中的内容写入到 SocketChannel 中,而后判断 Buffer 中是否还有数据
- 若 Buffer 中还有数据,则须要将 SockerChannel 注册到 Seletor 中,并关注写事件,同时将未写完的 Buffer 作为附件一起放入到 SelectionKey 中。
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Write事件-服务端 */public class Server { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); final Selector selector = Selector.open()) { ssc.bind(new InetSocketAddress(7777)); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { int ready = selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); //判断key的事件类型 if (key.isAcceptable()) { final SocketChannel socketChannel = ssc.accept(); socketChannel.configureBlocking(false); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 3000000; i++) { sb.append("a"); } final ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); final int write = socketChannel.write(buffer); System.out.println("accept事件器写入.."+write); // 判断是否还有残余内容 if (buffer.hasRemaining()) { // 注册到Selector中,关注可写事件,并将buffer增加到key的附件中 socketChannel.register(selector, SelectionKey.OP_WRITE, buffer); } }else if (key.isWritable()) { SocketChannel socket = (SocketChannel) key.channel(); // 取得事件 ByteBuffer buffer = (ByteBuffer) key.attachment(); int write = socket.write(buffer); System.out.println("write事件器写入.."+write); // 如果曾经实现了写操作,须要移除key中的附件,同时不再对写事件感兴趣 if (!buffer.hasRemaining()) { key.attach(null); key.interestOps(0); } } // 处理完毕后移除 iterator.remove(); } } } catch (IOException e) { System.out.println("出现异常..."); } }}
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Write事件-客户端 */public class Client { public static void main(String[] args) { try (SocketChannel socketChannel = SocketChannel.open()) { // 建设连贯 socketChannel.connect(new InetSocketAddress("localhost", 7777)); int count = 0; while (true) { final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); count += socketChannel.read(buffer); System.out.println("客户端承受了.."+count); buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } }}
运行后果: