Scala并发编程实战Executor线程池

24次阅读

共计 4437 个字符,预计需要花费 12 分钟才能阅读完成。

创建线程是一个重量级操作,因为需要调用操作系统内核的 API,所以最好不要频繁的创建和销毁线程,为了能够复用创建的线程,常用的办法的就是创建线程池。

Executor

java.util.concurren 包中提供了若干接口和类来实现线程池,最常用的有 Executor,ExecutorService,ThreadPoolExecutor。

Executor 接口很简单定义如下:

public interface Executor {void execute(Runnable command);
}

这个接口的目的在于将任务与执行机制解耦,使得用户不需要手动创建线程,只要交给 Executor 就行了。

ExecutorService

ExecutorService 接口则扩展了 Executor 接口,增加了若干实用的方法,最常用的两个方法:

// 关闭线程池
void shutdown();
// 提交 Callable 任务以获取返回值
<T> Future<T> submit(Callable<T> task);

AbstractExecutorService 抽象类是 ExecutorService 的实现,实现了若干模板方法。

最重要的类莫过于 ThreadPoolExecutor,它是最最常用的 ExecutorService 实现类,下面重点说说。

ThreadPoolExecutor

ThreadPoolExecutor 在构造时可以指定的参数最多有 7 个,另外还有 3 个使用一些默认参数的简化版本。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize 是保留的核心线程数,即使线程处于空闲也不会被回收,除非设置了 allowCoreThreadTimeOut 属性。
  • maximumPoolSize 最大线程数。当 workQueue 满了,会给新提交的任务创建新线程,这种情况下线程数会超过 corePoolSize,但整个线程池的线程数必须有个上限,就是 maximumPoolSize 了。
  • keepAliveTime 回收线程前,允许保留空闲线程的时长。
  • workQueue 存储提交的任务的队列
  • threadFactory 创建线程的工厂类(ThreadFactory 这个接口就定义了一个方法Thread newThread(Runnable r);)
  • handler handler 用于没有可用线程(线程数达到最大值,没有空闲线程)且 workQueue 队列满了的时候。

ThreadPoolExecutor 已经提供了以下 4 种策略。
CallerRunsPolicy:提交任务的线程自己去执行该任务。
AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
DiscardPolicy:直接丢弃任务,没有任何异常抛出。
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

ThreadPoolExecutory 的构造函数一共有四种,使得用户可以省略 threadFactory 和 handler 中的一个或两个。

需要注意的情况
当 maximumPoolSize>corePoolSize 时,如果 workQueue 满了,新提交的任务会被新线程马上执行,而之前提交的在队列中等待的队列则继续等待。
也就是说后提交的任务可能先执行了。
当新线程执行完新提交的这个任务后,会转去执行队列中的数据,这时消费任务队列的线程数可能会大于 corePoolSize,消费速度加快了。
下面做个实验。

package io.github.liam8.con

import java.util.concurrent.{ArrayBlockingQueue, Callable, Future, RejectedExecutionException, ThreadPoolExecutor, TimeUnit}

object ExecutorDemo {def main(args: Array[String]): Unit = {
    // corePoolSize=1,maximumPoolSize=2,queue capacity=1
    val executor = new ThreadPoolExecutor(
      1,
      2,
      10,
      TimeUnit.SECONDS,
      new ArrayBlockingQueue[Runnable](1)
    )
    val task1 = new Runnable {override def run(): Unit = {println("task1 running")
        Thread.sleep(3000)
        println("task1 complete")
      }
    }
    val task2 = new Runnable {override def run(): Unit = {println("task2 running")
        Thread.sleep(3000)
        println("task2 complete")
      }
    }
    val task3 = new Callable[String] {override def call(): String = {println("task3 running")
        Thread.sleep(3000)
        println("task3 complete")
        "xxx"
      }
    }
    val task4 = new Runnable {override def run(): Unit = {println("task4 running")
        Thread.sleep(3000)
        println("task4 complete")
      }
    }
    var task2Result: Future[String] = null
    var taskCount = 1
    try {executor.execute(task1)
      println("task1 submitted")
      taskCount += 1
      executor.execute(task2)
      println("task2 submitted")
      taskCount += 1
      task2Result = executor.submit(task3)
      println("task3 submitted")
      taskCount += 1
      executor.execute(task4)
      println("task4 submitted")
    } catch {case e: RejectedExecutionException => println(s"task $taskCount be rejected")
    }
    // 起一个线程跟踪线程池大小
    val th = new Thread {
      var threadNum = 0

      override def run(): Unit =
        while (true) {if (executor.getPoolSize != threadNum) {
            threadNum = executor.getPoolSize
            println("pool size:" + threadNum)
          }
          Thread.sleep(100)
        }
    }
    th.setDaemon(true)
    th.start()
    if (task2Result != null) {println(task2Result.get(7, TimeUnit.SECONDS))
    }
    Thread.sleep(5000)
    executor.shutdown()}


}

output

task1 running
task1 submitted
task2 submitted
task3 submitted
task3 running  //task3 在 task2 之前运行了!task 4 be rejected // 线程数达到最大值,任务队列也满了,task4 被拒绝(默认的 handler)
pool size:2
task1 complete
task3 complete
xxx
task2 running // 空闲的线程开始消费队列
task2 complete
pool size:0

Executors

Executors 是 JUC 包中的一个静态工厂类,其中除了 newFixedThreadPool,newSingleThreadExecutor 方法,其他方法都不推荐使用,因为其他方法创建的线程池使用的是无界队列,可能会占用过多内存,甚至 OOM,所以建议使用有界队列。

ExecutionContext

Scala 另外提供了 ExecutionContext 和 Future 来简化线程池的使用,Future 可以接受一个 ExecutionContext 类型的隐式参数,将传入的函数提交到 ExecutionContext 的线程池中运行。
下面举个栗子,不做深入探讨。

package io.github.liam8.con

import java.util.concurrent.Executors

import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import scala.concurrent.duration._

object ExecutionContextDemo {def main(args: Array[String]): Unit = {val pool = Executors.newFixedThreadPool(2)
    implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(pool)

    val f = Future {val t = Thread.currentThread().getName
      println(s"$t: future is coming")
      123
    }

    val re = f.map(r => {val t = Thread.currentThread().getName
      println(s"$t: mapping")
      r * r
    })
    re.onSuccess {case x: Int => println(x) }

    Await.result(f, 3.seconds)
    ec.shutdown()}

}

output

pool-1-thread-1: future is coming
pool-1-thread-2: mapping
15129

参考文献

Executor 与线程池:如何创建正确的线程池?

Futures Made Easy with Scala

本文代码

Github 仓库

转载请注明原文地址:https://liam-blog.ml/2019/09/22/Scala-Concurrency-Executor/

查看更多博主文章

正文完
 0