关于java:从零构建netty一步步构建NIO

4次阅读

共计 9320 个字符,预计需要花费 24 分钟才能阅读完成。

自制 nio

通过上篇 socke 根底,咱们回顾了下 socket 的用法。上篇内容很简略,服务端也只是接管了一个客户端的连贯,接下来咱们就降级下咱们的 demo,使其像一个真正的服务器。

首先咱们容许服务端接管多个客户端的连贯。批改 OioServer 如下

代码 2 -1

public class OioServer {

    private ServerSocket serverSocket;

    public void start() {
        Socket socket = null;
        try {openServer(8081);
            if (Objects.isNull(serverSocket)) {return;}
           while (true) {socket = listenAccept();
                handleSocket(socket);
            }
        } catch (Exception e) {e.printStackTrace();
            SocketUtils.closeServerSocketSafely(serverSocket);
            SocketUtils.closeSocketSafely(socket);
        }
    }

    private void handleSocket(Socket socket) {new Thread(() -> {while (!socket.isClosed()) {String msg = SocketUtils.read(socket);
                SocketUtils.write(socket, "I get you" + msg);
            }
        }).start();}

    public void openServer(int port) throws IOException {
        // 1 创立 ServerSocket
        serverSocket = new ServerSocket();
        // 2 绑定端口
        SocketAddress socketAddress = new InetSocketAddress(port);
        serverSocket.bind(socketAddress);
        // 3 accept 客户端
    }

    public Socket listenAccept() throws IOException {return serverSocket.accept();
    }
}

当调用 start()办法后,咱们服务器就开始监听 8081 接口了。而后每次一个客户端连贯进来,咱们就会失去一个 socket,而后咱们创立一个线程去解决这个 socket。

为什么要创立新的线程?因为 socket 读写都是阻塞的,如果不启动新线程,那主线程就会被阻塞。这个时候,有新的客户端连贯进来将不会被解决。然而,咱们为每个 socket 创立一个线程,这样是有代价的,并且咱们服务器是不可能创立无数个线程的。固咱们应用为每个 socket 创立一个线程这种办法在高并发的状况下显然是不可行的。那么有什么办法改良吗?答案是必定的。当初 java 有了 nio,然而我当初不急于把这个王炸展现进去,让咱们一步步凑近它,并揭开它的神秘面纱。

当初咱们晓得了为每个 socket 创立一个线程是因为,socket 的操作(读或写)是阻塞的,那咱们不让它阻塞不就能够了?有方法吗?有。对于读,咱们能够应用 inputStream.available();来判断一下,是否可读,不可读咱们就不调用阻塞办法 inputStream.read(bytes)。于是咱们再SocketUtils 中天加一个办法

代码 2 -2

/**
  * 从 socket 中读数据
*/
public static ReadResult readWithNoBlocking(Socket socket) {
    try {InputStream inputStream = socket.getInputStream();
        byte[] bytes = new byte[1024];
        int len;
        StringBuilder sb = new StringBuilder();
        if (inputStream.available() <= 0) {return ReadResult.unReadableResult();
        }
        while ((len = inputStream.read(bytes)) != -1) {sb.append(new String(bytes, 0, len, "UTF-8"));
            if (inputStream.available() <= 0) {return ReadResult.readableResult(sb.toString());
            }
        }
        return ReadResult.readableResult(sb.toString());
    } catch (IOException e) {e.printStackTrace();
        return ReadResult.unReadableResult();}
}

而后批改 OioServer,

代码 2 -4

public class OioServer {

    private ServerSocket serverSocket;
    private volatile List<Socket> socketList = new ArrayList<>();
    ...
    public void start() {
        Socket socket = null;
        try {openServer(8081);
            // 开启解决 socket 连贯的线程
            startChildHandler();
            // 主线程监听连贯
            while (true) {Socket socket = listenAccept();
                handleSocket(socket);
            }
        } catch (Exception e) {e.printStackTrace();
            SocketUtils.closeServerSocketSafely(serverSocket);
            SocketUtils.closeSocketSafely(socket);
        }
    }
     // 增加 socket 到 socketList 中
    private void handleSocket(Socket socket) {socketList.add(socket);
    }
    // 解决所有 socket
    private void startChildHandler() {new Thread(() -> {while (true) {for (Socket socketToDeal : socketList) {ReadResult readResult = SocketUtils.readWithNoBlocking(socketToDeal);
                    if (readResult.readable()) {System.out.println("收到客户端音讯" + socketToDeal.getInetAddress().toString() + " " + readResult.result());
                        SocketUtils.write(socketToDeal, "Get u:" + readResult.result());
                    }
                }
                try {Thread.sleep(1000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        }).start();}
    
        
    

首先咱们批改了 handleSocket 办法,是新建的 socket 增加到 socketList 中,因为咱们有了 SocketUtils.readWithNoBlocking 办法,读操作再也不会阻塞住线程了,这样咱们就能够在循环中一直保持所有的 socket 是否有音讯发过来,并解决。

尽管上述代码健壮性有待考据,然而咱们的确失去了一个只有一个线程就能够解决所有 socket 的服务器模型。也能够说,这是简易版的 nio 服务器。

更加通用化

当初咱们曾经有一个 nio 的 server 了,然而,齐全是没有章法的编写的,如果要减少性能,或者定制化一些货色,那必须要批改OioServer, 这违反了开闭准则。因而咱们须要提取一些通用逻辑,将逻辑的解决交给应用方,上面是以可读为例。

代码 2 -5

public class NioServer {

    private ServerSocket serverSocket;
    private volatile List<SocketContext> socketList = new ArrayList<>();
    private volatile List<SocketContext> statusChangedContext = new ArrayList<>();

    public void start(int port) {
        // 监听端口线程
        new Thread(() ->{
            Socket socket = null;
            try {openServer(port);
                startChildHandler();
                while (true) {socket = listenAccept();
                    handleSocket(socket);
                }
            } catch (Exception e) {e.printStackTrace();
                SocketUtils.closeServerSocketSafely(serverSocket);
                SocketUtils.closeSocketSafely(socket);
            }
        }).start();}

    // 监听所有 socket
    private void startChildHandler() {new Thread(() -> {while (true) {for (SocketContext socketToDeal : socketList) {ReadResult readResult = SocketUtils.readWithNoBlocking(socketToDeal.getSocket());
                    if (readResult.readable()) {
                        // 如果 socket 可读,将其退出到 statusChangedContext 中,并唤醒调用线程
                        socketToDeal.setStatus(SocketContext.STATUS_READABLE);
                        socketToDeal.setMsg(readResult.result());
                        statusChangedContext.add(socketToDeal);
                        synchronized (statusChangedContext) {statusChangedContext.notifyAll();
                        }
                    }
                }
                try {Thread.sleep(1000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        }).start();}

    private void handleSocket(Socket socket) {SocketContext socketContext = new SocketContext();
        socketContext.setSocket(socket);
        socketList.add(socketContext);
    }

    private void openServer(int port) throws IOException {
        // 1 创立 ServerSocket
        serverSocket = new ServerSocket();
        // 2 绑定端口
        SocketAddress socketAddress = new InetSocketAddress(port);
        serverSocket.bind(socketAddress);
        // 3 accept 客户端
    }

    private Socket listenAccept() throws IOException {return serverSocket.accept();
    }

    public List<SocketContext> getStatusChangedContext() {if (statusChangedContext.size() == 0) {
            try {
                // 当 statusChangedContext 为空,也就是没有事件要解决的时候,咱们挂起调用方线程,这样能够节约资源
                synchronized (statusChangedContext) {statusChangedContext.wait();
                }
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        return statusChangedContext;
    }

    public static class SocketContext {

        public static final int STATUS_READABLE = 1;
        private Socket socket;
        private int status;
        private String msg;

        public Socket getSocket() {return socket;}

        public void setSocket(Socket socket) {this.socket = socket;}

        public int getStatus() {return status;}

        public void setStatus(int status) {this.status = status;}

        public String read() {return msg;}

        public void setMsg(String msg) {this.msg = msg;}

        public void write(String msg) {SocketUtils.write(this.socket, msg);
        }
    }
}

而后咱们就能够这样应用它了

代码 2 -6

public class NioServerTest {

    @Test
    public void test() {NioSocket server = new NioSocket();
        server.start(8081);
        while (true) {Iterator<SocketContext> socketContexts = server.getStatusChangedContext().iterator();
            while (socketContexts.hasNext()) {SocketContext context = socketContexts.next();
                socketContexts.remove();
                if (context.getStatus() == SocketContext.STATUS_READABLE) {
                    // 解决读
                    System.out.println(context.read());
                    context.write("Ok");
                }
            }
        }


    }
}

代码 2 -4 代码 2 -5逻辑逾越应该不大,这里解释下 2-5 的一些细节.

为了让 NioSocket 在后盾继续监听咱们设定的端口,咱们将 socket = listenAccept(); handleSocket(socket);这两个步骤放入一个独自的线程。每次有客户端接入,便会失去一个新的 socket,将这个新的 socket 退出到 socketList 中,而后在 startChildHandler 启动的线程中遍历所有 socket,并判断其状态扭转(可读)。

为了把业务控制权交于调用方,在本例中也就是 NioSocketTest.test。我定义看一个变量statusChangedContext, 如果有 socket 可读,则将其包装成SocketContext 退出到 statusChangedContext 中取。这样,调用方间接拿到 statusChangedContext 去遍历,就能够解决所有的 socket 的读事件。

当调用方调用 getStatusChangedContext() 办法时,如果此时 statusChangedContext 为空,则调用线程会被挂起,晓得有可读事件呈现,调用线程被唤醒(statusChangedContext.notifyAll())

java nio 实现

如果看官老爷读了下面两局部,那么至多对 nio 的应用曾经有所领悟了。下面咱们自制了一个 nio 的 socket,尽管只能对 read 事件作出反应,然而其余的事件,比方,可写、socket 断开等事件也是能够依照这个思路去做的。那么咱们就能够无缝切入 java nio 了。

代码 2 -7

public class NioServer {

    private Selector selector;
    private Selector chiledSelector;

    public void start(int port) throws IOException {// 通过 open()办法找到 Selector
        selector = Selector.open();
        chiledSelector = Selector.open();
        // 关上服务器套接字通道
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 服务器配置为非阻塞
        ssc.configureBlocking(false);
        // 进行服务的绑定
        ssc.bind(new InetSocketAddress("localhost", port));
        // 注册到 selector,期待连贯
        SelectionKey selectionKey = ssc.register(selector, 0);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
        while (!Thread.currentThread().isInterrupted()) {selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();
                if (!key.isValid()) {continue;}
                if (key.isAcceptable()) {SocketChannel clientChannel = ssc.accept();
                    handleSocket(clientChannel);
                }
                keyIterator.remove(); // 该事件曾经解决,能够抛弃}
        }
    }


    public Set<SelectionKey> getStatusChangedContext() throws IOException {chiledSelector.select();
        return chiledSelector.selectedKeys();}

    private void handleSocket(SocketChannel clientChannel) throws IOException {clientChannel.configureBlocking(false);
        clientChannel.register(chiledSelector, SelectionKey.OP_READ);
        System.out.println("a new client connected" + clientChannel.getRemoteAddress());
    }

    public void write(SelectionKey key, String msg) throws IOException, ClosedChannelException {SocketChannel channel = (SocketChannel) key.channel();
        System.out.println("write:" + msg);
        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
        sendBuffer.clear();
        sendBuffer.put(msg.getBytes());
        sendBuffer.flip();
        channel.write(sendBuffer);
        channel.register(chiledSelector, SelectionKey.OP_READ);
    }

    public String read(SelectionKey key) throws IOException {SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        readBuffer.clear();
        int numRead;
        try {numRead = socketChannel.read(readBuffer);
        } catch (IOException e) {key.cancel();
            socketChannel.close();

            return null;
        }
        return new String(readBuffer.array(), 0, numRead);
    }
}

代码 2 -8

public class NioServerTest {

    @Test
    public void test() throws IOException {NioServer server = new NioServer();
        server.start(8081);
        while (true) {Iterator<SelectionKey> socketContexts = server.getStatusChangedContext().iterator();
            while (socketContexts.hasNext()) {SelectionKey key = socketContexts.next();
                socketContexts.remove();
                if ((key.readyOps() & SelectionKey.OP_READ) != 0) {System.out.println(server.read(key));
                    server.write(key, "Ok");
                }
            }
        }
    }
    
}

下面利用 java nio 写的 server 跟咱们本人实现的 nio 写的 server 成果是一样的。咱们本人创立监听客户端线程,还有解决 socket 线程的工作,交给了 java nio 外部(当然不是简略的起了两个线程而已,我只是简化了这个模型)。

在 java nio 中,socket 不在是 socket,而是 SocketChannel,这里大家临时了解他俩等价吧。而后一个 Selector 就相当于一个线程,而后咱们将 channel 与 selector 通过 register 办法关联起来,并指定咱们感兴趣的事。留神:这里跟咱们本人实现的 nio 有区别,咱们没有提供注册趣味事件,而是默认对可读事件感兴趣。而后咱们调 selector.select()办法,同样,这个办法没有事件产生会阻塞。而后失去事件汇合去遍历解决。

大节

这篇文章,咱们通过 bio 的 socket 本人通过线程和循环实现了服务端,并有了事件的概念。而后咱们又用 Nio 的形式去实现了雷同的性能。通过两种形式,咱们很天然的了解了 Nio 的应用及基本原理,下一章咱们将会更加粗疏的学习 Java NIO.

正文完
 0