自制nio

通过上篇socke根底,咱们回顾了下socket的用法。上篇内容很简略,服务端也只是接管了一个客户端的连贯,接下来咱们就降级下咱们的demo,使其像一个真正的服务器。

首先咱们容许服务端接管多个客户端的连贯。批改OioServer如下

代码2-1

public class OioServer {    private ServerSocket serverSocket;    public void start() {        Socket socket = null;        try {            openServer(8081);            if (Objects.isNull(serverSocket)) {                return;            }           while (true) {                socket = listenAccept();                handleSocket(socket);            }        } catch (Exception e) {            e.printStackTrace();            SocketUtils.closeServerSocketSafely(serverSocket);            SocketUtils.closeSocketSafely(socket);        }    }    private void handleSocket(Socket socket) {        new Thread(() -> {            while (!socket.isClosed()) {                String msg = SocketUtils.read(socket);                SocketUtils.write(socket, " I get you" + msg);            }        }).start();    }    public void openServer(int port) throws IOException {        // 1 创立ServerSocket        serverSocket = new ServerSocket();        // 2 绑定端口        SocketAddress socketAddress = new InetSocketAddress(port);        serverSocket.bind(socketAddress);        // 3 accept客户端    }    public Socket listenAccept() throws IOException {        return serverSocket.accept();    }}

当调用start()办法后,咱们服务器就开始监听8081接口了。而后每次一个客户端连贯进来,咱们就会失去一个socket,而后咱们创立一个线程去解决这个socket。

为什么要创立新的线程?因为socket读写都是阻塞的,如果不启动新线程,那主线程就会被阻塞。这个时候,有新的客户端连贯进来将不会被解决。然而,咱们为每个socket创立一个线程,这样是有代价的,并且咱们服务器是不可能创立无数个线程的。固咱们应用为每个socket创立一个线程这种办法在高并发的状况下显然是不可行的。那么有什么办法改良吗?答案是必定的。当初java有了nio,然而我当初不急于把这个王炸展现进去,让咱们一步步凑近它,并揭开它的神秘面纱。

当初咱们晓得了为每个socket创立一个线程是因为,socket的操作(读或写)是阻塞的,那咱们不让它阻塞不就能够了?有方法吗?有。对于读,咱们能够应用inputStream.available();来判断一下,是否可读,不可读咱们就不调用阻塞办法 inputStream.read(bytes)。于是咱们再SocketUtils中天加一个办法

代码2-2

/**  * 从socket中读数据*/public static ReadResult readWithNoBlocking(Socket socket) {    try {        InputStream inputStream = socket.getInputStream();        byte[] bytes = new byte[1024];        int len;        StringBuilder sb = new StringBuilder();        if (inputStream.available() <= 0) {            return ReadResult.unReadableResult();        }        while ((len = inputStream.read(bytes)) != -1) {            sb.append(new String(bytes, 0, len, "UTF-8"));            if (inputStream.available() <= 0) {                return ReadResult.readableResult(sb.toString());            }        }        return ReadResult.readableResult(sb.toString());    } catch (IOException e) {        e.printStackTrace();        return ReadResult.unReadableResult();    }}

而后批改OioServer,

代码2-4

public class OioServer {    private ServerSocket serverSocket;    private volatile List<Socket> socketList = new ArrayList<>();    ...    public void start() {        Socket socket = null;        try {            openServer(8081);            // 开启解决socket连贯的线程            startChildHandler();            // 主线程监听连贯            while (true) {                Socket socket = listenAccept();                handleSocket(socket);            }        } catch (Exception e) {            e.printStackTrace();            SocketUtils.closeServerSocketSafely(serverSocket);            SocketUtils.closeSocketSafely(socket);        }    }     // 增加socket到socketList中    private void handleSocket(Socket socket) {        socketList.add(socket);    }    // 解决所有socket    private void startChildHandler() {        new Thread(() -> {            while (true) {                for (Socket socketToDeal : socketList) {                    ReadResult readResult = SocketUtils.readWithNoBlocking(socketToDeal);                    if (readResult.readable()) {                        System.out.println("收到客户端音讯" + socketToDeal.getInetAddress().toString() + " " + readResult.result());                        SocketUtils.write(socketToDeal, "Get u:" + readResult.result());                    }                }                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }).start();    }                

首先咱们批改了handleSocket办法,是新建的socket增加到socketList中,因为咱们有了SocketUtils.readWithNoBlocking办法,读操作再也不会阻塞住线程了,这样咱们就能够在循环中一直保持所有的socket是否有音讯发过来,并解决。

尽管上述代码健壮性有待考据,然而咱们的确失去了一个只有一个线程就能够解决所有socket的服务器模型。也能够说,这是简易版的nio服务器。

更加通用化

当初咱们曾经有一个nio 的server了,然而,齐全是没有章法的编写的,如果要减少性能,或者定制化一些货色,那必须要批改OioServer,这违反了开闭准则。因而咱们须要提取一些通用逻辑,将逻辑的解决交给应用方,上面是以可读为例。

代码2-5

public class NioServer {    private ServerSocket serverSocket;    private volatile List<SocketContext> socketList = new ArrayList<>();    private volatile List<SocketContext> statusChangedContext = new ArrayList<>();    public void start(int port) {        // 监听端口线程        new Thread(() ->{            Socket socket = null;            try {                openServer(port);                startChildHandler();                while (true) {                    socket = listenAccept();                    handleSocket(socket);                }            } catch (Exception e) {                e.printStackTrace();                SocketUtils.closeServerSocketSafely(serverSocket);                SocketUtils.closeSocketSafely(socket);            }        }).start();    }    // 监听所有socket    private void startChildHandler() {        new Thread(() -> {            while (true) {                for (SocketContext socketToDeal : socketList) {                    ReadResult readResult = SocketUtils.readWithNoBlocking(socketToDeal.getSocket());                    if (readResult.readable()) {                        // 如果socket可读,将其退出到statusChangedContext中,并唤醒调用线程                        socketToDeal.setStatus(SocketContext.STATUS_READABLE);                        socketToDeal.setMsg(readResult.result());                        statusChangedContext.add(socketToDeal);                        synchronized (statusChangedContext) {                            statusChangedContext.notifyAll();                        }                    }                }                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }).start();    }    private void handleSocket(Socket socket) {        SocketContext socketContext = new SocketContext();        socketContext.setSocket(socket);        socketList.add(socketContext);    }    private void openServer(int port) throws IOException {        // 1 创立ServerSocket        serverSocket = new ServerSocket();        // 2 绑定端口        SocketAddress socketAddress = new InetSocketAddress(port);        serverSocket.bind(socketAddress);        // 3 accept客户端    }    private Socket listenAccept() throws IOException {        return serverSocket.accept();    }    public List<SocketContext> getStatusChangedContext() {        if (statusChangedContext.size() == 0) {            try {                // 当statusChangedContext为空,也就是没有事件要解决的时候,咱们挂起调用方线程,这样能够节约资源                synchronized (statusChangedContext) {                    statusChangedContext.wait();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }        return statusChangedContext;    }    public static class SocketContext {        public static final int STATUS_READABLE = 1;        private Socket socket;        private int status;        private String msg;        public Socket getSocket() {            return socket;        }        public void setSocket(Socket socket) {            this.socket = socket;        }        public int getStatus() {            return status;        }        public void setStatus(int status) {            this.status = status;        }        public String read() {            return msg;        }        public void setMsg(String msg) {            this.msg = msg;        }        public void write(String msg) {            SocketUtils.write(this.socket, msg);        }    }}

而后咱们就能够这样应用它了

代码2-6

public class NioServerTest {    @Test    public void test() {        NioSocket server = new NioSocket();        server.start(8081);        while (true) {            Iterator<SocketContext> socketContexts = server.getStatusChangedContext().iterator();            while (socketContexts.hasNext()) {                SocketContext context = socketContexts.next();                socketContexts.remove();                if (context.getStatus() == SocketContext.STATUS_READABLE) {                    // 解决读                    System.out.println(context.read());                    context.write("Ok");                }            }        }    }}

代码2-4代码2-5逻辑逾越应该不大,这里解释下2-5的一些细节.

为了让NioSocket在后盾继续监听咱们设定的端口,咱们将 socket = listenAccept(); handleSocket(socket);这两个步骤放入一个独自的线程。每次有客户端接入,便会失去一个新的socket,将这个新的socket退出到socketList中,而后在startChildHandler启动的线程中遍历所有socket,并判断其状态扭转(可读)。

为了把业务控制权交于调用方,在本例中也就是NioSocketTest.test。我定义看一个变量statusChangedContext,如果有socket可读,则将其包装成SocketContext退出到statusChangedContext中取。这样,调用方间接拿到statusChangedContext去遍历,就能够解决所有的socket的读事件。

当调用方调用getStatusChangedContext()办法时,如果此时statusChangedContext为空,则调用线程会被挂起,晓得有可读事件呈现,调用线程被唤醒(statusChangedContext.notifyAll())

java nio实现

如果看官老爷读了下面两局部,那么至多对nio的应用曾经有所领悟了。下面咱们自制了一个nio 的socket,尽管只能对read事件作出反应,然而其余的事件,比方,可写、socket断开等事件也是能够依照这个思路去做的。那么咱们就能够无缝切入java nio了。

代码2-7

public class NioServer {    private Selector selector;    private Selector chiledSelector;    public void start(int port) throws IOException {        // 通过open()办法找到Selector        selector = Selector.open();        chiledSelector = Selector.open();        // 关上服务器套接字通道        ServerSocketChannel ssc = ServerSocketChannel.open();        // 服务器配置为非阻塞        ssc.configureBlocking(false);        // 进行服务的绑定        ssc.bind(new InetSocketAddress("localhost", port));        // 注册到selector,期待连贯        SelectionKey selectionKey = ssc.register(selector, 0);        selectionKey.interestOps(SelectionKey.OP_ACCEPT);        while (!Thread.currentThread().isInterrupted()) {            selector.select();            Set<SelectionKey> keys = selector.selectedKeys();            Iterator<SelectionKey> keyIterator = keys.iterator();            while (keyIterator.hasNext()) {                SelectionKey key = keyIterator.next();                if (!key.isValid()) {                    continue;                }                if (key.isAcceptable()) {                    SocketChannel clientChannel = ssc.accept();                    handleSocket(clientChannel);                }                keyIterator.remove(); //该事件曾经解决,能够抛弃            }        }    }    public Set<SelectionKey> getStatusChangedContext() throws IOException {        chiledSelector.select();        return chiledSelector.selectedKeys();    }    private void handleSocket(SocketChannel clientChannel) throws IOException {        clientChannel.configureBlocking(false);        clientChannel.register(chiledSelector, SelectionKey.OP_READ);        System.out.println("a new client connected " + clientChannel.getRemoteAddress());    }    public void write(SelectionKey key, String msg) throws IOException, ClosedChannelException {        SocketChannel channel = (SocketChannel) key.channel();        System.out.println("write:" + msg);        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);        sendBuffer.clear();        sendBuffer.put(msg.getBytes());        sendBuffer.flip();        channel.write(sendBuffer);        channel.register(chiledSelector, SelectionKey.OP_READ);    }    public String read(SelectionKey key) throws IOException {        SocketChannel socketChannel = (SocketChannel) key.channel();        ByteBuffer readBuffer = ByteBuffer.allocate(1024);        readBuffer.clear();        int numRead;        try {            numRead = socketChannel.read(readBuffer);        } catch (IOException e) {            key.cancel();            socketChannel.close();            return null;        }        return new String(readBuffer.array(), 0, numRead);    }}

代码2-8

public class NioServerTest {    @Test    public void test() throws IOException {        NioServer server = new NioServer();        server.start(8081);        while (true) {            Iterator<SelectionKey> socketContexts = server.getStatusChangedContext().iterator();            while (socketContexts.hasNext()) {                SelectionKey key = socketContexts.next();                socketContexts.remove();                if ((key.readyOps() & SelectionKey.OP_READ) != 0) {                    System.out.println(server.read(key));                    server.write(key, "Ok");                }            }        }    }    }

下面利用java nio写的server跟咱们本人实现的nio写的server成果是一样的。咱们本人创立监听客户端线程,还有解决socket线程的工作,交给了java nio外部(当然不是简略的起了两个线程而已,我只是简化了这个模型)。

在java nio中,socket不在是socket,而是SocketChannel,这里大家临时了解他俩等价吧。而后一个Selector就相当于一个线程,而后咱们将channel与selector通过register办法关联起来,并指定咱们感兴趣的事。留神:这里跟咱们本人实现的nio有区别,咱们没有提供注册趣味事件,而是默认对可读事件感兴趣。而后咱们调selector.select()办法,同样,这个办法没有事件产生会阻塞。而后失去事件汇合去遍历解决。

大节

这篇文章,咱们通过bio的socket本人通过线程和循环实现了服务端,并有了事件的概念。而后咱们又用Nio的形式去实现了雷同的性能。通过两种形式,咱们很天然的了解了Nio的应用及基本原理,下一章咱们将会更加粗疏的学习Java NIO.