1、概述
NIO(Non-blocking I/O,在 Java 畛域,也称为 New I/O),是一种同步非阻塞的 I / O 模型,也是 I / O 多路复用的根底,曾经被越来越多地利用到大型应用服务器,成为解决高并发与大量连贯、I/ O 解决问题的无效形式。
外围组件次要包含:
- Channel
- Buffer
- Selector
Java NIO: Channels and Buffers(通道和缓冲区)
规范的 IO 基于字节流和字符流进行操作的,而 NIO 是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
Java NIO: Asynchronous IO(异步 IO)
Java NIO 能够让你异步的应用 IO,例如:当线程从通道读取数据到缓冲区时,线程还是能够进行其余事件。当数据被写入到缓冲区时,线程能够持续解决它。从缓冲区写入通道也相似。
Java NIO: Selectors(选择器)
Java NIO 引入了选择器的概念,选择器用于监听多个通道的事件(比方:连贯关上,数据达到)。因而,单个的线程能够监听多个数据通道。
2、根底组件
2.1、Channel
Channel
与 Stream
的区别
- 你能够通过
Channel
进行读和写,但只能从Stream
中,单向获取数据(读或写) Channels
能够进行异步 读 或 写Channel
通常都是基于Buffer
进行读或写
Channel
在 Java NIO
中的一些 具体实现
- FileChannel : 通过文件获取数据
- DatagramChannel:通过 UDP 进行网络数据传输
- SocketChannel:通过 TCP 进行网络数据传输
- ServerSocketChannel:通监听 TCP 链接,进行数据链接。每个链接都会创立一个 ServerSocketChannel。
FileChannel Demo
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {System.out.println("Read" + bytesRead);
buf.flip();
while(buf.hasRemaining()){System.out.print((char) buf.get());
}
buf.clear();
bytesRead = inChannel.read(buf);
}
aFile.close();
2.2、Buffer
Buffer
实质上是一个内存块,您能够在其中写入数据,而后能够在当前再次读取。该内存块包装在 NIO Buffer 对象中,该对象提供了一组办法,能够更轻松地应用该局部数据。
Buffer
中三个次要属性:
- capacity : Buffer 大小
- position : 以后操作下标
- limit:数据大小
罕用办法:
- flip() : 写数据实现后,将
limit
设置到数据末端,position
设置为 0,进行读操作 - rewind() : 从新设置
position
为 0,进行读操作
Buffer
在 Java NIO
中的一些 具体实现
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
2.3、Selector
Selector
的劣势在于,只须要应用一个线程,就能够管控多个 channel
, 线程之间的切换对于操作系统来说是低廉的,并且每个线程也占用操作系统中的一些资源(内存)。因而,应用的线程越少越好。
须要留神的是,channel 必须设置成 non-blocking
的模式,因而对于 FileChannel 不实用以后模式
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
SelectionKey 的 四种状态
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
3、NIO 实现 RPC 通信
3.1、RpcServer
1、创立 ServerSocketChannel 监听信息
public class RPCServer {private Map<String, Object> services = new HashMap<>();
private Selector selector;
private ServerSocketChannel ssc;
public RPCServer() {
try {ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(3003);
ssc.configureBlocking(false);
ssc.bind(address);
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {throw new RuntimeException(e);
}
}
}
2、提供服务类(HelloService.class
)
public interface HelloService {String sayHello();
void sayHi(String temp);
}
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello() {return "jiuling";}
@Override
public void sayHi(String temp) {System.out.println(temp);
}
}
3、ServerSocketChannel
监听申请
public void start() {System.out.println("----- 开始监听申请 ------");
try {while (selector.select() > 0) {for (SelectionKey sk : selector.selectedKeys()) {selector.selectedKeys().remove(sk);
if (sk.isAcceptable()) {
// 通过 accept 办法,获取对应的 SocketChannel
SocketChannel sc = ssc.accept();
// 设置采纳非阻塞模式
sc.configureBlocking(false);
// 将 channel 状态设置为可读状态
sc.register(selector, SelectionKey.OP_READ);
sk.interestOps(SelectionKey.OP_ACCEPT);
} else if (sk.isReadable()) {SocketChannel sc = (SocketChannel) sk.channel();
try {
// 调用反射办法
remoteHandMethod(sk, sc);
} catch (Exception e) {
// 从 Selector 中删除指定的 SelectionKey
sk.cancel();
if (sk.channel() != null) {sk.channel().close();}
}
}
}
}
} catch (Exception e) {throw new RuntimeException(e);
}
}
4、remoteHandMethod
反射调用
private void remoteHandMethod(SelectionKey sk,
SocketChannel sc) throws Exception {
//1、从流中读取数据
ByteBuffer buff = ByteBuffer.allocate(1024);
sc.read(buff);
int postion = buff.position();// 这里获取它真正的大小
byte[] data = buff.array();
String message = new String(data, 0, postion);// class/ 办法名(参数类型: 参数, 参数类型: 参数)
message = message.trim();
//2、解析数据,获取申请内容()
String[] clazzInfo = message.split("/");
String className = clazzInfo[0];
String methodName = clazzInfo[1].substring(0,
clazzInfo[1].indexOf("("));
String temp = clazzInfo[1].substring(clazzInfo[1].indexOf("(") + 1,
clazzInfo[1].indexOf(")"));
String typeValues = decodeParamsTypeAndValue(temp);
//3、反射调用
Object object = services.get(className);
Class clazz = object.getClass();
Object result = null;
if (typeValues == null) {
Method method = clazz.getDeclaredMethod(methodName,
null);
result = method.invoke(object, null);
} else {Class[] types = new Class[typeValues.length];
Object[] values = new Object[typeValues.length];
for (int i = 0; i < typeValues.length; i++) {String[] tv = typeValues[i].split(":");
String type = tv[0];
String value = tv[1];
types[i] = Class.forName(type);
if (type.contains("Integer") || type.contains("int"))
values[i] = Integer.parseInt(value);
else if (type.contains("Float") || type.contains("float"))
values[i] = Float.parseFloat(value);
else if (type.contains("Double") || type.contains("double"))
values[i] = Double.parseDouble(value);
else if (type.contains("Long") || type.contains("long"))
values[i] = Long.parseLong(value);
else
values[i] = value;
}
Method method = clazz.getDeclaredMethod(methodName,
types);
result = method.invoke(object, values);
}
//4、返回内容
sc.write(ByteBuffer.wrap(result.toString()
.getBytes()));
sk.interestOps(SelectionKey.OP_READ);
}
// 它返回的格局是 参数类型:参数值
private String[] decodeParamsTypeAndValue(String params) {if (params == null || params.equals(""))
return null;
if (params.indexOf(",") < 0)
return new String[]{params};
return params.split(",");
}
3.2、RpcClient
1、创立客户端
public class RPCClient {
private SocketChannel channel;
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private static RPCClient client = new RPCClient();
private Selector selector = null;
public RPCClient(String serverIp) {
try {System.out.println("------ 客户端要启动了 --------");
selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress(serverIp, 3003);
// 获取 socket 通道
channel = SocketChannel.open(isa);
// 连贯服务器
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {throw new RuntimeException(e);
}
}
}
2、获取代理类
// 获取代理
public Object getRemoteProxy(final Class clazz) {
// 动静产生实现类
return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {String methodName = method.getName();
String clazzName = clazz
.getSimpleName();
Object result = null;
if (args == null || args.length == 0) {// 示意没有参数 它传递的类型
// 接口名 / 办法名()
channel.write(ByteBuffer
.wrap((clazzName + "/" + methodName + "()")
.getBytes()));
} else {
int size = args.length;
String[] types = new String[size];
StringBuffer content = new StringBuffer(clazzName)
.append("/").append(methodName).append("(");
for (int i = 0; i < size; i++) {types[i] = args[i].getClass().getName();
content.append(types[i]).append(":").append(args[i]);
if (i != size - 1)
content.append(",");
}
content.append(")");
channel.write(ByteBuffer
.wrap(content.toString().getBytes()));
}
// 获取后果
result = getresult();
return result;
}
});
}
3、获取返回后果
private Object getresult() {
// 解析后果 如果结尾为 null 或 NULL 则疏忽
try {while (selector.select() > 0) {for (SelectionKey sk : selector.selectedKeys()) {selector.selectedKeys().remove(sk);
if (sk.isReadable()) {SocketChannel sc = (SocketChannel) sk.channel();
buffer.clear();
sc.read(buffer);
int postion = buffer.position();
String result = new String(buffer.array(), 0, postion);
result = result.trim();
buffer.clear();
if (result.endsWith("null") || result.endsWith("NULL"))
return null;
String[] typeValue = result.split(":");
String type = typeValue[0];
String value = typeValue[1];
if (type.contains("Integer") || type.contains("int"))
return Integer.parseInt(value);
else if (type.contains("Float")
|| type.contains("float"))
return Float.parseFloat(value);
else if (type.contains("Long") || type.contains("long"))
return Long.parseLong(value);
else
return value;
}
}
}
} catch (Exception e) {throw new RuntimeException(e);
}
return null;
}
残缺代码能够关注公众号获取~