Reactor模式

反应堆模式:“反应”器名字中”反应“的由来:

  • “反应”即“倒置”,“控制逆转”,具体事件处理程序不调用反应器,而向反应器注册一个事件处理器,表示自己对某些事件感兴趣,有时间来了,具体事件处理程序通过事件处理器对某个指定的事件发生做出反应。

单线程Reactor模式流程:

  • ①服务器端的Reactor是一个线程对象,该线程会启动事件循环,并使用Selector(选择器)来实现IO的多路复用。channel注册一个Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。
  • ②客户端向服务器端发起一个连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ事件以及对应的READ事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ事件了。
  • ③当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过SocketChannel的read()方法读取数据,此时read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。
  • ④每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。

注意,Reactor的单线程模式的单线程主要是针对于I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。

基于单线程反应器模式手写一个NIO通信

服务端处理器:

/** * 类说明:nio通信服务端处理器 */public class NioServerHandle implements Runnable{    private Selector selector;    private ServerSocketChannel serverChannel;    private volatile boolean started;    /**     * 构造方法     * @param port 指定要监听的端口号     */    public NioServerHandle(int port) {        try {            selector = Selector.open();            serverChannel = ServerSocketChannel.open();            serverChannel.configureBlocking(false);            serverChannel.socket().bind(new InetSocketAddress(port));            serverChannel.register(selector,SelectionKey.OP_ACCEPT);            started = true;            System.out.println("服务器已启动,端口号:"+port);        } catch (IOException e) {            e.printStackTrace();        }    }    public void stop(){        started = false;    }    @Override    public void run() {        //循环遍历selector        while(started){            try{                //阻塞,只有当至少一个注册的事件发生的时候才会继续.                selector.select();                Set<SelectionKey> keys = selector.selectedKeys();                Iterator<SelectionKey> it = keys.iterator();                SelectionKey key = null;                while(it.hasNext()){                    key = it.next();                    it.remove();                    try{                        handleInput(key);                    }catch(Exception e){                        if(key != null){                            key.cancel();                            if(key.channel() != null){                                key.channel().close();                            }                        }                    }                }            }catch(Throwable t){                t.printStackTrace();            }        }        //selector关闭后会自动释放里面管理的资源        if(selector != null)            try{                selector.close();            }catch (Exception e) {                e.printStackTrace();            }    }    private void handleInput(SelectionKey key) throws IOException{        if(key.isValid()){            //处理新接入的请求消息            if(key.isAcceptable()){                //获得关心当前事件的channel                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();                //通过ServerSocketChannel的accept创建SocketChannel实例                //完成该操作意味着完成TCP三次握手,TCP物理链路正式建立                SocketChannel sc = ssc.accept();                System.out.println("======socket channel 建立连接" );                //设置为非阻塞的                sc.configureBlocking(false);                //连接已经完成了,可以开始关心读事件了                sc.register(selector,SelectionKey.OP_READ);            }            //读消息            if(key.isReadable()){                System.out.println("======socket channel 数据准备完成," +                        "可以去读==读取=======");                SocketChannel sc = (SocketChannel) key.channel();                //创建ByteBuffer,并开辟一个1M的缓冲区                ByteBuffer buffer = ByteBuffer.allocate(1024);                //读取请求码流,返回读取到的字节数                int readBytes = sc.read(buffer);                //读取到字节,对字节进行编解码                if(readBytes>0){                    //将缓冲区当前的limit设置为position=0,                    // 用于后续对缓冲区的读取操作                    buffer.flip();                    //根据缓冲区可读字节数创建字节数组                    byte[] bytes = new byte[buffer.remaining()];                    //将缓冲区可读字节数组复制到新建的数组中                    buffer.get(bytes);                    String message = new String(bytes,"UTF-8");                    System.out.println("服务器收到消息:" + message);                    //处理数据                    String result = response(message) ;                    //发送应答消息                    doWrite(sc,result);                }                //链路已经关闭,释放资源                else if(readBytes<0){                    key.cancel();                    sc.close();                }            }        }    }    //发送应答消息    private void doWrite(SocketChannel channel,String response)            throws IOException {        //将消息编码为字节数组        byte[] bytes = response.getBytes();        //根据数组容量创建ByteBuffer        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);        //将字节数组复制到缓冲区        writeBuffer.put(bytes);        //flip操作        writeBuffer.flip();        //发送缓冲区的字节数组        channel.write(writeBuffer);    }}public class NioServer {    private static NioServerHandle nioServerHandle;    public static void start(){        if(nioServerHandle !=null)            nioServerHandle.stop();        nioServerHandle = new NioServerHandle(DEFAULT_PORT);        new Thread(nioServerHandle,"Server").start();    }    public static void main(String[] args){        start();    }}

客户端处理器:

public class NioClientHandle implements Runnable{    private String host;    private int port;    private Selector selector;    private SocketChannel socketChannel;    private volatile boolean started;    public NioClientHandle(String ip, int port) {        this.host = ip;        this.port = port;        try {            //创建选择器            selector = Selector.open();            //打开通道            socketChannel = SocketChannel.open();            //如果为 true,则此通道将被置于阻塞模式;            // 如果为 false,则此通道将被置于非阻塞模式            socketChannel.configureBlocking(false);            started = true;        } catch (IOException e) {            e.printStackTrace();        }    }    public void stop(){        started = false;    }    @Override    public void run() {        try {            doConnect();        } catch (IOException e) {            e.printStackTrace();            System.exit(1);        }        //循环遍历selector        while(started){            try {                //阻塞,只有当至少一个注册的事件发生的时候才会继续                selector.select();                //获取当前有哪些事件可以使用                Set<SelectionKey> keys = selector.selectedKeys();                //转换为迭代器                Iterator<SelectionKey> it = keys.iterator();                SelectionKey key = null;                while(it.hasNext()){                    key = it.next();                    it.remove();                    try {                        handleInput(key);                    } catch (IOException e) {                        e.printStackTrace();                        if(key!=null){                            key.cancel();                            if(key.channel()!=null){                                key.channel().close();                            }                        }                    }                }            } catch (IOException e) {                e.printStackTrace();            }        }        //selector关闭后会自动释放里面管理的资源        if(selector!=null){            try {                selector.close();            } catch (IOException e) {                e.printStackTrace();            }        }    }    //具体的事件处理方法    private void handleInput(SelectionKey key) throws IOException{        if(key.isValid()){            //获得关心当前事件的channel            SocketChannel sc = (SocketChannel)key.channel();            if(key.isConnectable()){//连接事件                if(sc.finishConnect()){}                else{System.exit(1);}            }            //有数据可读事件            if(key.isReadable()){                //创建ByteBuffer,并开辟一个1M的缓冲区                ByteBuffer buffer = ByteBuffer.allocate(1024);                //读取请求码流,返回读取到的字节数                int readBytes = sc.read(buffer);                //读取到字节,对字节进行编解码                if(readBytes>0){                    //将缓冲区当前的limit设置为position,position=0,                    // 用于后续对缓冲区的读取操作                    buffer.flip();                    //根据缓冲区可读字节数创建字节数组                    byte[] bytes = new byte[buffer.remaining()];                    //将缓冲区可读字节数组复制到新建的数组中                    buffer.get(bytes);                    String result = new String(bytes,"UTF-8");                    System.out.println("accept message:"+result);                }else if(readBytes<0){                    key.cancel();                    sc.close();                }            }        }    }    //发送消息    private void doWrite(SocketChannel channel,String request)            throws IOException {        //将消息编码为字节数组        byte[] bytes = request.getBytes();        //根据数组容量创建ByteBuffer        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);        //将字节数组复制到缓冲区        writeBuffer.put(bytes);        //flip操作        writeBuffer.flip();        //发送缓冲区的字节数组        channel.write(writeBuffer);    }    private void doConnect() throws IOException {        /*如果此通道处于非阻塞模式,        则调用此方法将启动非阻塞连接操作。        如果立即建立连接,就像本地连接可能发生的那样,则此方法返回true。        否则,此方法返回false,        稍后必须通过调用finishConnect方法完成连接操作。*/        if(socketChannel.connect(new InetSocketAddress(host,port))){}        else{            //连接还未完成,所以注册连接就绪事件,向selector表示关注这个事件            socketChannel.register(selector,SelectionKey.OP_CONNECT);        }    }    //写数据对外暴露的API    public void sendMsg(String msg) throws Exception{        socketChannel.register(selector,SelectionKey.OP_READ);        doWrite(socketChannel,msg);    }}public class NioClient {    private static NioClientHandle nioClientHandle;    public static void start(){        if(nioClientHandle !=null)            nioClientHandle.stop();        nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);        new Thread(nioClientHandle,"Client").start();    }    //向服务器发送消息    public static boolean sendMsg(String msg) throws Exception{        nioClientHandle.sendMsg(msg);        return true;    }    public static void main(String[] args) throws Exception {        start();        System.out.println("请输入请求信息:");        Scanner scanner = new Scanner(System.in);        while(NioClient.sendMsg(scanner.next()));    }}

服务端过程:

  1. 启动服务端,完成一些初始化工作,ServerSocketChannel绑定端口并且注册接受连接事件.
  2. 循环里selector.select()阻塞,只有当至少一个注册的事件发生的时候才会继续,循环里面处理发生的注册事件
  3. 注册事件发生时交给处理器,若为接受连接则accept取出socketChannel并完成连接,然后就是关注read读取事件即注册,有数据读取了则处理器读取请求数据并返回.

客户端过程:

  1. 启动客户端,完成一些初始化工作.
  2. 根据服务端ip及端口发起连接.
  3. 往服务端发送数据,并注册read读取事件
  4. 循环里selector.select()阻塞,只有当至少一个注册的事件发生的时候才会继续,循环里面处理发生的注册事件.
  5. 注册事件发生时交给处理器,若为连接事件并且连接成功则跳过即不予处理等待读取事件发送.

初始化工作如打开selector,channel,设置通道模式是否阻塞.


单线程Reactor,工作者线程池

但在单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应。所以我们应该将非I/O的业务逻辑操作从Reactor线程上卸载,以此来加速Reactor线程对I/O请求的响应.

添加了一个工作者线程池,并将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理。


改进的版本中,所以的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作。
对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下:

  • ① 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的读取和发送;
  • ②当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

多Reactor线程模式

Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的事件循环逻辑。
mainReactor可以只有一个,但subReactor一般会有多个。mainReactor线程主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通信。


流程:

  • ①注册一个Acceptor事件处理器到mainReactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样mainReactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。启动mainReactor的事件循环。
  • ②客户端向服务器端发起一个连接请求,mainReactor监听到了该ACCEPT事件并将该ACCEPT事件派发给Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将这个SocketChannel传递给subReactor线程池。
  • ③subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事件以及对应的READ事件处理器注册到subReactor线程中。当然你也注册WRITE事件以及WRITE事件处理器到subReactor线程中以完成I/O写操作。Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的循环逻辑。
  • ④当有I/O事件就绪时,相关的subReactor就将事件派发给响应的处理器处理。注意,这里subReactor线程只负责完成I/O的read()操作,在读取到数据后将业务逻辑的处理放入到线程池中完成,若完成业务逻辑后需要返回数据给客户端,则相关的I/O的write操作还是会被提交回subReactor线程来完成。

注意,所以的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)依旧还是在Reactor线程(mainReactor线程 或 subReactor线程)中完成的。Thread Pool(线程池)仅用来处理非I/O操作的逻辑。
多Reactor线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subReactor线程来完成与客户端的通信,这样一来就不会因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多Reactor线程模式在海量的客户端并发请求的情况下,还可以通过实现subReactor线程池来将海量的连接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。

Netty服务端使用了“多Reactor线程模式”