关于java:说说-Java-线程池

33次阅读

共计 5488 个字符,预计需要花费 14 分钟才能阅读完成。

一、引言

池的概念大家并不生疏,数据库连接池、线程池等 … 大体来说,有三个长处:

  1. 升高资源耗费。
  2. 进步响应速度。
  3. 便于对立治理。

以上是“池化”技术的雷同特点,至于他们之间的不同点这里不讲,两者都是为了进步性能和效率,抛开理论做连连看找不同,没有意义。

同样,类比于线程池来说:

  • 升高资源耗费:
    反复利用线程池中曾经创立的线程,相比之下省去了线程创立和销毁的性能耗费。
  • 进步响应速度:
    当有工作创立时,不用期待线程创立,能够立刻执行。
  • 便于对立治理:
    应用线程池,能够对线程对立治理,对线程的执行状态做对立监控。

二、线程池的应用

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
                          RejectedExecutionHandler handler);
1、要害参数
  • corePoolSize 外围线程数
    当向线程池中提交一个工作时,如果线程池中的线程数量小于外围线程数,即便存在闲暇线程,也会新建一个线程来执行当前任务,直到线程数量大于或等于外围线程数。
  • maximunPoolSize 最大线程数
    当工作队列满了,线程池中的线程数量小于最大线程数时,创立新线程执行工作。对于无界队列,疏忽该参数。
  • keepAliveTime 线程存活工夫
    大于外围线程数的那一部分线程的存活工夫,如果这部分线程闲暇超过这段时间,则进行销毁。
  • workqueue 工作队列
    线程池中的线程数大于外围线程数时,将工作放入此队列期待执行。
  • threadFactory 线程工厂
    用于创立线程,工厂应用 new Threa() 的形式创立线程,并为每个线程做对立规定的命名:pool-m-thread-n(m 为线程池的编号,n 为线程池内的线程编号)。
  • handler 饱和策略
    当线程池和队列都满了,则依据此策略解决工作。
2、工作队列类型
名称 形容
ArrayBlockingQueue 基于数组构造的有界阻塞队列,此队列按 FIFO(先进先出)准则对元素进行排序。
LinkedBlockingQueue 基于链表构造的阻塞队列,此队列按 FIFO(先进先出)排序元素,吞吐量通常要高于 ArrayBlockingQueue。Executors.newFixedThreadPool() 应用了这个队列。
SynchronousQueue 不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作始终处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,动态工厂办法 Executors.newCachedThreadPool() 应用了这个队列。
PriorityBlockingQueue 具备优先级的有限阻塞队列。
3、饱和策略类型
策略名称 个性
AbortPolicy 默认的饱和策略,间接抛出 RejectedExecutionException 异样
DiscardPolicy 不解决,间接抛弃工作
CallerRunsPolicy 应用调用者的线程执行工作
DiscardOldestPolicy 抛弃队列里最近的一个工作,执行当前任务

同时,还能够自行实现 RejectedExecutionHandler 接口来自定义饱和策略,比方记录日志、长久化等等。

  • void execute(Runnable command)
    ❀ 示例:

    ThreadFactory namedThreadFactory =
    new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    ExecutorService executor =
    new ThreadPoolExecutor(
    10,    
    1000,
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(10),
    namedThreadFactory,
    new ThreadPoolExecutor.AbortPolicy());
    executor.execute(() -> {System.out.println(1111);
    });

留神应用 execute 办法提交工作时,没有返回值。

  • Future<?> submit(Runnable task)
    ❀ 示例:

    Future<Integer> future = executor.submit(() -> {return 1 + 1;});
    Integer result = future.get();

    还能够应用 submit 办法提交工作,该办法返回一个 Future 对象,通过 Future#get() 办法能够取得工作的返回值,该办法会始终阻塞晓得工作执行结束。还能够应用 Future#get(long timeout, TimeUnit unit) 办法,该办法会阻塞一段时间后立刻返回,而这时工作可能没有执行结束。

5、敞开线程池

ThreadPoolExecutor 提供了 shutdown() 和 shutdownNow() 两个办法敞开线程池。原理是首先遍历线程池的工作线程,顺次调用 interrupt() 办法中断线程,这样看来如果无奈响应中断的工作就不能终止。

两者区别是:

  • shutdownNow() 首先将线程池的状态设置成 STOP,而后尝试进行所有的正在执行或暂停工作的线程,并返回期待执行工作的列表。
  • shutdown() 首先将线程池的状态设置成 SHUTDOWN 状态,而后中断所有没有正在执行工作的线程。

如果调用了其中一种办法,isShutdown 办法就会返回 true。当所有的工作都已敞开后, 才示意线程池敞开胜利,这时调用 isTerminaed 办法会返回 true。理论利用中能够依据工作是否 肯定要执行结束 的个性,决定应用哪种办法敞开线程池。

6、正当的配置线程池

通常咱们能够 依据 CPU 外围数量来设计线程池数量

能够通过 Runtime.getRuntime().availableProcessors() 办法取得以后设施的物理外围数量。值得注意的是,如果利用运行在一些 docker 或虚拟机容器上时,该办法获得的是以后物理机的 CPU 外围数。

  • IO 密集型 2nCPU
  • 计算密集型 nCPU+1

    • 其中 n 为 CPU 外围数量。
    • 为什么加 1:即便当计算密集型的线程偶然因为缺失故障或者其余起因而暂停时,这个额定的线程也能确保 CPU 的时钟周期不会被节约。

三、线程池的运行过程

当提交一个新工作时,线程池的解决步骤:

  1. 判断以后线程池内的线程数量是否小于外围线程数,如果小于则新建线程执行工作。否则,进入下个阶段。
  2. 判断队列是否已满,如果没满,则将工作退出期待队列。否则,进入下个阶段。
  3. 在下面根底上判断是否大于最大线程数,如果是依据响应的策略解决。否则,新建线程执行当前任务。

线程池的源码比较简单易懂,感兴趣的小伙伴能够自行查看 java.util.concurrent.ThreadPoolExecutor,在线程池中每个工作都被包装为一个一个的 Worker,上面简略看下 Worker 的 run() 办法:

try {while (task != null || (task = getTask()) != null) {w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {task.run();
                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly);
        }

能够看到一直的循环取出 Task 并执行,而在工作的执行前后,有 beforeExecute 和 afterExecute 办法,咱们能够实现两个办法实现一些监控逻辑。除此之外还能够汇合线程池的一些属性或者重写 terminated() 办法在线程池敞开时进行监控。

四、常见的几种线程池实现

Executors 中提供了集中常见的线程池,别离利用在不同的场景。

  • FixThreadPool 固定数量的线程池,实用于对线程治理,高负载的零碎
  • SingleThreadPool 只有一个线程的线程池,实用于保障工作程序执行
  • CacheThreadPool 创立一个不限度线程数量的线程池,实用于执行短期异步工作的小程序,低负载零碎
  • ScheduledThreadPool 定时工作应用的线程池,实用于定时工作

下面几种线程池的个性次要依赖于 ThreadPoolExecutor 的几个参数来实现,不同的外围线程数量,以及不同类型的阻塞队列,同时咱们还能够自行实现本人的线程池满足业务需要。

值得注意的是,并不举荐应用 Executors 创立线程池,详见下:

  • Executors.newFixedThreadPool(int nThread)
public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>());
 }

❀ 持续来看 LinkedBlockingQueue:

public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
}

能够看到应用 LinkedBlockingQueue 创立的是 Integer.MAX_VALUE 大小的队列,会沉积大量的申请,从而造成 OOM

  • Executors.newSingleThreadExexutor()
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

同样,应用的 LinkedBlockingQueue,一样的状况

  • Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

代码课件线程池应用的最大线程数是 Integer.MAX_VALUE,可能会创立大量线程,导致 OOM

  • Executors.newScheduleThreadPool()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

和下面是一样的问题,最大线程数是 Integer.MAX_VALUE

所以原则上来说禁止应用 Executors 创立线程池,而应用 ThreadPoolExecutor 的构造函数来创立线程池。

五、结语

线程池在开发中还是比拟常见的,联合不同的业务场景,联合最佳实际配置正确的参数,能够帮忙咱们的利用性能失去晋升。

欢送拜访集体博客 获取更多常识分享。

正文完
 0