关于netty:netty-in-action学习笔记第一章-了解java-NIO1

【netty in action】学习笔记-第一章 理解java NIO(1)

学习netty,java nio是根底,因为前者是对后者的封装,当然又不只是封装。随着学习的深刻你会了解这句话的含意。

下图是netty的架构图,让你对netty波及的模型,传输,协定有个根本印象。

netty的个性能够总结为一下几点:

  • 统计的API操作阻塞和非阻塞的socket
  • 接口易用
  • 线程模型简略而弱小
  • 链式调用逻辑,复用性搞高
  • 文档和示例丰盛
  • 除了JDK之外不依赖别的组件
  • 相比于java api有更好的吞吐量以及低提早
  • 缩小不必要的内存占用,不会再因为疾速的,慢速的或者超负荷的连贯导致OOM
  • 反对SSL/TLS
  • 社区沉闷

两种实现异步API罕用的设计

基于callbacks

FetchCalback.java

public interface FetchCalback {
    void onData(Data data);
    void onError(Throwable cause);
}

Fetcher.java

public interface Fetcher {
    void fetchData(FetchCalback fetchCalback);
}

MyFetcher.java

public class MyFetcher implements Fetcher{
    @Override
    public void fetchData(FetchCalback fetchCalback) {
        try {
            //模仿获取数据
            Thread.sleep(1000);
            Data data = new Data(1, 2);

            fetchCalback.onData(data);
        } catch (Exception e) {
            fetchCalback.onError(e);
        }
    }
}

Worker.java

public class Worker {

    public void doWorker() {

        Fetcher fetcher = new MyFetcher();
        fetcher.fetchData(new FetchCalback() {
            @Override
            public void onData(Data data) {
                System.out.println("获取到数据:" + data);
            }

            @Override
            public void onError(Throwable cause) {

                System.err.println(cause.getMessage());
            }
        });
    }

    public static void main(String[] args) {
        Worker worker = new Worker();
        worker.doWorker();
    }
}

Data.java

@Getter
@Setter
@AllArgsConstructor
@ToString
public class Data {
    int a;
    int b;
}

代码比较简单,调用fetchdata获取数据,拿到数据之后通过FetchCalback回调过去,调用方不须要等着获取后果做下一步的解决。

这里可能有人疑难:下面的fetchData办法也会阻塞啊?是的,不过那是因为这只是个示例,理论的我的项目中获取数据齐全能够放在线程池里异步解决。

基于回调的异步设计方案在很多其它语言也很风行,比拟典型的就是javascript,外面大量的回调,给你一段代码感触下:

let p = new Promise((resolve, reject) => {
    setTimeout(resolve, 1000, 'success');
});
p.then(
    res => {
        console.log(res);
        return `${res} again`;
    }
)
    .then(
        res => console.log(res)
    );

基于futures的异步设计

Futures是一个形象的概念,它示意一个值可能在某一点变得可用。一个Future要么取得
计算完的后果,要么取得计算失败后的异样。Java在java.util.concurrent包中附带了Future接口。
还是下面的例子,用future的形式革新下。上面的代码只贴出来不一样的中央。

public interface Fetcher {
    Future<Data> fetchData();
}
public class MyFetcher implements Fetcher{

    ExecutorService executor = Executors.newFixedThreadPool(1);
    @Override
    public Future<Data> fetchData() {
        return executor.submit(task);
    }

    Callable<Data> task = new Callable<Data>() {
        @Override
        public Data call() throws Exception {
            //模仿获取数据
            Thread.sleep(1000);
            Data data = new Data(1, 2);
            return data;
        }
    };
}

看一个基于阻塞形式实现的服务端示例,

//基于阻塞模式的EchoServer
public class PlainEchoServer {

    public void server(int port) throws IOException {
        final ServerSocket serverSocket = new ServerSocket(port);

        try {
            final Socket clientSocket = serverSocket.accept();
            System.out.println("新的连贯:" + clientSocket);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //读取数据,业务解决
                }
            }).start();
        }catch (Exception e) {
            e.printStackTrace();
        }


    }
}

这段代码,给每个新连贯的客户端创立一个解决线程。这种计划的问题在于客户端的连接数受限于零碎所能反对的线程数。

上面这个示例是基于java nio异步形式实现的服务端示例。

public class PlainNIOEchoServer {

    public void server(int port) throws IOException {
        System.out.println("listening for connection on port " + port);
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        ServerSocket ss = serverChannel.socket();

        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);

        serverChannel.configureBlocking(false);

        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT); //监听承受连贯的事件


        while (true) {
            selector.select();
            Set readyKeys = selector.selectedKeys();
            Iterator iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = (SelectionKey)iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel)key.channel();
                        SocketChannel client = server.accept();
                        System.out.println("accept:" + client);
                        client.configureBlocking(false); //配置非阻塞

                        client.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, ByteBuffer.allocate(100));
                    }
                    if (key.isReadable()) {
                        SocketChannel client = (SocketChannel)key.channel();
                        ByteBuffer out = (ByteBuffer)key.attachment();
                        client.read(out);
                    }
                    if (key.isWritable()) {
                        SocketChannel client = (SocketChannel)key.channel();
                        ByteBuffer out = (ByteBuffer)key.attachment();
                        out.flip();
                        client.write(out);
                        out.compact();
                    }
                }catch (Exception e) {
                    key.cancel();
                    key.channel().close();
                }
            }


        }
    }
}

这段代码有几个点须要解释下。

ByteBuffer是java nio里的一个外围概念,能够认为它是一个字节容器。咱们能够用它来调配堆内的空间,能够调配对外的空间。后者效率更高,然而须要特地留神OOM的问题。

另外,须要留神到ByteBuffer须要利用flip来切换读写模式,netty是不须要咱们关怀这个的。

另外一个须要关注的概念是selector,java nio里应用selector来判断一个或者多个channel是否能够读或者写。一个selector能够监听多个connection,这种后面的一个线程解决一个连贯的形式要好的多。

应用selector的步骤如下:

  • 创立一个或者多个selector,并注册channel到下面
  • 注册的时候指定感兴趣的事件监听,包含OP_CONNECT,OP_ACCEPT,OP_READ,OP_WRITE
  • 轮询selector.select() 查看是否有事件产生
  • 如果上一步阻塞,阐明没有事件产生,否则就拿到selectkey判断事件的类型进一步解决。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理