前言

本篇博文是《从0到1学习 Netty》中 NIO 系列的第五篇博文,次要内容是应用多线程对程序进行优化,充分利用 CPU 的能力,往期系列文章请拜访博主的 Netty 专栏,博文中的所有代码全副收集在博主的 GitHub 仓库中;

引入

这前几篇文章中,都是采纳单线程进行设计,尽管能够运行,然而没有充分利用 CPU 的性能,并且如果有一个事件的解决工夫较长,则会影响其余事件的解决。

例如,开发一个我的项目,如果团队只有一个全栈工程师,那么他须要先实现前端,再实现后端,只能循序渐进的实现工作,如果前端开发遭逢艰难,破费了很多工夫,则会大大拉长我的项目开发周期,而如果一个团队里有前端工程师和后端工程师,则前后端的开发能同步进行,这样会大大提高开发效率。

同理,对之前的代码进行优化,分两组选择器:

  • 抉择一个线程配置一个选择器,作为 ‘Boss’,专门解决 accept 事件
  • 创立多个线程(最好与 CPU 外围数始终),作为 ‘Worker’,每个线程配置一个选择器,轮流解决 readwrite 等事件

实现

1、创立一个 Boss 线程,负责解决 accept 事件类型:

Thread.currentThread().setName("Boss");Selector boss = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(7999));while (true) {    boss.select();    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();    while (iter.hasNext()) {        SelectionKey key = iter.next();        iter.remove();        if (key.isAcceptable()) {            ServerSocketChannel channel = (ServerSocketChannel) key.channel();            SocketChannel sc = channel.accept();            sc.configureBlocking(false);        }    }}

2、创立 Worker 类,用于初始化 Worker 线程和 Selector,负责解决 read 事件类型:

class Worker implements Runnable{    private Thread thread;    private volatile Selector worker;    private String name;    public Worker(String name) {        this.name = name;    }    public void register() throws IOException {        this.thread = new Thread(this, this.name);        this.worker = Selector.open();        this.thread.start();    }        @Override    public void run() {        while (true) {            try {                this.worker.select();                Iterator<SelectionKey> iter = this.worker.selectedKeys().iterator();                while (iter.hasNext()) {                    SelectionKey key = iter.next();                    iter.remove();                    if (key.isReadable()) {                        ByteBuffer buffer = ByteBuffer.allocate(16);                        SocketChannel channel = (SocketChannel) key.channel();                        channel.read(buffer);                        buffer.flip();                        debugAll(buffer);                    }                }            } catch (IOException e) {                throw new RuntimeException(e);            }        }    }}

然而这里会有个问题,每次进行 register() 的时候会新创建一个线程,但咱们只想一个 Worker 对应一个线程,所以咱们须要对上述代码进行优化,应用标志符来进行判断是否实现过初始化:

private volatile boolean start = false;public void register() throws IOException {    if (!this.start) {        this.thread = new Thread(this, this.name);        this.selector = Selector.open();        this.thread.start();        this.start = true;    }}

留神,this.worker = Selector.open();this.thread.start(); 不要写反了,不然之后运行会呈现空指针异样:

Exception in thread "worker-0" java.lang.NullPointerException        at com.sidiot.netty.c3.MultiThreadServer$Worker.run(MultiThreadServer.java:75)        at java.base/java.lang.Thread.run(Thread.java:832)

3、将 Worker 进行关联,先创立一个 worker 线程:

Worker worker0 = new Worker("worker-0");worker0.register();while (true) {    ...    while (iter.hasNext()) {        ...        if (key.isAcceptable()) {            ...            log.debug("connected... {}", sc.getRemoteAddress());            log.debug("before register {}", sc.getRemoteAddress());            sc.register(worker0.selector, SelectionKey.OP_READ, null);            log.debug("after register {}", sc.getRemoteAddress());        }    }}

4、编写客户端:

public class MultiThreadClient {      public static void main(String[] args) throws IOException {          SocketChannel sc = SocketChannel.open();          sc.connect(new InetSocketAddress("localhost", 7999));          sc.write(Charset.defaultCharset().encode("Hello, World! --sidiot."));          System.in.read();      }  }

5、运行服务端和客户端,运行后果如下:

20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:5061220:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:5061220:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:50612

发现 worker 并没有进行工作,或者说是客户端发送的数据并没有进入到 worker 的可读事件中,这是因为在 worker 的 run() 办法运行时,SocketChannel 还没有注册到 worker 的 selector 中,导致 worker 线程在 this.selector.select(); 的地位产生了阻塞;

6、因为 sc.register 产生在 boss 线程中,而 select 产生在 worker 线程中,无奈确定两个线程的执行程序,因而须要把两步操作都放入一个线程中;

SocketChannel 传到到 Worker 的 register() 办法中:

public void register(SocketChannel sc) throws IOException {      if (!this.start) {          this.thread = new Thread(this, this.name);          this.selector = Selector.open();          this.thread.start();          this.start = true;      }        sc.register(this.selector, SelectionKey.OP_READ, null);  }

但这样还是不行的,因为 register() 办法还是在 boss 线程中执行,这就须要应用队列来实现线程间的通信了:

private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public void register(SocketChannel sc) throws IOException {      ...      this.queue.add(() -> {          try {              sc.register(this.selector, SelectionKey.OP_READ, null);          } catch (ClosedChannelException e) {              throw new RuntimeException(e);          }      });          this.selector.wakeup();}@Override  public void run() {      while (true) {          try {              this.selector.select();              Runnable task = this.queue.poll();              if (task != null) {                  task.run();              }              Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();            ...    }}

留神,这里须要 this.selector.wakeup(); 来唤醒 selector 持续往下走;

还有另一种办法,参考代码点击这里;

7、将单线程 worker 转成多线程:

Worker[] workers = new Worker[4];  for (int i = 0; i < workers.length; i++) {      workers[i] = new Worker("worker-" + i);  }

同时应用计数器来实现各个 worker 线程的轮询应用:

AtomicInteger index = new AtomicInteger();while (true) {      ...    while (iter.hasNext()) {          ...        if (key.isAcceptable()) {              ...            workers[index.getAndIncrement() % workers.length].register(sc);          }      }  }

运行后果:

22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:5466822:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:5466822:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:5466822:36:13 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54668+--------+-------------------- all ------------------------+----------------+position: [0], limit: [7]         +-------------------------------------------------+         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|+--------+-------------------------------------------------+----------------+22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:5467622:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:5467622:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:5467622:36:20 [DEBUG] [worker-1] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54676+--------+-------------------- all ------------------------+----------------+position: [0], limit: [7]         +-------------------------------------------------+         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|+--------+-------------------------------------------------+----------------+22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:5468722:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:5468722:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:5468722:36:30 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54687+--------+-------------------- all ------------------------+----------------+position: [0], limit: [7]         +-------------------------------------------------+         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|+--------+-------------------------------------------------+----------------+

后记

以上就是 多线程优化 的所有内容了,心愿本篇博文对大家有所帮忙!

参考:

  • Netty API reference;
  • 黑马程序员Netty全套教程 ;

上篇精讲:「NIO」(四)音讯边界与可写事件

我是 ,期待你的关注;

创作不易,请多多反对;

系列专栏:摸索 Netty:源码解析与利用案例分享