乐趣区

关于java:最佳实践NIO知识梳理

1、概述

NIO(Non-blocking I/O,在 Java 畛域,也称为 New I/O),是一种同步非阻塞的 I / O 模型,也是 I / O 多路复用的根底,曾经被越来越多地利用到大型应用服务器,成为解决高并发与大量连贯、I/ O 解决问题的无效形式。

外围组件次要包含:

  • Channel
  • Buffer
  • Selector
Java NIO: Channels and Buffers(通道和缓冲区)

规范的 IO 基于字节流和字符流进行操作的,而 NIO 是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

Java NIO: Asynchronous IO(异步 IO)

Java NIO 能够让你异步的应用 IO,例如:当线程从通道读取数据到缓冲区时,线程还是能够进行其余事件。当数据被写入到缓冲区时,线程能够持续解决它。从缓冲区写入通道也相似。

Java NIO: Selectors(选择器)

Java NIO 引入了选择器的概念,选择器用于监听多个通道的事件(比方:连贯关上,数据达到)。因而,单个的线程能够监听多个数据通道。

2、根底组件

2.1、Channel

ChannelStream 的区别

  • 你能够通过 Channel 进行读和写,但只能从 Stream 中,单向获取数据(读或写)
  • Channels 能够进行异步
  • Channel 通常都是基于 Buffer 进行读或写

ChannelJava NIO 中的一些 具体实现

  • FileChannel : 通过文件获取数据
  • DatagramChannel:通过 UDP 进行网络数据传输
  • SocketChannel:通过 TCP 进行网络数据传输
  • ServerSocketChannel:通监听 TCP 链接,进行数据链接。每个链接都会创立一个 ServerSocketChannel。

FileChannel Demo

    RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
    FileChannel inChannel = aFile.getChannel();

    ByteBuffer buf = ByteBuffer.allocate(48);

    int bytesRead = inChannel.read(buf);
    while (bytesRead != -1) {System.out.println("Read" + bytesRead);
      buf.flip();

      while(buf.hasRemaining()){System.out.print((char) buf.get());
      }

      buf.clear();
      bytesRead = inChannel.read(buf);
    }
    aFile.close();

2.2、Buffer

Buffer 实质上是一个内存块,您能够在其中写入数据,而后能够在当前再次读取。该内存块包装在 NIO Buffer 对象中,该对象提供了一组办法,能够更轻松地应用该局部数据。

Buffer 中三个次要属性:

  • capacity : Buffer 大小
  • position : 以后操作下标
  • limit:数据大小

罕用办法:

  • flip() : 写数据实现后,将 limit 设置到数据末端,position设置为 0,进行读操作
  • rewind() : 从新设置 position 为 0,进行读操作

BufferJava NIO 中的一些 具体实现

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

2.3、Selector

Selector 的劣势在于,只须要应用一个线程,就能够管控多个 channel , 线程之间的切换对于操作系统来说是低廉的,并且每个线程也占用操作系统中的一些资源(内存)。因而,应用的线程越少越好。

须要留神的是,channel 必须设置成 non-blocking 的模式,因而对于 FileChannel 不实用以后模式

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

SelectionKey 的 四种状态

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

3、NIO 实现 RPC 通信

3.1、RpcServer

1、创立 ServerSocketChannel 监听信息

public class RPCServer {private Map<String, Object> services = new HashMap<>();
    private Selector selector;
    private ServerSocketChannel ssc;

    public RPCServer() {
        try {ssc = ServerSocketChannel.open();
            InetSocketAddress address = new InetSocketAddress(3003);
            ssc.configureBlocking(false);
            ssc.bind(address);
            selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);

        } catch (Exception e) {throw new RuntimeException(e);
        }
    }
}

2、提供服务类(HelloService.class)

public interface HelloService {String sayHello();

    void sayHi(String temp);
}

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello() {return "jiuling";}

    @Override
    public void sayHi(String temp) {System.out.println(temp);
    }
}

3、ServerSocketChannel 监听申请

 public void start() {System.out.println("----- 开始监听申请 ------");
        try {while (selector.select() > 0) {for (SelectionKey sk : selector.selectedKeys()) {selector.selectedKeys().remove(sk);
                    if (sk.isAcceptable()) {
                        // 通过 accept 办法,获取对应的 SocketChannel
                        SocketChannel sc = ssc.accept();
                        // 设置采纳非阻塞模式
                        sc.configureBlocking(false);
                        // 将 channel 状态设置为可读状态
                        sc.register(selector, SelectionKey.OP_READ);
                        sk.interestOps(SelectionKey.OP_ACCEPT);
                    } else if (sk.isReadable()) {SocketChannel sc = (SocketChannel) sk.channel();
                        try {
                            // 调用反射办法
                            remoteHandMethod(sk, sc);
                        } catch (Exception e) {
                            // 从 Selector 中删除指定的 SelectionKey
                            sk.cancel();
                            if (sk.channel() != null) {sk.channel().close();}
                        }
                    }
                }
            }
        } catch (Exception e) {throw new RuntimeException(e);
        }
    }

4、remoteHandMethod反射调用

private void remoteHandMethod(SelectionKey sk,
                                  SocketChannel sc) throws Exception {
      //1、从流中读取数据                            
      ByteBuffer buff = ByteBuffer.allocate(1024);
        sc.read(buff);

        int postion = buff.position();// 这里获取它真正的大小
        byte[] data = buff.array();
        String message = new String(data, 0, postion);// class/ 办法名(参数类型: 参数, 参数类型: 参数)
        message = message.trim();
    
      //2、解析数据,获取申请内容()
        String[] clazzInfo = message.split("/");
        String className = clazzInfo[0];
        String methodName = clazzInfo[1].substring(0,
                clazzInfo[1].indexOf("("));
        String temp = clazzInfo[1].substring(clazzInfo[1].indexOf("(") + 1,
                clazzInfo[1].indexOf(")"));
        String typeValues = decodeParamsTypeAndValue(temp);
       
       //3、反射调用
      Object object = services.get(className);
      Class clazz = object.getClass();
      Object result = null;                    
     
     if (typeValues == null) {
            Method method = clazz.getDeclaredMethod(methodName,
                    null);
            result = method.invoke(object, null);
        } else {Class[] types = new Class[typeValues.length];
            Object[] values = new Object[typeValues.length];
            for (int i = 0; i < typeValues.length; i++) {String[] tv = typeValues[i].split(":");
                String type = tv[0];
                String value = tv[1];
                types[i] = Class.forName(type);
                if (type.contains("Integer") || type.contains("int"))
                    values[i] = Integer.parseInt(value);
                else if (type.contains("Float") || type.contains("float"))
                    values[i] = Float.parseFloat(value);
                else if (type.contains("Double") || type.contains("double"))
                    values[i] = Double.parseDouble(value);
                else if (type.contains("Long") || type.contains("long"))
                    values[i] = Long.parseLong(value);
                else
                    values[i] = value;
            }
            Method method = clazz.getDeclaredMethod(methodName,
                    types);
            result = method.invoke(object, values);
        }
        
        
       //4、返回内容
        sc.write(ByteBuffer.wrap(result.toString()
                .getBytes()));
        sk.interestOps(SelectionKey.OP_READ);
                                  
}


    // 它返回的格局是 参数类型:参数值
    private String[] decodeParamsTypeAndValue(String params) {if (params == null || params.equals(""))
            return null;
        if (params.indexOf(",") < 0)
            return new String[]{params};
        return params.split(",");


    }

3.2、RpcClient

1、创立客户端

public class RPCClient {
    private SocketChannel channel;
    private ByteBuffer buffer = ByteBuffer.allocate(1024);
    private static RPCClient client = new RPCClient();
    private Selector selector = null;

    public RPCClient(String serverIp) {
     try {System.out.println("------ 客户端要启动了 --------");
            selector = Selector.open();
            InetSocketAddress isa = new InetSocketAddress(serverIp, 3003);
// 获取 socket 通道
            channel = SocketChannel.open(isa);
// 连贯服务器
            channel.configureBlocking(false);


            channel.register(selector, SelectionKey.OP_READ);


        } catch (Exception e) {throw new RuntimeException(e);
        }
    
    }

}

2、获取代理类

   // 获取代理
    public Object getRemoteProxy(final Class clazz) {
// 动静产生实现类

        return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {


            @Override
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {String methodName = method.getName();
                String clazzName = clazz
                        .getSimpleName();
                Object result = null;
                if (args == null || args.length == 0) {// 示意没有参数 它传递的类型
// 接口名 / 办法名()
                    channel.write(ByteBuffer
                            .wrap((clazzName + "/" + methodName + "()")
                                    .getBytes()));
                } else {
                    int size = args.length;
                    String[] types = new String[size];
                    StringBuffer content = new StringBuffer(clazzName)
                            .append("/").append(methodName).append("(");
                    for (int i = 0; i < size; i++) {types[i] = args[i].getClass().getName();
                        content.append(types[i]).append(":").append(args[i]);
                        if (i != size - 1)
                            content.append(",");
                    }
                    content.append(")");
                    channel.write(ByteBuffer
                            .wrap(content.toString().getBytes()));
                }
// 获取后果
                result = getresult();


                return result;
            }
        });


    }

3、获取返回后果

private Object getresult() {
// 解析后果 如果结尾为 null 或 NULL 则疏忽
        try {while (selector.select() > 0) {for (SelectionKey sk : selector.selectedKeys()) {selector.selectedKeys().remove(sk);
                    if (sk.isReadable()) {SocketChannel sc = (SocketChannel) sk.channel();
                        buffer.clear();
                        sc.read(buffer);
                        int postion = buffer.position();

                        String result = new String(buffer.array(), 0, postion);
                        result = result.trim();
                        buffer.clear();

                        if (result.endsWith("null") || result.endsWith("NULL"))
                            return null;


                        String[] typeValue = result.split(":");
                        String type = typeValue[0];
                        String value = typeValue[1];
                        if (type.contains("Integer") || type.contains("int"))
                            return Integer.parseInt(value);
                        else if (type.contains("Float")
                                || type.contains("float"))
                            return Float.parseFloat(value);
                        else if (type.contains("Long") || type.contains("long"))
                            return Long.parseLong(value);
                        else
                            return value;
                    }
                }
            }
        } catch (Exception e) {throw new RuntimeException(e);
        }
        return null;
    }

残缺代码能够关注公众号获取~

退出移动版