Java并发22并发设计模式-ThreadPerMessage-与-Worker-Thread-模式

26次阅读

共计 4806 个字符,预计需要花费 13 分钟才能阅读完成。

我们曾经把并发编程领域的问题总结为三个核心问题:分工、同步和互斥。其中,同步和互斥相关问题更多地源自微观,而分工问题则是源自宏观。我们解决问题,往往都是从宏观入手, 同样,解决并发编程问题,首要问题也是解决宏观的分工问题

并发编程领域里,解决分工问题也有一系列的设计模式,比较常用的主要有 Thread-Per-Message 模式、Worker Thread 模式、生产者 – 消费者模式等等。今天我们重点介绍 Thread-Per-Message 模式。

如何理解 Thread-Per-Message 模式

比如写一个 HTTP Server,很显然只能在主线程中接收请求,而不能处理 HTTP 请求,因为如果在主线程中处理 HTTP 请求的话,那同一时间只能处理一个请求,太慢了!怎么办呢?可以利用代办的思路,创建一个子线程,委托子线程去处理 HTTP 请求。

这种委托他人办理的方式,在并发编程领域被总结为一种设计模式,叫做Thread-Per-Message 模式,简言之就是为每个任务分配一个独立的线程。这是一种最简单的分工方法,实现起来也非常简单。

用 Thread 实现 Thread-Per-Message 模式

Thread-Per-Message 模式的一个最经典的应用场景是 网络编程里服务端的实现,服务端为每个客户端请求创建一个独立的线程,当线程处理完请求后,自动销毁,这是一种最简单的并发处理网络请求的方法。

下面我们就以 echo 程序的服务端为例,介绍如何实现 Thread-Per-Message 模式。

final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(new InetSocketAddress(8080));
// 处理请求    
try {while (true) {
    // 接收请求
    SocketChannel sc = ssc.accept();
    // 每个请求都创建一个线程
    new Thread(()->{
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模拟处理请求
        Thread.sleep(2000);
        // 写 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip();
        sc.write(wb);
        // 关闭 Socket
        sc.close();}catch(Exception e){throw new UncheckedIOException(e);
      }
    }).start();}
} finally {ssc.close();
}   

如果你熟悉网络编程,相信你一定会提出一个很尖锐的问题:上面这个 echo 服务的实现方案是不具备可行性的。原因在于 Java 中的线程是一个重量级的对象,创建成本很高,一方面创建线程比较耗时,另一方面线程占用的内存也比较大。所以,为每个请求创建一个新的线程并不适合高并发场景。

于是,你开始质疑 Thread-Per-Message 模式,而且开始重新思索解决方案,这时候很可能你会想到 Java 提供的线程池。

Thread-Per-Message 模式虽然作为一种最简单的分工方案,Java 语言支持不了,显然是 Java 语言本身的问题。

Java 语言里,Java 线程是和操作系统线程一一对应的,这种做法本质上是将 Java 线程的调度权完全委托给操作系统,而操作系统在这方面非常成熟,所以这种做法的好处是稳定、可靠,但是也继承了操作系统线程的缺点:创建成本高。为了解决这个缺点,Java 并发包里提供了线程池等工具类。这个思路在很长一段时间里都是很稳妥的方案,但是这个方案并不是唯一的方案。

业界还有另外一种方案,叫做 轻量级线程。这个方案在 Java 领域知名度并不高,但是在其他编程语言里却叫得很响,例如 Go 语言、Lua 语言里的协程,本质上就是一种轻量级的线程。轻量级的线程,创建的成本很低,基本上和创建一个普通对象的成本相似;并且创建的速度和内存占用相比操作系统线程至少有一个数量级的提升,所以基于轻量级线程实现 Thread-Per-Message 模式就完全没有问题了。

Worker Thread 模式及其实现

Thread-Per-Message 模式,对应到现实世界,其实就是委托代办。这种分工模式如果用 Java Thread 实现,频繁地创建、销毁线程非常影响性能,同时无限制地创建线程还可能导致 OOM,所以在 Java 领域使用场景就受限了。

要想有效避免线程的频繁创建、销毁以及 OOM 问题。也是 Java 领域使用最多的 Worker Thread 模式。

Worker Thread 模式可以类比现实世界里车间的工作模式:车间里的工人,有活儿了,大家一起干,没活儿了就聊聊天等着。你可以参考下面的示意图来理解,Worker Thread 模式中Worker Thread 对应到现实世界里,其实指的就是车间里的工人。不过这里需要注意的是,车间里的工人数量往往是确定的。

车间工作示意图 ###

那在编程领域该如何模拟车间的这种工作模式呢?或者说如何去实现 Worker Thread 模式呢?通过上面的图,你很容易就能想到用阻塞队列做任务池,然后创建固定数量的线程消费阻塞队列中的任务。其实你仔细想会发现,这个方案就是 Java 语言提供的线程池。

线程池有很多优点,例如能够避免重复创建、销毁线程,同时能够限制创建线程的上限等等。学习完上一篇文章后你已经知道,用 Java 的 Thread 实现 Thread-Per-Message 模式难以应对高并发场景,原因就在于频繁创建、销毁 Java 线程的成本有点高,而且无限制地创建线程还可能导致应用 OOM。线程池,则恰好能解决这些问题。

下面的示例代码是用线程池实现的 echo 服务端,相比于 Thread-Per-Message 模式的实现,改动非常少,仅仅是创建了一个最多线程数为 500 的线程池 es,然后通过 es.execute() 方法将请求处理的任务提交给线程池处理。

ExecutorService es = Executors
  .newFixedThreadPool(500);
final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(new InetSocketAddress(8080));
// 处理请求    
try {while (true) {
    // 接收请求
    SocketChannel sc = ssc.accept();
    // 将请求处理任务提交给线程池
    es.execute(()->{
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模拟处理请求
        Thread.sleep(2000);
        // 写 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip();
        sc.write(wb);
        // 关闭 Socket
        sc.close();}catch(Exception e){throw new UncheckedIOException(e);
      }
    });
  }
} finally {ssc.close();
  es.shutdown();}   

正确地创建线程池

Java 的线程池既能够避免无限制地 创建线程 导致 OOM,也能避免无限制地 接收任务 导致 OOM。只不过后者经常容易被我们忽略,例如在上面的实现中,就被我们忽略了。所以强烈建议你 用创建有界的队列来接收任务

当请求量大于有界队列的容量时,就需要合理地拒绝请求。如何合理地拒绝呢?这需要你结合具体的业务场景来制定,建议你 在创建线程池时,清晰地指明拒绝策略

同时,为了便于调试和诊断问题,我也强烈建议你 在实际工作中给线程赋予一个业务相关的名字.

ExecutorService es = new ThreadPoolExecutor(
  50, 500,
  60L, TimeUnit.SECONDS,
  // 注意要创建有界队列
  new LinkedBlockingQueue<Runnable>(2000),
  // 建议根据业务需求实现 ThreadFactory
  r->{return new Thread(r, "echo-"+ r.hashCode());
  },
  // 建议根据业务需求实现 RejectedExecutionHandler
  new ThreadPoolExecutor.CallerRunsPolicy());

避免线程死锁

使用线程池过程中,还要注意一种 线程死锁 的场景。如果提交到相同线程池的任务不是相互独立的,而是有依赖关系的,那么就有可能导致线程死锁。具体现象是 应用每运行一段时间偶尔就会处于无响应的状态,监控数据看上去一切都正常,但是实际上已经不能正常工作了

我们可以用下面的示例代码来模拟该应用,如果你执行下面的这段代码,会发现它永远执行不到最后一行。执行过程中没有任何异常,但是应用已经停止响应了。

//L1、L2 阶段共用的线程池
ExecutorService es = Executors.
  newFixedThreadPool(2);
//L1 阶段的闭锁    
CountDownLatch l1=new CountDownLatch(2);
for (int i=0; i<2; i++){System.out.println("L1");
  // 执行 L1 阶段任务
  es.execute(()->{
    //L2 阶段的闭锁 
    CountDownLatch l2=new CountDownLatch(2);
    // 执行 L2 阶段子任务
    for (int j=0; j<2; j++){es.execute(()->{System.out.println("L2");
        l2.countDown();});
    }
    // 等待 L2 阶段任务执行完
    l2.await();
    l1.countDown();});
}
// 等着 L1 阶段任务执行完
l1.await();
System.out.println("end");

当应用出现类似问题时,首选的诊断方法是查看线程栈。你会发现线程池中的两个线程全部都阻塞在 l2.await()这里

原因找到了,那如何解决就简单了,最简单粗暴的办法就是将线程池的最大线程数调大,如果能够确定任务的数量不是非常多的话,这个办法也是可行的,否则这个办法就行不通了。其实 这种问题通用的解决方案是为不同的任务创建不同的线程池

提交到相同线程池中的任务一定是相互独立的,否则就一定要慎重

总结

解决并发编程里的分工问题,最好的办法是和现实世界做对比。对比现实世界构建编程领域的模型,能够让模型更容易理解。Thread-Per-Message 模式,类似于现实世界里的委托他人办理。Worker Thread 模式则类似于车间里工人的工作模式。

Worker Thread 模式和 Thread-Per-Message 模式的区别有哪些呢?从现实世界的角度看,你委托代办人做事,往往是和代办人直接沟通的;对应到编程领域,其实现也是主线程直接创建了一个子线程,主子线程之间是可以直接通信的。而车间工人的工作方式则是完全围绕任务展开的,一个具体的任务被哪个工人执行,预先是无法知道的;对应到编程领域,则是主线程提交任务到线程池,但主线程并不关心任务被哪个线程执行。

Worker Thread 模式能避免线程频繁创建、销毁的问题,而且能够限制线程的最大数量。Java 语言里可以直接使用线程池来实现 Worker Thread 模式,线程池是一个非常基础和优秀的工具类,甚至有些大厂的编码规范都不允许用 new Thread() 来创建线程的,必须使用线程池。

使用线程池还是需要格外谨慎的,如何正确创建线程池、如何避免线程死锁问题,还需要注意前面我们曾经提到的 ThreadLocal 内存泄露问题。同时对于提交到线程池的任务,还要做好异常处理,避免异常的任务从眼前溜走,有时没有发现异常的任务后果往往都很严重。

正文完
 0