IO流模块:常常看、常常用、常常忘;
一、根底简介
在IO流的网络模型中,以常见的「客户端-服务端」交互场景为例;
客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」解决时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程须要执行,从解决逻辑看就是「读取数据-业务执行-应答写数据」的模式;
Java提供「三种」IO网络编程模型,即:「BIO同步阻塞」、「NIO同步非阻塞」、「AIO异步非阻塞」;
二、同步阻塞
1、模型图解
BIO即同步阻塞,服务端收到客户端的申请时,会启动一个线程解决,「交互」会阻塞直到整个流程完结;
这种模式如果在高并发且流程简单耗时的场景下,客户端的申请响应会存在重大的性能问题,并且占用过多资源;
2、参考案例
【服务端】启动ServerSocket接管客户端的申请,通过一系列逻辑之后,向客户端发送音讯,留神这里线程的10秒休眠;
public class SocketServer01 { public static void main(String[] args) throws Exception { // 1、创立Socket服务端 ServerSocket serverSocket = new ServerSocket(8080); // 2、办法阻塞期待,直到有客户端连贯 Socket socket = serverSocket.accept(); // 3、输出流,输入流 InputStream inStream = socket.getInputStream(); OutputStream outStream = socket.getOutputStream(); // 4、数据接管和响应 int readLen = 0; byte[] buf = new byte[1024]; if ((readLen=inStream.read(buf)) != -1){ // 接收数据 String readVar = new String(buf, 0, readLen) ; System.out.println("readVar======="+readVar); } // 响应数据 Thread.sleep(10000); outStream.write("sever-8080-write;".getBytes()); // 5、资源敞开 IoClose.ioClose(outStream,inStream,socket,serverSocket); }}
【客户端】Socket连贯,先向ServerSocket发送申请,再接管其响应,因为Server端模仿耗时,Client处于长时间阻塞状态;
public class SocketClient01 { public static void main(String[] args) throws Exception { // 1、创立Socket客户端 Socket socket = new Socket(InetAddress.getLocalHost(), 8080); // 2、输出流,输入流 OutputStream outStream = socket.getOutputStream(); InputStream inStream = socket.getInputStream(); // 3、数据发送和响应接管 // 发送数据 outStream.write("client-hello".getBytes()); // 接收数据 int readLen = 0; byte[] buf = new byte[1024]; if ((readLen=inStream.read(buf)) != -1){ String readVar = new String(buf, 0, readLen) ; System.out.println("readVar======="+readVar); } // 4、资源敞开 IoClose.ioClose(inStream,outStream,socket); }}
三、同步非阻塞
1、模型图解
NIO即同步非阻塞,服务端能够实现一个线程,解决多个客户端申请连贯,服务端的并发能力失去极大的晋升;
这种模式下客户端的申请连贯都会注册到Selector多路复用器上,多路复用器会进行轮询,对申请连贯的IO流进行解决;
2、参考案例
【服务端】单线程能够解决多个客户端申请,通过轮询多路复用器查看是否有IO申请;
public class SocketServer01 { public static void main(String[] args) throws Exception { try { //启动服务开启监听 ServerSocketChannel socketChannel = ServerSocketChannel.open(); socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989)); // 设置非阻塞,承受客户端 socketChannel.configureBlocking(false); // 关上多路复用器 Selector selector = Selector.open(); // 服务端Socket注册到多路复用器,指定趣味事件 socketChannel.register(selector, SelectionKey.OP_ACCEPT); // 多路复用器轮询 ByteBuffer buffer = ByteBuffer.allocateDirect(1024); while (selector.select() > 0){ Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> selectionKeyIter = selectionKeys.iterator(); while (selectionKeyIter.hasNext()){ SelectionKey selectionKey = selectionKeyIter.next() ; selectionKeyIter.remove(); if(selectionKey.isAcceptable()) { // 承受新的连贯 SocketChannel client = socketChannel.accept(); // 设置读非阻塞 client.configureBlocking(false); // 注册到多路复用器 client.register(selector, SelectionKey.OP_READ); } else if (selectionKey.isReadable()) { // 通道可读 SocketChannel client = (SocketChannel) selectionKey.channel(); int len = client.read(buffer); if (len > 0){ buffer.flip(); byte[] readArr = new byte[buffer.limit()]; buffer.get(readArr); System.out.println(client.socket().getPort() + "端口数据:" + new String(readArr)); buffer.clear(); } } } } } catch (Exception e) { e.printStackTrace(); } }}
【客户端】每隔3秒继续的向通道内写数据,服务端通过轮询多路复用器,继续的读取数据;
public class SocketClient01 { public static void main(String[] args) throws Exception { try { // 连贯服务端 SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989)); ByteBuffer writeBuffer = ByteBuffer.allocate(1024); String conVar = "client-hello"; writeBuffer.put(conVar.getBytes()); writeBuffer.flip(); // 每隔3S发送一次数据 while (true) { Thread.sleep(3000); writeBuffer.rewind(); socketChannel.write(writeBuffer); writeBuffer.clear(); } } catch (Exception e) { e.printStackTrace(); } }}
四、异步非阻塞
1、模型图解
AIO即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采纳异步的模式,对于性能的晋升是微小的;
这与惯例的第三方对接模式很类似,本地服务在申请第三方服务时,申请过程耗时很大,会异步执行,第三方第一次回调,确认申请能够被执行;第二次回调则是推送处理结果,这种思维在解决简单问题时,能够很大水平的进步性能,节俭资源:
2、参考案例
【服务端】各种「accept」、「read」、「write」动作是异步,通过Future来获取计算的后果;
public class SocketServer01 { public static void main(String[] args) throws Exception { // 启动服务开启监听 AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ; socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989)); // 指定30秒内获取客户端连贯,否则超时 Future<AsynchronousSocketChannel> acceptFuture = socketChannel.accept(); AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS); if (asyChannel != null && asyChannel.isOpen()){ // 读数据 ByteBuffer inBuffer = ByteBuffer.allocate(1024); Future<Integer> readResult = asyChannel.read(inBuffer); readResult.get(); System.out.println("read:"+new String(inBuffer.array())); // 写数据 inBuffer.flip(); Future<Integer> writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes())); writeResult.get(); } // 敞开资源 asyChannel.close(); }}
【客户端】相干「connect」、「read」、「write」办法调用是异步的,通过Future来获取计算的后果;
public class SocketClient01 { public static void main(String[] args) throws Exception { // 连贯服务端 AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); Future<Void> result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989)); result.get(); // 写数据 String conVar = "client-hello"; ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes()); Future<Integer> writeFuture = socketChannel.write(reqBuffer); writeFuture.get(); // 读数据 ByteBuffer inBuffer = ByteBuffer.allocate(1024); Future<Integer> readFuture = socketChannel.read(inBuffer); readFuture.get(); System.out.println("read:"+new String(inBuffer.array())); // 敞开资源 socketChannel.close(); }}
五、Reactor模型
1、模型图解
这部分内容,能够参考「Doug Lea的《IO》」文档,查看更多细节;
1.1 Reactor设计原理
Reactor模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端申请后,会将申请分派给对应的线程解决;
Reactor:负责事件的监听和散发;Handler:负责处理事件,外围逻辑「read读」、「decode解码」、「compute业务计算」、「encode编码」、「send应答数据」;
1.2 单Reactor单线程
【1】Reactor线程通过select监听客户端的申请事件,收到事件后通过Dispatch进行散发;
【2】如果是建设连贯申请事件,Acceptor通过「accept」办法获取连贯,并创立一个Handler对象来解决后续业务;
【3】如果不是连贯申请事件,则Reactor会将该事件交由以后连贯的Handler来解决;
【4】在Handler中,会实现相应的业务流程;
这种模式将所有逻辑「连贯、读写、业务」放在一个线程中解决,防止多线程的通信,资源竞争等问题,然而存在显著的并发和性能问题;
1.3 单Reactor多线程
【1】Reactor线程通过select监听客户端的申请事件,收到事件后通过Dispatch进行散发;
【2】如果是建设连贯申请事件,Acceptor通过「accept」办法获取连贯,并创立一个Handler对象来解决后续业务;
【3】如果不是连贯申请事件,则Reactor会将该事件交由以后连贯的Handler来解决;
【4】在Handler中,只负责事件响应不解决具体业务,将数据发送给Worker线程池来解决;
【5】Worker线程池会调配具体的线程来解决业务,最初把后果返回给Handler做响应;
这种模式将业务从Reactor单线程拆散解决,能够让其更专一于事件的散发和调度,Handler应用多线程也充沛的利用cpu的解决能力,导致逻辑变的更加简单,Reactor单线程仍旧存在高并发的性能问题;
1.4 主从Reactor多线程
【1】 MainReactor主线程通过select监听客户端的申请事件,收到事件后通过Dispatch进行散发;
【2】如果是建设连贯申请事件,Acceptor通过「accept」办法获取连贯,之后MainReactor将连贯调配给SubReactor;
【3】如果不是连贯申请事件,则MainReactor将连贯调配给SubReactor,SubReactor调用以后连贯的Handler来解决;
【4】在Handler中,只负责事件响应不解决具体业务,将数据发送给Worker线程池来解决;
【5】Worker线程池会调配具体的线程来解决业务,最初把后果返回给Handler做响应;
这种模式Reactor线程分工明确,MainReactor负责接管新的申请连贯,SubReactor负责后续的交互业务,适应于高并发的解决场景,是Netty组件通信框架的所采纳的模式;
2、参考案例
【服务端】提供两个EventLoopGroup,「ParentGroup」次要是用来接管客户端的申请连贯,真正的解决是转交给「ChildGroup」执行,即Reactor多线程模型;
@Slf4jpublic class NettyServer { public static void main(String[] args) { // EventLoop组,处理事件和IO EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup = new NioEventLoopGroup(); try { // 服务端启动疏导类 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit()); // 异步IO的后果 ChannelFuture channelFuture = serverBootstrap.bind(8989).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e){ e.printStackTrace(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } }}class ServerChannelInit extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) { // 获取管道 ChannelPipeline pipeline = socketChannel.pipeline(); // 编码、解码器 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 增加自定义的handler pipeline.addLast("serverHandler", new ServerHandler()); }}class ServerHandler extends ChannelInboundHandlerAdapter { /** * 通道读和写 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server-Msg【"+msg+"】"); TimeUnit.MILLISECONDS.sleep(2000); String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ; ctx.channel().writeAndFlush("hello-client;time:" + nowTime); ctx.fireChannelActive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
【客户端】通过Bootstrap类,与服务器建设连贯,服务端通过ServerBootstrap启动服务,绑定在8989
端口,而后服务端和客户端进行通信;
public class NettyClient { public static void main(String[] args) { // EventLoop处理事件和IO NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { // 客户端通道疏导 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class).handler(new ClientChannelInit()); // 异步IO的后果 ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e){ e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } }}class ClientChannelInit extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) { // 获取管道 ChannelPipeline pipeline = socketChannel.pipeline(); // 编码、解码器 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 增加自定义的handler pipeline.addLast("clientHandler", new ClientHandler()); }}class ClientHandler extends ChannelInboundHandlerAdapter { /** * 通道读和写 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Client-Msg【"+msg+"】"); TimeUnit.MILLISECONDS.sleep(2000); String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ; ctx.channel().writeAndFlush("hello-server;time:" + nowTime); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().writeAndFlush("channel...active"); } @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
六、参考源码
编程文档:https://gitee.com/cicadasmile/butte-java-note利用仓库:https://gitee.com/cicadasmile/butte-flyer-parent