BIO编程

回顾下Linux下阻塞IO模型:

再看看Java的BIO编程模型:

/** * 类说明:客户端 */public class BioClient {    public static void main(String[] args) throws InterruptedException,            IOException {        //通过构造函数创建Socket,并且连接指定地址和端口的服务端        Socket socket =  new Socket(DEFAULT_SERVER_IP,DEFAULT_PORT);        System.out.println("请输入请求消息:");        //启动读取服务端输出数据的线程        new ReadMsg(socket).start();        PrintWriter pw = null;        //允许客户端在控制台输入数据,然后送往服务器        while(true){            pw = new PrintWriter(socket.getOutputStream());            pw.println(new Scanner(System.in).next());            pw.flush();        }    }    //读取服务端输出数据的线程    private static class ReadMsg extends Thread {        Socket socket;        public ReadMsg(Socket socket) {            this.socket = socket;        }        @Override        public void run() {            //负责socket读写的输入流            try (BufferedReader br = new BufferedReader(                    new InputStreamReader(socket.getInputStream()))){                String line = null;                //通过输入流读取服务端传输的数据                //如果已经读到输入流尾部,返回null,退出循环                //如果得到非空值,就将结果进行业务处理                while((line=br.readLine())!=null){                    System.out.printf("%s\n",line);                }            } catch (SocketException e) {                System.out.printf("%s\n", "服务器断开了你的连接");            } catch (Exception e) {                e.printStackTrace();            } finally {                clear();            }        }        //必要的资源清理工作        private void clear() {            if (socket != null)            try {                socket.close();            } catch (IOException e) {                e.printStackTrace();            }        }    }}/** * 类说明:bio的服务端主程序 */public class BioServer {    //服务器端必须    private static ServerSocket server;    //线程池,处理每个客户端的请求    private static ExecutorService executorService            = Executors.newFixedThreadPool(5);    private static void start() throws IOException{        try{            //通过构造函数创建ServerSocket            //如果端口合法且空闲,服务端就监听成功            server = new ServerSocket(DEFAULT_PORT);            System.out.println("服务器已启动,端口号:" + DEFAULT_PORT);            while(true){                Socket socket= server.accept();                System.out.println("有新的客户端连接----" );                //当有新的客户端接入时,打包成一个任务,投入线程池                executorService.execute(new BioServerHandler(socket));            }        }finally{            if(server!=null){                server.close();            }        }    }    public static void main(String[] args) throws IOException {        start();    }}/** * 类说明: */public class BioServerHandler implements Runnable{    private Socket socket;    public BioServerHandler(Socket socket) {        this.socket = socket;    }    public void run() {        try(//负责socket读写的输出、输入流            BufferedReader in = new BufferedReader(                new InputStreamReader(socket.getInputStream()));            PrintWriter out = new PrintWriter(socket.getOutputStream(),                    true)){            String message;            String result;            //通过输入流读取客户端传输的数据            //如果已经读到输入流尾部,返回null,退出循环            //如果得到非空值,就将结果进行业务处理            while((message = in.readLine())!=null){                System.out.println("Server accept message:"+message);                result = response(message);                //将业务结果通过输出流返回给客户端                out.println(result);            }        }catch(Exception e){            e.printStackTrace();        }finally{            if(socket != null){                try {                    socket.close();                } catch (IOException e) {                    e.printStackTrace();                }                socket = null;            }        }    }}

过程:

  1. 服务端提供IP和监听端口
  2. 客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接
  3. 如果连接成功建立,双方就可以通过套接字进行通信

最早的时候服务器端是针对一个连接新建一个线程来处理→→服务端针对每个客户端连接把请求丢进线程池来处理任务
缺点:若高并发场景且处理时间稍长则许多请求会阻塞一直等待,严重影响性能.

AIO

先回顾下Linux下AIO模型:

原生JDK网络编程AIO:

异步IO采用“订阅-通知”模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。
注意:异步IO里面客户端和服务端均采用这种“订阅-通知”模式.

AIO编程几个核心类:
:AsynchronousServerSocketChannel:类似BIO里面的ServerSocket


②:AsynchronousSocketChannel :类似BIO里面的socket用来通信,有三个方法:connect():用于连接到指定端口,指定IP地址的服务器,read()、write():完成读写
注意点:

  • 1.这三个方法会执行就相当于上面图解里面的Subscrible函数向操作系统监听线程。
  • 2.这几个方法里面有个参数,比如write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A>handler)的attachment,是附加到IO操作里面的对象.

③:CompletionHandler:源码注释是异步IO操作中用来处理消费的结果,其实也就是结果回调函数,连接丶读写都是异步操作都需要实现此接口。

而CompletionHandler接口中定义了两个方法,

  1. completed(V result , A attachment):当IO完成时触发该方法,该方法的第一个参数代表IO操作返回的对象,第二个参数代表发起IO操作时传入的附加参数。
  2. faild(Throwable exc, A attachment):当IO失败时触发该方法,第一个参数代表IO操作失败引发的异常或错误。

先上代码
客户端:

/** * 类说明:aio的客户端主程序 */public class AioClient {    //IO通信处理器    private static AioClientHandler clientHandle;    public static void start(){        if(clientHandle!=null)            return;        clientHandle = new AioClientHandler(DEFAULT_SERVER_IP,DEFAULT_PORT);        //负责网络通讯的线程        new Thread(clientHandle,"Client").start();    }    //向服务器发送消息    public static boolean sendMsg(String msg) throws Exception{        if(msg.equals("q")) return false;        clientHandle.sendMessag(msg);        return true;    }    public static void main(String[] args) throws Exception{        AioClient.start();        System.out.println("请输入请求消息:");        Scanner scanner = new Scanner(System.in);        while(AioClient.sendMsg(scanner.nextLine()));    }}/** * 类说明:IO通信处理器,负责连接服务器,对外暴露对服务端发送数据的API */public class AioClientHandler        implements CompletionHandler<Void,AioClientHandler>,Runnable {    private AsynchronousSocketChannel clientChannel;    private String host;    private int port;    private CountDownLatch latch;//防止线程退出    public AioClientHandler(String host, int port) {        this.host = host;        this.port = port;        try {            //创建一个实际异步的客户端通道            clientChannel = AsynchronousSocketChannel.open();        } catch (IOException e) {            e.printStackTrace();        }    }    @Override    public void run() {        //创建CountDownLatch,因为是异步调用,下面的connect不会阻塞,        // 那么整个run方法会迅速结束,那么负责网络通讯的线程也会迅速结束        latch = new CountDownLatch(1);        //发起异步连接操作,回调参数就是这个实例本身,        // 如果连接成功会回调这个实例的completed方法        clientChannel.connect(new InetSocketAddress(host,port),                        null,this);        try {            latch.await();            clientChannel.close();        } catch (InterruptedException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }    //连接成功,这个方法会被系统调用    @Override    public void completed(Void result, AioClientHandler attachment) {        System.out.println("已经连接到服务端。");    }    //连接失败,这个方法会被系统调用    @Override    public void failed(Throwable exc, AioClientHandler attachment) {        System.err.println("连接失败。");        exc.printStackTrace();        latch.countDown();        try {            clientChannel.close();        } catch (IOException e) {            e.printStackTrace();        }    }    //对外暴露对服务端发送数据的API    public void sendMessag(String msg){        /*为了把msg变成可以在网络传输的格式*/        byte[] bytes = msg.getBytes();        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);        writeBuffer.put(bytes);        writeBuffer.flip();        /*进行异步写,同样的这个方法会迅速返回,         需要提供一个接口让系统在一次网络写操作完成后通知我们的应用程序。        所以我们传入一个实现了CompletionHandler的AioClientWriteHandler        第1个writeBuffer,表示我们要发送给服务器的数据;        第2个writeBuffer,考虑到网络写有可能无法一次性将数据写完,需要进行多次网络写,        所以将writeBuffer作为附件传递给AioClientWriteHandler。        */        clientChannel.write(writeBuffer,writeBuffer,                new AioClientWriteHandler(clientChannel,latch));    }} /**     * 类说明:网络写的处理器,CompletionHandler<Integer, ByteBuffer>中     * Integer:本次网络写操作完成实际写入的字节数,     * ByteBuffer:写操作的附件,存储了写操作需要写入的数据     */    public class AioClientWriteHandler            implements CompletionHandler<Integer, ByteBuffer> {        private AsynchronousSocketChannel clientChannel;        private CountDownLatch latch;        public AioClientWriteHandler(AsynchronousSocketChannel clientChannel,                                     CountDownLatch latch) {            this.clientChannel = clientChannel;            this.latch = latch;        }        @Override        public void completed(Integer result, ByteBuffer buffer) {            //有可能无法一次性将数据写完,需要检查缓冲区中是否还有数据需要继续进行网络写            if(buffer.hasRemaining()){                clientChannel.write(buffer,buffer,this);            }else{                //写操作已经完成,为读取服务端传回的数据建立缓冲区                ByteBuffer readBuffer = ByteBuffer.allocate(1024);                /*这个方法会迅速返回,需要提供一个接口让                系统在读操作完成后通知我们的应用程序。*/                clientChannel.read(readBuffer,readBuffer,                        new AioClientReadHandler(clientChannel,latch));            }        }        @Override        public void failed(Throwable exc, ByteBuffer attachment) {            System.err.println("数据发送失败...");            try {                clientChannel.close();                latch.countDown();            } catch (IOException e) {            }        }        }/** * 类说明:网络读的处理器 * CompletionHandler<Integer, ByteBuffer>中 * Integer:本次网络读操作实际读取的字节数, * ByteBuffer:读操作的附件,存储了读操作读到的数据 * */public class AioClientReadHandler        implements CompletionHandler<Integer, ByteBuffer> {    private AsynchronousSocketChannel clientChannel;    private CountDownLatch latch;    public AioClientReadHandler(AsynchronousSocketChannel clientChannel,                                CountDownLatch latch) {        this.clientChannel = clientChannel;        this.latch = latch;    }    @Override    public void completed(Integer result,ByteBuffer buffer) {        buffer.flip();        byte[] bytes = new byte[buffer.remaining()];        buffer.get(bytes);        String msg;        try {            msg = new String(bytes,"UTF-8");            System.out.println("accept message:"+msg);        } catch (UnsupportedEncodingException e) {            e.printStackTrace();        }    }    @Override    public void failed(Throwable exc,ByteBuffer attachment) {        System.err.println("数据读取失败...");        try {            clientChannel.close();            latch.countDown();        } catch (IOException e) {        }    }}

服务端

/** * 类说明:服务器主程序 */public class AioServer {    private static AioServerHandler serverHandle;    //统计客户端个数    public volatile static long clientCount = 0;    public static void start(){        if(serverHandle!=null)            return;        serverHandle = new AioServerHandler(DEFAULT_PORT);        new Thread(serverHandle,"Server").start();    }    public static void main(String[] args){        AioServer.start();    }}/** * 类说明:处理用户连接的处理器 */public class AioAcceptHandler        implements CompletionHandler<AsynchronousSocketChannel,        AioServerHandler> {    @Override    public void completed(AsynchronousSocketChannel channel,                          AioServerHandler serverHandler) {        AioServer.clientCount++;        System.out.println("连接的客户端数:" + AioServer.clientCount);        //重新注册监听,让别的客户端也可以连接        serverHandler.channel.accept(serverHandler,this);        ByteBuffer readBuffer = ByteBuffer.allocate(1024);        //1)ByteBuffer dst:接收缓冲区,用于从异步Channel中读取数据包;        //2)  A attachment:异步Channel携带的附件,通知回调的时候作为入参使用;        //3)  CompletionHandler<Integer,? super A>:系统回调的业务handler,进行读操作        channel.read(readBuffer,readBuffer,                new AioReadHandler(channel));    }    @Override    public void failed(Throwable exc, AioServerHandler serverHandler) {        exc.printStackTrace();        serverHandler.latch.countDown();    }}/** * 类说明:读数据的处理器 */public class AioReadHandler        implements CompletionHandler<Integer, ByteBuffer> {    private AsynchronousSocketChannel channel;    public AioReadHandler(AsynchronousSocketChannel channel) {        this.channel = channel;    }    //读取到消息后的处理    @Override    public void completed(Integer result, ByteBuffer attachment) {        //如果条件成立,说明客户端主动终止了TCP套接字,这时服务端终止就可以了        if(result == -1) {            try {                channel.close();            } catch (IOException e) {                e.printStackTrace();            }            return;        }        //flip操作        attachment.flip();        byte[] message = new byte[attachment.remaining()];        attachment.get(message);        try {            System.out.println(result);            String msg = new String(message,"UTF-8");            System.out.println("server accept message:"+msg);            String responseStr = response(msg);            //向客户端发送消息            doWrite(responseStr);        } catch (Exception e) {            e.printStackTrace();        }    }    //发送消息    private void doWrite(String result) {        byte[] bytes = result.getBytes();        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);        writeBuffer.put(bytes);        writeBuffer.flip();        //异步写数据        channel.write(writeBuffer, writeBuffer,                new CompletionHandler<Integer, ByteBuffer>() {            @Override            public void completed(Integer result, ByteBuffer attachment) {                if(attachment.hasRemaining()){                    channel.write(attachment,attachment,this);                }else{                    //读取客户端传回的数据                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);                    //异步读数据                    channel.read(readBuffer,readBuffer,                            new AioReadHandler(channel));                }            }            @Override            public void failed(Throwable exc, ByteBuffer attachment) {                try {                    channel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }        });    }    @Override    public void failed(Throwable exc, ByteBuffer attachment) {        try {            this.channel.close();        } catch (IOException e) {            e.printStackTrace();        }    }}/** * 类说明:响应网络操作的处理器 */public class AioServerHandler implements Runnable {    public CountDownLatch latch;    /*进行异步通信的通道*/    public AsynchronousServerSocketChannel channel;    public AioServerHandler(int port) {        try {            //创建服务端通道            channel = AsynchronousServerSocketChannel.open();            //绑定端口            channel.bind(new InetSocketAddress(port));            System.out.println("Server is start,port:"+port);        } catch (IOException e) {            e.printStackTrace();        }    }    @Override    public void run() {        latch = new CountDownLatch(1);        //用于接收客户端的连接,异步操作,        // 需要实现了CompletionHandler接口的处理器处理和客户端的连接操作        channel.accept(this,new AioAcceptHandler());        try {            latch.await();        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

疑难点1:


怎么理解这里客户端写操作的处理器回调方法?

  1. 客户端把ByteBuffer里面的数据写到AsynchronousSocketChannel这个管道上,
  2. 如果ByteBuffer里面数据很大,超过了管道容量,这时会先完成写操作,服务端收到数据回调这个completed方法
  3. 则需要ByteBuffer再写入剩下的数据到管道里,每发完一次数据通知一次,这个管道容量取决于网卡的缓冲区。这个completed方法并不是说ByteBuffer的数据写完了,而是当前网卡这份数据写完了.

疑难点2:
Buffer:

查看源码可看到几个重要属性:
capacity:表示分配的内存大小
position:类似指针类的索引,读取或写入的位置标识符
limit:可读或可写的范围,小于capacity,limit到capaticy的最大容量值的这段空间不予写入是放一些初始化值的.

ByteBuffer可以理解为放在内存中的一个数组。
比如图中一开始是写入模式,写入五个字节,地址为0-4,position在4,调用flip方法后切换到读模式,position变为0即开始序列,limit变为4,这样就可以buffer开头开始读取了.

AIO编程相对复杂,代码中一些关键方法都有注释,目前Linux下没有真正意义上的AIO,实际上是用了NIO里面的epoll(true),底层原理还是用了IO复用(NIO).windows实现了AIO,AIO是未来的方向,需待linux内核支持.