乐趣区

关于java:IO流中线程模型总结

IO 流模块:常常看、常常用、常常忘;

一、根底简介

在 IO 流的网络模型中,以常见的「客户端 - 服务端」交互场景为例;

客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」解决时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程须要执行,从解决逻辑看就是「读取数据 - 业务执行 - 应答写数据」的模式;

Java 提供「三种」IO 网络编程模型,即:「BIO 同步阻塞」、「NIO 同步非阻塞」、「AIO 异步非阻塞」;

二、同步阻塞

1、模型图解

BIO 即同步阻塞,服务端收到客户端的申请时,会启动一个线程解决,「交互」会阻塞直到整个流程完结;

这种模式如果在高并发且流程简单耗时的场景下,客户端的申请响应会存在重大的性能问题,并且占用过多资源;

2、参考案例

服务端】启动 ServerSocket 接管客户端的申请,通过一系列逻辑之后,向客户端发送音讯,留神这里线程的 10 秒休眠;

public class SocketServer01 {public static void main(String[] args) throws Exception {
        // 1、创立 Socket 服务端
        ServerSocket serverSocket = new ServerSocket(8080);
        // 2、办法阻塞期待,直到有客户端连贯
        Socket socket = serverSocket.accept();
        // 3、输出流,输入流
        InputStream inStream = socket.getInputStream();
        OutputStream outStream = socket.getOutputStream();
        // 4、数据接管和响应
        int readLen = 0;
        byte[] buf = new byte[1024];
        if ((readLen=inStream.read(buf)) != -1){
            // 接收数据
            String readVar = new String(buf, 0, readLen) ;
            System.out.println("readVar======="+readVar);
        }
        // 响应数据
        Thread.sleep(10000);
        outStream.write("sever-8080-write;".getBytes());
        // 5、资源敞开
        IoClose.ioClose(outStream,inStream,socket,serverSocket);
    }
}

客户端】Socket 连贯,先向 ServerSocket 发送申请,再接管其响应,因为 Server 端模仿耗时,Client 处于长时间阻塞状态;

public class SocketClient01 {public static void main(String[] args) throws Exception {
        // 1、创立 Socket 客户端
        Socket socket = new Socket(InetAddress.getLocalHost(), 8080);
        // 2、输出流,输入流
        OutputStream outStream = socket.getOutputStream();
        InputStream inStream = socket.getInputStream();
        // 3、数据发送和响应接管
        // 发送数据
        outStream.write("client-hello".getBytes());
        // 接收数据
        int readLen = 0;
        byte[] buf = new byte[1024];
        if ((readLen=inStream.read(buf)) != -1){String readVar = new String(buf, 0, readLen) ;
            System.out.println("readVar======="+readVar);
        }
        // 4、资源敞开
        IoClose.ioClose(inStream,outStream,socket);
    }
}

三、同步非阻塞

1、模型图解

NIO 即同步非阻塞,服务端能够实现一个线程,解决多个客户端申请连贯,服务端的并发能力失去极大的晋升;

这种模式下客户端的申请连贯都会注册到 Selector 多路复用器上,多路复用器会进行轮询,对申请连贯的 IO 流进行解决;

2、参考案例

服务端】单线程能够解决多个客户端申请,通过轮询多路复用器查看是否有 IO 申请;

public class SocketServer01 {public static void main(String[] args) throws Exception {
        try {
            // 启动服务开启监听
            ServerSocketChannel socketChannel = ServerSocketChannel.open();
            socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989));
            // 设置非阻塞,承受客户端
            socketChannel.configureBlocking(false);
            // 关上多路复用器
            Selector selector = Selector.open();
            // 服务端 Socket 注册到多路复用器,指定趣味事件
            socketChannel.register(selector, SelectionKey.OP_ACCEPT);
            // 多路复用器轮询
            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
            while (selector.select() > 0){Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> selectionKeyIter = selectionKeys.iterator();
                while (selectionKeyIter.hasNext()){SelectionKey selectionKey = selectionKeyIter.next() ;
                    selectionKeyIter.remove();
                    if(selectionKey.isAcceptable()) {
                        // 承受新的连贯
                        SocketChannel client = socketChannel.accept();
                        // 设置读非阻塞
                        client.configureBlocking(false);
                        // 注册到多路复用器
                        client.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {
                        // 通道可读
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        int len = client.read(buffer);
                        if (len > 0){buffer.flip();
                            byte[] readArr = new byte[buffer.limit()];
                            buffer.get(readArr);
                            System.out.println(client.socket().getPort() + "端口数据:" + new String(readArr));
                            buffer.clear();}
                    }
                }
            }
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

客户端】每隔 3 秒继续的向通道内写数据,服务端通过轮询多路复用器,继续的读取数据;

public class SocketClient01 {public static void main(String[] args) throws Exception {
        try {
            // 连贯服务端
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
            String conVar = "client-hello";
            writeBuffer.put(conVar.getBytes());
            writeBuffer.flip();
            // 每隔 3S 发送一次数据
            while (true) {Thread.sleep(3000);
                writeBuffer.rewind();
                socketChannel.write(writeBuffer);
                writeBuffer.clear();}
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

四、异步非阻塞

1、模型图解

AIO 即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采纳异步的模式,对于性能的晋升是微小的;

这与惯例的第三方对接模式很类似,本地服务在申请第三方服务时,申请过程耗时很大,会异步执行,第三方第一次回调,确认申请能够被执行;第二次回调则是推送处理结果,这种思维在解决简单问题时,能够很大水平的进步性能,节俭资源:

2、参考案例

服务端】各种「accept」、「read」、「write」动作是异步,通过 Future 来获取计算的后果;

public class SocketServer01 {public static void main(String[] args) throws Exception {
        // 启动服务开启监听
        AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ;
        socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989));
        // 指定 30 秒内获取客户端连贯,否则超时
        Future<AsynchronousSocketChannel> acceptFuture = socketChannel.accept();
        AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS);

        if (asyChannel != null && asyChannel.isOpen()){
            // 读数据
            ByteBuffer inBuffer = ByteBuffer.allocate(1024);
            Future<Integer> readResult = asyChannel.read(inBuffer);
            readResult.get();
            System.out.println("read:"+new String(inBuffer.array()));

            // 写数据
            inBuffer.flip();
            Future<Integer> writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes()));
            writeResult.get();}

        // 敞开资源
        asyChannel.close();}
}

客户端】相干「connect」、「read」、「write」办法调用是异步的,通过 Future 来获取计算的后果;

public class SocketClient01 {public static void main(String[] args) throws Exception {
        // 连贯服务端
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        Future<Void> result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
        result.get();

        // 写数据
        String conVar = "client-hello";
        ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes());
        Future<Integer> writeFuture = socketChannel.write(reqBuffer);
        writeFuture.get();

        // 读数据
        ByteBuffer inBuffer = ByteBuffer.allocate(1024);
        Future<Integer> readFuture = socketChannel.read(inBuffer);
        readFuture.get();
        System.out.println("read:"+new String(inBuffer.array()));

        // 敞开资源
        socketChannel.close();}
}

五、Reactor 模型

1、模型图解

这部分内容,能够参考「Doug Lea 的《IO》」文档,查看更多细节;

1.1 Reactor 设计原理

Reactor 模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端申请后,会将申请分派给对应的线程解决;

Reactor:负责事件的监听和散发;Handler:负责处理事件,外围逻辑「read 读」、「decode 解码」、「compute 业务计算」、「encode 编码」、「send 应答数据」;

1.2 单 Reactor 单线程

【1】Reactor 线程通过 select 监听客户端的申请事件,收到事件后通过 Dispatch 进行散发;

【2】如果是建设连贯申请事件,Acceptor 通过「accept」办法获取连贯,并创立一个 Handler 对象来解决后续业务;

【3】如果不是连贯申请事件,则 Reactor 会将该事件交由以后连贯的 Handler 来解决;

【4】在 Handler 中,会实现相应的业务流程;

这种模式将所有逻辑「连贯、读写、业务」放在一个线程中解决,防止多线程的通信,资源竞争等问题,然而存在显著的并发和性能问题;

1.3 单 Reactor 多线程

【1】Reactor 线程通过 select 监听客户端的申请事件,收到事件后通过 Dispatch 进行散发;

【2】如果是建设连贯申请事件,Acceptor 通过「accept」办法获取连贯,并创立一个 Handler 对象来解决后续业务;

【3】如果不是连贯申请事件,则 Reactor 会将该事件交由以后连贯的 Handler 来解决;

【4】在 Handler 中,只负责事件响应不解决具体业务,将数据发送给 Worker 线程池来解决;

【5】Worker 线程池会调配具体的线程来解决业务,最初把后果返回给 Handler 做响应;

这种模式将业务从 Reactor 单线程拆散解决,能够让其更专一于事件的散发和调度,Handler 应用多线程也充沛的利用 cpu 的解决能力,导致逻辑变的更加简单,Reactor 单线程仍旧存在高并发的性能问题;

1.4 主从 Reactor 多线程

【1】MainReactor 主线程通过 select 监听客户端的申请事件,收到事件后通过 Dispatch 进行散发;

【2】如果是建设连贯申请事件,Acceptor 通过「accept」办法获取连贯,之后 MainReactor 将连贯调配给 SubReactor;

【3】如果不是连贯申请事件,则 MainReactor 将连贯调配给 SubReactor,SubReactor 调用以后连贯的 Handler 来解决;

【4】在 Handler 中,只负责事件响应不解决具体业务,将数据发送给 Worker 线程池来解决;

【5】Worker 线程池会调配具体的线程来解决业务,最初把后果返回给 Handler 做响应;

这种模式 Reactor 线程分工明确,MainReactor 负责接管新的申请连贯,SubReactor 负责后续的交互业务,适应于高并发的解决场景,是 Netty 组件通信框架的所采纳的模式;

2、参考案例

服务端】提供两个 EventLoopGroup,「ParentGroup」次要是用来接管客户端的申请连贯,真正的解决是转交给「ChildGroup」执行,即 Reactor 多线程模型;

@Slf4j
public class NettyServer {public static void main(String[] args) {
        // EventLoop 组,处理事件和 IO
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            // 服务端启动疏导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit());

            // 异步 IO 的后果
            ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
            channelFuture.channel().closeFuture().sync();} catch (Exception e){e.printStackTrace();
        } finally {parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();}
    }
}

class ServerChannelInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) {
        // 获取管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 编码、解码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        // 增加自定义的 handler
        pipeline.addLast("serverHandler", new ServerHandler());
    }
}

class ServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 通道读和写
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Server-Msg【"+msg+"】");
        TimeUnit.MILLISECONDS.sleep(2000);
        String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
        ctx.channel().writeAndFlush("hello-client;time:" + nowTime);
        ctx.fireChannelActive();}
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {cause.printStackTrace();
        ctx.close();}
}

客户端 】通过 Bootstrap 类,与服务器建设连贯,服务端通过 ServerBootstrap 启动服务,绑定在8989 端口,而后服务端和客户端进行通信;

public class NettyClient {public static void main(String[] args) {
        // EventLoop 处理事件和 IO
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            // 客户端通道疏导
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class).handler(new ClientChannelInit());

            // 异步 IO 的后果
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync();
            channelFuture.channel().closeFuture().sync();} catch (Exception e){e.printStackTrace();
        } finally {eventLoopGroup.shutdownGracefully();
        }
    }
}

class ClientChannelInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) {
        // 获取管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 编码、解码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        // 增加自定义的 handler
        pipeline.addLast("clientHandler", new ClientHandler());
    }
}

class ClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 通道读和写
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Client-Msg【"+msg+"】");
        TimeUnit.MILLISECONDS.sleep(2000);
        String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
        ctx.channel().writeAndFlush("hello-server;time:" + nowTime);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.channel().writeAndFlush("channel...active");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {cause.printStackTrace();
        ctx.close();}
}

六、参考源码

编程文档:https://gitee.com/cicadasmile/butte-java-note

利用仓库:https://gitee.com/cicadasmile/butte-flyer-parent
退出移动版