乐趣区

NIO之Reactor模式Netty序章

Reactor 模式

反应堆模式:“反应”器名字中”反应“的由来:

  • “反应”即“倒置”,“控制逆转”, 具体事件处理程序不调用反应器,而向反应器注册一个事件处理器,表示自己对某些事件感兴趣,有时间来了,具体事件处理程序通过事件处理器对某个指定的事件发生做出反应。

单线程 Reactor 模式流程:

  • ①服务器端的 Reactor 是一个线程对象,该线程会启动事件循环,并使用 Selector(选择器)来实现 IO 的多路复用。channel 注册一个 Acceptor 事件处理器到 Reactor 中,Acceptor 事件处理器所关注的事件是 ACCEPT 事件,这样 Reactor 会监听客户端向服务器端发起的连接请求事件(ACCEPT 事件)。
  • ②客户端向服务器端发起一个连接请求,Reactor 监听到了该 ACCEPT 事件的发生并将该 ACCEPT 事件派发给相应的 Acceptor 处理器来进行处理。Acceptor 处理器通过 accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的 READ 事件以及对应的 READ 事件处理器注册到 Reactor 中,这样一来 Reactor 就会监听该连接的 READ 事件了。
  • ③当 Reactor 监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过 SocketChannel 的 read()方法读取数据,此时 read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。
  • ④每当处理完所有就绪的感兴趣的 I / O 事件后,Reactor 线程会再次执行 select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。

注意,Reactor 的单线程模式的单线程主要是针对于 I / O 操作而言,也就是所有的 I / O 的 accept()、read()、write()以及 connect()操作都在一个线程上完成的。

基于单线程反应器模式手写一个 NIO 通信

服务端处理器:

/**
 * 类说明:nio 通信服务端处理器
 */
public class NioServerHandle implements Runnable{
    private Selector selector;
    private ServerSocketChannel serverChannel;
    private volatile boolean started;
    /**
     * 构造方法
     * @param port 指定要监听的端口号
     */
    public NioServerHandle(int port) {

        try {selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(port));
            serverChannel.register(selector,SelectionKey.OP_ACCEPT);
            started = true;
            System.out.println("服务器已启动,端口号:"+port);
        } catch (IOException e) {e.printStackTrace();
        }

    }
    public void stop(){started = false;}
    @Override
    public void run() {
        // 循环遍历 selector
        while(started){
            try{
                // 阻塞, 只有当至少一个注册的事件发生的时候才会继续.
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){key = it.next();
                    it.remove();
                    try{handleInput(key);
                    }catch(Exception e){if(key != null){key.cancel();
                            if(key.channel() != null){key.channel().close();}
                        }
                    }
                }
            }catch(Throwable t){t.printStackTrace();
            }
        }
        //selector 关闭后会自动释放里面管理的资源
        if(selector != null)
            try{selector.close();
            }catch (Exception e) {e.printStackTrace();
            }
    }
    private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){
            // 处理新接入的请求消息
            if(key.isAcceptable()){
                // 获得关心当前事件的 channel
                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                // 通过 ServerSocketChannel 的 accept 创建 SocketChannel 实例
                // 完成该操作意味着完成 TCP 三次握手,TCP 物理链路正式建立
                SocketChannel sc = ssc.accept();
                System.out.println("======socket channel 建立连接");
                // 设置为非阻塞的
                sc.configureBlocking(false);
                // 连接已经完成了,可以开始关心读事件了
                sc.register(selector,SelectionKey.OP_READ);
            }
            // 读消息
            if(key.isReadable()){
                System.out.println("======socket channel 数据准备完成," +
                        "可以去读 == 读取 =======");
                SocketChannel sc = (SocketChannel) key.channel();
                // 创建 ByteBuffer,并开辟一个 1M 的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                // 读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                // 读取到字节,对字节进行编解码
                if(readBytes>0){
                    // 将缓冲区当前的 limit 设置为 position=0,// 用于后续对缓冲区的读取操作
                    buffer.flip();
                    // 根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    // 将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String message = new String(bytes,"UTF-8");
                    System.out.println("服务器收到消息:" + message);
                    // 处理数据
                    String result = response(message) ;
                    // 发送应答消息
                    doWrite(sc,result);
                }
                // 链路已经关闭,释放资源
                else if(readBytes<0){key.cancel();
                    sc.close();}
            }

        }
    }
    // 发送应答消息
    private void doWrite(SocketChannel channel,String response)
            throws IOException {
        // 将消息编码为字节数组
        byte[] bytes = response.getBytes();
        // 根据数组容量创建 ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        // 将字节数组复制到缓冲区
        writeBuffer.put(bytes);
        //flip 操作
        writeBuffer.flip();
        // 发送缓冲区的字节数组
        channel.write(writeBuffer);
    }
}
public class NioServer {

    private static NioServerHandle nioServerHandle;

    public static void start(){if(nioServerHandle !=null)
            nioServerHandle.stop();
        nioServerHandle = new NioServerHandle(DEFAULT_PORT);
        new Thread(nioServerHandle,"Server").start();}
    public static void main(String[] args){start();
    }

}

客户端处理器:

public class NioClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;

    private volatile boolean started;

    public NioClientHandle(String ip, int port) {
        this.host = ip;
        this.port = port;
        try {
            // 创建选择器
            selector = Selector.open();
            // 打开通道
            socketChannel = SocketChannel.open();
            // 如果为 true,则此通道将被置于阻塞模式;// 如果为 false,则此通道将被置于非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        } catch (IOException e) {e.printStackTrace();
        }

    }
    public void stop(){started = false;}
    @Override
    public void run() {
        try {doConnect();
        } catch (IOException e) {e.printStackTrace();
            System.exit(1);
        }
        // 循环遍历 selector
        while(started){
            try {
                // 阻塞, 只有当至少一个注册的事件发生的时候才会继续
                selector.select();
                // 获取当前有哪些事件可以使用
                Set<SelectionKey> keys = selector.selectedKeys();
                // 转换为迭代器
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){key = it.next();
                    it.remove();
                    try {handleInput(key);
                    } catch (IOException e) {e.printStackTrace();
                        if(key!=null){key.cancel();
                            if(key.channel()!=null){key.channel().close();}
                        }
                    }
                }
            } catch (IOException e) {e.printStackTrace();
            }
        }

        //selector 关闭后会自动释放里面管理的资源
        if(selector!=null){
            try {selector.close();
            } catch (IOException e) {e.printStackTrace();
            }
        }

    }

    // 具体的事件处理方法
    private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){
            // 获得关心当前事件的 channel
            SocketChannel sc = (SocketChannel)key.channel();
            if(key.isConnectable()){// 连接事件
                if(sc.finishConnect()){}
                else{System.exit(1);}
            }
            // 有数据可读事件
            if(key.isReadable()){
                // 创建 ByteBuffer,并开辟一个 1M 的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                // 读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                // 读取到字节,对字节进行编解码
                if(readBytes>0){
                    // 将缓冲区当前的 limit 设置为 position,position=0,// 用于后续对缓冲区的读取操作
                    buffer.flip();
                    // 根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    // 将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("accept message:"+result);
                }else if(readBytes<0){key.cancel();
                    sc.close();}
            }
        }
    }

    // 发送消息
    private void doWrite(SocketChannel channel,String request)
            throws IOException {
        // 将消息编码为字节数组
        byte[] bytes = request.getBytes();
        // 根据数组容量创建 ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        // 将字节数组复制到缓冲区
        writeBuffer.put(bytes);
        //flip 操作
        writeBuffer.flip();
        // 发送缓冲区的字节数组
        channel.write(writeBuffer);
    }

    private void doConnect() throws IOException {
        /* 如果此通道处于非阻塞模式,则调用此方法将启动非阻塞连接操作。如果立即建立连接,就像本地连接可能发生的那样,则此方法返回 true。否则,此方法返回 false,稍后必须通过调用 finishConnect 方法完成连接操作。*/
        if(socketChannel.connect(new InetSocketAddress(host,port))){}
        else{
            // 连接还未完成,所以注册连接就绪事件,向 selector 表示关注这个事件
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
        }
    }

    // 写数据对外暴露的 API
    public void sendMsg(String msg) throws Exception{socketChannel.register(selector,SelectionKey.OP_READ);
        doWrite(socketChannel,msg);
    }
}
public class NioClient {

    private static NioClientHandle nioClientHandle;

    public static void start(){if(nioClientHandle !=null)
            nioClientHandle.stop();
        nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);
        new Thread(nioClientHandle,"Client").start();}
    // 向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception{nioClientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception {start();
        System.out.println("请输入请求信息:");
        Scanner scanner = new Scanner(System.in);
        while(NioClient.sendMsg(scanner.next()));

    }

}

服务端过程:

  1. 启动服务端, 完成一些初始化工作,ServerSocketChannel 绑定端口并且注册接受连接事件.
  2. 循环里 selector.select()阻塞, 只有当至少一个注册的事件发生的时候才会继续, 循环里面处理发生的注册事件
  3. 注册事件发生时交给处理器, 若为接受连接则 accept 取出 socketChannel 并完成连接, 然后就是关注 read 读取事件即注册, 有数据读取了则处理器读取请求数据并返回.

客户端过程:

  1. 启动客户端, 完成一些初始化工作.
  2. 根据服务端 ip 及端口发起连接.
  3. 往服务端发送数据, 并注册 read 读取事件
  4. 循环里 selector.select()阻塞, 只有当至少一个注册的事件发生的时候才会继续, 循环里面处理发生的注册事件.
  5. 注册事件发生时交给处理器, 若为连接事件并且连接成功则跳过即不予处理等待读取事件发送.

初始化工作如打开 selector,channel, 设置通道模式是否阻塞.


单线程 Reactor,工作者线程池

但在单线程 Reactor 模式中,不仅 I / O 操作在该 Reactor 线程上,连非 I / O 的业务操作也在该线程上进行处理了,这可能会大大延迟 I / O 请求的响应。所以我们应该将非 I / O 的业务逻辑操作从 Reactor 线程上卸载,以此来加速 Reactor 线程对 I / O 请求的响应.

添加了一个工作者线程池,并将非 I / O 操作从 Reactor 线程中移出转交给工作者线程池来执行。这样能够提高 Reactor 线程的 I / O 响应,不至于因为一些耗时的业务逻辑而延迟对后面 I / O 请求的处理。


改进的版本中,所以的 I / O 操作依旧由一个 Reactor 来完成,包括 I / O 的 accept()、read()、write()以及 connect()操作。
对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下:

  • ① 一个 NIO 线程同时处理成百上千的链路,性能上无法支撑,即便 NIO 线程的 CPU 负荷达到 100%,也无法满足海量消息的读取和发送;
  • ②当 NIO 线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了 NIO 线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

多 Reactor 线程模式

Reactor 线程池中的每一 Reactor 线程都会有自己的 Selector、线程和分发的事件循环逻辑。
mainReactor 可以只有一个,但 subReactor 一般会有多个。mainReactor 线程主要负责接收客户端的连接请求,然后将接收到的 SocketChannel 传递给 subReactor,由 subReactor 来完成和客户端的通信。


流程:

  • ①注册一个 Acceptor 事件处理器到 mainReactor 中,Acceptor 事件处理器所关注的事件是 ACCEPT 事件,这样 mainReactor 会监听客户端向服务器端发起的连接请求事件(ACCEPT 事件)。启动 mainReactor 的事件循环。
  • ②客户端向服务器端发起一个连接请求,mainReactor 监听到了该 ACCEPT 事件并将该 ACCEPT 事件派发给 Acceptor 处理器来进行处理。Acceptor 处理器通过 accept()方法得到与这个客户端对应的连接(SocketChannel),然后将这个 SocketChannel 传递给 subReactor 线程池。
  • ③subReactor 线程池分配一个 subReactor 线程给这个 SocketChannel,即,将 SocketChannel 关注的 READ 事件以及对应的 READ 事件处理器注册到 subReactor 线程中。当然你也注册 WRITE 事件以及 WRITE 事件处理器到 subReactor 线程中以完成 I / O 写操作。Reactor 线程池中的每一 Reactor 线程都会有自己的 Selector、线程和分发的循环逻辑。
  • ④当有 I / O 事件就绪时,相关的 subReactor 就将事件派发给响应的处理器处理。注意,这里 subReactor 线程只负责完成 I / O 的 read()操作,在读取到数据后将业务逻辑的处理放入到线程池中完成,若完成业务逻辑后需要返回数据给客户端,则相关的 I / O 的 write 操作还是会被提交回 subReactor 线程来完成。

注意,所以的 I / O 操作 (包括,I/ O 的 accept()、read()、write() 以及 connect()操作)依旧还是在 Reactor 线程 (mainReactor 线程 或 subReactor 线程) 中完成的。Thread Pool(线程池)仅用来处理非 I / O 操作的逻辑。
多 Reactor 线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个 Reactor 线程来完成。mainReactor 完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给 subReactor 线程来完成与客户端的通信,这样一来就不会因为 read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多 Reactor 线程模式在海量的客户端并发请求的情况下,还可以通过实现 subReactor 线程池来将海量的连接分发给多个 subReactor 线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。

Netty 服务端 使用了“多 Reactor 线程模式”

退出移动版