关于java:Netty源码一Netty介绍及Reactor模式

4次阅读

共计 6800 个字符,预计需要花费 17 分钟才能阅读完成。

一、Netty 介绍

1.1 简介

在官网结尾,有这样一句话来介绍 Netty

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

意为 Netty 是一个 异步 事件驱动 的网络应用程序框架,用于疾速开发高性能协定的服务端和客户端。

如何了解这句话?
异步 是因为 Netty 中简直所有操作都是异步的。
而所谓 事件驱动,简略了解的话就比方咱们应用电脑,你点什么按钮(即产生什么事件),电脑执行什么操作(即调用什么函数),当然事件不仅限于用户的操作,而在网络中,事件能够是建设连贯、读写数据等等。

1.2 Netty 的架构

Netty 的架构图如下,摘自官网

外围局部

  1. 反对零拷贝的升级版 ByteBuffer(对零拷贝概念不懂的童鞋可自行查阅材料)
  2. 对立交互 API
  3. 可拓展的事件模型

传输服务

  1. Socket & Datagram,对应于 TCP 和 UDP
  2. HTTP 隧道
  3. JVM 外部管道传输

协定反对:还有对泛滥协定的反对,包含 HTTP、WebSocket,SSL、google protobuf 等协定

1.3 Netty 的次要个性

  1. 基于 NIO 实现,具备 高吞吐、低提早 个性(没有学习 NIO 的童鞋须要先学习一下 NIO)
  2. 对泛滥协定的反对,开箱即用
  3. 各种传输类型的 对立 API
  4. 可定制的线程模型(与接下来要说的 Reactor 模型无关)

1.4 Netty 中的组件

这里先介绍一下 Netty 中的组件,先眼生一下,后续再进行具体介绍

  1. ByteBuf:缓冲区,包含堆内存和间接内存
  2. ChannelPromise:异步操作相干
  3. EventLoop 和 EventLoopGroup:EventLoop 意为事件循环,在单个线程中执行各种 IO 事件,一个 EventLoopGroup 对应多个 EventLoop
  4. Channel:每个 Channel 都会注册到一个 EventLoop 上,一个 EventLoop 能够对应多个 Channel,每个 Channel 蕴含了一个 ChannelPipeline 流水线
  5. ChannelPipeline 和 ChannelHandler:ChannelPipeline 流水线是一个双向链表,由一连串的 ChannelHandler 组成
  6. Bootstrap:启动相干

在下面 Netty 的次要个性中,看到 Netty 具备可定制的线程模型的个性,那么这是何意?
其实是 Netty 能够很轻松的在 Reactor 的几种线程模型中进行切换,所以咱们须要先理解一下 Reactor 模式

二、Reactor 模式

2.1 什么是 Reactor 模式

定义如下:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to associated request handlers

大抵意思:Reactor 是一种专门用于 处理事件 的模式,用于解决一个或多个输出源的并发服务申请,Service Handler 将传入的服务申请 散发 给关联的 Request Handlers 来解决。如下图

那平时所说的 Reactor 模式是基于 IO 多路复用 + 非阻塞 IO 来实现的,与之绝对应的是 多线程 + 传统 IO的模式(即每个线程解决一个连贯)

IO 多路复用:通过 Selector 选择器来实现单个线程治理多个连贯的形式,称为 IO 多路复用
非阻塞 IO:当调用 read、write 等 IO 操作时,不阻塞以后线程,称为非阻塞 IO

而 Reactor 模式个别实现形式有三种,别离为 单 Reactor 单线程、单 Reactor 多线程 主从 Reactor 多线程,接下来逐个进行介绍

2.2 常见的三种 Reactor 模型

Reactor 译为反应器,也叫散发器,它的作用其实就是监听多个连贯,并将监听到的 IO 事件分发给对应的处理器来解决。

2.2.1 单 Reactor 单线程

单 Reactor 单线程是指只有一个 Reactor,由该 Reactor 来 注册新连贯 以及 监听连贯的 IO 事件 ,并在 单个线程 中来解决这些 IO 事件,如下图

大抵流程

  1. Reactor 监听客户端的申请事件
  2. 如果是建设连贯事件,将该事件分发给 acceptor 解决,acceptor 会将该新连贯注册到 Reactor 上
  3. 如果是读或写事件,将该事件分发给对应的 handler 解决

样例代码
基于 Java NIO 的样例代码如下,能够应用 Telnet localhost 9999 来测试,服务器将会打印出接管到的内容,并返回接管到的数量

/**
 * Description: 单 Reactor 单线程
 * Created by kamier
 */
public class SingleReactorSingleThread {public static void main(String[] args) throws IOException {
        // 启动服务器
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(9999));
        serverSocketChannel.configureBlocking(false);

        // 获取一个 Selector
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        int recCount = 1;
        while (true) {int readyNum = selector.select();
            if (readyNum <= 0) continue;

            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {SelectionKey key = iterator.next();

                if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = serverSocketChannel1.accept();
                    socketChannel.configureBlocking(false);
                    // 监听该 SocketChannel 的读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(10);
                    // 打印读取到的内容
                    while (socketChannel.read(byteBuffer) > 0) {byteBuffer.flip();
                        while (byteBuffer.hasRemaining()) {System.out.print((char) byteBuffer.get());
                        }
                        byteBuffer.clear();}
                    // 批改对该 SocketChannel 感兴趣的事件集为读写事件
                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                } else if (key.isWritable()) {SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.wrap((":" + ++recCount + ";").getBytes());
                    while (byteBuffer.hasRemaining()) {socketChannel.write(byteBuffer);
                    }
                    // 批改对该 SocketChannel 感兴趣的事件集为读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }

                iterator.remove();}
        }
    }
}

优缺点

长处:实现简略,不须要解决并发问题
毛病:单线程无奈利用多 CPU 的解决能力,性能较低

2.2.2 单 Reactor 多线程

单 Reactor 多线程也是只有一个 Reactor,与单线程的次要区别在于,应用 多线程来进行业务逻辑的解决,如下图:

大抵流程

  1. Reactor 监听客户端的申请事件
  2. 如果是建设连贯事件,将该事件分发给 acceptor 解决,acceptor 会将该新连贯注册到 Reactor 上
  3. 如果是读或写事件,将该事件分发给对应的 handler 解决,对应的 handler 将事件交由线程池来解决

样例代码
处理事件的业务逻辑和单线程是一样的,只是将事件交由线程池来解决

/**
 * Description: 单 Reactor 多线程
 * Created by kamier on 2022/6/13 22:20
 */
public class SingleReactorMultiThread {public static AtomicInteger recCount = new AtomicInteger();

    public static void main(String[] args) throws IOException, InterruptedException {
        // 创立线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

        // 启动服务器
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(9999));
        serverSocketChannel.configureBlocking(false);

        // 获取一个 Selector
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {int readyNum = selector.select();
            if (readyNum <= 0) continue;

            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            CountDownLatch latch = new CountDownLatch(selectionKeys.size());
            for (SelectionKey key : selectionKeys) {
                // 提交工作给线程池
                threadPoolExecutor.submit(new SelectionKeyTask(key, selector, latch));
            }

            // 这里期待本轮事件全副解决实现
            latch.await();
            selectionKeys.clear();}
    }

    private static class SelectionKeyTask implements Runnable {

        SelectionKey key;
        Selector selector;
        CountDownLatch latch;
        SelectionKeyTask (SelectionKey key, Selector selector, CountDownLatch latch) {
            this.key = key;
            this.selector = selector;
            this.latch = latch;
        }
        @Override
        public void run() {
            try {if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = serverSocketChannel1.accept();
                    socketChannel.configureBlocking(false);
                    // 监听该 SocketChannel 的读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(10);
                    // 打印读取到的内容
                    while (socketChannel.read(byteBuffer) > 0) {byteBuffer.flip();
                        while (byteBuffer.hasRemaining()) {System.out.print((char) byteBuffer.get());
                        }
                        byteBuffer.clear();}
                    // 批改对该 SocketChannel 感兴趣的事件集为读写事件
                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                } else if (key.isWritable()) {SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.wrap((":" + recCount.incrementAndGet() + ";").getBytes());
                    while (byteBuffer.hasRemaining()) {socketChannel.write(byteBuffer);
                    }
                    // 批改对该 SocketChannel 感兴趣的事件集为读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
            } catch (Exception e) {e.printStackTrace();
            } finally {latch.countDown();
            }
        }
    }
}

优缺点

长处:

  1. 充分利用多 CPU 的解决能力

毛病:

  1. 实现绝对简单,须要解决多线程解决带来的并发问题
  2. 单个 Reactor 解决所有连贯的 IO 事件,在连贯数量较多的状况下,可能存在性能问题

2.2.3 主从 Reactor 多线程

主从 Reactor 多线程是指有 一个 MainReactor以及 多个 SubReactor,由 MainReactor 来 监听建设连贯事件 ,而 SubReactor 用于 监听注册到本身上的连贯的读写事件,分工更为明确。如下图:

大抵流程

  1. MainReactor 监听客户端的申请事件
  2. 如果是建设连贯事件,将该事件分发给 acceptor 解决,acceptor 会将该新连贯注册到某一个 SubReactor 上(SubReactor 能够有多个),并由该 SubReactor 监听该新连贯的读写事件
  3. SubReactor 监听注册到本身上的连贯的读写事件
  4. 如果是读或写事件,将该事件分发给对应的 handler 解决,对应的 handler 将事件交由线程池来解决

优缺点

长处:

  1. 主从 Reactor 分工明确
  2. 可拓展性强

毛病:

  1. 实现简单

三、总结

本文对 Netty 进行了一个简略介绍,并对 Reactor 的三种模型进行了解说,后续章节会先介绍每个 Netty 组件的作用以及对其要害代码的解读,最初再通过一个 http 服务器的样例,来看一下整个服务器的启动流程是什么样的,以及它是如何来解决 http 申请的?

正文完
 0