关于netty:深入学习Netty一NIO基础篇

74次阅读

共计 28156 个字符,预计需要花费 71 分钟才能阅读完成。

NIO 根底

什么是 NIO

  1. Java NIO 全称 Java non-blocking IO,指的是 JDK 提供的新 API。从 JDK 1.4 开始,Java 提供了一系列改良的输出 / 输入的新个性,被统称为 NIO,即 New IO,是 同步非阻塞 的。
  2. NIO 相干类都放在 java.nio 包下,并对原 java.io 包中很多类进行了改写。
  3. NIO 有 三大外围 局部:Channel(管道)Buffer(缓冲区)Selector(选择器)
  4. NIO 是面向 缓冲区 编程的。数据读取到了一个它略微解决的缓冲区,须要时可在缓冲区中前后挪动,这就减少了处理过程中的灵活性,应用它能够提供非阻塞的高伸缩性网络。
  5. Java NIO 的非阻塞模式,使一个线程从某通道发送申请读取数据,然而它仅能失去目前可用数据,如果目前没有可用数据时,则阐明不会获取,而不是放弃线程阻塞,所以直到数据变为能够读取之前,该线程能够做其余事件。非阻塞写入同理。

三大外围组件

Channel 的根本介绍

NIO 的通道相似于流,但有如下区别:

  1. 通道是双向的能够进行读写,而流是单向的只能读,或者写
  2. 通道能够实现异步读写数据
  3. 通道能够从缓冲区读取数据,也能够写入数据到缓冲区

四种通道:

  • FileChannel:从文件中读写数据
  • DatagramChannel:通过 UDP 协定,读写网络中的数据
  • SocketChannel:能通过 TCP 协定来读写网络中数据,罕用于客户端
  • ServerSocketChannel:监听 TCP 连贯,对每个新的连贯会创立一个 SocketChannel

Buffer(缓冲区)根本介绍

NIO 中的 Buffer 用于 NIO 通道(Channel)进行交互。

缓冲区实质上是一个能够读写数据的内存块,能够了解为是一个 容器对象(含数组),该对象提供了 一组办法,能够更轻松地应用内存块,缓冲区对象内置了一些机制,可能跟踪和记录缓冲区的状态变动状况。

当向 Buffer 写入数据时,Buffer 会记录下写了多少数据,一旦要读取数据,须要通过 flip() 办法将 Buffer 从写模式切换到读模式。在读模式下,能够读取之前写入到 Buffer 的所有数据。

当读完了所有数据,就须要清空缓存区,让它能够再次被写入。有两种形式能清空缓冲区,调用 clear() 或者 compact() 办法。

clear()办法会清空整个缓冲区。compact()办法只会革除曾经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的前面。

Channel 提供从文件、网络读取数据的渠道,然而读取或者都必须通过 Buffer。在 Buffer 子类中保护着一个对应类型的数组,用来存放数据。

Selector 的根本介绍

  1. Java 的 NIO 应用了非阻塞的 I/O 形式。能够用一个线程解决若干个客户端连贯,就会应用到 Selector(选择器)
  2. Selector 可能检测到多个注册通道上是否有事件产生(多个 Channel 以事件的模式注册到同一个 selector),如果有事件产生,便获取事件而后针对每个事件进行相应的解决
  3. 只有在连贯真正有读写事件产生时,才会进行读写,缩小了零碎开销,并且不用为每个连贯都创立一个线程,不必保护多个线程
  4. 防止了多线程之间上下文切换导致的开销

Selector 的特点

Netty 的 I/O 线程 NioEventLoop 聚合了 Selector(选择器 / 多路复用器),能够并发解决成千盈百个客户端连贯。

当线程从某客户端 Socket 通道进行读写时,若没有数据可用,该线程能够进行其余工作。

线程通常将非阻塞 I/O 的闲暇工夫用于其余通道上执行 I/O 操作,所以独自的线程能够治理多个输入输出通道。

因为读写操作都是非阻塞的,就能够充沛进步 I/O 线程的运行效率,防止因为频繁 I/O 阻塞导致的线程挂起。

一个 I/O 线程能够并发解决 N 个客户端连贯和读写操作,这从根本上解决了传统同步阻塞 I/O 一连贯一线程模型,架构性能、弹性伸缩能力和可靠性都失去极大地晋升。

ByteBuffer 的根本应用

外围依赖

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.36.Final</version>
</dependency>
/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/28
 * @Description ByteBuffer 根本应用,读取文件内容并打印
 */
public class ByteBufferTest {public static void main(String[] args) {
        // 获取 channel
        try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
            // 创立 ByteBuffer
            final ByteBuffer buffer = ByteBuffer.allocate(1024);
            // 读取文件内容, 并存入 buffer
            channel.read(buffer);
            // 切换为读模式
            buffer.flip();
            while (buffer.hasRemaining()) {System.out.print((char) buffer.get());
            }
            // 清空缓冲区, 并重置为写模式
            buffer.clear();} catch (IOException e) {e.printStackTrace();
        }

    }
}

输入后果:

1234567890abc

ByteBuffer 的构造

Buffer 中定义了四个属性来提供所其蕴含的数据元素。

// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
  • capacity:缓冲区的容量。通过构造函数赋予,一旦设置,无奈更改
  • limit:缓冲区的界线。位于 limit 后的数据不可读写。缓冲区的限度不能为负,并且 不能大于其容量
  • position下一个 读写地位的索引(相似 PC)。缓冲区的地位不能为负,并且 不能大于 limit
  • mark:记录以后 position 的值。position 被扭转后,能够通过调用 reset() 办法复原到 mark 的地位

在一开始的状况下,position 指向第一位写入地位,limit 和 capacity 则等于缓冲区的容量。

在写模式下,position 是写入地位,limit 等于容量,下图示意写入 4 个元素后的状态。

当调用 flip() 办法切换为读模式后,position 切换为读取地位,limit 切换为读取限度。

当读取到 limit 地位后,则不能够持续读取。

当调用 clear() 办法后,则回归最原始状态。

当调用 compact()办法时,须要留神:此办法为 ByteBuffer 的办法,而不是 Buffer 的办法

  • compact 会把未读完的数据向前压缩,而后切换到写模式
  • 数据前移后,原地位的值并未清零,写时会 笼罩 之前的值

ByteBuffer 的常见办法

调配空间:allocate()

//java.nio.HeapByteBuffer java 堆内存,读写效率较低,受到 gc 影响
System.out.println(ByteBuffer.allocate(1024).getClass());
//java.nio.DirectByteBuffer 间接内存,读写效率较高(少一次拷贝),不会受 gc 影响,分配内存效率较低,使用不当则可能会产生内存透露
System.out.println(ByteBuffer.allocateDirect(1024).getClass());

flip()

  • flip()办法会 切换对缓冲区的操作模式,由写 -> 读 / 读 -> 写

put()

  • put()办法能够将一个数据放入到缓冲区中。
  • 进行该操作后,postition 的值会 +1,指向下一个能够放入的地位。

get()

  • get()办法会读取缓冲区中的一个值
  • 进行该操作后,position 会 +1,如果超过了 limit 则会抛出异样

留神:get(i)办法不会扭转 position 的值。

rewind()

  • 该办法 只能在读模式下应用
  • rewind()办法后,会复原 position、limit 和 capacity 的值,变为进行 get()前的值

clear()

  • clear()办法会将缓冲区中的各个属性复原为最后的状态,position = 0, capacity = limit
  • 此时缓冲区的数据仍然存在,处于“被忘记”状态,下次进行写操作时会笼罩这些数据

mark()和 reset()

  • mark()办法会将 postion 的值保留到 mark 属性中
  • reset()办法会将 position 的值改为 mark 中保留的值

字符串和 ByteBuffer 互相转换

引入工具类:

import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;

import java.nio.ByteBuffer;

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/28
 * @Description 工具类
 */
public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];
    private static final char[] HEXDUMP_TABLE = new char[256 * 4];
    private static final String[] HEXPADDING = new String[16];
    private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
    private static final String[] BYTE2HEX = new String[256];
    private static final String[] BYTEPADDING = new String[16];

    static {final char[] DIGITS = "0123456789abcdef".toCharArray();
        for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
            HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
        }

        int i;

        // Generate the lookup table for hex dump paddings
        for (i = 0; i < HEXPADDING.length; i++) {
            int padding = HEXPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding * 3);
            for (int j = 0; j < padding; j++) {buf.append(" ");
            }
            HEXPADDING[i] = buf.toString();}

        // Generate the lookup table for the start-offset header in each row (up to 64KiB).
        for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);
            buf.append(StringUtil.NEWLINE);
            buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
            buf.setCharAt(buf.length() - 9, '|');
            buf.append('|');
            HEXDUMP_ROWPREFIXES[i] = buf.toString();}

        // Generate the lookup table for byte-to-hex-dump conversion
        for (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
        }

        // Generate the lookup table for byte dump paddings
        for (i = 0; i < BYTEPADDING.length; i++) {
            int padding = BYTEPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding);
            for (int j = 0; j < padding; j++) {buf.append(' ');
            }
            BYTEPADDING[i] = buf.toString();}

        // Generate the lookup table for byte-to-char conversion
        for (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';
            } else {BYTE2CHAR[i] = (char) i;
            }
        }
    }

    /**
     * 打印所有内容
     *
     * @param buffer
     */
    public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();
        buffer.limit(buffer.capacity());
        StringBuilder origin = new StringBuilder(256);
        appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
        System.out.println("+--------+-------------------- all ------------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
        System.out.println(origin);
        buffer.limit(oldlimit);
    }

    /**
     * 打印可读取内容
     *
     * @param buffer
     */
    public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);
        appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
        System.out.println("+--------+-------------------- read -----------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
        System.out.println(builder);
    }

    private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
            throw new IndexOutOfBoundsException("expected:" + "0 <= offset(" + offset + ") <= offset + length(" + length
                            + ") <=" + "buf.capacity(" + buf.capacity() + ')');
        }
        if (length == 0) {return;}
        dump.append(
                "+-------------------------------------------------+" +
                        StringUtil.NEWLINE + "|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +
                        StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");

        final int startIndex = offset;
        final int fullRows = length >>> 4;
        final int remainder = length & 0xF;

        // Dump the rows which have 16 bytes.
        for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;

            // Per-row prefix.
            appendHexDumpRowPrefix(dump, row, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + 16;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append("|");

            // ASCII dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append('|');
        }

        // Dump the last row which has less than 16 bytes.
        if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;
            appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + remainder;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(HEXPADDING[remainder]);
            dump.append("|");

            // Ascii dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append(BYTEPADDING[remainder]);
            dump.append('|');
        }

        dump.append(StringUtil.NEWLINE +
                "+--------+-------------------------------------------------+----------------+");
    }

    private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);
        } else {dump.append(StringUtil.NEWLINE);
            dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
            dump.setCharAt(dump.length() - 9, '|');
            dump.append('|');
        }
    }

    public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);
    }

}

测试类:

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/28
 * @Description 字符串和 ByteBuffer 互相转换
 */
public class TranslateTest {public static void main(String[] args) {
        String str1 = "hello";
        String str2;
        String str3;
        // 通过 StandardCharsets 的 encode 办法取得 ByteBuffer
        // 此时取得的 ByteBuffer 为读模式,无需通过 flip 切换模式
        ByteBuffer buffer = StandardCharsets.UTF_8.encode(str1);
        // 也能够应用 wrap 办法实现, 无需通过 flip 切换模式
        ByteBuffer wrap = ByteBuffer.wrap(str1.getBytes());
        ByteBufferUtil.debugAll(wrap);
        ByteBufferUtil.debugAll(buffer);

        // 将缓冲区中的数据转化为字符串
        // 通过 StandardCharsets 解码,取得 CharBuffer,再通过 toString 取得字符串
        str2 = StandardCharsets.UTF_8.decode(buffer).toString();
        System.out.println(str2);

        str3 = StandardCharsets.UTF_8.decode(wrap).toString();
        System.out.println(str3);
    }

}

运行后果:

+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
hello
hello

粘包与半包

景象

网络上有多条数据发送给服务端,数据之间应用 \n 进行分隔。
但因为某种原因这些数据在接管时,被进行了重新组合,例如原始数据有 3 条为:

  • Hello,world\n
  • I’m Jack\n
  • How are you?\n

变成了上面的两个 byteBuffer (粘包,半包)

  • Hello,world\nI’m Jack\nHo
  • w are you?\n

呈现起因

粘包

发送方在发送数据时,并不是一条一条地发送数据,而是 将数据整合在一起,当数据达到肯定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送进来。

半包

接管方的缓冲区的大小是无限的,当接管方的缓冲区满了当前,就须要 将信息截断,等缓冲区空了当前再持续放入数据。这就会产生一段残缺的数据最初被截断的景象。

解决办法

  1. 通过 get(index) 办法遍历 ByteBuffer,当遇到 \n 后进行解决。
  2. 记录从 position 到 index 的数据长度,申请对应大小的缓冲区。
  3. 将缓冲区的数据通过 get() 获取写入到 target 缓冲区中。
  4. 最初,调用 compact()办法切换为写模式,因为缓冲区中可能还存在未读取的数据。
/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description 解决黏包和半包
 */
public class ByteBufferTest {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(32);
        // 模仿黏包和半包
        buffer.put("Hello,world\nI'm Jack\nHo".getBytes(StandardCharsets.UTF_8));
        split(buffer);
        buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
        split(buffer);
    }

    private static void split(ByteBuffer buffer) {
        // 切换读模式
        buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            // 找到残缺音讯
            if (buffer.get(i) == '\n') {int length = i + 1 - buffer.position();
                final ByteBuffer target = ByteBuffer.allocate(length);
                // 从 buffer 中读取, 写入 target
                for(int j = 0; j < length; j++) {
                    // 将 buffer 中的数据写入 target 中
                    target.put(buffer.get());
                }
                // 打印查看后果
                ByteBufferUtil.debugAll(target);
            }
        }
        // 清空已读局部, 并切换写模式
        buffer.compact();}
}

运行后果:

+--------+-------------------- all ------------------------+----------------+
position: [12], limit: [12]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 2c 77 6f 72 6c 64 0a             |Hello,world.    |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [9], limit: [9]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 49 27 6d 20 4a 61 63 6b 0a                      |I'm Jack.       |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [13], limit: [13]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 6f 77 20 61 72 65 20 79 6f 75 3f 0a          |How are you?.   |
+--------+-------------------------------------------------+----------------+

文件编程

FileChannel

工作模式

📢:FileChannel 只能工作在 阻塞模式 下!

获取

不能间接关上 FileChannel,必须 通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel() 办法。

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写 依据结构 RandomAccessFile 时的读写模式决定

读取

通过 read() 办法将数据填充到 ByteBuffer 中,返回值示意读到了多少字节,-1示意读到了文件开端。

int readBytes = channel.read(buffer);

写入

因为 channel 是有写入下限的,所以 write() 办法并不能保障一次将 buffer 中的内容全副写入 channel。必须 依照以下规定进行写入

// 通过 hasRemaining()办法查看缓冲区中是否还有数据未写入到通道中
while(buffer.hasRemaining()) {channel.write(buffer);
}

敞开

Channel 必须敞开,不过调用 FileInputStream、FileOutputStream、RandomAccessFile 的 close() 办法时也会间接的调用 Channel 的 close()办法。

地位

channel 也领有一个保留读取数据地位的属性,即 position。

long pos = channel.position();

能够通过 position(int pos)设置 channel 中 position 的值。

long newPos = 10;
channel.position(newPos);

设置以后地位时,如果设置为文件的开端:

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要留神如果 position 超过了文件开端,再写入时在新内容和原开端之间会有空洞(00)

强制写入

操作系统出于性能的思考,会将数据缓存,不是立即写入磁盘,而是等到缓存满了当前将所有数据一次性的写入磁盘。能够调用 force(true) 办法将文件内容和元数据(文件的权限等信息)立即写入磁盘。

常见办法

FileChannel 次要用来对本地文件进行 IO 操作,常见的办法有:

  1. public int read(ByteBuffer dst):从通道中读取数据到缓冲区中。
  2. public int write(ByteBuffer src):把缓冲区中的数据写入到通道中。
  3. public long transferFrom(ReadableByteChannel src,long position,long count):从指标通道中复制数据到以后通道。
  4. public long transferTo(long position,long count,WriteableByteChannel target):把数据从以后通道复制给指标通道。

应用 FileChannel 写入文本文件

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description FileChannel 测试写入文件
 */
public class FileChannelTest {public static void main(String[] args) {try (final FileChannel channel = new FileOutputStream("data1.txt").getChannel()) {
            String msg = "Hello World!!!";
            final ByteBuffer buffer = ByteBuffer.allocate(16);
            buffer.put(msg.getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            channel.write(buffer);
        } catch (IOException e) {e.printStackTrace();
        }
    }

}

应用 FileChannel 读取文本文件

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description FileChannel 测试读取文件
 */
public class FileChannelTest {public static void main(String[] args) {try (final FileChannel channel = new FileInputStream("data1.txt").getChannel()) {final ByteBuffer buffer = ByteBuffer.allocate(16);
            channel.read(buffer);
            buffer.flip();
            while (buffer.hasRemaining()) {System.out.print((char) buffer.get());
            }
            // 清空缓冲区, 并重置为写模式
            buffer.clear();} catch (IOException e) {e.printStackTrace();
        }
    }

}

应用 FileChannel 进行数据传输

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description FileChannel 测试文件传输
 */
public class FileChannelTest {public static void main(String[] args){try (final FileChannel from = new FileInputStream("data.txt").getChannel();
             final FileChannel to = new FileOutputStream("data1.txt").getChannel()) {
            // 参数:inputChannel 的起始地位,传输数据的大小,目标 channel
            // 返回值为传输的数据的字节数
            // transferTo 一次只能传输 2G 的数据
            from.transferTo(0, from.size(), to);
        } catch (IOException e) {e.printStackTrace();
        }
    }

}

transferTo()办法对应的还有 transferFrom()办法。

尽管 transferTo()办法传输效率较高,底层利用操作系统的零拷贝进行优化,然而 transferTo 办法一次只能传输 2G 的数据。

解决办法:能够依据 transferTo()的返回值来判断,返回值代表传输了多少,通过 from 的 size()大小来每次减去即可。

long size = from.size();
for (long left = size; left > 0;) {left -= from.transferTo(size - left, size, to);
}

Channel 和 Buffer 的注意事项

  1. ByteBuffer 反对 类型化 的 put 和 get,put 放入什么数据类型,get 就应该应用相应的数据类型来取出,否则可能会产生 ByteUnderflowException 异样。
  2. 能够将一个一般的 Buffer 转换为只读的 Buffer:asReadOnlyBuffer()办法
  3. NIO 提供了 MapperByteBuffer,能够让文件间接在 内存(堆外内存)中进行批改,而如何同步到文件由 NIO 来实现。
  4. NIO 还反对通过多个 Buffer(即 Buffer 数组)实现读写操作,即Scattering(扩散)和 Gathering(汇集)

    • Scattering(扩散):在向缓冲区写入数据时,能够应用 Buffer 数组顺次写入,一个 Buffer 数组写满后,持续写入下一个 Buffer 数组。
    • Gathering(汇集):从缓冲区读取数据时,能够顺次读取,读完一个 Buffer 再按程序读取下一个。

网络编程

阻塞 vs 非阻塞

阻塞

  • 在没有数据可读时,包含数据复制过程中,线程必须阻塞期待,不会占用 CPU,但线程相当于闲置状态
  • 32 位 JVM 一个线程 320k,64 位 JVM 一个线程 1024k,为了缩小线程数量,须要采纳线程池技术
  • 但即便应用线程池,如果有很多连贯建设,但长时间 inactive,会阻塞线程池中所有线程

非阻塞

  • 在某个 Channel 没有可读事件时,线程不用阻塞,它能够去解决其它有可读事件的 Channel
  • 数据复制过程中,线程理论还是阻塞的(AIO 改良的中央)
  • 写数据时,线程只是期待数据写入 Channel 即可,无需期待 Channel 通过网络把数据发送进来

阻塞案例代码

服务端代码:

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description 应用 NIO 来了解阻塞模式 - 服务端
 */
public class Server {public static void main(String[] args) {
        //1. 创立服务器
        try (ServerSocketChannel ssc = ServerSocketChannel.open()) {final ByteBuffer buffer = ByteBuffer.allocate(16);
            //2. 绑定监听端口
            ssc.bind(new InetSocketAddress(7777));
            //3. 寄存建设连贯的汇合
            List<SocketChannel> channels = new ArrayList<>();
            while (true) {System.out.println("建设连贯...");
                //4. accept 建设客户端连贯 , 用来和客户端之间通信
                final SocketChannel socketChannel = ssc.accept();
                System.out.println("建设连贯实现...");
                channels.add(socketChannel);
                //5. 接管客户端发送的数据
                for (SocketChannel channel : channels) {System.out.println("正在读取数据...");
                    channel.read(buffer);
                    buffer.flip();
                    ByteBufferUtil.debugRead(buffer);
                    buffer.clear();
                    System.out.println("数据读取实现...");
                }
            }
        } catch (IOException e) {System.out.println("出现异常...");
        }
    }

}

客户端代码:

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description 应用 NIO 来了解阻塞模式 - 客户端
 */
public class Client {public static void main(String[] args) {try (SocketChannel socketChannel = SocketChannel.open()) {
            // 建设连贯
            socketChannel.connect(new InetSocketAddress("localhost", 7777));
            final ByteBuffer buffer = ByteBuffer.allocate(10);
            buffer.put("hello".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            socketChannel.write(buffer);
        } catch (IOException e) {e.printStackTrace();
        }
    }

}

运行后果:

  • 在刚开始服务器运行后:服务器端因 accept 阻塞。

  • 在客户端和服务器建设连贯后,客户端发送音讯前:服务器端因通道为空被阻塞。

  • 客户端发送数据后,服务器解决通道中的数据。之后再次进入循环时,再次被 accept 阻塞。

  • 之前的客户端再次发送音讯,服务器端因为被 accept 阻塞,就无奈解决之前客户端再次发送到通道中的信息了。

非阻塞

  • 通过 ServerSocketChannel 的 configureBlocking(false) 办法将 取得连贯设置为非阻塞的。此时若没有连贯,accept 会返回 null
  • 通过 SocketChannel 的 configureBlocking(false) 办法将从通道中 读取数据设置为非阻塞的。若此时通道中没有数据可读,read 会返回 -1
/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description 应用 NIO 来了解阻塞模式 - 服务端
 */
public class Server {public static void main(String[] args) {
        //1. 创立服务器
        try (ServerSocketChannel ssc = ServerSocketChannel.open()) {final ByteBuffer buffer = ByteBuffer.allocate(16);
            //2. 绑定监听端口
            ssc.bind(new InetSocketAddress(7777));
            //3. 寄存建设连贯的汇合
            List<SocketChannel> channels = new ArrayList<>();
            // 设置非阻塞!!
            ssc.configureBlocking(false);
            while (true) {System.out.println("建设连贯...");
                //4. accept 建设客户端连贯 , 用来和客户端之间通信
                final SocketChannel socketChannel = ssc.accept();
                // 设置非阻塞!!
                socketChannel.configureBlocking(false);
                System.out.println("建设连贯实现...");
                channels.add(socketChannel);
                //5. 接管客户端发送的数据
                for (SocketChannel channel : channels) {System.out.println("正在读取数据...");
                    channel.read(buffer);
                    buffer.flip();
                    ByteBufferUtil.debugRead(buffer);
                    buffer.clear();
                    System.out.println("数据读取实现...");
                }
            }
        } catch (IOException e) {System.out.println("出现异常...");
        }
    }

}

因为设置为了非阻塞,会始终执行 while(true) 中的代码,CPU 始终处于繁忙状态,会使得性能变低,所以理论状况中不应用这种办法解决申请。

Selector

根本介绍

  1. Java 的 NIO 应用了非阻塞的 I/O 形式。能够用一个线程解决若干个客户端连贯,就会应用到 Selector(选择器)。
  2. Selector 可能检测到多个注册通道上是否有事件产生(多个 Channel 以事件的模式注册到同一个 selector),如果有事件产生,便获取事件而后针对每个事件进行相应的解决。
  3. 只有在连贯真正有读写事件产生时,才会进行读写,缩小了零碎开销,并且不用为每个连贯都创立一个线程,不必保护多个线程。
  4. 防止了多线程之间上下文切换导致的开销。

特点

单线程能够配合 Selector 实现对多个 Channel 可读写事件的监控,这称为 多路复用

  • 多路复用仅针对网络 IO,一般文件 IO 无奈利用多路复用
  • 如果不必 Selector 的非阻塞模式,线程大部分工夫都在做无用功,而 Selector 可能保障

    • 有可连贯事件时才去连贯
    • 有可读事件才去读取
    • 有可写事件才去写入

限于网络传输能力,Channel 未必随时可写,一旦 Channel 可写,会触发 Selector 的可写事件进行写入。

Selector 相干办法阐明

  • selector.select():// 若未监听到注册管道中有事件,则继续阻塞
  • selector.select(1000):// 阻塞 1000 毫秒,1000 毫秒后返回
  • selector.wakeup():// 唤醒 selector
  • selector.selectNow():// 不阻塞,立刻返回

NIO 非阻塞网络编程过程剖析

  1. 当客户端连贯时,会通过 SeverSocketChannel 失去对应的 SocketChannel。
  2. Selector 进行监听,调用 select()办法,返回注册该 Selector 的所有通道中 有事件产生 的通道个数。
  3. 将 SocketChannel 注册到 Selector 上,public final SelectionKey register(Selector sel, int ops),一个 Selector 上能够注册 多个 SocketChannel。
  4. 注册后返回一个 SelectionKey,会和该 Selector 关联 (以 汇合 的模式)。
  5. 进一步失去各个 SelectionKey,有事件产生。
  6. 再通过 SelectionKey 反向获取 SocketChannel,应用 channnel()办法。
  7. 能够通过失去的 channel,实现业务解决。

SelectionKey 中定义了四个操作标记位:OP_READ示意通道中产生读事件;OP_WRITE—示意通道中产生写事件;OP_CONNECT—示意建设连贯;OP_ACCEPT—申请新连贯。

SelectionKey 的相干办法

办法 形容
public abstract Selector selector(); 失去与之关联的 Selector 对象
public abstract SelectableChannel channel(); 失去与之关联的通道
public final Object attachment() 失去与之关联的共享数据
public abstract SelectionKey interestOps(int ops); 设置或扭转监听的事件类型
public final boolean isReadable(); 通道是否可读
public final boolean isWritable(); 通道是否可写
public final boolean isAcceptable(); 是否能够建设连贯 ACCEPT

Selector 根本应用及 Accpet 事件

接下来咱们应用 Selector 实现多路复用,对服务端代码进行改良。

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description Selector 根本应用 - 服务端
 */
public class Server {public static void main(String[] args) {try (ServerSocketChannel ssc = ServerSocketChannel.open();
             final Selector selector = Selector.open()) {// 创立 selector 治理多个 channel
            ssc.bind(new InetSocketAddress(7777));
            ssc.configureBlocking(false);
            // 将通道注册到选择器中,并设置感兴趣的事件
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            ByteBuffer buffer = ByteBuffer.allocate(16);
            while (true) {
                // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了 CPU 空转
                // 返回值为就绪的事件个数
                int ready = selector.select();
                System.out.println("selector 就绪总数:" + ready);
                // 获取所有事件
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {final SelectionKey key = iterator.next();
                    // 判断 key 的事件类型
                    if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        final SocketChannel socketChannel = serverSocketChannel.accept();
                        System.out.println("获取到客户端连贯...");
                    }
                    // 处理完毕后移除
                    iterator.remove();}
            }
        } catch (IOException e) {System.out.println("出现异常...");
        }
    }
}

事件产生后,要么解决,要么应用 key.cancel()办法勾销 ,不能什么都不做, 否则下次该事件仍会触发,这是因为 nio 底层应用的是程度触发。

选择器中的通道对应的事件产生后,SelectionKey 会被放到另一个汇合中,然而selecionKey 不会主动移除,所以须要咱们在解决完一个事件后,通过迭代器手动移除其中的 selecionKey。否则会导致已被解决过的事件再次被解决,就会引发谬误。

Read 事件

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description Read 事件 - 服务端
 */
public class Server {public static void main(String[] args) {try (ServerSocketChannel ssc = ServerSocketChannel.open();
             final Selector selector = Selector.open()) {// 创立 selector 治理多个 channel
            ssc.bind(new InetSocketAddress(7777));
            ssc.configureBlocking(false);
            // 将通道注册到选择器中,并设置感兴趣的事件
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            ByteBuffer buffer = ByteBuffer.allocate(16);
            while (true) {
                // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了 CPU 空转
                // 返回值为就绪的事件个数
                int ready = selector.select();
                System.out.println("selector 就绪总数:" + ready);
                // 获取所有事件
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {final SelectionKey key = iterator.next();
                    // 判断 key 的事件类型
                    if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        final SocketChannel socketChannel = serverSocketChannel.accept();
                        System.out.println("获取到客户端连贯...");
                        // 设置为非阻塞模式,同时将连贯的通道也注册到抉择其中
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) { // 读事件
                        SocketChannel channel = (SocketChannel) key.channel();
                        channel.read(buffer);
                        buffer.flip();
                        ByteBufferUtil.debugRead(buffer);
                        buffer.clear();}
                    // 处理完毕后移除
                    iterator.remove();}
            }
        } catch (IOException e) {System.out.println("出现异常...");
        }
    }
}

断开解决

当客户端与服务器之间的连贯 断开时,会给服务器端发送一个读事件 ,对 异样断开 失常断开 须要不同的形式进行解决:

  • 失常断开

    • 失常断开时,服务器端的 channel.read(buffer)办法的返回值为 -1,所以当完结到返回值为 -1 时,须要调用 key 的 cancel()办法勾销此事件,并在勾销后移除该事件
  • 异样断开

    • 异样断开时,会抛出 IOException 异样,在 try-catch 的 catch 块中捕捉异样并调用 key 的 cancel() 办法即可

音讯边界

⚠️ 不解决音讯边界存在的问题

将缓冲区的大小设置为 4 个字节,发送 2 个汉字(你好),通过 decode 解码并打印时,会呈现乱码

ByteBuffer buffer = ByteBuffer.allocate(4);
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));
你�
��

这是因为 UTF-8 字符集下,1 个汉字占用 3 个字节,此时缓冲区大小为 4 个字节,一次读工夫无奈解决完通道中的所有数据,所以一共会触发两次读事件 。这就导致 你好 字被拆分为了前半部分和后半局部发送,解码时就会呈现问题。

💡 解决音讯边界

传输的文本可能有以下三种状况:

  • 文本大于缓冲区大小,此时须要将缓冲区进行扩容
  • 产生半包景象
  • 产生粘包景象

解决方案:

  • 固定音讯长度,数据包大小一样,服务器按预约长度读取,当发送的数据较少时,须要将数据进行填充,直到长度与音讯规定长度统一。毛病是节约带宽
  • 另一种思路是按分隔符拆分,毛病是效率低,须要一个一个字符地去匹配分隔符
  • TLV 格局,即 Type 类型、Length 长度、Value 数据 (也就是在音讯结尾 用一些空间寄存前面数据的长度),如 HTTP 申请头中的 Content-Type 与Content-Length。类型和长度已知的状况下,就能够不便获取音讯大小,调配适合的 buffer,毛病是 buffer 须要提前调配,如果内容过大,则影响 server 吞吐量

上面演示第二种解决方案,按 分隔符拆分

咱们须要在 Accept 事件产生后,将通道注册到 Selector 中时,对每个通道增加一个 ByteBuffer 附件,让每个通道产生读事件时都应用本人的通道,防止与其余通道发生冲突而导致问题。

ByteBuffer buffer = ByteBuffer.allocate(16);
// 增加通道对应的 Buffer 附件
socketChannel.register(selector, SelectionKey.OP_READ, buffer);

当 Channel 中的数据大于缓冲区时,须要对缓冲区进行 扩容 操作。此代码中的扩容的断定办法:Channel 调用 compact 办法后,的 position 与 limit 相等,阐明缓冲区中的数据并未被读取(容量太小),此时创立新的缓冲区,其大小扩充为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用 SelectionKey 的 attach 办法将新的缓冲区作为新的附件放入 SelectionKey 中

// 如果缓冲区太小,就进行扩容
if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
  // 将旧 buffer 中的内容放入新的 buffer 中
  buffer.flip();
  newBuffer.put(buffer);
  // 将新 buffer 放到 key 中作为附件
  key.attach(newBuffer);
}

改良后的代码如下

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description Read 事件完整版 - 服务端
 */
public class Server {public static void main(String[] args) {try (ServerSocketChannel ssc = ServerSocketChannel.open();
             final Selector selector = Selector.open()) {// 创立 selector 治理多个 channel
            ssc.bind(new InetSocketAddress(7777));
            ssc.configureBlocking(false);
            // 将通道注册到选择器中,并设置感兴趣的事件
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而防止了 CPU 空转
                // 返回值为就绪的事件个数
                int ready = selector.select();
                System.out.println("selector 就绪总数:" + ready);
                // 获取所有事件
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {final SelectionKey key = iterator.next();
                    // 判断 key 的事件类型
                    if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        final SocketChannel socketChannel = serverSocketChannel.accept();
                        System.out.println("获取到客户端连贯...");
                        socketChannel.configureBlocking(false);
                        ByteBuffer byteBuffer = ByteBuffer.allocate(16);
                        // 注册到 Selector 并且设置读事件, 设置附件 bytebuffer
                        socketChannel.register(selector, SelectionKey.OP_READ, byteBuffer);
                    } else if (key.isReadable()) { // 读事件
                        try {SocketChannel channel = (SocketChannel) key.channel();
                            // 通过 key 取得附件
                            ByteBuffer buffer = (ByteBuffer) key.attachment();
                            int read = channel.read(buffer);
                            if (read == -1) {key.cancel();
                                channel.close();} else {
                                // 通过分隔符来分隔 buffer 中的数据
                                split(buffer);
                                // 如果缓冲区太小,就进行扩容
                                if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                    // 将旧 buffer 中的内容放入新的 buffer 中
                                    buffer.flip();
                                    newBuffer.put(buffer);
                                    // 将新 buffer 放到 key 中作为附件
                                    key.attach(newBuffer);
                                }
                            }
                        } catch (IOException e) {
                            // 异样断开, 勾销事件
                            key.cancel();}
                    }
                    // 处理完毕后移除
                    iterator.remove();}
            }
        } catch (IOException e) {System.out.println("出现异常...");
        }
    }

    private static void split(ByteBuffer buffer) {buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            // 找到一条实现数据
            if (buffer.get(i) == '\n') {
                // 缓冲区长度
                int length = i + 1 - buffer.position();
                ByteBuffer target = ByteBuffer.allocate(length);
                // 将后面的内容写入 target 缓冲区
                for (int j = 0; j < length; j++) {
                    // 将 buffer 中的数据写入 target 中
                    target.put(buffer.get());
                }
                ByteBufferUtil.debugAll(target);
            }
        }
        // 切换为写模式,然而缓冲区可能未读完,这里须要应用 compact
        buffer.compact();}
}
/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description Read 事件完整版 - 客户端
 */
public class Client {public static void main(String[] args) {try (SocketChannel socketChannel = SocketChannel.open()) {
            // 建设连贯
            socketChannel.connect(new InetSocketAddress("localhost", 7777));
            final ByteBuffer buffer = ByteBuffer.allocate(32);
            buffer.put("01234567890abcdef3333\n".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            socketChannel.write(buffer);
        } catch (IOException e) {e.printStackTrace();
        }
    }
}

ByteBuffer 的大小调配

  • 每个 channel 都须要记录可能被切分的音讯,因为 ByteBuffer 不能被多个 channel 独特应用,因而须要为每个 channel 保护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比方一个 ByteBuffer 1Mb 的话,要反对百万连贯就要 1Tb 内存,因而须要设计大小可变的 ByteBuffer
  • 调配思路:

    • 一种思路是首先调配一个较小的 buffer,例如 4k,如果发现数据不够,再调配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,长处是音讯间断容易解决,毛病是数据拷贝消耗性能
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多进去的内容写入新的数组,与后面的区别是音讯存储不间断解析简单,长处是防止了拷贝引起的性能损耗

Write 事件

服务器通过 Buffer 通道中写入数据时,可能因为通道容量小于 Buffer 中的数据大小,导致无奈一次性将 Buffer 中的数据全副写入到 Channel 中,这时便须要分屡次写入,具体步骤如下:

  1. 执行一次写操作,向将 buffer 中的内容写入到 SocketChannel 中,而后判断 Buffer 中是否还有数据
  2. 若 Buffer 中还有数据,则 须要将 SockerChannel 注册到 Seletor 中,并关注写事件,同时将未写完的 Buffer 作为附件一起放入到 SelectionKey 中。
/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description Write 事件 - 服务端
 */
public class Server {public static void main(String[] args) {try (ServerSocketChannel ssc = ServerSocketChannel.open();
             final Selector selector = Selector.open()) {ssc.bind(new InetSocketAddress(7777));
            ssc.configureBlocking(false);
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {int ready = selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {final SelectionKey key = iterator.next();
                    // 判断 key 的事件类型
                    if (key.isAcceptable()) {final SocketChannel socketChannel = ssc.accept();
                        socketChannel.configureBlocking(false);
                        StringBuilder sb = new StringBuilder();
                        for (int i = 0; i < 3000000; i++) {sb.append("a");
                        }
                        final ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                        final int write = socketChannel.write(buffer);
                        System.out.println("accept 事件器写入.."+write);
                        // 判断是否还有残余内容
                        if (buffer.hasRemaining()) {
                            // 注册到 Selector 中,关注可写事件,并将 buffer 增加到 key 的附件中
                            socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);
                        }
                    }else if (key.isWritable()) {SocketChannel socket = (SocketChannel) key.channel();
                        // 取得事件
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        int write = socket.write(buffer);
                        System.out.println("write 事件器写入.."+write);
                        // 如果曾经实现了写操作,须要移除 key 中的附件,同时不再对写事件感兴趣
                        if (!buffer.hasRemaining()) {key.attach(null);
                            key.interestOps(0);
                        }
                    }
                    // 处理完毕后移除
                    iterator.remove();}
            }
        } catch (IOException e) {System.out.println("出现异常...");
        }
    }
}
/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2021/12/29
 * @Description Write 事件 - 客户端
 */
public class Client {public static void main(String[] args) {try (SocketChannel socketChannel = SocketChannel.open()) {
            // 建设连贯
            socketChannel.connect(new InetSocketAddress("localhost", 7777));
            int count = 0;
            while (true) {final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                count += socketChannel.read(buffer);
                System.out.println("客户端承受了.."+count);
                buffer.clear();}
        } catch (IOException e) {e.printStackTrace();
        }
    }
}

运行后果:

正文完
 0