IO模型

IO申请的两个阶段(Linux)

  • IO调用阶段:用户过程向内核发动零碎调用
  • IO执行阶段:此时用户进行期待IO申请解决实现返回,此阶段分为两步

    1. 期待数据就绪,并写入内核缓冲区
    2. 数据从内核缓冲区 到 用户态缓冲区

      • 内核态:运行操作系统程序,操作硬件
      • 用户态:运行用户程序

Linux五种IO模型

1.同步阻塞IO(BIO)

内核只能同时解决一个申请,分两个阶段(即上述的IO执行阶段):

  1. 零碎调用
  2. 数据从内核缓冲区读取到用户缓冲区

这个两个操作都是阻塞的所以只有等这两个操作都实现后能力解决其余IO

2.同步非阻塞IO(NIO)

过程的申请不会始终期待而是有专门的线程来轮询这些IO过程是否存有数据,然而轮询过程中会存在着零碎调用导致的高低问切换,如果申请过多会存在重大的零碎性能耗费

3.IO多路复用

多路是指多个数据通道,复用指的是一个或多个固定的线程来解决每一Socket连贯, select poll epoll都是IO多路复用的实现,线程一次能够select多个数据通道的数据状态,解决了NIO性能耗费过重的问题

-文件描述符fd

文件描述符(File descriptor)模式上是一个非负整数,是一个索引值,指向内核为每一个过程所保护的该过程所关上文件的记录表.

- select

这个函数会监督3类文件描述符,别离是writefds,readfds,exceptfds 调用select函数时会阻塞,直到select有以上3中描述符文件就绪或者超时,一旦某个描述符就绪了,会告诉程序进行相干的读写操作,因为select poll epoll都是同步IO,所以它们都须要在事件就绪后本人负责读写.也就是select会阻塞监听相干事件,直到解决完读写操作或者超时后才会解除阻塞.select单个过程可能监听的文件数量是无限的,linux个别默认是1024

int select(int n,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);

- poll

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

struct pollfd {    int fd; /* file descriptor */    short events; /* requested events to watch */    short revents; /* returned events witnessed */};

poll应用一个pollfd的构造体来传导须要监听的事件和要产生的事件,另外poll监听的文件描述符个数是没有限度的

- epoll

不须要轮询,工夫复杂度为O(1)

epoll_create 创立一个白板 寄存fd_events
epoll_ctl 用于向内核注册新的描述符或者是扭转某个文件描述符的状态。已注册的描述符在内核中会被保护在一棵红黑树上
epoll_wait 通过回调函数内核会将 I/O 筹备好的描述符退出到一个链表中治理,过程调用 epoll_wait() 便能够失去事件实现的描述符

两种触发模式:
LT:程度触发
epoll_wait() 检测到描述符事件达到时,将此事件告诉过程,过程能够不立刻解决该事件,下次调用 epoll_wait() 会再次告诉过程。是默认的一种模式,并且同时反对 BlockingNo-Blocking
ET:边缘触发
和 LT 模式不同的是,告诉之后过程必须立刻处理事件。
下次再调用 epoll_wait() 时不会再失去事件达到的告诉。很大水平上缩小了 epoll 事件被反复触发的次数,因而效率要比 LT 模式高。只反对 No-Blocking,以防止因为一个文件句柄的阻塞读/阻塞写操作把解决多个文件描述符的工作饿死。

4.信号驱动模型

信号驱动模型并不罕用,是一种半异步IO.当数据准备就绪后,内核会发送一个SIGIO音讯给利用过程,过程而后开始读写音讯.

5.异步IO

零碎调用会被立刻返回后果,而后读取写音讯由异步实现.

BIO

BIO - Block-IO 阻塞同步的通信形式

BIO的问题:

阻塞\同步,BIO很依赖于网络,网速不好阻塞工夫会很长;每次申请都由程序执行并返回,这是同步的缺点

BIO的工作流程:

  • 服务端启动
  • 阻塞期待客户端连贯
  • 客户端连贯
  • 监听客户端内容
  • 客户端断开
  • 回到第一步

BioServer

public class BioServer {    public static void main(String[] args) {        try {            // 服务端绑定端口            ServerSocket server  = new ServerSocket(9000);            while (true) {                // 创立一个Socket接管连贯 - 当没有时阻塞                Socket socket = server.accept();                // 获取输出流                InputStream inputStream = socket.getInputStream();                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));                String message;                while (null != (message = reader.readLine())) {                    System.out.println(message);                }                inputStream.close();                socket.close();            }        } catch (IOException e) {            e.printStackTrace();        }    }}

BioClient

public class BioClient {    public static void main(String[] args) {        try {            // 创立socket            Socket socket  = new Socket("localhost",9000);            // 获取Socket输入流            OutputStream  outputStream = socket.getOutputStream();            // 输入流            outputStream.write("hello socket".getBytes());            // 敞开            outputStream.close();            socket.close();        } catch (IOException e) {            e.printStackTrace();        }    }}

多线程解决BIO阻塞问

  • 解决了的问题:多个线程解决当一个客户端迟迟不退出时,其余线程仍然能够解决其它客户端发送过去的申请.防止了一个申请阻塞导致其余客户端申请始终期待的问题
  • 依然存在问题:退出服务端给定固定线程数是10,有10个客户端创立了连贯 然而没有一个人发送音讯 那么10个线程将全副阻塞,或者有些客户端迟迟没有操作会造成不必要的资源占用.

多线程BioServer代码

public class BioServer {    private static ExecutorService executorService = Executors.newFixedThreadPool(10);    public static void main(String[] args) {        ServerSocket serverSocket;        try {            serverSocket  = new ServerSocket(9000);            while (true){                //new Thread(new BioHandler(serverSocket.accept()){}).start();                executorService.execute(new BioHandler(serverSocket.accept()));            }        } catch (IOException e) {            e.printStackTrace();        }    }}
public class BioHandler implements Runnable {    private Socket  socket;    public BioHandler(Socket socket) {        this.socket = socket;    }    public void run() {        try {            InputStream input  = socket.getInputStream();            BufferedReader reader  = new BufferedReader(new InputStreamReader(input));            String m;            while (null != (m = reader.readLine())){                System.out.println(m);            }            input.close();            socket.close();        } catch (IOException e) {            e.printStackTrace();        }    }}

NIO

java 1.4版本引入,给予缓冲区面 \ 向通道的io操作
bionio
面向流面向缓冲区(buffer)
阻塞io非阻塞io
同步同步
Selector(选择器)

缓冲区(Buffer)

缓冲区介绍

缓冲区是一个特定数据类型的容器,有java.nio包定义,所有的缓冲区都是Buffer抽象类的子类

Buffer次要用于和NIO通道进行通信,数据从通道读入到缓冲区,再从缓冲区读取到通道

Buffer就像是一个数据能够保留多个类型雷同的数据

子类

ByteBuffer CharBuffer ShortBuffer IntBuffer LongBuffer FloatBuffer DoubleBuffer

根本属性

1.容量(capacity):示意缓冲区的最大容量 一旦创立不能批改
2.限度(limit):第一个不可读的索引,即位于limit前面的数据不可读
3.地位(position):下一个要读取或写入数据的索引
4.flip:将此时的position设为limit,position置为0 ,个别是从inputChannel将数据读入到buffer 而后将buffer flip后 为了从buffer中读取数据outputChannel
5.标记(mark)和复原(reset):标记是一个索引,通过Buffer.mark()指定一个特定的地位,应用reset办法能够复原到这个地位

public class BufferSample {    public static void main(String[] args) {        ByteBuffer buffer = ByteBuffer.allocate(1024);        System.out.println("capacity:" + buffer.capacity());        System.out.println("limit:" + buffer.limit(10));        System.out.println("position:" +  buffer.position());        /**         * 后果:         * capacity:1024         * limit:java.nio.HeapByteBuffer[pos=0 lim=10 cap=1024]         * position:0         */        System.out.println("==============================");        String str = "hello";        buffer.put(str.getBytes());        System.out.println("position:" +  buffer.position());        /**         * 后果:         * position:5         */        System.out.println("==============================");        System.out.println("pos 和 limit之间元素的个数:" + buffer.remaining());        buffer.mark();        buffer.put("oo".getBytes());        System.out.println("reset前position:" +  buffer.position());        buffer.reset();        System.out.println("reset后position:" +  buffer.position());        /**         * 后果:         * pos 和 limit之间元素的个数:5         * reset前position:7         * reset后position:5         */        System.out.println("==============================");        buffer.rewind();        System.out.println("position:" + buffer.position());        /**         * 后果:         * position:0         */        System.out.println("==============================");        byte[] dst = new byte[3];        buffer.get(dst);        System.out.println(new String(dst));        System.out.println("position:" + buffer.position());        /**         * 后果:         * hel         * position:3         */        System.out.println("==============================");        //将此时的position转为limit,并将position置为0 - 个别flip当前就是开始读取缓冲区类        buffer.flip();        System.out.println("capacity:" + buffer.capacity());        System.out.println("limit:" + buffer.limit());        System.out.println("position:" +  buffer.position());        byte[] b = new byte[buffer.limit()];        buffer.get(b,0,2);        System.out.println(new String(b,0,2));        /**         * 后果:         * capacity:1024         * limit:3         * position:0         * he         */    }}

间接/非间接缓冲区

  • 间接缓冲区:程序间接操作物理映射文件
  • 非间接缓冲区:jvm - 操作系统 - 物理内存

通道(Channel)

Channel:相似于流,然而Channel不能间接拜访数据,只能与缓冲区进行交互

通道主体实现类

1.FileChannel:用于读取 写入 映射和操作文件的通道
2.DataGramChannel:通过UDP读取网络中的数据通道
3.SocketChannel:通过Tcp读写通道的数据
4.ServerSocketChannel:能够监听新进入的Tcp连贯,对每一个新连贯创立一个SocketChannel

提供getChannel()办法的类

1.FileInputStream
2.FileOutputStream
3.RandomAccessFile
4.Socket
5.ServerSocket
6.DataGramSocket

通道间接传输

1.transferFrom()
2.transferTo()

public class ChannelSimple {    /**     * 利用通道实现文件复制(非间接缓冲区)     */    public static void FileNoDirectBufferTest(){        try {            //创立输入输出流            FileInputStream inputStream = new FileInputStream("../test.txt");            FileOutputStream outputStream = new FileOutputStream("../test2.txt");            //依据流获取通道            FileChannel inputChannel = inputStream.getChannel();            FileChannel outputChannel = outputStream.getChannel();            //创立缓冲区            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);            //从通道读取数据到缓冲区            while (-1 != inputChannel.read(byteBuffer)){                //limit - position,position - 0                byteBuffer.flip();                //将缓冲区中的数据写出                outputChannel.write(byteBuffer);                byteBuffer.clear();            }            outputChannel.close();            inputChannel.close();            outputStream.close();            inputStream.close();        } catch (IOException  e) {            e.printStackTrace();        }    }    /**     * 利用间接缓冲区实现文件复制(内存映射文件)     * @throws IOException     */    public static void FileMpDirectBufferTest() throws IOException{        //创立通道        FileChannel inputChannel = FileChannel.open(Paths.get("../test.txt"), StandardOpenOption.READ);        FileChannel outputChannel = FileChannel.open(Paths.get("../test2.txt"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ);        //内存映射文件        MappedByteBuffer inputBuffer = inputChannel.map(FileChannel.MapMode.READ_ONLY,0,inputChannel.size());        MappedByteBuffer outputBuffer =  outputChannel.map(FileChannel.MapMode.READ_WRITE,0,inputChannel.size());        //间接对缓冲区进行数据读写操作        byte [] dst = new byte[inputBuffer.limit()];        inputBuffer.get(dst);        outputBuffer.put(dst);        outputChannel.close();        inputChannel.close();    }    /**     * 利用间接缓冲区复制     * @throws IOException     */    public static void FileDirectBufferTest() throws IOException {        //创立通道        FileChannel inputChannel = FileChannel.open(Paths.get("../test.txt"), StandardOpenOption.READ);        FileChannel outputChannel = FileChannel.open(Paths.get("../test2.txt"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ);        //inputChannel.transferTo(0,inputChannel.size(),outputChannel);        //等同 下面的正文        outputChannel.transferFrom(inputChannel,0,inputChannel.size());        outputChannel.close();        inputChannel.close();    }}

扩散读取和汇集写入

  • 扩散读取(Scatter):将一个Channel 中的数据扩散贮存到多个Buffer
  • 汇集写入(Gather):将多个Buffer中的数据写入同一个Channel
public class ScatterAndGather {    public static void main(String[] args) {        try {            //创立输入输出流            FileInputStream inputStream = new FileInputStream("../test.txt");            FileOutputStream outputStream = new FileOutputStream("../test2.txt");            //依据流获取通道            FileChannel inputChannel = inputStream.getChannel();            FileChannel outputChannel = outputStream.getChannel();            //创立缓冲区            ByteBuffer byteBuffer1 = ByteBuffer.allocate((int)inputChannel.size()/2);            ByteBuffer byteBuffer2 = ByteBuffer.allocate((int)inputChannel.size()/2);            ByteBuffer[] byteBuffers = new ByteBuffer[]{byteBuffer1,byteBuffer2};            //从通道读取数据到缓冲区 - 扩散写入            while (-1 != inputChannel.read(byteBuffers)){                for (ByteBuffer buffer:byteBuffers){                    //limit - position,position - 0                    buffer.flip();                }                //汇集写出                for (ByteBuffer buffer:byteBuffers) {                    //将缓冲区中的数据写出                    outputChannel.write(buffer);                    buffer.clear();                }            }            outputChannel.close();            inputChannel.close();            outputStream.close();            inputStream.close();        } catch (IOException e) {            e.printStackTrace();        }    }}

选择器(Selector)

Selector个别被称为选择器,也被称为多路复用器.用于查看一个或多个通道是否处于可读\可写,如此能够实现一个线程治理多个Channel

应用Selector带来的益处

应用更少的线程来解决Channel,能够避免上下文切换带来的性能耗费

能够多路复用的Channel

能够被抉择(多路复用)的Channel都继承自SelectableChannel

                         SelectableChannel                               ||                    AbstractSelectableChannel                  ||           ||            ||      DataGramChannel    SocketChannel     ServerSocketChannel

所以FileChannel不适应与Selector,即不能切换为非阻塞模式

Selector应用根本步骤

1.创立Selector: Selector selector = Selector.open();
2.设置为非阻塞为:

`channel.configureBlocking(false);`

3.注册ChannelSelector:

/*** 参数-1:要注册到的多路复用器* 参数-2:是一个"interest汇合",即要监听事件的汇合(有以下四种)* OP_CONNECT 连贯    * OP_ACEEPT  接管* OP_READ    读* OP_WRITE   写*/SelectionKey key = channel.register(selector,SelectionKey.OP_READ);如果要监听多种事件如下:SelectionKey key = channel.register(selector,SelectionKey.OP_CONNECT | SelectionKey.OP_READ); 

4.而后就 连贯就绪 | 接管就绪 | 读就绪 | 写就绪

Selector次要办法

办法形容
Set<SelectKey> keys()返回所有SelectionKey汇合,代表 注册在这个Selector上Channel
Set<SelectKey> selectedKeys()返回已抉择了的(即有io操作的)SelectionKey
int select()监控所有注册了的Channel,如果有须要 io的操作时会将对应的selectKey退出到 selectedKeys汇合中,返回的则是被抉择 (有io操作的)Channel数量,这个操作时阻 塞的即只有被抉择的Channel数量>=1才 返回
int select(timeout)有超时时长,始终没有io操作的Channel呈现, 达到timeout呈现的工夫后将主动返回
int selectNow()无阻塞 立刻返回
Selector wakeUp()使正在select()立刻返回
void close()敞开

SelectionKey次要办法

SelectionKey示意ChannelSelector之间的关系,ChannelSelector注册就会产生一个SelectionKey

办法形容
int interestOps()感兴趣事件的汇合 boolean isInterested = interestSet & SelectionKey.OP_CONNECT ...
int readyOps()获取通道筹备好就绪的操作
SelectableChannel channel()获取注册通道
Selector selector()获取选择器
boolean isConnectable()检测Channel中是否有连贯事件就绪
boolean isAcceptable()检测Channel中是否有接管事件就绪
boolean isReadaable()检测Channel中是否有读事件就绪
boolean isWriteable()检测Channel中是否有写事件就绪
Object attach()将一个对象附着到SelectionKey上, 次要是一些用于标识的信息
Object attachment()获取注册信息 也能够在Channel注册的时候附着信息 SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
void cancel()申请勾销此键的通道到其选择器的注册

NioServer

public class NioServer {    public static void main(String[] args) throws IOException {        Integer flag = 0;        //创立服务端通道        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();        //非阻塞模式        serverSocketChannel.configureBlocking(false);        //绑定端口        serverSocketChannel.bind(new InetSocketAddress(9021));        //创立选择器        Selector selector = Selector.open();        //注册 接管        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        //有一个事件时就操作        while (selector.select() > 0) {            //获取事件汇合            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();            while (iterator.hasNext()) {                SelectionKey selectionKey = iterator.next();                //如果是接管就绪                if (selectionKey.isAcceptable()) {                    //获取客户端连贯                    SocketChannel socketChannel = serverSocketChannel.accept();                    //切换成非阻塞                    socketChannel.configureBlocking(false);                    //注册在多路复用器上 读                    socketChannel.register(selector, SelectionKey.OP_READ);                    //读事件                } else if (selectionKey.isReadable()) {                    //获取客户端连贯                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();                    //设置缓存                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);                    int len = 0;                    while (-1 != (len = socketChannel.read(byteBuffer))) {                        flag = 0;                        byteBuffer.flip();                        System.out.println(new String(byteBuffer.array(),0,len));                        byteBuffer.clear();                    }                    flag++;                    //判断此时是否有io事件,陷入空轮询 - 间断空轮询100次                    //申请勾销此键的通道在其选择器的注册,也就是 selector.select();的数量 -1                    if(flag == 100){                        selectionKey.cancel();                        socketChannel.close();                    }                }            }            iterator.remove();        }    }}

NioClient

package com.yuan.nio.selector;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;public class NioClient {    public static void main(String[] args) {        try {            SocketChannel socketChannel = SocketChannel.open();            socketChannel.connect(new InetSocketAddress("localhost",9021));            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);            byteBuffer.put("hello".getBytes());            byteBuffer.flip();            socketChannel.write(byteBuffer);            socketChannel.close();        } catch (IOException e) {            e.printStackTrace();        }    }}