通过后面几章的学习,咱们曾经 可能把握了JDK NIO的开发方式,咱们来总结一下NIO开发的流程:

  1. 创立一个服务端通道 ServerSocketChannel
  2. 创立一个选择器 Selector
  3. 将服务端通道注册到选择器上,并且关注咱们感兴趣的事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  4. 绑定服务管道的地址 serverSocketChannel.bind(new InetSocketAddress(8989));
  5. 开始进行事件抉择,抉择咱们感兴趣的事件做对应的操作!

具体的代码信息请参照第一章:多路复用模型章节,这里不做太多的赘述!

无关多路复用的概念,咱们也在第一章进行了剖析。多路复用模型可能最大限度的将一个线程的执行能力榨干,一条线程执行所有的数据,包含新连贯的接入、数据的读取、计算与回写,然而假如,咱们的数据计算及其迟缓,那么该工作的执行就势必影响下一个新链接的接入!

传统NIO单线程模型

如图,咱们能理解到,单线程状况下,读事件因为要做一些业务性操作(数据库连贯、图片、文件下载)等操作,导致线程阻塞再,读事件的解决上,此时单线程程序无奈进行下一次新链接的解决!咱们对该线程模型进行优化,select事件处理封装为工作,提交到线程池!

NIO多线程模型

下面的这种数据结构可能解决掉因为计算工作耗时过长,导致新链接接入阻塞的问题,咱们是否再次进行一次优化呢?

咱们是否创立多个事件选择器,每个事件选择器,负责不同的Socket连贯,就像上面这种:

NIO多线程优化模型

这样咱们就能够每一个Select选择器负责多个客户端Socket连贯,主线程只须要将客户端新连贯抉择一个选择器注册到select选择器上就能够了!所以咱们的架构图,就变成了下图这样:

咱们在select选择器外部解决计算工作的时候,也能够将工作封装为task,提交到线程池外面去,彻底将新连贯接入和读写事件处理分来到,互不影响!事实上,这也是Netty的核心思想之一,咱们能够依据下面的图例,本人简略写一个:

代码实现

构建一个事件执行器 对应上图的select选择器
/** * Nio事件处理器 * * @author huangfu * @date */public class MyNioEventLoop implements Runnable {    static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128);    private final Selector selector;    private final LinkedBlockingQueue<Runnable> linkedBlockingQueue;    public MyNioEventLoop(Selector selector) {        this.selector = selector;        linkedBlockingQueue = new LinkedBlockingQueue<>();    }    public Selector getSelector() {        return selector;    }    public LinkedBlockingQueue<Runnable> getLinkedBlockingQueue() {        return linkedBlockingQueue;    }    //疏忽  hashCode和eques    /**     * 工作处理器     */    @Override    public void run() {        while (!Thread.currentThread().isInterrupted()) {            try {                //进行事件抉择  这里咱们只解决读事件                if (selector.select() > 0) {                    Set<SelectionKey> selectionKeys = selector.selectedKeys();                    Iterator<SelectionKey> iterator = selectionKeys.iterator();                    //解决读事件                    while (iterator.hasNext()) {                        SelectionKey next = iterator.next();                        iterator.remove();                        if (next.isReadable()) {                            SocketChannel channel = (SocketChannel) next.channel();                            int read = channel.read(ALLOCATE);                            if(read > 0) {                                System.out.printf("线程%s【%s】发来消-息:",Thread.currentThread().getName(), channel.getRemoteAddress());                                System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8));                            }else if(read == -1) {                                System.out.println("连贯断开");                                channel.close();                            }                            ALLOCATE.clear();                        }                    }                    selectionKeys.clear();                }else {                    //解决异步工作  进行注册                    while (!linkedBlockingQueue.isEmpty()) {                        Runnable take = linkedBlockingQueue.take();                        //异步事件执行                        take.run();                    }                }            } catch (IOException | InterruptedException e) {                e.printStackTrace();            }        }    }}
构建一个选择器组
/** * 选择器组 * * @author huangfu * @date 2021年3月12日09:44:37 */public class SelectorGroup {    private final List<MyNioEventLoop> SELECTOR_GROUP = new ArrayList<>(8);    private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();    private final static AtomicInteger IDX = new AtomicInteger();    /**     * 初始化选择器     * @param count 处理器数量     * @throws IOException 异样欣慰     */    public SelectorGroup(int count) throws IOException {        for (int i = 0; i < count; i++) {            Selector open = Selector.open();            MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open);            SELECTOR_GROUP.add(myNioEventLoop);        }    }    public SelectorGroup() throws IOException {        this(AVAILABLE_PROCESSORS << 1);    }    /**     * 轮询获取一个选择器     * @return 返回一个选择器     */    public MyNioEventLoop next(){        int andIncrement = IDX.getAndIncrement();        int length = SELECTOR_GROUP.size();        return SELECTOR_GROUP.get(Math.abs(andIncrement % length));    }}
构建一个执行器记录器
/** * @author huangfu * @date */public class ThreadContext {    /**     * 记录以后应用过的选择器     */    public static final Set<MyNioEventLoop> RUN_SELECT = new HashSet<>();}
构建一个新连贯接入选择器
/** * 连接器 * * @author huangfu * @date 2021年3月12日10:15:37 */public class Acceptor implements Runnable {    private final ServerSocketChannel serverSocketChannel;    private final SelectorGroup selectorGroup;    public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) {        this.serverSocketChannel = serverSocketChannel;        this.selectorGroup = selectorGroup;    }    @Override    public void run() {        try {            SocketChannel socketChannel = serverSocketChannel.accept();            MyNioEventLoop next = selectorGroup.next();            //向队列追加一个注册工作            next.getLinkedBlockingQueue().offer(() -> {                try {                    //客户端注册为非阻塞                    socketChannel.configureBlocking(false);                    //注册到选择器 关注一个读事件                    socketChannel.register(next.getSelector(), SelectionKey.OP_READ);                } catch (Exception e) {                    e.printStackTrace();                }            });            //唤醒对应的工作,让其解决异步工作            next.getSelector().wakeup();            System.out.println("检测到连贯:" + socketChannel.getRemoteAddress());            //当以后选择器曾经被应用过了  就不再应用了,间接注册就行了            if (ThreadContext.RUN_SELECT.add(next)) {                //启动工作                new Thread(next).start();            }        } catch (IOException e) {            e.printStackTrace();        }    }}
创立启动器
/** * 反应器 * * @author huangfu * @date 2021年3月12日10:15:14 */public class Reactor implements Runnable {    private final Selector selector;    public Reactor(Selector selector) {        this.selector = selector;    }    @Override    public void run() {        try {            System.out.println("服务启动胜利");            while (!Thread.currentThread().isInterrupted()) {                //d期待连贯事件                selector.select();                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                while (iterator.hasNext()) {                    SelectionKey next = iterator.next();                    iterator.remove();                    //进行数据散发                    dispatch(next);                }            }        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * 将新连贯散发到新连贯接入器     * @param next 所有事件主键     */    private void dispatch(SelectionKey next) {        Runnable attachment = (Runnable) next.attachment();        if(attachment!=null) {            attachment.run();        }    }}

启动测试

/** * @author huangfu * @date */public class TestMain {    public static void main(String[] args) throws IOException {        //创立一个选择器组   传递选择器组的大小 决定应用多少选择器来实现        SelectorGroup selectorGroup = new SelectorGroup(2);        //开启一个服务端管道        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();        //开启一个服务端专用的选择器        Selector selector = Selector.open();        //设置非阻塞        serverSocketChannel.configureBlocking(false);        //创立一个连接器        Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup);        //将服务端通道注册到服务端选择器上  这里会绑定一个新连贯接入器        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor);        //绑定端口        serverSocketChannel.bind(new InetSocketAddress(8989));        //启动处理器        new Reactor(selector).run();    }}

总结

  1. 单线程下的NIO存在性能瓶颈,当某一计算过程迟缓的时候会阻塞住整个线程,导致影响其余事件的解决!
  2. 为了解决这一缺点,咱们提出了应用异步线程的形式去操作工作,将耗时较长的业务,封装为一个异步工作,提交到线程池执行!
  3. 为了使业务操作和新连贯接入齐全分来到,咱们做了另外一重优化,咱们封装了一个选择器组,轮询的形式获取选择器,每一个选择器都可能解决多个新连贯, socket连贯->selector选择器 = 多 -> 1,在每一个选择器外面又能够应用线程池来解决工作,进一步提高吞吐量!