BIO与AIO模型在JDK实现Netty序章

52次阅读

共计 11106 个字符,预计需要花费 28 分钟才能阅读完成。

BIO 编程

回顾下 Linux 下阻塞 IO 模型:

再看看 Java 的 BIO 编程模型:

/**
* 类说明:客户端
*/
public class BioClient {public static void main(String[] args) throws InterruptedException,
IOException {
// 通过构造函数创建 Socket,并且连接指定地址和端口的服务端
Socket socket = new Socket(DEFAULT_SERVER_IP,DEFAULT_PORT);
System.out.println("请输入请求消息:");
// 启动读取服务端输出数据的线程
new ReadMsg(socket).start();
PrintWriter pw = null;
// 允许客户端在控制台输入数据,然后送往服务器
while(true){pw = new PrintWriter(socket.getOutputStream());
pw.println(new Scanner(System.in).next());
pw.flush();}
}
// 读取服务端输出数据的线程
private static class ReadMsg extends Thread {
Socket socket;
public ReadMsg(Socket socket) {this.socket = socket;}
@Override
public void run() {
// 负责 socket 读写的输入流
try (BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))){
String line = null;
// 通过输入流读取服务端传输的数据
// 如果已经读到输入流尾部,返回 null, 退出循环
// 如果得到非空值,就将结果进行业务处理
while((line=br.readLine())!=null){System.out.printf("%s\n",line);
}
} catch (SocketException e) {System.out.printf("%s\n", "服务器断开了你的连接");
} catch (Exception e) {e.printStackTrace();
} finally {clear();
}
}
// 必要的资源清理工作
private void clear() {if (socket != null)
try {socket.close();
} catch (IOException e) {e.printStackTrace();
}
}
}
}
/**
* 类说明:bio 的服务端主程序
*/
public class BioServer {
// 服务器端必须
private static ServerSocket server;
// 线程池,处理每个客户端的请求
private static ExecutorService executorService
= Executors.newFixedThreadPool(5);
private static void start() throws IOException{
try{
// 通过构造函数创建 ServerSocket
// 如果端口合法且空闲,服务端就监听成功
server = new ServerSocket(DEFAULT_PORT);
System.out.println("服务器已启动,端口号:" + DEFAULT_PORT);
while(true){Socket socket= server.accept();
System.out.println("有新的客户端连接 ----");
// 当有新的客户端接入时,打包成一个任务,投入线程池
executorService.execute(new BioServerHandler(socket));
}
}finally{if(server!=null){server.close();
}
}
}
public static void main(String[] args) throws IOException {start();
}
}
/**
* 类说明:*/
public class BioServerHandler implements Runnable{
private Socket socket;
public BioServerHandler(Socket socket) {this.socket = socket;}
public void run() {
try(// 负责 socket 读写的输出、输入流
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(),
true)){
String message;
String result;
// 通过输入流读取客户端传输的数据
// 如果已经读到输入流尾部,返回 null, 退出循环
// 如果得到非空值,就将结果进行业务处理
while((message = in.readLine())!=null){System.out.println("Server accept message:"+message);
result = response(message);
// 将业务结果通过输出流返回给客户端
out.println(result);
}
}catch(Exception e){e.printStackTrace();
}finally{if(socket != null){
try {socket.close();
} catch (IOException e) {e.printStackTrace();
}
socket = null;
}
}
}
}

过程:

  1. 服务端提供 IP 和监听端口
  2. 客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接
  3. 如果连接成功建立,双方就可以通过套接字进行通信

最早的时候服务器端是针对一个连接新建一个线程来处理 →→ 服务端针对每个客户端连接把请求丢进线程池来处理任务
缺点:若高并发场景且处理时间稍长则许多请求会阻塞一直等待,严重影响性能.

AIO

先回顾下 Linux 下 AIO 模型:

原生 JDK 网络编程 AIO:

异步 IO 采用 “订阅 - 通知” 模式:即应用程序向操作系统注册 IO 监听,然后继续做自己的事情。当操作系统发生 IO 事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。
注意:异步 IO 里面客户端和服务端均采用这种 “订阅 - 通知” 模式.

AIO 编程几个核心类:
:AsynchronousServerSocketChannel: 类似 BIO 里面的 ServerSocket


②:AsynchronousSocketChannel : 类似 BIO 里面的 socket 用来通信,有三个方法:connect(): 用于连接到指定端口,指定 IP 地址的服务器,read()、write(): 完成读写
注意点:

  • 1. 这三个方法会执行就相当于上面图解里面的 Subscrible 函数向操作系统监听线程。
  • 2. 这几个方法里面有个参数, 比如 write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A>handler)的 attachment, 是附加到 IO 操作里面的对象.

③:CompletionHandler: 源码注释是异步 IO 操作中用来处理消费的结果, 其实也就是结果回调函数, 连接丶读写都是异步操作都需要实现此接口。

而 CompletionHandler 接口中定义了两个方法,

  1. completed(V result , A attachment):当 IO 完成时触发该方法,该方法的第一个参数代表 IO 操作返回的对象,第二个参数代表发起 IO 操作时传入的附加参数。
  2. faild(Throwable exc, A attachment):当 IO 失败时触发该方法,第一个参数代表 IO 操作失败引发的异常或错误。

先上代码
客户端:

/**
* 类说明:aio 的客户端主程序
*/
public class AioClient {
//IO 通信处理器
private static AioClientHandler clientHandle;
public static void start(){if(clientHandle!=null)
return;
clientHandle = new AioClientHandler(DEFAULT_SERVER_IP,DEFAULT_PORT);
// 负责网络通讯的线程
new Thread(clientHandle,"Client").start();}
// 向服务器发送消息
public static boolean sendMsg(String msg) throws Exception{if(msg.equals("q")) return false;
clientHandle.sendMessag(msg);
return true;
}
public static void main(String[] args) throws Exception{AioClient.start();
System.out.println("请输入请求消息:");
Scanner scanner = new Scanner(System.in);
while(AioClient.sendMsg(scanner.nextLine()));
}
}
/**
* 类说明:IO 通信处理器,负责连接服务器,对外暴露对服务端发送数据的 API
*/
public class AioClientHandler
implements CompletionHandler<Void,AioClientHandler>,Runnable {
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;// 防止线程退出
public AioClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
// 创建一个实际异步的客户端通道
clientChannel = AsynchronousSocketChannel.open();} catch (IOException e) {e.printStackTrace();
}
}
@Override
public void run() {
// 创建 CountDownLatch,因为是异步调用,下面的 connect 不会阻塞,// 那么整个 run 方法会迅速结束,那么负责网络通讯的线程也会迅速结束
latch = new CountDownLatch(1);
// 发起异步连接操作,回调参数就是这个实例本身,// 如果连接成功会回调这个实例的 completed 方法
clientChannel.connect(new InetSocketAddress(host,port),
null,this);
try {latch.await();
clientChannel.close();} catch (InterruptedException e) {e.printStackTrace();
} catch (IOException e) {e.printStackTrace();
}
}
// 连接成功,这个方法会被系统调用
@Override
public void completed(Void result, AioClientHandler attachment) {System.out.println("已经连接到服务端。");
}
// 连接失败,这个方法会被系统调用
@Override
public void failed(Throwable exc, AioClientHandler attachment) {System.err.println("连接失败。");
exc.printStackTrace();
latch.countDown();
try {clientChannel.close();
} catch (IOException e) {e.printStackTrace();
}
}
// 对外暴露对服务端发送数据的 API
public void sendMessag(String msg){
/* 为了把 msg 变成可以在网络传输的格式 */
byte[] bytes = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
/* 进行异步写,同样的这个方法会迅速返回,需要提供一个接口让系统在一次网络写操作完成后通知我们的应用程序。所以我们传入一个实现了 CompletionHandler 的 AioClientWriteHandler
第 1 个 writeBuffer,表示我们要发送给服务器的数据;第 2 个 writeBuffer,考虑到网络写有可能无法一次性将数据写完,需要进行多次网络写,所以将 writeBuffer 作为附件传递给 AioClientWriteHandler。*/
clientChannel.write(writeBuffer,writeBuffer,
new AioClientWriteHandler(clientChannel,latch));
}
}
/**
* 类说明:网络写的处理器,CompletionHandler<Integer, ByteBuffer> 中
* Integer:本次网络写操作完成实际写入的字节数,* ByteBuffer:写操作的附件,存储了写操作需要写入的数据
*/
public class AioClientWriteHandler
implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public AioClientWriteHandler(AsynchronousSocketChannel clientChannel,
CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 有可能无法一次性将数据写完, 需要检查缓冲区中是否还有数据需要继续进行网络写
if(buffer.hasRemaining()){clientChannel.write(buffer,buffer,this);
}else{
// 写操作已经完成,为读取服务端传回的数据建立缓冲区
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
/* 这个方法会迅速返回,需要提供一个接口让
系统在读操作完成后通知我们的应用程序。*/
clientChannel.read(readBuffer,readBuffer,
new AioClientReadHandler(clientChannel,latch));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {System.err.println("数据发送失败...");
try {clientChannel.close();
latch.countDown();} catch (IOException e) {}}
}
/**
* 类说明:网络读的处理器
* CompletionHandler<Integer, ByteBuffer> 中
* Integer:本次网络读操作实际读取的字节数,* ByteBuffer:读操作的附件,存储了读操作读到的数据 *
*/
public class AioClientReadHandler
implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public AioClientReadHandler(AsynchronousSocketChannel clientChannel,
CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result,ByteBuffer buffer) {buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String msg;
try {msg = new String(bytes,"UTF-8");
System.out.println("accept message:"+msg);
} catch (UnsupportedEncodingException e) {e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {System.err.println("数据读取失败...");
try {clientChannel.close();
latch.countDown();} catch (IOException e) {}}
}

服务端

/**
* 类说明:服务器主程序
*/
public class AioServer {
private static AioServerHandler serverHandle;
// 统计客户端个数
public volatile static long clientCount = 0;
public static void start(){if(serverHandle!=null)
return;
serverHandle = new AioServerHandler(DEFAULT_PORT);
new Thread(serverHandle,"Server").start();}
public static void main(String[] args){AioServer.start();
}
}
/**
* 类说明:处理用户连接的处理器
*/
public class AioAcceptHandler
implements CompletionHandler<AsynchronousSocketChannel,
AioServerHandler> {
@Override
public void completed(AsynchronousSocketChannel channel,
AioServerHandler serverHandler) {
AioServer.clientCount++;
System.out.println("连接的客户端数:" + AioServer.clientCount);
// 重新注册监听,让别的客户端也可以连接
serverHandler.channel.accept(serverHandler,this);
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//1)ByteBuffer dst:接收缓冲区,用于从异步 Channel 中读取数据包;//2)  A attachment:异步 Channel 携带的附件,通知回调的时候作为入参使用;//3)  CompletionHandler<Integer,? super A>:系统回调的业务 handler,进行读操作
channel.read(readBuffer,readBuffer,
new AioReadHandler(channel));
}
@Override
public void failed(Throwable exc, AioServerHandler serverHandler) {exc.printStackTrace();
serverHandler.latch.countDown();}
}
/**
* 类说明:读数据的处理器
*/
public class AioReadHandler
implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public AioReadHandler(AsynchronousSocketChannel channel) {this.channel = channel;}
// 读取到消息后的处理
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果条件成立,说明客户端主动终止了 TCP 套接字,这时服务端终止就可以了
if(result == -1) {
try {channel.close();
} catch (IOException e) {e.printStackTrace();
}
return;
}
//flip 操作
attachment.flip();
byte[] message = new byte[attachment.remaining()];
attachment.get(message);
try {System.out.println(result);
String msg = new String(message,"UTF-8");
System.out.println("server accept message:"+msg);
String responseStr = response(msg);
// 向客户端发送消息
doWrite(responseStr);
} catch (Exception e) {e.printStackTrace();
}
}
// 发送消息
private void doWrite(String result) {byte[] bytes = result.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
// 异步写数据
channel.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {if(attachment.hasRemaining()){channel.write(attachment,attachment,this);
}else{
// 读取客户端传回的数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 异步读数据
channel.read(readBuffer,readBuffer,
new AioReadHandler(channel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {channel.close();
} catch (IOException e) {e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {this.channel.close();
} catch (IOException e) {e.printStackTrace();
}
}
}
/**
* 类说明:响应网络操作的处理器
*/
public class AioServerHandler implements Runnable {
public CountDownLatch latch;
/* 进行异步通信的通道 */
public AsynchronousServerSocketChannel channel;
public AioServerHandler(int port) {
try {
// 创建服务端通道
channel = AsynchronousServerSocketChannel.open();
// 绑定端口
channel.bind(new InetSocketAddress(port));
System.out.println("Server is start,port:"+port);
} catch (IOException e) {e.printStackTrace();
}
}
@Override
public void run() {latch = new CountDownLatch(1);
// 用于接收客户端的连接,异步操作,// 需要实现了 CompletionHandler 接口的处理器处理和客户端的连接操作
channel.accept(this,new AioAcceptHandler());
try {latch.await();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}

疑难点 1:


怎么理解这里客户端写操作的处理器回调方法?

  1. 客户端把 ByteBuffer 里面的数据写到 AsynchronousSocketChannel 这个管道上,
  2. 如果 ByteBuffer 里面数据很大, 超过了管道容量,这时会先完成写操作,服务端收到数据回调这个 completed 方法
  3. 则需要 ByteBuffer 再写入剩下的数据到管道里, 每发完一次数据通知一次, 这个管道容量取决于网卡的缓冲区。这个 completed 方法并不是说 ByteBuffer 的数据写完了, 而是当前网卡这份数据写完了.

疑难点 2:
Buffer:

查看源码可看到几个重要属性:
capacity: 表示分配的内存大小
position: 类似指针类的索引, 读取或写入的位置标识符
limit: 可读或可写的范围,小于 capacity,limit 到 capaticy 的最大容量值的这段空间不予写入是放一些初始化值的.

ByteBuffer 可以理解为放在内存中的一个数组。
比如图中一开始是写入模式, 写入五个字节, 地址为 0 -4,position 在 4, 调用 flip 方法后切换到读模式,position 变为 0 即开始序列,limit 变为 4,这样就可以 buffer 开头开始读取了.

AIO 编程相对复杂, 代码中一些关键方法都有注释, 目前 Linux 下没有真正意义上的 AIO, 实际上是用了 NIO 里面的 epoll(true), 底层原理还是用了 IO 复用(NIO).windows 实现了 AIO,AIO 是未来的方向, 需待 linux 内核支持.

正文完
 0