1、概述
NIO(Non-blocking I/O,在Java畛域,也称为New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的根底,曾经被越来越多地利用到大型应用服务器,成为解决高并发与大量连贯、I/O解决问题的无效形式。
外围组件次要包含:
- Channel
- Buffer
- Selector
Java NIO: Channels and Buffers(通道和缓冲区)
规范的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
Java NIO: Asynchronous IO(异步IO)
Java NIO能够让你异步的应用IO,例如:当线程从通道读取数据到缓冲区时,线程还是能够进行其余事件。当数据被写入到缓冲区时,线程能够持续解决它。从缓冲区写入通道也相似。
Java NIO: Selectors(选择器)
Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比方:连贯关上,数据达到)。因而,单个的线程能够监听多个数据通道。
2、根底组件
2.1、Channel
Channel
与 Stream
的区别
- 你能够通过
Channel
进行读和写,但只能从Stream
中,单向获取数据(读或写) Channels
能够进行异步 读 或 写Channel
通常都是基于Buffer
进行读或写
Channel
在 Java NIO
中的一些 具体实现
- FileChannel : 通过文件获取数据
- DatagramChannel : 通过UDP进行网络数据传输
- SocketChannel : 通过TCP进行网络数据传输
- ServerSocketChannel : 通监听TCP链接,进行数据链接。每个链接都会创立一个 ServerSocketChannel。
FileChannel Demo
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw"); FileChannel inChannel = aFile.getChannel(); ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); while (bytesRead != -1) { System.out.println("Read " + bytesRead); buf.flip(); while(buf.hasRemaining()){ System.out.print((char) buf.get()); } buf.clear(); bytesRead = inChannel.read(buf); } aFile.close();
2.2、Buffer
Buffer
实质上是一个内存块,您能够在其中写入数据,而后能够在当前再次读取。该内存块包装在NIO Buffer对象中,该对象提供了一组办法,能够更轻松地应用该局部数据。
Buffer
中三个次要属性:
- capacity : Buffer 大小
- position : 以后操作下标
- limit : 数据大小
罕用办法:
- flip() : 写数据实现后,将
limit
设置到数据末端,position
设置为0,进行读操作 - rewind() : 从新设置
position
为0 ,进行读操作
Buffer
在 Java NIO
中的一些 具体实现
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
2.3、Selector
Selector
的劣势在于,只须要应用一个线程,就能够管控多个 channel
,线程之间的切换对于操作系统来说是低廉的,并且每个线程也占用操作系统中的一些资源(内存)。因而,应用的线程越少越好。
须要留神的是, channel 必须设置成 non-blocking
的模式,因而对于 FileChannel 不实用以后模式
channel.configureBlocking(false);SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
SelectionKey 的 四种状态
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
3、NIO 实现RPC通信
3.1、RpcServer
1、创立 ServerSocketChannel 监听信息
public class RPCServer { private Map<String, Object> services = new HashMap<>(); private Selector selector; private ServerSocketChannel ssc; public RPCServer() { try { ssc = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(3003); ssc.configureBlocking(false); ssc.bind(address); selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); } catch (Exception e) { throw new RuntimeException(e); } }}
2、提供服务类(HelloService.class
)
public interface HelloService { String sayHello(); void sayHi(String temp);}public class HelloServiceImpl implements HelloService { @Override public String sayHello() { return "jiuling"; } @Override public void sayHi(String temp) { System.out.println(temp); }}
3、ServerSocketChannel
监听申请
public void start() { System.out.println("-----开始监听申请------"); try { while (selector.select() > 0) { for (SelectionKey sk : selector.selectedKeys()) { selector.selectedKeys().remove(sk); if (sk.isAcceptable()) { // 通过accept办法,获取对应的SocketChannel SocketChannel sc = ssc.accept(); // 设置采纳非阻塞模式 sc.configureBlocking(false); // 将channel 状态设置为可读状态 sc.register(selector, SelectionKey.OP_READ); sk.interestOps(SelectionKey.OP_ACCEPT); } else if (sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); try { //调用反射办法 remoteHandMethod(sk, sc); } catch (Exception e) { //从Selector中删除指定的SelectionKey sk.cancel(); if (sk.channel() != null) { sk.channel().close(); } } } } } } catch (Exception e) { throw new RuntimeException(e); } }
4、remoteHandMethod
反射调用
private void remoteHandMethod(SelectionKey sk, SocketChannel sc) throws Exception { //1、从流中读取数据 ByteBuffer buff = ByteBuffer.allocate(1024); sc.read(buff); int postion = buff.position();//这里获取它真正的大小 byte[] data = buff.array(); String message = new String(data, 0, postion);// class/办法名(参数类型:参数,参数类型:参数) message = message.trim(); //2、解析数据,获取申请内容() String[] clazzInfo = message.split("/"); String className = clazzInfo[0]; String methodName = clazzInfo[1].substring(0, clazzInfo[1].indexOf("(")); String temp = clazzInfo[1].substring( clazzInfo[1].indexOf("(") + 1, clazzInfo[1].indexOf(")")); String typeValues = decodeParamsTypeAndValue(temp); //3、反射调用 Object object = services.get(className); Class clazz = object.getClass(); Object result = null; if (typeValues == null) { Method method = clazz.getDeclaredMethod(methodName, null); result = method.invoke(object, null); } else { Class[] types = new Class[typeValues.length]; Object[] values = new Object[typeValues.length]; for (int i = 0; i < typeValues.length; i++) { String[] tv = typeValues[i].split(":"); String type = tv[0]; String value = tv[1]; types[i] = Class.forName(type); if (type.contains("Integer") || type.contains("int")) values[i] = Integer.parseInt(value); else if (type.contains("Float") || type.contains("float")) values[i] = Float.parseFloat(value); else if (type.contains("Double") || type.contains("double")) values[i] = Double.parseDouble(value); else if (type.contains("Long") || type.contains("long")) values[i] = Long.parseLong(value); else values[i] = value; } Method method = clazz.getDeclaredMethod(methodName, types); result = method.invoke(object, values); } //4、 返回内容 sc.write(ByteBuffer.wrap(result.toString() .getBytes())); sk.interestOps(SelectionKey.OP_READ); } // 它返回的格局是 参数类型:参数值 private String[] decodeParamsTypeAndValue(String params) { if (params == null || params.equals("")) return null; if (params.indexOf(",") < 0) return new String[]{params}; return params.split(","); }
3.2、RpcClient
1、创立客户端
public class RPCClient { private SocketChannel channel; private ByteBuffer buffer = ByteBuffer.allocate(1024); private static RPCClient client = new RPCClient(); private Selector selector = null; public RPCClient(String serverIp) { try { System.out.println("------客户端要启动了--------"); selector = Selector.open(); InetSocketAddress isa = new InetSocketAddress(serverIp, 3003);// 获取socket通道 channel = SocketChannel.open(isa);// 连贯服务器 channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } catch (Exception e) { throw new RuntimeException(e); } }}
2、获取代理类
// 获取代理 public Object getRemoteProxy(final Class clazz) {//动静产生实现类 return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); String clazzName = clazz .getSimpleName(); Object result = null; if (args == null || args.length == 0) {// 示意没有参数 它传递的类型// 接口名/办法名() channel.write(ByteBuffer .wrap((clazzName + "/" + methodName + "()") .getBytes())); } else { int size = args.length; String[] types = new String[size]; StringBuffer content = new StringBuffer(clazzName) .append("/").append(methodName).append("("); for (int i = 0; i < size; i++) { types[i] = args[i].getClass().getName(); content.append(types[i]).append(":").append(args[i]); if (i != size - 1) content.append(","); } content.append(")"); channel.write(ByteBuffer .wrap(content.toString().getBytes())); }// 获取后果 result = getresult(); return result; } }); }
3、获取返回后果
private Object getresult() {// 解析后果 如果结尾为null或NULL则疏忽 try { while (selector.select() > 0) { for (SelectionKey sk : selector.selectedKeys()) { selector.selectedKeys().remove(sk); if (sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); buffer.clear(); sc.read(buffer); int postion = buffer.position(); String result = new String(buffer.array(), 0, postion); result = result.trim(); buffer.clear(); if (result.endsWith("null") || result.endsWith("NULL")) return null; String[] typeValue = result.split(":"); String type = typeValue[0]; String value = typeValue[1]; if (type.contains("Integer") || type.contains("int")) return Integer.parseInt(value); else if (type.contains("Float") || type.contains("float")) return Float.parseFloat(value); else if (type.contains("Long") || type.contains("long")) return Long.parseLong(value); else return value; } } } } catch (Exception e) { throw new RuntimeException(e); } return null; }
残缺代码能够关注公众号获取~