【netty in action】学习笔记 - 第一章 理解 java NIO(1)
学习 netty,java nio 是根底,因为前者是对后者的封装,当然又不只是封装。随着学习的深刻你会了解这句话的含意。
下图是 netty 的架构图,让你对 netty 波及的模型,传输,协定有个根本印象。
netty 的个性能够总结为一下几点:
- 统计的 API 操作阻塞和非阻塞的 socket
- 接口易用
- 线程模型简略而弱小
- 链式调用逻辑,复用性搞高
- 文档和示例丰盛
- 除了 JDK 之外不依赖别的组件
- 相比于 java api 有更好的吞吐量以及低提早
- 缩小不必要的内存占用,不会再因为疾速的,慢速的或者超负荷的连贯导致 OOM
- 反对 SSL/TLS
- 社区沉闷
两种实现异步 API 罕用的设计
基于 callbacks
FetchCalback.java
public interface FetchCalback {void onData(Data data);
void onError(Throwable cause);
}
Fetcher.java
public interface Fetcher {void fetchData(FetchCalback fetchCalback);
}
MyFetcher.java
public class MyFetcher implements Fetcher{
@Override
public void fetchData(FetchCalback fetchCalback) {
try {
// 模仿获取数据
Thread.sleep(1000);
Data data = new Data(1, 2);
fetchCalback.onData(data);
} catch (Exception e) {fetchCalback.onError(e);
}
}
}
Worker.java
public class Worker {public void doWorker() {Fetcher fetcher = new MyFetcher();
fetcher.fetchData(new FetchCalback() {
@Override
public void onData(Data data) {System.out.println("获取到数据:" + data);
}
@Override
public void onError(Throwable cause) {System.err.println(cause.getMessage());
}
});
}
public static void main(String[] args) {Worker worker = new Worker();
worker.doWorker();}
}
Data.java
@Getter
@Setter
@AllArgsConstructor
@ToString
public class Data {
int a;
int b;
}
代码比较简单,调用 fetchdata 获取数据,拿到数据之后通过 FetchCalback
回调过去,调用方不须要等着获取后果做下一步的解决。
这里可能有人疑难:下面的 fetchData
办法也会阻塞啊?是的,不过那是因为这只是个示例,理论的我的项目中获取数据齐全能够放在线程池里异步解决。
基于回调的异步设计方案在很多其它语言也很风行,比拟典型的就是 javascript,外面大量的回调,给你一段代码感触下:
let p = new Promise((resolve, reject) => {setTimeout(resolve, 1000, 'success');
});
p.then(
res => {console.log(res);
return `${res} again`;
}
)
.then(res => console.log(res)
);
基于 futures 的异步设计
Futures 是一个形象的概念,它示意一个值可能在某一点变得可用。一个 Future 要么取得
计算完的后果,要么取得计算失败后的异样。Java 在 java.util.concurrent 包中附带了 Future 接口。
还是下面的例子,用 future
的形式革新下。上面的代码只贴出来不一样的中央。
public interface Fetcher {Future<Data> fetchData();
}
public class MyFetcher implements Fetcher{ExecutorService executor = Executors.newFixedThreadPool(1);
@Override
public Future<Data> fetchData() {return executor.submit(task);
}
Callable<Data> task = new Callable<Data>() {
@Override
public Data call() throws Exception {
// 模仿获取数据
Thread.sleep(1000);
Data data = new Data(1, 2);
return data;
}
};
}
看一个基于阻塞形式实现的服务端示例,
// 基于阻塞模式的 EchoServer
public class PlainEchoServer {public void server(int port) throws IOException {final ServerSocket serverSocket = new ServerSocket(port);
try {final Socket clientSocket = serverSocket.accept();
System.out.println("新的连贯:" + clientSocket);
new Thread(new Runnable() {
@Override
public void run() {// 读取数据,业务解决}
}).start();}catch (Exception e) {e.printStackTrace();
}
}
}
这段代码,给每个新连贯的客户端创立一个解决线程。这种计划的问题在于客户端的连接数受限于零碎所能反对的线程数。
上面这个示例是基于 java nio 异步形式实现的服务端示例。
public class PlainNIOEchoServer {public void server(int port) throws IOException {System.out.println("listening for connection on port" + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address);
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 监听承受连贯的事件
while (true) {selector.select();
Set readyKeys = selector.selectedKeys();
Iterator iterator = readyKeys.iterator();
while (iterator.hasNext()) {SelectionKey key = (SelectionKey)iterator.next();
iterator.remove();
try {if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
System.out.println("accept:" + client);
client.configureBlocking(false); // 配置非阻塞
client.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, ByteBuffer.allocate(100));
}
if (key.isReadable()) {SocketChannel client = (SocketChannel)key.channel();
ByteBuffer out = (ByteBuffer)key.attachment();
client.read(out);
}
if (key.isWritable()) {SocketChannel client = (SocketChannel)key.channel();
ByteBuffer out = (ByteBuffer)key.attachment();
out.flip();
client.write(out);
out.compact();}
}catch (Exception e) {key.cancel();
key.channel().close();
}
}
}
}
}
这段代码有几个点须要解释下。
ByteBuffer
是 java nio 里的一个外围概念,能够认为它是一个字节容器。咱们能够用它来调配堆内的空间,能够调配对外的空间。后者效率更高,然而须要特地留神 OOM 的问题。
另外,须要留神到 ByteBuffer
须要利用 flip 来切换读写模式,netty 是不须要咱们关怀这个的。
另外一个须要关注的概念是 selector
,java nio 里应用selector
来判断一个或者多个 channel 是否能够读或者写。一个 selector
能够监听多个 connection,这种后面的一个线程解决一个连贯的形式要好的多。
应用 selector 的步骤如下:
- 创立一个或者多个 selector,并注册 channel 到下面
- 注册的时候指定感兴趣的事件监听,包含 OP_CONNECT,OP_ACCEPT,OP_READ,OP_WRITE
- 轮询
selector.select()
查看是否有事件产生 - 如果上一步阻塞,阐明没有事件产生,否则就拿到 selectkey 判断事件的类型进一步解决。