前言
本篇博文是《从 0 到 1 学习 Netty》中 NIO 系列的第四篇博文,次要内容是 介绍如何解决音讯边界以及通过可写事件解决写入内容过多的问题,往期系列文章请拜访博主的 Netty 专栏,博文中的所有代码全副收集在博主的 GitHub 仓库中;
音讯边界
将缓冲区的大小设置为 4 个字节,发送以下音讯时:
sc.write(Charset.defaultCharset().encode("你好,sidiot!"));
运行后果:
你�
���
�sid
iot�
��
这是因为 UTF-8 字符集下,1 个汉字占用 3 个字节,此时缓冲区大小为 4 个字节,一次读工夫无奈解决完通道中的所有数据,所以会触发屡次读事件。这导致其余几个中文字符被拆分开来发送,因而解码时就会呈现如上问题。
个别的解决思路有以下三种:
- 固定音讯长度,数据包大小一样,服务器按预约长度读取,当发送的数据较少时,须要将数据进行填充,直到长度与音讯规定长度统一,毛病是节约带宽;
- 按分隔符拆分,毛病是效率低,须要一个一个字符地去匹配分隔符;
-
TLV 格局,即 Type 类型、Length 长度、Value 数据,也就是在音讯结尾用一些空间寄存前面数据的长度,如 HTTP 申请头中的 Content-Type 与 Content-Length。类型和长度已知的状况下,就能够不便获取音讯大小,调配适合的 buffer,毛病是 buffer 须要提前调配,如果内容过大,则影响 server 的吞吐量;
- Http 1.1 是 TLV 格局;
- Http 2.0 是 LTV 格局;
接下来通过 按分隔符拆分 的形式来解决音讯边界问题;
先编写一个 split()
函数,用于解决将 buffer 的内容按分隔符进行拆分,代码如下:
private static void split(ByteBuffer buffer) {buffer.flip();
for(int i=0; i<buffer.limit(); i++) {if (buffer.get(i) == '\n') {int length = i + 1 - buffer.position();
ByteBuffer target = ByteBuffer.allocate(length);
for(int j=0; j<length; j++) {target.put(buffer.get());
}
debugAll(target);
}
}
buffer.compact();}
而后再看到 ByteBuffer 类,尽管 ByteBuffer 是线程平安的,然而它并不是设计用于多线程并发拜访,如果多个线程同时拜访同一个 ByteBuffer 对象,那么可能会呈现数据竞争和一致性问题,因而,咱们须要确保每个 Channel 都有本人的 ByteBuffer 对象,来防止共享;
这时就要看到 register
函数:
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
这第三个参数 att
示意附件的意思,即能够向其中放入一个 Object 类型的对象,该对象会与注销的 Channel 以及其对应的 SelectionKey 绑定,能够从 SelectionKey 获取到对应 Channel 的附件,咱们能够将 buffer 放入其中:
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.register(selector, SelectionKey.OP_READ, buffer);
之后能够通过 SelectionKey 的 attachment()
办法取得附件:
ByteBuffer buf = (ByteBuffer) key.attachment();
此外,还要留神 buffer 的大小,如果发送内容的大小要大于 buffer 的大小,则会呈现音讯失落的状况,比方要发送 "Hello, World! --sid10t.\n"
,因为 buffer 为 16,最初接管到的只有 Hello, World! --
,然而因为采纳了 按分隔符拆分,控制台不会输入任何字符;
因而,须要对 buffer 进行动静扩容,代码如下:
if (buf.position() == buf.limit()) {ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() * 2);
buf.flip();
newBuf.put(buf);
key.attach(newBuf);
}
上述代码是思考到 split()
函数中应用的是 compact()
办法,因而当 position
与 limit
相等时,阐明缓冲区中的数据并未被读取(容量太小),此时创立新的缓冲区,其大小扩充至原先的两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,并调用 SelectionKey 的 attach()
办法,将新的缓冲区作为新的附件放入 SelectionKey 中;
整体代码如下所示:
@Slf4j
public class MSGBoundary {public static void main(String[] args) {
try {Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
log.debug("Register Key: {}", sscKey);
ssc.bind(new InetSocketAddress(7999));
while (true) {selector.select();
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> iter = keySet.iterator();
// log.debug("count: {}", keySet.size());
while (iter.hasNext()) {SelectionKey key = iter.next();
log.debug("Selection Key: {}", key);
if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
sc.register(selector, SelectionKey.OP_READ, buffer);
log.debug("sc Key: {}", sc);
iter.remove();} else if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
try {int read = channel.read(buf);
log.debug("read: {}", read);
if (read <= 0) {key.cancel();
channel.close();} else {split(buf);
if (buf.position() == buf.limit()) {ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() * 2);
buf.flip();
newBuf.put(buf);
key.attach(newBuf);
}
}
} catch (IOException e) {e.printStackTrace();
key.cancel();} finally {iter.remove();
}
}
}
}
} catch (IOException e) {e.printStackTrace();
}
}
private static void split(ByteBuffer buffer) {buffer.flip();
for(int i=0; i<buffer.limit(); i++) {if (buffer.get(i) == '\n') {int length = i + 1 - buffer.position();
ByteBuffer target = ByteBuffer.allocate(length);
for(int j=0; j<length; j++) {target.put(buffer.get());
}
debugAll(target);
}
}
buffer.compact();}
}
运行后果:
22:18:16 [DEBUG] [main] c.s.n.c.MSGBoundary - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=0
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - sc Key: java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604]
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=1
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - read: 16
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=1
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - read: 8
+--------+-------------------- all ------------------------+----------------+
position: [24], limit: [24]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 2c 20 57 6f 72 6c 64 21 20 2d 2d |Hello, World! --|
|00000010| 73 69 64 31 30 74 2e 0a |sid10t.. |
+--------+-------------------------------------------------+----------------+
这里还须要思考一个问题,就是 Bytebuffer 的大小,ByteBuffer 不能太大,比方一个 ByteBuffer 的大小为 1MB 的话,如果要反对百万连贯就要 1TB 内存,因而须要设计大小可变的 ByteBuffer:
- 一种思路是首先调配一个较小的 buffer,例如 4k,如果发现数据不够,再调配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,长处是音讯间断容易解决,毛病是数据拷贝消耗性能;
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多进去的内容写入新的数组,与后面的区别是音讯存储不间断解析简单,长处是防止了拷贝引起的性能损耗;
可写事件
服务器通过 Buffer 向通道中写入数据时,可能会遇到通道容量小于 Buffer 中的数据大小,导致无奈一次性将 Buffer 中的数据全副写入到 Channel 中,这时便须要分屡次写入,通过 hasRemaining()
办法来判断 Buffer 中是否还有数据,代码如下:
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 5000000; i++) {sb.append("sidiot");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
while (buffer.hasRemaining()) {int write = sc.write(buffer);
System.out.println(write);
}
客户端通过循环来接收数据,代码如下:
int cnt = 0;
while (true) {ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
cnt += sc.read(buffer);
System.out.println(cnt);
buffer.clear();}
运行后果:
# WriteServer
4718556
3014633
4063201
0
4718556
0
2490349
0
0
2621420
0
2621420
0
2621420
0
2621420
...
509025
上述后果呈现 0 是因为缓冲区还没生产完,无奈进行写入,这样子会导致滞留在此,性能低下;
接下来,优化一下代码,通过 Selector 进行解决,提高效率:
while (iter.hasNext()) {SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
// 1. 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 5000000; i++) {sb.append("sidiot");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 2. 返回值代表理论写入的字节数
int write = sc.write(buffer);
System.out.println(write);
// 3. 判断是否有残余内存
if (buffer.hasRemaining()) {
// 4. 关注可写事件 1+4
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// 5. 将未写完的数据挂到 scKey 上
scKey.attach(buffer);
}
} else if (key.isWritable()) {ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);
// 6. 清理操作
if (!buffer.hasRemaining()) {key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 不在关注写事件
}
}
}
留神,这里须要应用组合 事件类型,即 scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
,如果只有 SelectionKey.OP_WRITE
意味着写事件笼罩原先的读事件;
运行后果:
# WriteServer
2097136
2621420
3014633
3801059
6815692
393213
2228207
393213
2228207
5242840
1164380
# WriteClient
131071
262142
393213
524284
...
29622046
29753117
29884188
30000000
后记
以上就是 音讯边界与可写事件 的所有内容了,心愿本篇博文对大家有所帮忙!
参考:
- Netty API reference;
- 黑马程序员 Netty 全套教程;
📝 上篇精讲:「NIO」(三)分析 Selector
💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;
👍 创作不易,请多多反对;
🔥 系列专栏:摸索 Netty:源码解析与利用案例分享