关于java:最佳实践NIO知识梳理

1、概述

NIO(Non-blocking I/O,在Java畛域,也称为New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的根底,曾经被越来越多地利用到大型应用服务器,成为解决高并发与大量连贯、I/O解决问题的无效形式。

外围组件次要包含:

  • Channel
  • Buffer
  • Selector
Java NIO: Channels and Buffers(通道和缓冲区)

规范的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

Java NIO: Asynchronous IO(异步IO)

Java NIO能够让你异步的应用IO,例如:当线程从通道读取数据到缓冲区时,线程还是能够进行其余事件。当数据被写入到缓冲区时,线程能够持续解决它。从缓冲区写入通道也相似。

Java NIO: Selectors(选择器)

Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比方:连贯关上,数据达到)。因而,单个的线程能够监听多个数据通道。

2、根底组件

2.1、Channel

ChannelStream 的区别

  • 你能够通过 Channel 进行读和写,但只能从Stream中,单向获取数据(读或写)
  • Channels 能够进行异步
  • Channel 通常都是基于 Buffer 进行读或写

ChannelJava NIO 中的一些 具体实现

  • FileChannel : 通过文件获取数据
  • DatagramChannel : 通过UDP进行网络数据传输
  • SocketChannel : 通过TCP进行网络数据传输
  • ServerSocketChannel : 通监听TCP链接,进行数据链接。每个链接都会创立一个 ServerSocketChannel。

FileChannel Demo

    RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
    FileChannel inChannel = aFile.getChannel();

    ByteBuffer buf = ByteBuffer.allocate(48);

    int bytesRead = inChannel.read(buf);
    while (bytesRead != -1) {

      System.out.println("Read " + bytesRead);
      buf.flip();

      while(buf.hasRemaining()){
          System.out.print((char) buf.get());
      }

      buf.clear();
      bytesRead = inChannel.read(buf);
    }
    aFile.close();

2.2、Buffer

Buffer 实质上是一个内存块,您能够在其中写入数据,而后能够在当前再次读取。该内存块包装在NIO Buffer对象中,该对象提供了一组办法,能够更轻松地应用该局部数据。

Buffer 中三个次要属性:

  • capacity : Buffer 大小
  • position : 以后操作下标
  • limit : 数据大小

罕用办法:

  • flip() : 写数据实现后,将limit设置到数据末端, position设置为0,进行读操作
  • rewind() : 从新设置 position 为0 ,进行读操作

BufferJava NIO 中的一些 具体实现

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

2.3、Selector

Selector 的劣势在于,只须要应用一个线程,就能够管控多个 channel ,线程之间的切换对于操作系统来说是低廉的,并且每个线程也占用操作系统中的一些资源(内存)。因而,应用的线程越少越好。

须要留神的是, channel 必须设置成 non-blocking 的模式,因而对于 FileChannel 不实用以后模式

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

SelectionKey 的 四种状态

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

3、NIO 实现RPC通信

3.1、RpcServer

1、创立 ServerSocketChannel 监听信息

public class RPCServer {
    private Map<String, Object> services = new HashMap<>();
    private Selector selector;
    private ServerSocketChannel ssc;

    public RPCServer() {
        try {
            ssc = ServerSocketChannel.open();
            InetSocketAddress address = new InetSocketAddress(3003);
            ssc.configureBlocking(false);
            ssc.bind(address);
            selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2、提供服务类(HelloService.class)

public interface HelloService {

    String sayHello();

    void sayHi(String temp);
}

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello() {
        return "jiuling";
    }

    @Override
    public void sayHi(String temp) {
        System.out.println(temp);
    }
}

3、ServerSocketChannel 监听申请

 public void start() {
        System.out.println("-----开始监听申请------");
        try {
            while (selector.select() > 0) {
                for (SelectionKey sk : selector.selectedKeys()) {
                    selector.selectedKeys().remove(sk);
                    if (sk.isAcceptable()) {
                        // 通过accept办法,获取对应的SocketChannel
                        SocketChannel sc = ssc.accept();
                        // 设置采纳非阻塞模式
                        sc.configureBlocking(false);
                        // 将channel 状态设置为可读状态
                        sc.register(selector, SelectionKey.OP_READ);
                        sk.interestOps(SelectionKey.OP_ACCEPT);
                    } else if (sk.isReadable()) {
                    
                        SocketChannel sc = (SocketChannel) sk.channel();
                        try {
                            //调用反射办法
                            remoteHandMethod(sk, sc);
                        } catch (Exception e) {
                            //从Selector中删除指定的SelectionKey
                            sk.cancel();
                            if (sk.channel() != null) {
                                sk.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

4、remoteHandMethod反射调用

private void remoteHandMethod(SelectionKey sk,
                                  SocketChannel sc) throws Exception {
      //1、从流中读取数据                            
      ByteBuffer buff = ByteBuffer.allocate(1024);
        sc.read(buff);

        int postion = buff.position();//这里获取它真正的大小
        byte[] data = buff.array();
        String message = new String(data, 0, postion);// class/办法名(参数类型:参数,参数类型:参数)
        message = message.trim();
    
      //2、解析数据,获取申请内容()
        String[] clazzInfo = message.split("/");
        String className = clazzInfo[0];
        String methodName = clazzInfo[1].substring(0,
                clazzInfo[1].indexOf("("));
        String temp = clazzInfo[1].substring(
                clazzInfo[1].indexOf("(") + 1,
                clazzInfo[1].indexOf(")"));
        String typeValues = decodeParamsTypeAndValue(temp);
       
       //3、反射调用
      Object object = services.get(className);
      Class clazz = object.getClass();
      Object result = null;                    
     
     if (typeValues == null) {
            Method method = clazz.getDeclaredMethod(methodName,
                    null);
            result = method.invoke(object, null);
        } else {
            Class[] types = new Class[typeValues.length];
            Object[] values = new Object[typeValues.length];
            for (int i = 0; i < typeValues.length; i++) {
                String[] tv = typeValues[i].split(":");
                String type = tv[0];
                String value = tv[1];
                types[i] = Class.forName(type);
                if (type.contains("Integer") || type.contains("int"))
                    values[i] = Integer.parseInt(value);
                else if (type.contains("Float") || type.contains("float"))
                    values[i] = Float.parseFloat(value);
                else if (type.contains("Double") || type.contains("double"))
                    values[i] = Double.parseDouble(value);
                else if (type.contains("Long") || type.contains("long"))
                    values[i] = Long.parseLong(value);
                else
                    values[i] = value;
            }
            Method method = clazz.getDeclaredMethod(methodName,
                    types);
            result = method.invoke(object, values);
        }
        
        
       //4、 返回内容
        sc.write(ByteBuffer.wrap(result.toString()
                .getBytes()));
        sk.interestOps(SelectionKey.OP_READ);
                                  
}


    // 它返回的格局是 参数类型:参数值
    private String[] decodeParamsTypeAndValue(String params) {
        if (params == null || params.equals(""))
            return null;
        if (params.indexOf(",") < 0)
            return new String[]{params};
        return params.split(",");


    }

3.2、RpcClient

1、创立客户端

public class RPCClient {
    private SocketChannel channel;
    private ByteBuffer buffer = ByteBuffer.allocate(1024);
    private static RPCClient client = new RPCClient();
    private Selector selector = null;

    public RPCClient(String serverIp) {
     try {
            System.out.println("------客户端要启动了--------");
            selector = Selector.open();
            InetSocketAddress isa = new InetSocketAddress(serverIp, 3003);
// 获取socket通道
            channel = SocketChannel.open(isa);
// 连贯服务器
            channel.configureBlocking(false);


            channel.register(selector, SelectionKey.OP_READ);


        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    
    }

}

2、获取代理类

   // 获取代理
    public Object getRemoteProxy(final Class clazz) {
//动静产生实现类

        return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {


            @Override
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {

                String methodName = method.getName();
                String clazzName = clazz
                        .getSimpleName();
                Object result = null;
                if (args == null || args.length == 0) {// 示意没有参数 它传递的类型
// 接口名/办法名()
                    channel.write(ByteBuffer
                            .wrap((clazzName + "/" + methodName + "()")
                                    .getBytes()));
                } else {
                    int size = args.length;
                    String[] types = new String[size];
                    StringBuffer content = new StringBuffer(clazzName)
                            .append("/").append(methodName).append("(");
                    for (int i = 0; i < size; i++) {
                        types[i] = args[i].getClass().getName();
                        content.append(types[i]).append(":").append(args[i]);
                        if (i != size - 1)
                            content.append(",");
                    }
                    content.append(")");
                    channel.write(ByteBuffer
                            .wrap(content.toString().getBytes()));
                }
// 获取后果
                result = getresult();


                return result;
            }
        });


    }

3、获取返回后果

private Object getresult() {
// 解析后果 如果结尾为null或NULL则疏忽
        try {
            while (selector.select() > 0) {
                for (SelectionKey sk : selector.selectedKeys()) {
                    selector.selectedKeys().remove(sk);
                    if (sk.isReadable()) {
                        SocketChannel sc = (SocketChannel) sk.channel();
                        buffer.clear();
                        sc.read(buffer);
                        int postion = buffer.position();

                        String result = new String(buffer.array(), 0, postion);
                        result = result.trim();
                        buffer.clear();

                        if (result.endsWith("null") || result.endsWith("NULL"))
                            return null;


                        String[] typeValue = result.split(":");
                        String type = typeValue[0];
                        String value = typeValue[1];
                        if (type.contains("Integer") || type.contains("int"))
                            return Integer.parseInt(value);
                        else if (type.contains("Float")
                                || type.contains("float"))
                            return Float.parseFloat(value);
                        else if (type.contains("Long") || type.contains("long"))
                            return Long.parseLong(value);
                        else
                            return value;
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return null;
    }

残缺代码能够关注公众号获取~

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理