1、概述

NIO(Non-blocking I/O,在Java畛域,也称为New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的根底,曾经被越来越多地利用到大型应用服务器,成为解决高并发与大量连贯、I/O解决问题的无效形式。

外围组件次要包含:

  • Channel
  • Buffer
  • Selector
Java NIO: Channels and Buffers(通道和缓冲区)

规范的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

Java NIO: Asynchronous IO(异步IO)

Java NIO能够让你异步的应用IO,例如:当线程从通道读取数据到缓冲区时,线程还是能够进行其余事件。当数据被写入到缓冲区时,线程能够持续解决它。从缓冲区写入通道也相似。

Java NIO: Selectors(选择器)

Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比方:连贯关上,数据达到)。因而,单个的线程能够监听多个数据通道。

2、根底组件

2.1、Channel

ChannelStream 的区别

  • 你能够通过 Channel 进行读和写,但只能从Stream中,单向获取数据(读或写)
  • Channels 能够进行异步
  • Channel 通常都是基于 Buffer 进行读或写

ChannelJava NIO 中的一些 具体实现

  • FileChannel : 通过文件获取数据
  • DatagramChannel : 通过UDP进行网络数据传输
  • SocketChannel : 通过TCP进行网络数据传输
  • ServerSocketChannel : 通监听TCP链接,进行数据链接。每个链接都会创立一个 ServerSocketChannel。

FileChannel Demo

    RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");    FileChannel inChannel = aFile.getChannel();    ByteBuffer buf = ByteBuffer.allocate(48);    int bytesRead = inChannel.read(buf);    while (bytesRead != -1) {      System.out.println("Read " + bytesRead);      buf.flip();      while(buf.hasRemaining()){          System.out.print((char) buf.get());      }      buf.clear();      bytesRead = inChannel.read(buf);    }    aFile.close();

2.2、Buffer

Buffer 实质上是一个内存块,您能够在其中写入数据,而后能够在当前再次读取。该内存块包装在NIO Buffer对象中,该对象提供了一组办法,能够更轻松地应用该局部数据。

Buffer 中三个次要属性:

  • capacity : Buffer 大小
  • position : 以后操作下标
  • limit : 数据大小

罕用办法:

  • flip() : 写数据实现后,将limit设置到数据末端, position设置为0,进行读操作
  • rewind() : 从新设置 position 为0 ,进行读操作

BufferJava NIO 中的一些 具体实现

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

2.3、Selector

Selector 的劣势在于,只须要应用一个线程,就能够管控多个 channel ,线程之间的切换对于操作系统来说是低廉的,并且每个线程也占用操作系统中的一些资源(内存)。因而,应用的线程越少越好。

须要留神的是, channel 必须设置成 non-blocking 的模式,因而对于 FileChannel 不实用以后模式

channel.configureBlocking(false);SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

SelectionKey 的 四种状态

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

3、NIO 实现RPC通信

3.1、RpcServer

1、创立 ServerSocketChannel 监听信息

public class RPCServer {    private Map<String, Object> services = new HashMap<>();    private Selector selector;    private ServerSocketChannel ssc;    public RPCServer() {        try {            ssc = ServerSocketChannel.open();            InetSocketAddress address = new InetSocketAddress(3003);            ssc.configureBlocking(false);            ssc.bind(address);            selector = Selector.open();            ssc.register(selector, SelectionKey.OP_ACCEPT);        } catch (Exception e) {            throw new RuntimeException(e);        }    }}

2、提供服务类(HelloService.class)

public interface HelloService {    String sayHello();    void sayHi(String temp);}public class HelloServiceImpl implements HelloService {    @Override    public String sayHello() {        return "jiuling";    }    @Override    public void sayHi(String temp) {        System.out.println(temp);    }}

3、ServerSocketChannel 监听申请

 public void start() {        System.out.println("-----开始监听申请------");        try {            while (selector.select() > 0) {                for (SelectionKey sk : selector.selectedKeys()) {                    selector.selectedKeys().remove(sk);                    if (sk.isAcceptable()) {                        // 通过accept办法,获取对应的SocketChannel                        SocketChannel sc = ssc.accept();                        // 设置采纳非阻塞模式                        sc.configureBlocking(false);                        // 将channel 状态设置为可读状态                        sc.register(selector, SelectionKey.OP_READ);                        sk.interestOps(SelectionKey.OP_ACCEPT);                    } else if (sk.isReadable()) {                                            SocketChannel sc = (SocketChannel) sk.channel();                        try {                            //调用反射办法                            remoteHandMethod(sk, sc);                        } catch (Exception e) {                            //从Selector中删除指定的SelectionKey                            sk.cancel();                            if (sk.channel() != null) {                                sk.channel().close();                            }                        }                    }                }            }        } catch (Exception e) {            throw new RuntimeException(e);        }    }

4、remoteHandMethod反射调用

private void remoteHandMethod(SelectionKey sk,                                  SocketChannel sc) throws Exception {      //1、从流中读取数据                                  ByteBuffer buff = ByteBuffer.allocate(1024);        sc.read(buff);        int postion = buff.position();//这里获取它真正的大小        byte[] data = buff.array();        String message = new String(data, 0, postion);// class/办法名(参数类型:参数,参数类型:参数)        message = message.trim();          //2、解析数据,获取申请内容()        String[] clazzInfo = message.split("/");        String className = clazzInfo[0];        String methodName = clazzInfo[1].substring(0,                clazzInfo[1].indexOf("("));        String temp = clazzInfo[1].substring(                clazzInfo[1].indexOf("(") + 1,                clazzInfo[1].indexOf(")"));        String typeValues = decodeParamsTypeAndValue(temp);              //3、反射调用      Object object = services.get(className);      Class clazz = object.getClass();      Object result = null;                              if (typeValues == null) {            Method method = clazz.getDeclaredMethod(methodName,                    null);            result = method.invoke(object, null);        } else {            Class[] types = new Class[typeValues.length];            Object[] values = new Object[typeValues.length];            for (int i = 0; i < typeValues.length; i++) {                String[] tv = typeValues[i].split(":");                String type = tv[0];                String value = tv[1];                types[i] = Class.forName(type);                if (type.contains("Integer") || type.contains("int"))                    values[i] = Integer.parseInt(value);                else if (type.contains("Float") || type.contains("float"))                    values[i] = Float.parseFloat(value);                else if (type.contains("Double") || type.contains("double"))                    values[i] = Double.parseDouble(value);                else if (type.contains("Long") || type.contains("long"))                    values[i] = Long.parseLong(value);                else                    values[i] = value;            }            Method method = clazz.getDeclaredMethod(methodName,                    types);            result = method.invoke(object, values);        }                       //4、 返回内容        sc.write(ByteBuffer.wrap(result.toString()                .getBytes()));        sk.interestOps(SelectionKey.OP_READ);                                  }    // 它返回的格局是 参数类型:参数值    private String[] decodeParamsTypeAndValue(String params) {        if (params == null || params.equals(""))            return null;        if (params.indexOf(",") < 0)            return new String[]{params};        return params.split(",");    }

3.2、RpcClient

1、创立客户端

public class RPCClient {    private SocketChannel channel;    private ByteBuffer buffer = ByteBuffer.allocate(1024);    private static RPCClient client = new RPCClient();    private Selector selector = null;    public RPCClient(String serverIp) {     try {            System.out.println("------客户端要启动了--------");            selector = Selector.open();            InetSocketAddress isa = new InetSocketAddress(serverIp, 3003);// 获取socket通道            channel = SocketChannel.open(isa);// 连贯服务器            channel.configureBlocking(false);            channel.register(selector, SelectionKey.OP_READ);        } catch (Exception e) {            throw new RuntimeException(e);        }        }}

2、获取代理类

   // 获取代理    public Object getRemoteProxy(final Class clazz) {//动静产生实现类        return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {            @Override            public Object invoke(Object proxy, Method method, Object[] args)                    throws Throwable {                String methodName = method.getName();                String clazzName = clazz                        .getSimpleName();                Object result = null;                if (args == null || args.length == 0) {// 示意没有参数 它传递的类型// 接口名/办法名()                    channel.write(ByteBuffer                            .wrap((clazzName + "/" + methodName + "()")                                    .getBytes()));                } else {                    int size = args.length;                    String[] types = new String[size];                    StringBuffer content = new StringBuffer(clazzName)                            .append("/").append(methodName).append("(");                    for (int i = 0; i < size; i++) {                        types[i] = args[i].getClass().getName();                        content.append(types[i]).append(":").append(args[i]);                        if (i != size - 1)                            content.append(",");                    }                    content.append(")");                    channel.write(ByteBuffer                            .wrap(content.toString().getBytes()));                }// 获取后果                result = getresult();                return result;            }        });    }

3、获取返回后果

private Object getresult() {// 解析后果 如果结尾为null或NULL则疏忽        try {            while (selector.select() > 0) {                for (SelectionKey sk : selector.selectedKeys()) {                    selector.selectedKeys().remove(sk);                    if (sk.isReadable()) {                        SocketChannel sc = (SocketChannel) sk.channel();                        buffer.clear();                        sc.read(buffer);                        int postion = buffer.position();                        String result = new String(buffer.array(), 0, postion);                        result = result.trim();                        buffer.clear();                        if (result.endsWith("null") || result.endsWith("NULL"))                            return null;                        String[] typeValue = result.split(":");                        String type = typeValue[0];                        String value = typeValue[1];                        if (type.contains("Integer") || type.contains("int"))                            return Integer.parseInt(value);                        else if (type.contains("Float")                                || type.contains("float"))                            return Float.parseFloat(value);                        else if (type.contains("Long") || type.contains("long"))                            return Long.parseLong(value);                        else                            return value;                    }                }            }        } catch (Exception e) {            throw new RuntimeException(e);        }        return null;    }

残缺代码能够关注公众号获取~