NIO—NonBlocking IO(new IO)
- io 面向流编程,只能作为输入或者输出流的一种,是同步阻塞的,每一个连接过来都要创建一个线程去处理,线程上下文切换开销很大,造成了很大的瓶颈
- 于是有了线程池实现的伪阻塞 IO,一定程度解决了线程创建过多的问题,但是没有从根本上解决阻塞的问题,并且线程过多而线程池过小时也会造成很大的瓶颈
- 既然根本瓶颈原因是线程数和阻塞 IO,那么我们有没有办法只用 1 个线程去处理多个客户端连接呢?这就是 NIO 出现的原因
NIO 主要有 三个核心部分组成:
- buffer 缓冲区
- Channel 管道
- Selector 选择器
nio 面向 block 块,buffer 缓冲区编程,底层是数组,buffer 提供数据访问,channel 读写到 buffer,buffer 读写到 channel,从 buffer 读取到程序 channel 是双向的
理解 NIO 需要理解事件编程模型
NIO 核心:
NIO 由原来的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的 I / O 操作都是纯 CPU 操作,没有必要开启多线程。
单线程处理 I / O 的效率确实非常高,没有线程切换,只是拼命的读、写、选择事件。
NIO 带个我们:
- 事件驱动模型—异步编程都离不开事件
- 单线程处理多连接—多路复用使得处理更加高效
- 非阻塞 IO,只阻塞获取可操作事件
- 基于 block 传输比基于流传输更加高效
- 零拷贝—DirectBuffer
缺点:
NIO 并没有完全屏蔽平台差异,它仍然是基于各个操作系统的 I / O 系统实现的,差异仍然存在。使用 NIO 做网络编程构建事件驱动模型并不容易,陷阱重重。
推荐使用 NIO 成熟框架 Netty
Buffer
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。
Capacity、Position、Limit
0 <=
mark <=
position <=
limit <=
capacity
- capacity
作为一个内存块,Buffer 有一个固定的大小值,也叫“capacity”. 你只能往里写 capacity 个 byte、long,char 等类型。一旦 Buffer 满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。
- position
当你写数据到 Buffer 中时,position 表示当前的位置。初始的 position 值为 0. 当一个 byte、long 等数据写到 Buffer 后,position 会向前移动到下一个可插入数据的 Buffer 单元。position 最大可为 capacity – 1.
当读取数据时,也是从某个特定位置读。当将 Buffer 从写模式切换到读模式,position 会被重置为 0. 当从 Buffer 的 position 处读取数据时,position 向前移动到下一个可读的位置。
- limit
在写模式下,Buffer 的 limit 表示你最多能往 Buffer 里写多少数据。写模式下,limit 等于 Buffer 的 capacity。
当切换 Buffer 到读模式时,limit 表示你最多能读到多少数据。因此,当切换 Buffer 到读模式时,limit 会被设置成写模式下的 position 值。换句话说,你能读到之前写入的所有数据(limit 被设置成已写数据的数量,这个值在写模式下就是 position)
同一个 buffer 可以存储不同数据类型的数据,但是获取的时候要指定类型获取
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.putInt(1);
buffer.putLong(387524875628742L);
buffer.putChar('s');
buffer.flip();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());
put 方法只能放入 byte 型,不能放入 int
flip、clear、rewind、mark
- flip
flip 方法将 Buffer 从写模式切换到读模式。调用 flip()方法会将 position 设回 0,并将 limit 设置成之前 position 的值。
public final Buffer flip() {
this.limit = this.position;
this.position = 0;
this.mark = -1;
return this;
}
- clear
position 将被设回 0,limit 被设置成 capacity 的值。换句话说,Buffer 被清空了。Buffer 中的数据并未清除,只是这些标记告诉我们可以从哪里开始往 Buffer 里写数据。
public final Buffer clear() {
this.position = 0;
this.limit = this.capacity;
this.mark = -1;
return this;
}
- rewind
Buffer.rewind()将 position 设回 0,所以你可以重读 Buffer 中的所有数据。limit 保持不变,仍然表示能从 Buffer 中读取多少个元素
public final Buffer rewind() {
this.position = 0;
this.mark = -1;
return this;
}
- mark
可以标记 Buffer 中的一个特定 position。之后可以通过调用 Buffer.reset()方法恢复到这个 position。
public final Buffer mark() {
this.mark = this.position;
return this;
}
- slice 分片
将 buffer 根据设置的 position 和 limit 分片一个 buffer,有自己的 position、limit 和 capacity,数据共用一个内存地址的 buffer 数据
public static void test2(){ByteBuffer buffer = ByteBuffer.allocate(1024);
for(int i=0;i<buffer.capacity();i++){buffer.put((byte)i);
}
buffer.position(10);
buffer.limit(20);
ByteBuffer buffer1 = buffer.slice();//buffer 分片
for(int m=0;m<buffer1.capacity();m++){byte b = buffer1.get();
System.out.print(b+" ");
}
}
输出:10 11 12 13 14 15 16 17 18 19
ReadOnlyBuffer
普通的 Buffer(可读可写)可以随时转换为只读 Buffer,但是只读 Buffer 不可以转换为普通 Buffer
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
转换后的 Buffer 是一个新的只读 Buffer,拥有独立的 position、limit 和 capacity
DirectBuffer
堆外内存 buffer,本地 JNI 非 JVM 堆内存 buffer,允许直接访问
普通 ByteBuffer 由 JVM 管理,在 JVM 堆上分配内存
ByteBuffer buf = ByteBuffer.allocate(1024);
DirectBuffer 会在本地内存中分配,脱离 JVM 的堆管理
ByteBuffer buf = ByteBuffer.allocateDirect(1024);
为什么要这样做呢?
———- 又是 GC——–
我们都知道 JVM 在堆上的老年代中,GC 时会采取
标记 - 整理
策略,会使得对象在堆内存中的地址发生变化,整理时会 buffer 太大时会很难 gc 整理
所以出现了 DirectBuffer
,它使用unsafe.allocateMemory
分配内存,是一个 native 方法,由 buffer 的 address
变量记录这个内存的地址来提供访问
比较
- DirectBuffer:本地方法分配内存显然没有 JVM 堆分配快,但是涉及
IO
和网络 IO
的话就是 DirectBuffer 比较快了
DirectByteBuffer 继承了 MappedByteBuffer
缓存的使用可以使用 DirectByteBuffer 和 HeapByteBuffer。如果使用了 DirectByteBuffer,一般来说可以减少一次系统空间到用户空间的拷贝。
数据量比较小的中小应用情况下,可以考虑使用 heapBuffer;反之可以用 directBuffer
MappedByteBuffer
映射到堆外内存的 ByteBuffer,DirectByteBuffer 继承此类实现堆外内存的分配
通过下面方式映射 buffer 到堆外内存
MappedByteBuffer mappedByteBuffer = channel.map(MapMode.READ_WRITE, 0, channel.size());
使用拷贝文件:
RandomAccessFile in = new RandomAccessFile("nio/1.txt", "rw");
RandomAccessFile out = new RandomAccessFile("nio/2.txt", "rw");
FileChannel inChannel = in.getChannel();
FileChannel outChannel = out.getChannel();
MappedByteBuffer inputData = inChannel.map(FileChannel.MapMode.READ_ONLY,0,new File("nio/1.txt").length());
Charset charset = Charset.forName("utf-8");// 编码
CharsetDecoder decoder = charset.newDecoder();
CharsetEncoder encoder = charset.newEncoder();
CharBuffer charBuffer = decoder.decode(inputData);
ByteBuffer buffer = encoder.encode(charBuffer);
outChannel.write(buffer);
in.close();out.close();
Channel—通道
FileChannel
NIO 提供的一种连接到文件的通道,用于文件的读写
在使用 FileChannel 时,需要从 输入输出流或者 RandomAccessFile
中获取 FIleChannel
- 如果要向 FileChannel 中读取数据,需要申请一个 ByteBuffer,将数据从 FileChannel 中读取到缓冲区 ByteBuffer,
read()
返回多少个字节被读取,如果返回 - 1 说明文件已经到达末尾 - 如果要向 FileChannel 中写入数据,需要先将数据写入到 ByteBuffer 中,在从 ByteBuffer 中写入到 FileChannel 中,调用
write()
方法
注意读写之间需要 Buffer.flip();
例子:
1. 读取文件数据并打印
FileInputStream fileInputStream = new FileInputStream("1.log");
FileChannel channel = fileInputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);;
channel.read(byteBuffer);
byteBuffer.flip();
while(byteBuffer.remaining()>0){byte b = byteBuffer.get();
System.out.println((char) b);
}
fileInputStream.close();
2. 把 1.txt 数据写入 2.txt
FileInputStream inputStream = new FileInputStream("1.txt");
FileChannel in = inputStream.getChannel();
FileOutputStream outputStream = new FileOutputStream("2.txt");
FileChannel out = outputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while(true){byteBuffer.clear();// 没有的话会一直读取
int read = in.read(byteBuffer);
System.out.println("read:"+read);
if(read==-1){break;// 为 - 1 表示文件结束 返回}
byteBuffer.flip();
out.write(byteBuffer);
}
inputStream.close();
outputStream.close();
ServerSockerChannel
NIO 提供了一种可以监听新进入的 TCP 连接的通道,就是ServerSocketChannel
,对应 IO 中ServerSocket
- 打开监听通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
while(true){SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}
SocketChannel
NIO 提供的一种连接到 TCP 套接字的通道,就是SocketChannel
,对应 IO 中Socket
- 打开一个 SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
Channel 读写
- 读
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);
- 写
ByteBuffer writeBuffer = ByteBuffer.allocate(48);
String msg = "hello";
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
channel.write(writeBuffer);
- 读完写
ByteBuffer buffer = ByteBuffer.allocate(1024);
int byteRead = channel.read(buffer);
if(byteRead<=0){channel.close();
break;
}
buffer.flip();
channel.write(buffer);
read += byteRead;
buffer.clear();
每次写完 buffer,如果 buffer 数据不需要再使用,建议 clear 清空 buffer,准备下一次写操作
Selector—多路复用器(选择器)
多路复用器,这个名字很形象,使用一个线程去处理多个 channel,从而管理多个 channel
为什么要使用一个线程管理多个 channel?
线程上下文切换开销很大,线程越少处理 channel 更高效
创建 Selector—创建比赛
Selector selector = Selector.open();
注册 channel—购买入场卷
channel 通过注册到 selector 上来把 channel 的事件交给 Selector 管理,并且返回一个 SelectionKey
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
- 与 Selector 一起使用时,Channel 必须处于
非阻塞模式
下。这意味着不能将 FileChannel 与 Selector 一起使用,因为 FileChannel 不能切换到非阻塞模式
channel.configureBlocking(false);
- 通过 SelectionKey 获取 channel 和 selector 以及准备好的事件
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
Selector 执行选择—拿着入场卷入场
把 channel 注册到 Selector 后,我们可以使用 Selector.select();
方法获取准备就绪的通道,返回一个 int 型整数,表示准备好的 channel 数
通过 selector.selectedKeys();
方法获取准备就绪的 SelectionKey,再通过 SelectionKey 获取 channel 和 selector,一般使用迭代器遍历这些准备好的 channel
在每一次处理完一个 SelectionKey,必须把它从迭代器中删除,处理完,这个 SelectionKey 就没有用了,就像一个入场卷,你可以通过它进入赛场并且它上面有入场人和座位对应信息,比赛结束后你无法再通过它执行任何有效的操作。
- 看完比赛,举办者不会回收所有的票据,需要你们自己处理,不能乱丢在场地中,需要自己丢到垃圾桶中或者带回家
iterator.remove();
- wakeUp()方法
某个线程调用 select() 方法后阻塞了,即使没有通道已经就绪,也无法返回,wakeUp 方法使得立马返回。
Scatter、Gather
scatter / gather 经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的 buffer 中,这样你可以方便的处理消息头和消息体。
Scatter
分散(scatter)从 Channel 中读取是指在读操作时将读取的数据写入多个 buffer 中。因此,Channel 将从 Channel 中读取的数据“分散(scatter)”到多个 Buffer 中。
Gather
聚集(gather)写入 Channel 是指在写操作时将多个 buffer 的数据写入同一个 Channel,因此,Channel 将多个 Buffer 中的数据“聚集(gather)”后发送到 Channel。
例子:用三个长度分别为 3,4,5 的 buffer 存储输入的字符串,前 3 个字符存储在第一个 buffer,4- 7 字符存储在第二个 buffer,长度为 4,8-12 存储在第三个 buffer,长度为 5
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(8899);
serverSocketChannel.socket().bind(inetSocketAddress);
int messageLength = 3 + 4 + 5;
ByteBuffer[] byteBuffer = new ByteBuffer[3];
byteBuffer[0] = ByteBuffer.allocate(3);
byteBuffer[1] = ByteBuffer.allocate(4);
byteBuffer[2] = ByteBuffer.allocate(5);
SocketChannel socketChannel = serverSocketChannel.accept();
while (true){
int byteRead = 0;
while (byteRead<messageLength){long r = socketChannel.read(byteBuffer);
byteRead += r;
System.out.println("byteread:"+byteRead);
Arrays.stream(byteBuffer).map(o->"position:"+o.position()+",limit:"+o.limit()).forEach(System.out::println);
}
Arrays.stream(byteBuffer).forEach(Buffer::flip);
int byteWrite = 0;
while(byteWrite<messageLength){long r = socketChannel.write(byteBuffer);
byteWrite += r;
System.out.println("bytewrite:"+byteWrite);
Arrays.stream(byteBuffer).map(o->"position:"+o.position()+",limit:"+o.limit()).forEach(System.out::println);
}
Arrays.stream(byteBuffer).forEach(Buffer::clear);
}
测试:使用 linux nc localhost 8899 测试
输入:helloworld 回车
输出:byteread:11
position:3,limit:3
position:4,limit:4
position:4,limit:5
解释:回车算一个字符一共 11 个字符,前三个存储到第一个 buffer 了,存满了;中间四个存储到第二个 buffer,存满了;剩下多余的存储到第三个 buffer,没有存满
NIO 服务端客户端
这个程序演示使用 NIO 创建一个聊天室,服务端和多个客户端连接,客户端可以互发消息
- server 服务端
/**
* 可以直接使用 linux nc 命令当做客户端
* nc localhost 端口
*/
public class Server {private static Map<SocketChannel,String> clientMap = new HashMap<>();
public static void main(String[] args) throws IOException {
// 打开服务器 channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置非阻塞 即将使用 selector
serverSocketChannel.configureBlocking(false);
// 获取服务器的 socket
ServerSocket serverSocket = serverSocketChannel.socket();
// 绑定端口
serverSocket.bind(new InetSocketAddress(8089));
// 打开一个多路复用器,使用一条线程处理客户端 channel
Selector selector = Selector.open();
// 注册服务器 channel 到
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
// 阻塞获取 channel 事件
// 一旦调用了 select() 方法,并且返回值表明有一个或更多个通道就绪了
int num = selector.select();
/**
* 获取到后 拿到多路复用器的 SelectionKey 核心方法 channel 获取注册在起上的 channel
* SelectionKey 每次注册一个 channel 都会创建一个 SelectionKey 其中常量定义 channel 状态
**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 对其中每一个 SelectionKey 进行操作
selectionKeys.forEach(selectionKey->{
try {
// 如果该服务器 SelectionKey 被接收
if(selectionKey.isAcceptable()){
// 拿到服务器 channel
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel client = null;
// 拿到本次连接上服务器的客户端
client = server.accept();
client.configureBlocking(false);
// 把客户端注册到多路复用器,监听客户端的可读事件
client.register(selector,SelectionKey.OP_READ);
// 为每个客户端分配 id
String key = "["+ UUID.randomUUID()+"]";
clientMap.put(client,key);
// 如果 SelectionKey 读就绪,执行读操作
}else if(selectionKey.isReadable()){
// 拿到 channel
SocketChannel channel = (SocketChannel) selectionKey.channel();
// 创建读 buffer
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 读取 channel 中数据到读 buffer
int read = channel.read(readBuffer);
String reMsg = "";
// 如果有数据
if(read>0){
// 翻转进行写操作
readBuffer.flip();
// 制定解码集 utf-8,对读 buffer 解码打印
Charset charset = Charset.forName("utf-8");
reMsg = String.valueOf(charset.decode(readBuffer).array());
System.out.println(clientMap.get(channel)+"receive:"+reMsg);
}else if(read==-1) channel.close();// 如果客户端关闭就关闭客户端 channel
// 群发:发送数据到其他客户端 channel
for(SocketChannel ch:clientMap.keySet()){if(ch!=channel) {String key = clientMap.get(ch);
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(("来自"+key + ":" + reMsg).getBytes());
writeBuffer.flip();
ch.write(writeBuffer);
}
}
}
} catch (IOException e) {e.printStackTrace();
}
});
selectionKeys.clear();// 每次处理完一个 SelectionKey 的事件,把该 SelectionKey 删除}
}
}
- 客户端
public class Client {public static void main(String[] args) throws IOException {
// 打开客户端 channel
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞模式,可以配合 selector 使用
socketChannel.configureBlocking(false);
// 打开 selector
Selector selector = Selector.open();
// 注册客户端 channel 到多路复用器,监听连接事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 连接到指定地址
socketChannel.connect(new InetSocketAddress("localhost",8089));
while (true){
try{
// 执行 selector 方法,阻塞获取 channel 事件的触发
int num = selector.select();
// 获取注册到多路复用器上的 SelectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 通过迭代器遍历 SelectionKey
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();
// 如果 SelectionKey 触发的事件为连接就绪
if(selectionKey.isConnectable()){
// 拿到 SelectionKey 的客户端 channel
SocketChannel client = (SocketChannel) selectionKey.channel();
if(client.isConnectionPending()){
// 完成连接
client.finishConnect();
// 新建一个写 buffer
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
// 写入客户端连接成功消息
writeBuffer.put((client.toString()+": 连接成功!").getBytes());
// 翻转读写操作 执行写操作
writeBuffer.flip();
// 写入 buffer 数据刅客户端
client.write(writeBuffer);
// 开辟一个线程写,因为标准输入是阻塞的,当前线程不能阻塞写
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(()->{while (true){writeBuffer.clear();
InputStreamReader reader = new InputStreamReader(System.in);
BufferedReader br = new BufferedReader(reader);
String msg = br.readLine();
// 每次读入一行,写入数据到 buffer 并且写入客户端 channel
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
client.write(writeBuffer);
}
});
}
// 注册客户端可读事件到多路复用器
client.register(selector,SelectionKey.OP_READ);
// 如果多路复用器上的 SelectionKey 处于读就绪状态
}else if(selectionKey.isReadable()){
// 拿到 SelectionKey 触发相应事件对应的客户端 channel,执行读操作
SocketChannel client = (SocketChannel) selectionKey.channel();
// 创建一个新的读 buffer,ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 从准备好读操作的 channel 中读取数据
int count = client.read(readBuffer);
if (count>0){
// 转码并数据使用 String 保存且打印
String reMsg = new String(readBuffer.array(),0,count);
System.out.println(reMsg);
}else if(count==-1) client.close();// 关闭客户端}
}
selectionKeys.clear();// 每次处理完一个 SelectionKey 的事件,把该 SelectionKey 删除}
catch (Exception e){e.printStackTrace();
}
}
}
}
- 测试
1. 创建一个服务端和三个客户端
2. 客户端 1,2,3 分别发送数据
服务端拿到连接信息和三个客户端发送信息
客户端 1 先创建,拿到 2,3 连接信息和 2,3 发送信息
客户端 2 先于 3 创建,拿到 3 连接信息和 1,3 发送信息
客户端 3 最后创建,只能拿到 1,2 发送信息
3. 此时再使用 nc 命令创建一个客户端
发送信息,客户端可以收到
客户端 2 发送信息,该终端客户端也可以收到
NIO 案例—跨端口传输数据—MultiServer
实现目标:服务端监听两个端口,一个 8089,一个 8090,8089 只有唯一的一个主客户端 A 连接,8090 有多个客户端 B 连接,客户端 A 接收多个客户端 B 连接的发送的消息,实现跨端口的消息转发
- 服务端
我们先看服务端,服务端首先需要监听两个端口,我们创建两个服务端 channel;服务端接收到连接后监听客户端 B 的发送数据事件(也就是客户端 writable 服务端 readable 事件);拿到客户端 B 的消息后,把它发送到客户端 A
服务端怎么发送数据到客户端 A?
保存一个客户端 channel 集合,为不同端口客户端分配不同的 id 的结尾部分,客户端 A 分配为 wxq],客户端 B 分配为 gzh],在他们 channel 创建的时候保存到 HashMap 中,channel 作为 key,id 作为值保存
下面说一下服务端流程:
- 创建两个服务端 channel,绑定不同端口
- 创建一个多路复用器 selector,把两个服务端注册到 selector 上,并监听 acceptable 事件
-
执行 selector.select()方法,拿到 SelectionKey 集合,对不同事件做不停处理
- 如果事件为
接收就绪
,通过SelectionKey.channel()
方法拿到服务端 channel,根据端口不同注册不同的监听事件,如果是 8090 的,说明是客户端 B 的连接完成,拿到客户端 B 的 channel,监听它的可读事件
,并且分配 id 后缀为 gzh] 并且保存;如果是 8089 端口的服务端 channel,说明是客户端 A 的连接完成,客户端客户端 A 的 channel,监听它的可写事件
,并且分配 id 后缀为 wxq],保存到 hashmap - 如果事件是
读就绪
,说明客户端 B 已经完后数据的写操作,可以读取客户端 B 的数据,执行读取;首先把数据读取并写入到readBuffer
,使用new String(readBuffer.array()
创建即将发送的 msg,遍历客户端 channel 的 key,如果后缀为 wxq],说明是客户端 A,则把数据写入 writeBuffer 中,并把数据写入客户端 A 的 channel 中
- 如果事件为
- 每次 SelectionKey 的事件执行完毕,把该 SelectionKey 删除
代码:
public class Server {
private static int CAPACITY = 1024;
private static ByteBuffer readBuffer = ByteBuffer.allocate(CAPACITY);
private static ByteBuffer writeBuffer = ByteBuffer.allocate(CAPACITY);
private static Map<SocketChannel,String> clientMap = new HashMap<>();
public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannelWxq = ServerSocketChannel.open();
ServerSocketChannel serverSocketChannelGzh = ServerSocketChannel.open();
serverSocketChannelGzh.configureBlocking(false);
serverSocketChannelWxq.configureBlocking(false);
ServerSocket serverSocketWxq = serverSocketChannelWxq.socket();
ServerSocket serverSocketGzh = serverSocketChannelGzh.socket();
serverSocketWxq.bind(new InetSocketAddress(8089));
System.out.println("监听 8089:微信墙服务端口");
serverSocketGzh.bind(new InetSocketAddress(8090));
System.out.println("监听 8090:公众号服务端口");
Selector selector = Selector.open();
serverSocketChannelWxq.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannelGzh.register(selector, SelectionKey.OP_ACCEPT);
while (true){int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey->{
try {if(selectionKey.isAcceptable()){ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel client = null;
client = server.accept();
client.configureBlocking(false);
String key = "";
if(server==serverSocketChannelGzh) {// 如果是公众号 server,注册其客户端的可读事件
client.register(selector, SelectionKey.OP_READ);
key = "["+ UUID.randomUUID()+":gzh]";
}else if(server==serverSocketChannelWxq){// 如果是
client.register(selector,SelectionKey.OP_WRITE);
key = "["+ UUID.randomUUID()+":wxq]";
}
System.out.println(key+": 连接成功!");
clientMap.put(client,key);
}else if(selectionKey.isReadable()){SocketChannel channel = (SocketChannel) selectionKey.channel();
readBuffer.clear();
int read = 0;
while(true){int byteRead = channel.read(readBuffer);
if(byteRead<=0){break;}
readBuffer.flip();
channel.write(readBuffer);
read += byteRead;
readBuffer.clear();}
String reMsg = new String(readBuffer.array(),0,read);
System.out.println(clientMap.get(channel)+"send to wxq:"+reMsg);
// 写入微信墙服务
for(SocketChannel ch:clientMap.keySet()){if(ch!=channel) {String key = clientMap.get(ch);
if(key.endsWith("wxq]")) {writeBuffer.clear();
writeBuffer.put(("来自" + clientMap.get(channel) + ":" + reMsg).getBytes(StandardCharsets.UTF_8));
writeBuffer.flip();
ch.write(writeBuffer);
}
}
}
}
} catch (IOException e) {e.printStackTrace();
}
});
selectionKeys.clear();// 每次处理完一个 SelectionKey 的事件,把该 SelectionKey 删除}
}
}
到此,服务端写完了,你就可以使用 linux 或者 win 下的 nc
命令连接到服务端,模拟客户端 A 和客户端 B 发送消息
客户端发送消息后会会写一条是因为我在接收到消息后把消息写入客户端 B 的 buffer 中了
- 客户端 B—发送消息
客户端 B 负责发送消息,主要事件就是负责写数据
流程:
- 创建一个客户端 channel
SocketChannel
,打开一个多留复用器 selector,绑定可连接事件
,连接到服务端监听的 8090 端口 -
执行
selector.select()
方法,处理连接就绪
和写就绪两个事件
- 如果事件为
连接就绪
,只需要拿到 channel,执行finishConnect
方法完成连接,并且注册监听事件为可写事件
- 如果事件为
写就绪
,执行写操作,使用标准输入从控制台读取输入并且写入 writebuffer 中,通过channel.write()
方法写入数据到客户端
- 如果事件为
- 清理事件的 SelectionKey
代码:
public class GzhClient {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("localhost",8090));
while (true){
try{int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();
if(selectionKey.isConnectable()){SocketChannel client = (SocketChannel) selectionKey.channel();
if(client.isConnectionPending()){client.finishConnect();
}
client.register(selector,SelectionKey.OP_WRITE);
}else if(selectionKey.isWritable()){SocketChannel client = (SocketChannel) selectionKey.channel();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.clear();
InputStreamReader reader = new InputStreamReader(System.in);
BufferedReader br = new BufferedReader(reader);
String msg = br.readLine();
// 每次读入一行,写入数据到 buffer 并且写入客户端 channel
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
client.write(writeBuffer);
}
}
selectionKeys.clear();// 每次处理完一个 SelectionKey 的事件,把该 SelectionKey 删除}
catch (Exception e){e.printStackTrace();
}
}
}
}
- 客户端 A—接收服务端转发消息
客户端 A 负责发送消息,主要事件就是负责读数据
流程:
- 创建一个客户端 channel
SocketChannel
,打开一个多留复用器 selector,绑定可连接事件
,连接到服务端监听的 8089 端口 -
执行
selector.select()
方法,处理连接就绪
和读就绪两个事件
- 如果事件为
连接就绪
,只需要拿到 channel,执行finishConnect
方法完成连接,并且注册监听事件为可写事件
- 如果事件为
读就绪
,执行读操作,把 channel 中数据使用read()
方法读取到 readBuffer 中,通过new String(readBuffer.array()
方法接收 String 类型数据,并且打印到控制台
- 如果事件为
- 清理事件的 SelectionKey
代码:
public class WxQClient {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("localhost",8089));
while (true){
try{int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();
if(selectionKey.isConnectable()){SocketChannel client = (SocketChannel) selectionKey.channel();
if(client.isConnectionPending()){client.finishConnect();
}
client.register(selector,SelectionKey.OP_READ);
}else if(selectionKey.isReadable()){SocketChannel client = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int count = client.read(readBuffer);
if (count>0){String reMsg = new String(readBuffer.array(),0,count);
System.out.println(reMsg);
}else if(count==-1) client.close();// 关闭客户端}
}
selectionKeys.clear();// 每次处理完一个 SelectionKey 的事件,把该 SelectionKey 删除}
catch (Exception e){e.printStackTrace();
}
}
}
}
至此,我们服务端和客户端 AB 都已经完后,现在我们测试一下
- 启动服务端,启动一个 WxQClient 也就是 ClientA,启动两个 GzhClient,也就是 ClientB
服务端显示连接成功
- 客户端 B 发送消息
服务端接收到消息并打印,并转发到客户端 A,客户端 A 打印消息