关于线程池:深入浅出线程池-京东云技术团队

一、线程1、什么是线程线程(thread)是操作系统可能进行运算调度的最小单位。它被蕴含在过程之中,是过程中的理论 运作单位。一条线程指的是过程中一个繁多程序的控制流,一个过程中能够并发多个线程,每条线 程并行执行不同的工作。 2、如何创立线程2.1、JAVA中创立线程/** * 继承Thread类,重写run办法 */class MyThread extends Thread { @Override public void run() { System.out.println("myThread..." + Thread.currentThread().getName());} }/** * 实现Runnable接口,实现run办法 */class MyRunnable implements Runnable { @Override public void run() { System.out.println("MyRunnable..." + Thread.currentThread().getName());} }/** * 实现Callable接口,指定返回类型,实现call办法 */class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "MyCallable..." + Thread.currentThread().getName();} }2.2、测试一下public static void main(String[] args) throws Exception { MyThread thread = new MyThread(); thread.run(); //myThread...main thread.start(); //myThread...Thread-0 MyRunnable myRunnable = new MyRunnable(); Thread thread1 = new Thread(myRunnable); myRunnable.run(); //MyRunnable...main thread1.start(); //MyRunnable...Thread-1 MyCallable myCallable = new MyCallable(); FutureTask<String> futureTask = new FutureTask<>(myCallable); Thread thread2 = new Thread(futureTask); thread2.start(); System.out.println(myCallable.call()); //MyCallable...main System.out.println(futureTask.get()); //MyCallable...Thread-2} 2.3、问题既然咱们创立了线程,那为何咱们间接调用办法和咱们调用start()办法的后果不同?new Thread() 是否实在创立了线程? ...

September 22, 2023 · 8 min · jiezi

关于线程池:别再纠结线程池池大小线程数量了哪有什么固定公式-京东云技术团队

可能很多人都看到过一个线程数设置的实践: CPU 密集型的程序 - 外围数 + 1I/O 密集型的程序 - 外围数 * 2不会吧,不会吧,真的有人依照这个实践布局线程数? 线程数和CPU利用率的小测试抛开一些操作系统,计算机原理不谈,说一个根本的实践(不必纠结是否谨严,只为好了解):一个CPU外围,单位工夫内只能执行一个线程的指令 那么实践上,我一个线程只须要不停的执行指令,就能够跑满一个外围的利用率。 来写个死循环空跑的例子验证一下: 测试环境:AMD Ryzen 5 3600, 6 - Core, 12 - Threads public class CPUUtilizationTest { public static void main(String[] args) { //死循环,什么都不做 while (true){ } }}运行这个例子后,来看看当初CPU的利用率: 从图上能够看到,我的3号外围利用率曾经被跑满了 那基于下面的实践,我多开几个线程试试呢? public class CPUUtilizationTest { public static void main(String[] args) { for (int j = 0; j < 6; j++) { new Thread(new Runnable() { @Override public void run() { while (true){ } } }).start(); } }}此时再看CPU利用率,1/2/5/7/9/11 几个外围的利用率曾经被跑满: ...

September 20, 2023 · 2 min · jiezi

关于线程池:再说Java线程池

线程池流程图 execute外围办法这里不必多说,其实就是线程池的流程图中的所说的,如果小于外围线程数量,创立外围线程,否则往队列仍工作,假如队列也满了,那就创立非核心线程,如果大于了maximumPooSize,也就是说非核心线程也创立不了了,不好意思,那就走reject策略 addWorkeraddWorker别看代码比拟多,其实外围代码就是通过CAS来减少一个Worker count数,而后跳出循环,创立Worker加锁的形式退出到线程池中workers(HashSet)数据结构中,而后启动Worker WorkerThreadPoolExecutor创立了一个Worker并启动了他,咱们是不是要看一下Worker是什么玩意?唉吆喂,Worker居然继承了AQS并实现了Runnable,加锁的线程,是不是?好吧,那既然是线程了,那看一下他run办法吧:首先看看一下 task != null || (task = getTask()) != null,这个条件,对于第一次创立外围线程并执行run来说,firstTask是通过 w = new Worker(firstTask)传递过去,给了Worker实例变量了,所以task必定是不为null的,批准吧?而后看外围代码task.run,其实就是调用用户本人定义的run办法的逻辑,了解了吧 好了,那比如说咱们外围线程的第一个工作运行完了呢?那getTask()就上场了: 其实有三局部外围逻辑 : 敞开线程池如果曾经对线程池进行了shutdown了,那好,等我workerQueue队列工作生产结束,那我就线程退出如果曾经对线程池进行了stop,我去,好吧,你太暴力了,那就间接退出吧,会在shutdownNow中具体说 非核心线程超时如果wc > corePoolSize,也就是说有非核心线程,是不是timed为true,而后非核心线程是以workQueue.poll以规定工夫来获取工作,此时如果工作是null,没有工作,是不是timedOut=true了,好,而后再循环timed和timedOut都为true了,上述第二个逻辑的if就成立了,那非核心线程的Worker就会退出 阻塞和非阻塞形式获取工作如果是外围线程,就会阻塞的形式take来拿工作,如果是非核心线程,就会进行poll(time)形式来拿工作 shutdonwNow & shutdownshutdonwNowadvanceRunState(STOP):将线程池的状态扭转为STOP,实质上是一个线程标记位的扭转interruptWorkers():对线程池中的所有线程都进行终止(interrupt)操作这里会对所有的线程都会进行interrupt,那当初就有一个疑难,Worker会在哪里阻塞呢?其实有两处(Worker中): 对于闲暇的线程会在getTask中以take阻塞的形式来拿工作还有就是在task.run中,咱们本人定义业务逻辑如果有可中断的代码,就会被中断而shutdownNow会对所有的Worker进行interrupt,也可能会对正在运行的工作,如果能够响应中断,就会中断以后正在运行的工作,否则会进行下一次getTask的时候,即运行完当前任务,再下一次获取工作前判断退出Worker线程 shutdowninterruptIdleWorkers:是interrupt闲暇的线程,谁是闲暇的线程?怎么来辨别,这里AQS就出场了w.tryLock()是一个很要害的玩意,要晓得,如果Worker正在执行工作,正在执行task.run(),这后面必定是加锁的,所以tryLock是拿不到锁的,所以针对正在运行的Worker线程是不interrupt的

April 2, 2023 · 1 min · jiezi

关于线程池:NacosThreadPoolExecutor构建动态线程池

0 文章概述动静线程池是指能够动静调节线程池某些参数,本文咱们联合Apollo和线程池实现一个动静线程池。 1 线程池根底1.1 七个参数咱们首先回顾Java线程池七大参数,这对后续设置线程池参数有帮忙。咱们查看ThreadPoolExecutor构造函数如下: public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }}corePoolSize线程池外围线程数,类比业务大厅开设的固定窗口。例如业务大厅开设2个固定窗口,那么这两个窗口不会敞开,全天都会进行业务办理 ...

March 7, 2023 · 6 min · jiezi

关于线程池:线程池调优计算原理

1、背景咱们应用线程池来无效地使零碎工作负载与系统资源保持一致。这个零碎工作负载应该是能够独立运行的工作,比方一个web利用的每一个Http申请都能够属于这个类别,咱们能够解决每一个申请而不必思考另一个Http申请。 咱们冀望咱们的应用程序具备良好的吞吐量和良好的响应能力。为了实现这一点,首先咱们应该将咱们的应用程序工作划分为独立的工作,而后咱们应该以无效利用 CPU、RAM(利用率)等系统资源的形式运行这些工作。通过应用线程池,指标是在无效应用系统资源的同时运行这些独自的工作。 如果疏忽磁盘和网络,给定单个 CPU 资源,按程序执行A和B总是比通过工夫切片“同时”执行A和B快,这是计算的基本定律。一旦线程数超过 CPU 内核数,增加更多线程就会变慢,而不是变快。 比方在8核服务器上,现实状态下将线程数设置为 8 将提供最佳性能,超出此范畴的任何事件都会因为上下文切换的开销而开始变慢。但在理论状况中不能疏忽Disk和Network。 2、Java原生线程池 对于线程池的具体实现原理:线程池的基本原理,线程池生命周期治理,具体设计等等,能想到的根本都有,十分具体; Java 的Executors类提供了一些不同类型的线程池;static ExecutorService newSingleThreadExecutor() 创立一个 Executor,它应用单个工作线程在无界队列中运行。static ExecutorService newCachedThreadPool() newCachedThreadPool创立一个可缓存线程池,如果线程池长度超过解决须要,可灵便回收闲暇线程,若无可回收,则新建线程。实现原理将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,应用的synchronousQueue(无界),也就是说来了工作就创立线程运行,当线程闲暇超过60秒,就销毁线程。是大小无界的线程池。比方,实用于执行很多的短期异步工作的小程序,或者是负载较轻的服务器。static ExecutorService newFixedThreadPool(int nThreads) 创立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中期待。这个创立的线程池corePoolSize和maximum PoolSize 值是相等的,它应用的LinkedBlockingQueue(无界队列)。实用于为了满足资源管理要求,而须要限度以后线程数量的利用场景。static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创立一个定长线程池,反对定时及周期性工作执行。实用于多个后盾线程执行周期工作,同时为了满足资源管理的需要而限度后盾线程的数量的利用场景。newSingleThreadExecutor(),因为这个池只有 1 个线程,因而咱们提交到这个线程池的每个工作都是程序工作的,没有并发,如果咱们有能够独立运行的工作,这个配置在咱们的应用程序吞吐量和响应能力方面并不好。 newCachedThreadPool(),因为这个池会为提交到池的每个工作创立一个新线程或应用现有线程。对于某些场景(例如,如果咱们的工作是短期工作,此池应用对于咱们的独立工作可能很有意义。如果咱们的工作不是短暂的,应用这种线程池会导致在应用程序上创立许多线程。如果咱们创立的线程超过阈值,那么咱们就不能无效地应用 CPU 资源,因为 CPU 的大部分工夫都花在线程或上下文切换上,而不是真正的工作。这再次导致咱们的应用程序响应能力和吞吐量降落。 咱们须要线程池newFixedThreadPool(int nThreads),咱们应该抉择现实的大小来减少咱们的应用程序吞吐量和响应能力(我假如咱们有能够独立运行的工作)。重点是抉择不要太多也不要太小。前者导致CPU破费太多工夫进行线程切换而不是真正的工作,也会导致过多的内存应用问题,后者导致CPU闲暇,而咱们有应该解决的工作。 ⚠️:只有工作都是同类型并且互相独立时,线程池的效率达到最佳 2.1、问题2.1.1、线程饥饿死锁在线程池中所有正在执行工作的线程都因为期待其余仍处于工作队列中的工作而阻塞 例1:(饥饿或死锁)在单线程池中,正在执行的工作阻塞期待队列中的某个工作执行结束 例2:(饥饿或死锁)线程池不够大时,通过栅栏机制协调多个工作时 例3:(饥饿)因为其余资源的隐性限度,每个工作都须要应用无限的数据库连贯资源,那么不论线程池多大,都会体现出和连贯资源雷同的大小。 每当提交了一个有依赖性的Executor工作时,要分明地晓得可能会呈现线程"饥饿"死锁,因而须要在代码或配置Executor地配置文件中记录线程池的大小限度或配置限度。以下代码对死锁的产生做了举例。 package com.flydean;import org.junit.Test;import java.util.concurrent.*;public class ThreadPoolDeadlock { ExecutorService executorService= Executors.newSingleThreadExecutor(); public class RenderPageTask implements Callable<String> { public String call() throws Exception{ Future<String> header, footer; header= executorService.submit(()->{ return "加载页眉"; }); footer= executorService.submit(()->{ return "加载页脚"; }); return header.get()+ footer.get(); } } public void submitTask(){ executorService.submit(new RenderPageTask()); }}产生死锁剖析: ...

October 15, 2022 · 2 min · jiezi

关于线程池:ForkJoinPool线程池

ForkJoinPool是自java7开始,jvm提供的一个用于并行执行的工作框架。其宗旨是将大工作分成若干小工作,之后再并行对这些小工作进行计算,最终汇总这些工作的后果,失去最终的后果。1.分治理:大工作拆分小工作,计算小工作把计算结果进行合并,实现大工作计算。2.工作窃取:当前工作线程没有可用线程,则通过利用其余现场队列闲置线程进行本工作队列工作执行,充分利用CPU资源。

July 29, 2022 · 1 min · jiezi

关于线程池:创建线程池

线程池的创立有四种,别离是:1.newCachedThreadPool,最大线程数是Integer最大值,个别不倡议应用该线程池,有OOM危险。2.newFixedThreadPool,指定线程数量。3.newScheduledThreadPool,定时工作线程池,定时执行一些周期性工作。4.newSingleThreadExecutor,队列型线程池,严格依照单线程先进先出执行队列工作。 上述四种线程池创立形式都不倡议,倡议应用ThreadPoolExecutor,通过七大参数创立自定义线程池,七大参数:1.corePoolSize,外围线程数。2.maxPoolSize,最大线程数。3.keepAliveTime:线程存活工夫数。4.unit:工夫单位。5.threadFactory:创立线程工厂。6.workQueue:线程工作队列。7.handler:回绝策略。

July 15, 2022 · 1 min · jiezi

关于线程池:动态调整线程池参数

指标实现动静调整线程池参数对线程池运行状况进行监控实现一,线程池可调整的参数 外围线程数超时工夫最大线程数回绝策略而队列BlockingQueue因为是final类型,所以没有对外批改入口。但能够通过重写LinkedBlockingQueue并把capacity设置为非final。 二,联合配置核心实现动静调整 这里的配置核心应用Apollo, 通过监听配置核心变动,而后更新线程池配置。示例代码如下: @Slf4j@Componentpublic class DynamicThreadPoolConfig { /** 线程执行器 **/ private volatile ThreadPoolExecutor executor; /** 外围线程数 **/ private Integer corePoolSize = 10; /** 最大值线程数 **/ private Integer maximumPoolSize = 20; /** 待执行工作的队列的长度 **/ private Integer workQueueSize = 1000; /** 线程闲暇工夫 **/ private Long keepAliveTime = 1000L; /** 线程名 **/ private String threadName; private Config config = ConfigService.getConfig("lepu-activity-center");; public DynamicThreadPoolConfig() { init(config); } /** * 初始化 */ private void init(Config config) { log.info("线程池初始化中.........."); if (executor == null) { synchronized (DynamicThreadPoolConfig.class) { if (executor == null) { String corePoolSizeProperty = config.getProperty("corePoolSize", corePoolSize.toString()); log.info("批改前的外围线程池:{}",corePoolSizeProperty); String maximumPoolSizeProperty = config.getProperty("maximumPoolSize", maximumPoolSize.toString()); String keepAliveTImeProperty = config.getProperty("keepAliveTime", keepAliveTime.toString()); BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize); executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty), Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty); } } } } /** * 监听到配置核心发生变化后,更新线程池配置 * @param changeEvent */ @ApolloConfigChangeListener public void onChange(ConfigChangeEvent changeEvent){ log.info("线程池参数配置发生变化,namespace:{}",changeEvent.getNamespace()); for(String key : changeEvent.changedKeys()){ ConfigChange change = changeEvent.getChange(key); String newValue = change.getNewValue(); refreshThreadPool(key,newValue); } } /** * 更新线程池配置 * @param key * @param newValue */ private void refreshThreadPool(String key, String newValue) { if (executor == null) { return; } if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) { executor.setCorePoolSize(Integer.valueOf(newValue)); log.info("批改外围线程数key={},value={}",key,newValue); } if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) { executor.setMaximumPoolSize(Integer.valueOf(newValue)); log.info("批改最大线程数key={},value={}", key, newValue); } if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) { executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS); log.info("批改线程闲暇工夫key={},value={}", key, newValue); } } public ThreadPoolExecutor getExecutor() { return executor; }}三,监控形式 ...

June 30, 2022 · 1 min · jiezi

关于线程池:Java多线程与线程池技术

一、序言Java多线程编程线程池被宽泛应用,甚至成为了标配。 线程池实质是池化技术的利用,和连接池相似,创立连贯与敞开连贯属于耗时操作,创立线程与销毁线程也属于重操作,为了提高效率,先提前创立好一批线程,当有须要应用线程时从线程池取出,用完后放回线程池,这样防止了频繁创立与销毁线程。 // 工作Runnable runnable = () -> System.out.println(Thread.currentThread().getId());在利用中优先选用线程池执行异步工作,依据不同的场景选用不同的线程池,进步异步工作执行效率。 1、一般执行new Thread(runnable).start();2、线程池执行Executors.newSingleThreadExecutor().execute(runnable)二、线程池根底(一)外围参数1、外围参数线程池的外围参数决定了池的类型,进而决定了池的个性。 参数解释行为corePoolSize外围线程数池中长期保护的线程数量,不被动回收maximumPoolSize最大线程数最大线程数大于等于外围线程数keepAliveTime线程最大闲暇工夫非核心线程最大闲暇工夫,超时回收线程workQueue工作队列工作队列间接决定线程池的类型2、参数与池的关系Executors类默认创立线程池与参数对应关系。 线程池corePoolSizemaximumPoolSizekeepAliveTimeworkQueuenewCachedThreadPool0Integer.MAX_VALUE60SynchronousQueuenewSingleThreadExecutor110LinkedBlockingQueuenewFixedThreadPoolNN0LinkedBlockingQueuenewScheduledThreadPoolNInteger.MAX_VALUE0DelayedWorkQueue(二)线程池比照依据应用场景抉择对应的线程池。 1、通用比照线程池特点实用场景newCachedThreadPool超时未应用的线程回主动销毁,有新工作时主动创立实用于低频、轻量级的工作。回收线程的目标是节约线程长时间闲暇而占有的资源。newSingleThreadExecutor线程池中有且只有一个线程程序执行工作newFixedThreadPool线程池中有固定数量的线程,且始终存在实用于高频的工作,即线程在大多数工夫里都处于工作状态。newScheduledThreadPool定时线程池与定时调度相关联2、拓展比照保护仅有一个线程的线程池有如下两种形式,失常应用的状况下,二者差别不大;简单应用环境下,二者存在轻微的差别。用newSingleThreadExecutor形式创立的线程池在任何时刻至少只有一个线程,因而能够了解为用异步的形式执行程序工作;后者初始化的时候也只有一个线程,应用过程中可能会呈现最大线程数超过1的状况,这时要求线性执行的工作会并行执行,业务逻辑可能会呈现问题,与理论场景无关。 private final static ExecutorService executor = Executors.newSingleThreadExecutor();private final static ExecutorService executor = Executors.newFixedThreadPool(1);(三)线程池原理 线程池次要解决流程,工作提交之后是怎么执行的。大抵如下: 判断外围线程池是否已满,如果不是,则创立线程执行工作如果外围线程池满了,判断队列是否满了,如果队列没满,将工作放在队列中如果队列满了,则判断线程池是否已满,如果没满,创立线程执行工作如果线程池也满了,则依照回绝策略对工作进行解决(四)提交工作的形式往线程池中提交工作,次要有两种办法:提交无返回值的工作和提交有返回值的工作。 1、无返回值工作execute用于提交不须要返回后果的工作。 public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(() -> System.out.println("hello"));}2、有返回值工作submit()用于提交一个须要返回果的工作。 该办法返回一个Future对象,通过调用这个对象的get()办法,咱们就能取得返回后果。get()办法会始终阻塞,直到返回后果返回。 咱们也能够应用它的重载办法get(long timeout, TimeUnit unit),这个办法也会阻塞,然而在超时工夫内依然没有返回后果时,将抛出异样TimeoutException。 public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(2); Future<Long> future = executor.submit(() -> { System.out.println("task is executed"); return System.currentTimeMillis(); }); System.out.println("task execute time is: " + future.get());}在提交工作时,如果无返回值工作,优先应用execute。(无)敞开线程池在线程池应用实现之后,咱们须要对线程池中的资源进行开释操作,这就波及到敞开性能。咱们能够调用线程池对象的shutdown()和shutdownNow()办法来敞开线程池。 ...

April 6, 2022 · 2 min · jiezi

关于线程池:如何判断线程池已经执行完所有任务了

很多场景下,咱们须要期待线程池的所有工作都执行完,而后再进行下一步操作。对于线程 Thread 来说,很好实现,加一个 join 办法就解决了,然而对于线程池的判断就比拟麻烦了。 咱们本文提供 4 种判断线程池工作是否执行完的办法: 应用 isTerminated 办法判断。应用 getCompletedTaskCount 办法判断。应用 CountDownLatch 判断。应用 CyclicBarrier 判断。接下来咱们一个一个来看。 不判断的问题如果不对线程池是否曾经执行完做判断,就会呈现以下问题,如下代码所示: import java.util.Random;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ThreadPoolCompleted { public static void main(String[] args) { // 创立线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024)); // 增加工作 addTask(threadPool); // 打印后果 System.out.println("线程池工作执行实现!"); } /** * 给线程池增加工作 */ private static void addTask(ThreadPoolExecutor threadPool) { // 工作总数 final int taskCount = 5; // 增加工作 for (int i = 0; i < taskCount; i++) { final int finalI = i; threadPool.submit(new Runnable() { @Override public void run() { try { // 随机休眠 0-4s int sleepTime = new Random().nextInt(5); TimeUnit.SECONDS.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("工作%d执行实现", finalI)); } }); } }}复制代码以上程序的执行后果如下: ...

March 30, 2022 · 3 min · jiezi

关于线程池:从简单代码入手分析线程池原理

一、线程池简介1、池化思维在我的项目工程中,基于池化思维的技术利用很多,例如基于线程池的工作并发执行,中间件服务的连接池配置,通过对共享资源的治理,升高资源的占用耗费,晋升效率和服务性能。 池化思维从直观感觉上了解,既有作为容器的存储能力(持续性的承接),也要具备维持一定量的储备能力(初始化的提供),同时作为容器又必然有大小的限度,上面通过这个根底逻辑来详细分析Java中的线程池原理。 2、线程池首先相熟JVM执行周期的都晓得,在内存中频繁的创立和销毁对象是很影响性能的,而线程作为过程中运行的根本单位,通过线程池的形式重复使用已创立的线程,在工作执行动作上防止或缩小线程的频繁创立动作。 线程池中保护多个线程,当收到调度工作时能够防止创立线程间接执行,并以此升高服务资源的耗费,把绝对不确定的并发工作治理在绝对确定的线程池中,进步零碎服务的稳定性。下文基于JDK1.8围绕ThreadPoolExecutor类深入分析。 二、原理与周期1、类图设计 Executor 接口源码正文解读:未来会执行命令,工作提交和执行两个动作会被解耦,传入Runnable工作对象即可,线程池会执行相应调度和工作解决。Executor尽管是ThreadPoolExecutor线程池的顶层接口,然而其自身只是形象了工作的解决思维。 ExecutorService 接口扩大Executor接口,单个或批量的给工作的执行后果生成Future,并削减工作中断或终止的治理办法。 AbstractExecutorService 抽象类提供对ExecutorService接口定义的工作执行办法(submit,invokeAll)等默认实现,提供newTaskFor办法用于构建RunnableFuture对象。 ThreadPoolExecutor 类保护线程池生命周期,治理线程和工作,通过相应调度机制实现工作的并发执行。 2、根本案例示例中创立了一个简略的butte-pool线程池,设置4个外围线程执行工作,队列容器设置256大小;在理论业务中,对于参数设定须要考量工作执行工夫,服务配置,测试数据等。 public class ThrPool implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ThrPool.class) ; /** * 线程池治理,ThreadFactoryBuilder出自Guava工具库 */ private static final ThreadPoolExecutor DEV_POOL; static { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("butte-pool-%d").build(); DEV_POOL = new ThreadPoolExecutor(0, 8,60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(256),threadFactory, new ThreadPoolExecutor.AbortPolicy()); DEV_POOL.allowCoreThreadTimeOut(true); } /** * 工作办法 */ @Override public void run() { try { logger.info("Print...Job...Run...;queue_size:{}",DEV_POOL.getQueue().size()); Thread.sleep(5000); } catch (Exception e){ e.printStackTrace(); } }} ...

March 20, 2022 · 2 min · jiezi

关于线程池:使用线程池时限制每个线程间隔启动的一种思路

需要形容最近跑仿真时,为了高效利用服务器的计算资源,启动多个仿真程序以达到并行。我将须要运行的命令一次性加载到Python的线程池中,并限度同时可执行的程序在服务器可承受的范畴内。然而遇到了一个新的问题,在每次启动时,线程池容许的20个并行程序同时启动,导致程序启动时遇到读写文件的抵触。为了防止这种抵触,须要并行的程序顺次启动,而不是同时启动。例如,每个程序的启动工夫距离20秒。 一种解决思路我想到的方法是利用线程锁。在启动线程后,每个线程中首先调用一个函数,这个函数的实现是抢一个线程锁,而后睡眠20秒,之后再解锁。这样,其余线程尽管也被启动了,然而因为只有一个线程可能抢到线程锁,所以抢到锁的线程在睡眠20秒后就能够启动程序进行仿真,后续的所有线程仍然须要抢锁而后再启动。这样就防止了同时有多个线程启动仿真导致读写抵触了。 改良思路我的思路比较简单,其中还存在一个小问题:第一个启动的线程也须要睡眠20秒。这个问题比拟好解决,能够定义一个睡眠时长变量,最开始初始化这个睡眠时长0秒,而后每次睡眠后讲这个变量改写为20秒。这样第一个抢到锁的过程就能够睡眠0秒,后续抢到锁的过程会睡眠20秒。

February 21, 2022 · 1 min · jiezi

关于线程池:Java-JUC-ThreadPoolExecutor解析

线程池 ThreadPoolExecutor介绍线程池次要解决两个问题:一是当执行大量异步工作时线程池可能提供较好的性能。在不应用线程池时,每当须要执行工作时就须要 new 一个线程来执行,频繁的创立与销毁十分耗费性能。而线程池中的线程是能够复用的,不须要在每次须要执行工作时候都从新创立和销毁。二是线程池提供了资源限度和治理的伎俩,比方能够限度线程个数,动静减少线程等。 另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同状况下的须要,咱们能够应用更不便的 Executors 的工厂办法,来创立不同类型的线程池,也能够本人自定义线程池。 线程池的工作机制 线程池刚创立的时候没有任何线程,当来了新的申请的时候才会创立外围线程去解决对应的申请当解决实现之后,外围线程并不会回收在外围线程达到指定的数量之前,每一个申请都会在线程池中创立一个新的外围线程当外围线程全都被占用的时候,新来的申请会放入工作队列中。工作队列实质上是一个阻塞队列当工作队列被占满,再来的新申请会交给长期线程来解决长期线程在应用实现之后会持续存活一段时间,直到没有申请解决才会被销毁类图介绍 如上类图所示,Executors 是一个工具类,提供了多种静态方法,依据咱们抉择的不同提供不同的线程池实例。 ThreadPoolExecutor 继承了 AbstractExecutorService 抽象类,在 ThreadPoolExecutor 中成员变量 ctl 是一个 Integer 的原子性变量,用来记录线程池的状态和线程中线程个数,相似于 ReentrantReadWriteLock 应用一个变量来保留两种信息一样。 假如 Integer 类型是 32 位二进制示意,则其中高 3 位示意线程池的状态,后 29 为示意线程池线程数量。 //默认RUNNING状态,线程个数为0private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));获取高 3 位,运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; }获取低 29 位,线程个数 private static int workerCountOf(int c) { return c & CAPACITY; }线程池状态含意如下: RUNNING:承受新工作并且解决阻塞队列中的工作SHUTDOWN:回绝新工作然而解决阻塞队列中的工作STOP:回绝新工作并且摈弃阻塞队列里的工作,同时中断正在解决的工作TIDYING:所有工作都执行完(包含阻塞队列中的工作)后以后线程池流动线程数量为 0,将调用 terminated 办法TERMINATED:终止状态。terminated 调用实现办法后的状态线程池状态转换如下: ...

February 9, 2022 · 6 min · jiezi

关于线程池:线程池

下面是提交了工作后,线程池解决流程。如果想在工作提交前预热线程池能够应用prestartAllCoreThreads提前创立线程。须要留神的是,上图中两个中央的创立线程都是须要加锁的,相干代码如下图: private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //这里的外部类Worker继承了AbstractQueuedSynchronizer,也实现了Runnable //Worker外面有个Thread属性,每次创立工作的时候都会对应创立一个线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }这里再列一下几种工作队列: ...

December 10, 2021 · 2 min · jiezi

关于线程池:90的人以为会用ThreadPoolExecutor了看了这10张图再说吧

在阿里巴巴手册中有一条倡议: 【强制】线程池不容许应用 Executors 去创立,而是通过ThreadPoolExecutor的形式,这样的解决形式让写的同学更加明确线程池的运行规定,躲避资源耗尽的危险。如果常常基于Executors提供的工厂办法创立线程池,很容易疏忽线程池外部的实现。特地是回绝策略,因应用Executors创立线程池时不会传入这个参数,间接采纳默认值,所以经常被疏忽。 上面咱们就来理解一下线程池相干的实现原理、API以及实例。 线程池的作用在实际利用中创立线程池次要是为了: 缩小资源开销:缩小每次创立、销毁线程的开销;进步响应速度:申请到来时,线程已创立好,可间接执行,进步响应速度;进步线程的可管理性:线程是稀缺资源,需依据状况加以限度,确保零碎稳固运行;ThreadPoolExecutorThreadPoolExecutor能够实现线程池的创立。ThreadPoolExecutor相干类图如下: 从类图能够看出,ThreadPoolExecutor最终实现了Executor接口,是线程池创立的真正实现者。 Executor两级调度模型 在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。在Java虚拟机层面,用户将多个工作提交给Executor框架,Executor负责调配线程执行它们;在操作系统层面,操作系统再将这些线程调配给处理器执行。 ThreadPoolExecutor的三个角色工作ThreadPoolExecutor承受两种类型的工作:Callable和Runnable。 Callable:该类工作有返回后果,能够抛出异样。通过submit办法提交,返回Future对象。通过get获取执行后果。Runnable:该类工作只执行,无奈获取返回后果,在执行过程中无奈抛异样。通过execute或submit办法提交。工作执行器Executor框架最外围的接口是Executor,它示意工作的执行器。 通过下面类图能够看出,Executor的子接口为ExecutorService。再往底层有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor(集成自ThreadPoolExecutor)。 执行后果Future接口示意异步的执行后果,它的实现类为FutureTask。 三个角色之间的解决逻辑图如下: 线程池解决流程 一个线程从被提交(submit)到执行共经验以下流程: 线程池判断外围线程池里是的线程是否都在执行工作,如果不是,则创立一个新的工作线程来执行工作。如果外围线程池里的线程都在执行工作,则进入下一个流程;线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的工作贮存在这个工作队列里。如果工作队列满了,则进入下一个流程;线程池判断其外部线程是否都处于工作状态。如果没有,则创立一个新的工作线程来执行工作。如果已满了,则交给饱和策略来解决这个工作。线程池在执行execute办法时,次要有以下四种状况: 如果以后运行的线程少于corePoolSize,则创立新线程来执行工作(须要取得全局锁);如果运行的线程等于或多于corePoolSize,则将工作退出BlockingQueue;如果无奈将工作退出BlockingQueue(队列已满),则创立新的线程来解决工作(须要取得全局锁);如果创立新线程将使以后运行的线程超出maxiumPoolSize,工作将被回绝,并调用RejectedExecutionHandler.rejectedExecution()办法。线程池采取上述的流程进行设计是为了缩小获取全局锁的次数。在线程池实现预热(以后运行的线程数大于或等于corePoolSize)之后,简直所有的excute办法调用都执行步骤二。 线程的状态流转顺便再回顾一下线程的状态的转换,在JDK中Thread类中提供了一个枚举类,例举了线程的各个状态: public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }一共定义了6个枚举值,其实代表的是5种类型的线程状态: NEW:新建;RUNNABLE:运行状态;BLOCKED:阻塞状态;WAITING:期待状态,WAITING和TIMED_WAITING能够归为一类,都属于期待状态,只是后者能够设置等待时间,即期待多久;TERMINATED:终止状态;线程关系转换图: 当new Thread()阐明这个线程处于NEW(新建状态);调用Thread.start()办法示意这个线程处于RUNNABLE(运行状态); 然而RUNNABLE状态中又蕴含了两种状态:READY(就绪状态)和RUNNING(运行中)。调用start()办法,线程不肯定取得了CPU工夫片,这时就处于READY,期待CPU工夫片,当取得了CPU工夫片,就处于RUNNING状态。 在运行中调用synchronized同步的代码块,没有获取到锁,这时会处于BLOCKED(阻塞状态),当从新获取到锁时,又会变为RUNNING状态。在代码执行的过程中可能会碰到Object.wait()等一些期待办法,线程的状态又会转变为WAITING(期待状态),期待被唤醒,当调用了Object.notifyAll()唤醒了之后线程执行完就会变为TERMINATED(终止状态)。 线程池的状态线程池中状态通过2个二进制位(bit)来示意线程池的5个状态:RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED: RUNNING:线程池失常工作的状态,在 RUNNING 状态下线程池承受新的工作并解决工作队列中的工作;SHUTDOWN:调用shutdown()办法会进入 SHUTDOWN 状态。在 SHUTDOWN 状态下,线程池不承受新的工作,然而会继续执行工作队列中已有的工作;STOP:调用shutdownNow()会进入 STOP 状态。在 STOP 状态下线程池既不承受新的工作,也不解决曾经在队列中的工作。对于还在执行工作的工作线程,线程池会发动中断请求来中断正在执行的工作,同时会清空工作队列中还未被执行的工作;TIDYING:当线程池中的所有执行工作的工作线程都曾经终止,并且工作线程汇合为空的时候,进入 TIDYING 状态;TERMINATED:当线程池执行完terminated()钩子办法当前,线程池进入终态 TERMINATED;ThreadPoolExecutor APIThreadPoolExecutor创立线程池API: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 参数解释: ...

November 2, 2021 · 3 min · jiezi

关于线程池:解析ThreadPoolExecutor类是如何保证线程池正确运行的

摘要:对于线程池的外围类ThreadPoolExecutor来说,有哪些重要的属性和外部类为线程池的正确运行提供重要的保障呢?本文分享自华为云社区《【高并发】通过源码深度解析ThreadPoolExecutor类是如何保障线程池正确运行的》,作者: 冰 河 。 对于线程池的外围类ThreadPoolExecutor来说,有哪些重要的属性和外部类为线程池的正确运行提供重要的保障呢?明天咱们就一起来深入探讨下这些问题!! ThreadPoolExecutor类中的重要属性在ThreadPoolExecutor类中,存在几个十分重要的属性和办法,接下来,咱们就介绍下这些重要的属性和办法。 ctl相干的属性AtomicInteger类型的常量ctl是贯通线程池整个生命周期的重要属性,它是一个原子类对象,次要用来保留线程的数量和线程池的状态,咱们看下与这个属性相干的代码如下所示。 //次要用来保留线程数量和线程池的状态,高3位保留线程状态,低29位保留线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//线程池中线程的数量的位数(32-3)private static final int COUNT_BITS = Integer.SIZE - 3;//示意线程池中的最大线程数量//将数字1的二进制值向右移29位,再减去1private static final int CAPACITY = (1 << COUNT_BITS) - 1;//线程池的运行状态private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;//获取线程状态private static int runStateOf(int c) { return c & ~CAPACITY; }//获取线程数量private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) { return c < s;}private static boolean runStateAtLeast(int c, int s) { return c >= s;}private static boolean isRunning(int c) { return c < SHUTDOWN;}private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1);}private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1);}private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get()));}对于线程池的各状态阐明如下所示。 ...

August 31, 2021 · 3 min · jiezi

关于线程池:从源码分析创建线程池的4种方式

摘要:从创立线程池的源码来深入分析到底有哪些形式能够创立线程池。本文分享自华为云社区《【高并发】从源码角度剖析创立线程池到底有哪些形式》,作者:冰 河 。 在Java的高并发畛域,线程池始终是一个绕不开的话题。有些童鞋始终在应用线程池,然而,对于如何创立线程池仅仅停留在应用Executors工具类的形式,那么,创立线程池到底存在哪几种形式呢?就让咱们一起从创立线程池的源码来深入分析到底有哪些形式能够创立线程池。 应用Executors工具类创立线程池在创立线程池时,初学者用的最多的就是Executors 这个工具类,而应用这个工具类创立线程池时非常简单的,不须要关注太多的线程池细节,只须要传入必要的参数即可。Executors 工具类提供了几种创立线程池的办法,如下所示。 • Executors.newCachedThreadPool:创立一个可缓存的线程池,如果线程池的大小超过了须要,能够灵便回收闲暇线程,如果没有可回收线程,则新建线程• Executors.newFixedThreadPool:创立一个定长的线程池,能够控制线程的最大并发数,超出的线程会在队列中期待• Executors.newScheduledThreadPool:创立一个定长的线程池,反对定时、周期性的工作执行• Executors.newSingleThreadExecutor: 创立一个单线程化的线程池,应用一个惟一的工作线程执行工作,保障所有工作依照指定程序(先入先出或者优先级)执行• Executors.newSingleThreadScheduledExecutor:创立一个单线程化的线程池,反对定时、周期性的工作执行• Executors.newWorkStealingPool:创立一个具备并行级别的work-stealing线程池 其中,Executors.newWorkStealingPool办法是Java 8中新增的创立线程池的办法,它可能为线程池设置并行级别,具备更高的并发度和性能。除了此办法外,其余创立线程池的办法实质上调用的是ThreadPoolExecutor类的构造方法。 例如,咱们能够应用如下代码创立线程池。 Executors.newWorkStealingPool();Executors.newCachedThreadPool();Executors.newScheduledThreadPool(3);应用ThreadPoolExecutor类创立线程池从代码构造上看ThreadPoolExecutor类继承自AbstractExecutorService,也就是说,ThreadPoolExecutor类具备AbstractExecutorService类的全副性能。 既然Executors工具类中创立线程池大部分调用的都是ThreadPoolExecutor类的构造方法,所以,咱们也能够间接调用ThreadPoolExecutor类的构造方法来创立线程池,而不再应用Executors工具类。接下来,咱们一起看下ThreadPoolExecutor类的构造方法。 ThreadPoolExecutor类中的所有构造方法如下所示。 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}由ThreadPoolExecutor类的构造方法的源代码可知,创立线程池最终调用的构造方法如下。 ...

August 26, 2021 · 3 min · jiezi

关于线程池:高并发中那些不得不说的线程池与ThreadPoolExecutor类

摘要:从整体上意识下线程池中最外围的类之一——ThreadPoolExecutor,对于ThreadPoolExecutor的底层原理和源码实现,以及线程池中的其余技术细节的底层原理和源码实现。本文分享自华为云社区《高并发之——不得不说的线程池与ThreadPoolExecutor类浅析》,作者: 冰 河 。 既然Java中反对以多线程的形式来执行相应的工作,但为什么在JDK1.5中又提供了线程池技术呢?这个问题大家自行脑补,多动脑,必定没害处,哈哈哈。。。 说起中的线程池技术,在很多框架和异步解决中间件中都有波及,而且性能禁受起了短暂的考验。能够这样说,Java的线程池技术是Java最外围的技术之一,在Java的高并发畛域中,Java的线程池技术是一个永远绕不开的话题。既然Java的线程池技术这么重要(怎么能说是这么重要呢?那是相当的重要,那家伙老重要了,哈哈哈),那么,本文咱们就来简略的说下线程池与ThreadPoolExecutor类。 一、Thread间接创立线程的弊病(1)每次new Thread新建对象,性能差。(2)线程不足对立治理,可能无限度的新建线程,相互竞争,有可能占用过多系统资源导致死机或OOM。(3)短少更多的性能,如更多执行、定期执行、线程中断。(4)其余弊病,大家自行脑补,多动脑,没害处,哈哈。 二、线程池的益处(1)重用存在的线程,缩小对象创立、沦亡的开销,性能佳。(2)能够无效管制最大并发线程数,进步系统资源利用率,同时能够防止过多资源竞争,防止阻塞。(3)提供定时执行、定期执行、单线程、并发数管制等性能。(4)提供反对线程池监控的办法,可对线程池的资源进行实时监控。(5)其余益处,大家自行脑补,多动脑,没害处,哈哈。 三、线程池1.线程池类构造关系线程池中的一些接口和类的构造关系如下图所示。 后文会死磕这些接口和类的底层原理和源码。 2.创立线程池罕用的类——ExecutorsExecutors.newCachedThreadPool:创立一个可缓存的线程池,如果线程池的大小超过了须要,能够灵便回收闲暇线程,如果没有可回收线程,则新建线程Executors.newFixedThreadPool:创立一个定长的线程池,能够控制线程的最大并发数,超出的线程会在队列中期待Executors.newScheduledThreadPool:创立一个定长的线程池,反对定时、周期性的工作执行Executors.newSingleThreadExecutor: 创立一个单线程化的线程池,应用一个惟一的工作线程执行工作,保障所有工作依照指定程序(先入先出或者优先级)执行Executors.newSingleThreadScheduledExecutor:创立一个单线程化的线程池,反对定时、周期性的工作执行Executors.newWorkStealingPool:创立一个具备并行级别的work-stealing线程池3.线程池实例的几种状态Running:运行状态,能接管新提交的工作,并且也能解决阻塞队列中的工作Shutdown: 敞开状态,不能再接管新提交的工作,然而能够解决阻塞队列中曾经保留的工作,当线程池处于Running状态时,调用shutdown()办法会使线程池进入该状态Stop: 不能接管新工作,也不能解决阻塞队列中曾经保留的工作,会中断正在解决工作的线程,如果线程池处于Running或Shutdown状态,调用shutdownNow()办法,会使线程池进入该状态Tidying: 如果所有的工作都曾经终止,无效线程数为0(阻塞队列为空,线程池中的工作线程数量为0),线程池就会进入该状态。Terminated: 处于Tidying状态的线程池调用terminated()办法,会应用线程池进入该状态留神:不须要对线程池的状态做非凡的解决,线程池的状态是线程池外部依据办法自行定义和解决的。 4.合理配置线程的一些倡议(1)CPU密集型工作,就须要尽量压迫CPU,参考值能够设置为NCPU+1(CPU的数量加1)。(2)IO密集型工作,参考值能够设置为2*NCPU(CPU数量乘以2) 四、线程池最外围的类之一——ThreadPoolExecutor1.构造方法ThreadPoolExecutor参数最多的构造方法如下: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectHandler) 其余的构造方法都是调用的这个构造方法来实例化对象,能够说,咱们间接剖析这个办法之后,其余的构造方法咱们也明确是怎么回事了!接下来,就对此构造方法进行具体的剖析。 留神:为了更加深刻的剖析ThreadPoolExecutor类的构造方法,会适当调整参数的程序进行解析,以便于大家更能深刻的了解ThreadPoolExecutor构造方法中每个参数的作用。 上述构造方法接管如下参数进行初始化: (1)corePoolSize:外围线程数量。 (2)maximumPoolSize:最大线程数。 (3)workQueue:阻塞队列,存储期待执行的工作,很重要,会对线程池运行过程产生重大影响。 其中,上述三个参数的关系如下所示: 如果运行的线程数小于corePoolSize,间接创立新线程解决工作,即便线程池中的其余线程是闲暇的。如果运行的线程数大于等于corePoolSize,并且小于maximumPoolSize,此时,只有当workQueue满时,才会创立新的线程解决工作。如果设置的corePoolSize与maximumPoolSize雷同,那么创立的线程池大小是固定的,此时,如果有新工作提交,并且workQueue没有满时,就把申请放入到workQueue中,期待闲暇的线程,从workQueue中取出工作进行解决。如果运行的线程数量大于maximumPoolSize,同时,workQueue曾经满了,会通过回绝策略参数rejectHandler来指定解决策略。根据上述三个参数的配置,线程池会对工作进行如下解决形式: 当提交一个新的工作到线程池时,线程池会依据以后线程池中正在运行的线程数量来决定该工作的解决形式。解决形式总共有三种:间接切换、应用有限队列、应用有界队列。 间接切换罕用的队列就是SynchronousQueue。应用有限队列就是应用基于链表的队列,比方:LinkedBlockingQueue,如果应用这种形式,线程池中创立的最大线程数就是corePoolSize,此时maximumPoolSize不会起作用。当线程池中所有的外围线程都是运行状态时,提交新工作,就会放入期待队列中。应用有界队列应用的是ArrayBlockingQueue,应用这种形式能够将线程池的最大线程数量限度为maximumPoolSize,能够升高资源的耗费。然而,这种形式使得线程池对线程的调度更艰难,因为线程池和队列的容量都是无限的了。依据下面三个参数,咱们能够简略得出如何升高系统资源耗费的一些措施: 如果想升高系统资源的耗费,包含CPU使用率,操作系统资源的耗费,上下文环境切换的开销等,能够设置一个较大的队列容量和较小的线程池容量。这样,会升高线程解决工作的吞吐量。如果提交的工作常常产生阻塞,能够思考调用设置最大线程数的办法,从新设置线程池最大线程数。如果队列的容量设置的较小,通常须要将线程池的容量设置的大一些,这样,CPU的使用率会高些。如果线程池的容量设置的过大,并发量就会减少,则须要思考线程调度的问题,反而可能会升高解决工作的吞吐量。接下来,咱们持续看ThreadPoolExecutor的构造方法的参数。 (4)keepAliveTime:线程没有工作执行时最多放弃多久工夫终止当线程池中的线程数量大于corePoolSize时,如果此时没有新的工作提交,外围线程外的线程不会立刻销毁,须要期待,直到期待的工夫超过了keepAliveTime就会终止。 (5)unit:keepAliveTime的工夫单位 (6)threadFactory:线程工厂,用来创立线程默认会提供一个默认的工厂来创立线程,当应用默认的工厂来创立线程时,会使新创建的线程具备雷同的优先级,并且是非守护的线程,同时也设置了线程的名称 (7)rejectHandler:回绝解决工作时的策略 如果workQueue阻塞队列满了,并且没有闲暇的线程池,此时,持续提交工作,须要采取一种策略来解决这个工作。 线程池总共提供了四种策略: 间接抛出异样,这也是默认的策略。实现类为AbortPolicy。用调用者所在的线程来执行工作。实现类为CallerRunsPolicy。抛弃队列中最靠前的工作并执行当前任务。实现类为DiscardOldestPolicy。间接抛弃当前任务。实现类为DiscardPolicy。2.ThreadPoolExecutor提供的启动和进行工作的办法(1)execute():提交工作,交给线程池执行(2)submit():提交工作,可能返回执行后果 execute+Future(3)shutdown():敞开线程池,期待工作都执行完(4)shutdownNow():立刻敞开线程池,不期待工作执行完 3.ThreadPoolExecutor提供的实用于监控的办法(1)getTaskCount():线程池已执行和未执行的工作总数(2)getCompletedTaskCount():已实现的工作数量(3)getPoolSize():线程池以后的线程数量(4)getCorePoolSize():线程池外围线程数(5)getActiveCount():以后线程池中正在执行工作的线程数量 点击关注,第一工夫理解华为云陈腐技术~

August 16, 2021 · 1 min · jiezi

关于线程池:多线程学习Executor框架

前言Executor框架提供了组件来治理Java中的线程,Executor框架将其分为工作,线程执行工作,工作执行后果三局部。本篇文章将对Executor框架中的组件进行学习。 参考:《Java并发编程的艺术》 注释一. Executor的组件前言中曾经提到,Executor框架将线程的治理分为工作,线程执行工作,工作执行后果三局部。上面以图表模式对这三局部进行阐明。 项阐明工作Executor框架提供了Runnable接口和Callable接口,工作须要实现这两个接口能力被线程执行。线程执行工作Executor框架提供了接口Executor和继承于Executor的ExecutorService接口来定义工作执行机制。Executor框架中的线程池类ThreadPoolExecutor和ScheduledThreadPoolExecutor均实现了ExecutorService接口。工作执行后果Executor框架提供了Future接口和实现了Future接口的FutureTask类来定义工作执行后果。组件之间的类图关系如下所示。 二. ThreadPoolExecutor的解析ThreadPoolExecutor继承于AbstractExecutorService,并实现了ExecutorService接口,是Executor框架的外围类,用于治理线程。在多线程学习-线程池应用中曾经对ThreadPoolExecutor的原理,创立,执行和敞开进行了简略学习。在本大节将对ThreadPoolExecutor的具体实现进行学习。 ThreadPoolExecutor应用了原子整型ctl来示意线程池状态和Worker数量。ctl是一个原子整型,前3位示意线程池状态,后29位示意Worker数量。ThreadPoolExecutor中这部分的源码如下所示。 public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; //取整型前3位,即获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //取整型后29位,即获取Worker数量 private static int workerCountOf(int c) { return c & CAPACITY; } //依据线程池状态和Worker数量拼装ctl private static int ctlOf(int rs, int wc) { return rs | wc; } //线程池状态判断 private static boolean runStateLessThan(int c, int s) { return c < s; } //线程池状态判断 private static boolean runStateAtLeast(int c, int s) { return c >= s; } //判断线程池状态是否为RUNNING private static boolean isRunning(int c) { return c < SHUTDOWN; } ...... }在ThreadPoolExecutor中规定了线程池的状态如下。 ...

August 6, 2021 · 12 min · jiezi

关于线程池:多线程学习线程池使用

前言线程池是Java中应用较多的并发框架,正当应用线程池,能够:升高资源耗费,进步响应速度,进步线程的可管理性。本篇文章为《Java并发编程的艺术》第9章的学习笔记,依据原文作者的编写思路,顺次对线程池的原理,线程池的创立,线程池执行工作和敞开线程池进行了学习和总结。最初会给出一个线程池的实现案例,该案例为之前在某微信公众号上看到的线程池实现实战,曾经无奈讲究其出处,原案例对于线程池的敞开存在缺点,我对其进行了一些修改和阐明,分享进去一起学习。 注释一. 线程池的原理当一个工作提交到线程池ThreadPoolExecutor时,该工作的执行如下图所示。 如果以后运行的线程数小于corePoolSzie(外围线程数),则创立新线程来执行工作(须要获取全局锁);如果以后运行的线程数等于或大于corePoolSzie,则将工作退出BlockingQueue(工作阻塞队列);如果BlockingQueue已满,则创立新的线程来执行工作(须要获取全局锁);如果创立新线程会使以后线程数大于maximumPoolSize(最大线程数),则回绝工作并调用RejectedExecutionHandler的rejectedExecution()办法。因为ThreadPoolExecutor存储工作线程应用的汇合是HashSet,因而执行上述步骤1和步骤3时须要获取全局锁来保障线程平安,而获取全局锁会导致线程池性能瓶颈,因而通常状况下,线程池实现预热后(以后线程数大于等于corePoolSize),线程池的execute()办法都是执行步骤2。 二. 线程池的创立通过ThreadPoolExecutor可能创立一个线程池,ThreadPoolExecutor的构造函数签名如下。 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)通过ThreadPoolExecutor创立线程池时,须要指定线程池的外围线程数,最大线程数,线程保活工夫,线程保活工夫单位和工作阻塞队列,并按需指定线程工厂和饱和回绝策略,如果不指定线程工厂和饱和回绝策略,则ThreadPoolExecutor会应用默认的线程工厂和饱和回绝策略。上面别离介绍这些参数的含意。 参数含意corePoolSize外围线程数,即线程池的根本大小。当一个工作被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否闲暇,也需创立一个新线程来执行工作。maximumPoolSize最大线程数。当线程池中线程数大于等于corePoolSize时,新提交的工作会退出工作阻塞队列,然而如果工作阻塞队列已满且线程数小于maximumPoolSize,此时会持续创立新的线程来执行工作。该参数规定了线程池容许创立的最大线程数keepAliveTime线程保活工夫。当线程池的线程数大于外围线程数时,多余的闲暇线程会最大存活keepAliveTime的工夫,如果超过这个工夫且闲暇线程还没有获取到工作来执行,则该闲暇线程会被回收掉。unit线程保活工夫单位。通过TimeUnit指定线程保活工夫的工夫单位,可选单位有DAYS(天),HOURS(时),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(纳秒),但无论指定什么工夫单位,ThreadPoolExecutor对立会将其转换为NANOSECONDS。workQueue工作阻塞队列。线程池的线程数大于等于corePoolSize时,新提交的工作会增加到workQueue中,所有线程执行完上一个工作后,会循环从workQueue中获取工作来执行。threadFactory创立线程的工厂。能够通过线程工厂给每个创立进去的线程设置更有意义的名字。handler饱和回绝策略。如果工作阻塞队列已满且线程池中的线程数等于maximumPoolSize,阐明线程池此时处于饱和状态,应该执行一种回绝策略来解决新提交的工作。三. 线程池执行工作线程池应用两个办法执行工作,别离为execute()和submit()。execute()办法用于执行不须要返回值的工作,submit()办法用于执行须要返回值的工作。execute()是接口Executor定义的办法,submit()是接口ExecutorService定义的办法,相干类图如下所示。 ThreadPoolExecutor对execute()的实现如下。 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}AbstractExecutorService对submit()实现如下。 ...

July 15, 2021 · 7 min · jiezi

关于线程池:10问10答你真的了解线程池吗

简介: 《Java开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨ThreadPoolExecutor的一些技术细节,并且给出几个罕用的最佳实际倡议。 作者 | 风楼起源 | 阿里技术公众号 《Java开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨ThreadPoolExecutor的一些技术细节,并且给出几个罕用的最佳实际倡议。 我在查找材料的过程中,发现有些问题存在争议。前面发现,一部分起因是因为不同JDK版本的事实是有差别的。因而,上面的剖析是基于当下最罕用的版本JDK1.8,并且对于存在争议的问题,咱们剖析源码,源码才是最精确的。 1 corePoolSize=0会怎么样这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样答复这个问题的: 提交工作后,先判断以后池中线程数是否小于corePoolSize,如果小于,则创立新线程执行这个工作。否者,判断期待队列是否已满,如果没有满,则增加到期待队列。否者,判断以后池中线程数是否大于maximumPoolSize,如果大于则回绝。否者,创立一个新的线程执行这个工作。依照下面的形容,如果corePoolSize=0,则会判断期待队列的容量,如果还有容量,则排队,并且不会创立新的线程。 —— 但其实,这是老版本的实现形式,从1.6之后,实现形式就变了。咱们间接看execute的源码(submit也依赖它),我备注出了要害一行: int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); // 留神这一行代码,增加到期待队列胜利后,判断以后池内线程数是否为0,如果是则创立一个firstTask为null的worker,这个worker会从期待队列中获取工作并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);线程池提交工作后,首先判断以后池中线程数是否小于corePoolSize。如果小于则尝试创立新的线程执行该工作;否则尝试增加到期待队列。如果增加队列胜利,判断以后池内线程数是否为0,如果是则创立一个firstTask为null的worker,这个worker会从期待队列中获取工作并执行。如果增加到期待队列失败,个别是队列已满,才会再尝试创立新的线程。但在创立之前须要与maximumPoolSize比拟,如果小于则创立胜利。否则执行回绝策略。答 上述问题需辨别JDK版本。在1.6版本之后,如果corePoolSize=0,提交工作时如果线程池为空,则会立刻创立一个线程来执行工作(先排队再获取);如果提交工作的时候,线程池不为空,则先在期待队列中排队,只有队列满了才会创立新线程。 所以,优化在于,在队列没有满的这段时间内,会有一个线程在生产提交的工作;1.6之前的实现是,必须等队列满了之后,才开始生产。 2 线程池创立之后,会立刻创立外围线程么之前有人问过我这个问题,因为他发现利用中有些Bean创立了线程池,然而这个Bean个别状况下用不到,所以征询我是否须要把这个线程池正文掉,以缩小利用运行时的线程数(该利用运行时线程过多。) 答 不会。从下面的源码能够看出,在刚刚创立ThreadPoolExecutor的时候,线程并不会立刻启动,而是要等到有工作提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads当时启动外围线程。 prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.prestartAllCoreThreads:Starts all core threads.3 外围线程永远不会销毁么这个问题有点tricky。首先咱们要明确一下概念,尽管在JavaDoc中也应用了“core/non-core threads”这样的形容,但其实这是一个动静的概念,JDK并没有给一部分线程打上“core”的标记,做什么特殊化的解决。这个问题我认为想要探讨的是闲置线程终结策略的问题。 ...

June 1, 2021 · 4 min · jiezi

关于线程池:线程池ThreadPoolExecutor

一 应用线程池的益处升高资源耗费:通过重复使用线程缩小线程创立和销毁对资源的造成的耗费。进步响应速度:工作到达时,能够不须要期待线程的创立就能立刻执行。进步线程的可管理性:线程是稀缺资源,无限度的创立会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行对立的调配,调优和监控。二 线程池的参数1.外围线程数2.最大线程数3.超过外围线程数的线程放弃工夫4.线程放弃工夫的单位5.寄存期待线程的队列6.线程创立工厂7.线程的饱和策略。 三 线程池的工作原理新的工作来到,先判断线程内工作线程是否达到外围线程数,如果没有,间接创立线程执行工作。如果外围线程数已满,队列未满,工作则会退出期待队列。如果期待队列已满,并且没有达到最大线程数,会创立线程,执行工作。如果期待队列已满,也曾经打到了最大线程数,会触发饱和策略 四 有哪些饱和策略• ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来回绝新工作的解决。• ThreadPoolExecutor.CallerRunsPolicy:调用执行本人的线程运行工作,也就是间接在调用execute办法的线程中运行(run)被回绝的工作,如果执行程序已敞开,则会抛弃该工作。因而这种策略会升高对于新工作提交速度,影响程序的整体性能。如果您的应用程序能够接受此提早并且你要求任何一个工作申请都要被执行的话,你能够抉择这个策略。在以后的调用线程执行工作。提交会提早。• ThreadPoolExecutor.DiscardPolicy: 不解决新工作,间接抛弃掉。• ThreadPoolExecutor.DiscardOldestPolicy: 此策略将抛弃最早的未解决的工作申请。默认的回绝策略是AbortPolicy 抛出RejectedExecutionException。 五 一些比照1.Runnable VS CallableRunnable 不会返回后果或者抛出异样、Callable能够。2.execute() Vs submit();execute() 提交不须要返回值的工作。submit() 提交须要返回值得工作。会返回Future<T> 对象。能够调用 future.get() 获取后果,get()办法会阻塞以后线程。get(long timeout,TimeUnit unit)办法能够阻塞一段时间后就返回,但线程可能并未实现,拿不到后果。3.shutdown() Vs shutdownNow()shutdown: 敞开线程池,线程池状态变为shutdown,不承受新工作,但会期待队列里的工作执行结束。shutdownNow : 敞开线程池,线程池状态变为STOP,会终止正在运行的工作,进行解决排队的工作,并返回期待的List。4.isTerminated() Vs isShutDown();isShutDown 当调用 shutdown()之后就会返回true;isTerminated() ,当调用shutdown()之后,等所有工作执行结束才会返回true; 六 几种常见的线程池及问题1.FixedThreadPool 固定线程池。外围线程数等于最大线程数,无界队列 Integer.MAX_VALUE 工作多可能OOM2.SingleThreadPool 单例线程池。 外围线程数和最大线程数为1 无界队列 工作多可能OOM3.cacheThreadPool 缓存线程池。外围线程数为0 最大线程数为Integer.MAX_VALUE 同步队列,线程过多 七 外围线程数的大小设置计算密集型:CPU个数+1。I/O密集型:2倍CPU个数。

May 25, 2021 · 1 min · jiezi

关于线程池:图解定时任务线程池

线程池概念咱们上篇文章剖析了ThreadPoolExecutor,如果要用一句话阐明它的次要劣势,就是线程置换。还有Executors工具类,极大的简化了研发人员工作。 我用一个图反复形容下线程池概念。多生产-多生产模型。 生产者将线程工作丢进线程池中,生产者就就完结了。线程池管制消费者生产元素,消费者能够是1个或者多个,取决于线程池参数corePoolSize和maxPoolSize设置。阻塞队列是用来装生产者丢进去的线程工作,如ArrayBlockingQueue,LinkedBlockingQueue,DelayedQueue等。如果生产者生产能力超过消费者生产能力,如果阻塞队列有长度限度并且超过队列长度线程池会执行饱和策略,如果队列没有长度限度,可也能呈现OOM哦,因为线程工作可能把内存都撑爆了,这也是面试常考点哦!具体概念能够翻看我上一篇文章《线程池面试必考问题》。 定时工作延时原理还记得咱们下面说的阻塞队列吗?定时工作线程池底层应用DelayedQueue实现的,这种提早队列有一个最大的特点:按时出队列,大家都考过驾照吧,科目三考试的时候都是车上坐的是4集体,假如一个人考试须要花15分钟,那么考试学员队列看起来是这样的。 DelayedQueue底层须要实现Delayed接口同时须要实现getDelay办法和compareTo办法,getDelay办法用于计算出队列工夫,一旦小于0就会出队列;compareTo办法用于按触发工夫从小到大排序。这就是Schedule线程池工作延时原理,如果须要看案例代码,请参考我文章《并发队列:PriorityBlockingQueue和DelayQueue案例应用》。 scheduleWithFixedDelay和scheduleAtFixedRate区别 由上图可知:假如线程工作:耗时1秒,定时3秒执行,scheduleWithFixedDelay其实是4秒执行一次。 scheduleWithFixedDelay:是以工作完结工夫周期运行。scheduleAtFixedRate:是以固定周期运行。FutureTask获取返回值在ScheduledThreadPoolExecutor中,ScheduledFutureTask是获取定时工作返回值,继承FutureTask。咱们看下FutureTask调用get阻塞简化流程图。 向线程池增加工作,工作被封装成ScheduledFutureTask并且实现Callable接口是为了获取工作返回值。当客户端线程通过get形式获取线程池线程执行的返回值,客户端线程会阻塞直到线程池线程工作执行完。在理论使用,咱们个别拿返回值测试多线程性能。 Timer比拟Timer是单线程,而且不带返回值。ScheduledThreadPoolExecutor是多线程的,采纳线程复用代替创立新的线程,并且FutureTask带返回值。Timer线程调用sche办法,如果TimerTask出异样,Timer单线程间接挂掉退出,而ScheduledThreadPoolExecutor会捕捉了异样,不影响其余消费者线程。上面是代码测试。import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.TimeUnit;/** * @author :jiaolian * @date :Created in 2021-02-25 13:50 * @description:Timer工作异样,Timer线程退出! * @modified By: * 公众号:叫练 */public class TimerTaskExceptionTest { public static void main(String[] args) throws InterruptedException { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("first task"); } },0,1000); TimeUnit.SECONDS.sleep(3); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("second task"); int x = 5/0; } },0,1000); }}如上代码:timer提交了first_task和second_task两个工作,3秒后,second_task执行5/0会抛出异样,此时timer线程会退出。如果换成ScheduledThreadPoolExecutor则不会影响first_task。在理论利用中,举荐用定时工作线程池中的办法去解决工作。 ...

February 25, 2021 · 1 min · jiezi

关于线程池:程序员不得不知的线程池附10道面试题

为什么要用线程池呢?上面是一段创立线程并运行的代码: for (int i = 0; i < 100; i++) { new Thread(() -> { System.out.println("run thread->" + Thread.currentThread().getName()); userService.updateUser(....); }).start();} 咱们想应用这种形式去做异步,或者进步性能,而后将某些耗时操作放入一个新线程去运行。 这种思路是没问题的,然而这段代码是存在问题的,有哪些问题呢?上面咱们就来看看有哪些问题; 创立销毁线程资源耗费;咱们应用线程的目标本是出于效率思考,能够为了创立这些线程却耗费了额定的工夫,资源,对于线程的销毁同样须要系统资源。cpu资源无限,上述代码创立线程过多,造成有的工作不能即时实现,响应工夫过长。线程无奈治理,无节制地创立线程对于无限的资源来说仿佛成了“得失相当”的一种作用。既然咱们下面应用手动创立线程会存在问题,那有解决办法吗? 答案:有的,应用线程池。 线程池介绍线程池(Thread Pool):把一个或多个线程通过对立的形式进行调度和重复使用的技术,防止了因为线程过多而带来应用上的开销。 线程池有什么长处?升高资源耗费。通过反复利用已创立的线程升高线程创立和销毁造成的耗费。进步响应速度。当工作达到时,工作能够不须要等到线程创立就能立刻执行。进步线程的可管理性。线程池应用在JDK中rt.jar包下JUC(java.util.concurrent)创立线程池有两种形式:ThreadPoolExecutor 和 Executors,其中 Executors又能够创立 6 种不同的线程池类型。 ThreadPoolExecutor 的应用 线程池应用代码如下: import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ThreadPoolDemo { private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(100)); public static void main(String[] args) { threadPoolExecutor.execute(new Runnable() { @Override public void run() { System.out.println("田学生您好"); } }); }} ...

January 28, 2021 · 3 min · jiezi

关于线程池:不看后悔的项目中线程池实际应用

前言:最近在看线程池方面的内容,联合源码学习完其外部原理后,心想本人在我的项目中有理论应用过线程池吗?想了想,的确在我的项目中很多中央应用到了线程池;上面来简略聊下最近在日志方面中多线程的利用: 服务接口日志异步线程池化入库解决定时工作中应用多线程进行日志清理 本文主线:①、线程池基本原理解读; ②、线程池理论利用例子: 线程池利用 Demo 我的项目构造形容服务接口日志异步线程池化入库解决定时工作中应用多线程进行日志清理线程池基本原理解读:啥也不说,先贴一张脑图,通过脑图对线程池疾速的进行理解;除了看图外,也能够通过此文章《ava线程池实现原理及其在美团业务中的实际》对线程池进行具体的理解; 线程池理论利用例子:上面就来聊聊最近在我的项目日志中线程池的利用;线程池利用Demo我的项目形容:Demo 地址:https://github.com/leishen6/s... 下面说的两种日志方面线程池利用曾经写好了Demo,是一个 SpringBoot 我的项目,我的项目构造如下图: 服务接口日志异步线程池化入库解决:后盾服务接口我的项目中,常常须要对接口的申请报文和响应报文日志做入库保留;上面将通过比照 一般形式入库操作和线程池形式入库操作 ,来说说为什么线程池式入库更加优雅; 一般形式入库操作:一般入库就是间接进行完业务逻辑解决并构建好响应后同时将日志进行入数据库,入库胜利后再将响应返回; 流程图如下: 然而这样存在一个很大的弊病就是因为多了一次数据库操作(日志入库),进而可能会导致响应速度比较慢; 上面就聊聊怎么通过线程池对日志入库进行优化,晋升接口的响应速度; 线程池形式入库操作:线程池形式入库,能够将日志间接放入到队列中,而后就间接返回响应,最初应用线程池中的线程取出队列中的日志数据异步做入库操作; 流程图如下: 应用线程池形式,解决申请的主线程能够将日志放入到队列后,间接将响应返回,而后再应用线程池中的线程取出队列中的日志数据异步的将其进行入库;因为缩小了一次数据库操作,会极大的晋升接口响应速度。 上面来看看代码实现:1、下面说到的寄存申请报文和响应报文日志的队列: LinkedBlockingDeque // 基于链表的双向阻塞队列,在队列的两端都能够插入和移除元素,是线程平安的,多线程并发下效率更高BlockingQueue<TestLogBean> queue = new LinkedBlockingDeque<TestLogBean>(MAX_QUEUE_SIZE);除了 LinkedBlockingDeque 阻塞队列外,还有一些其它常常会用到的阻塞队列,如下图: 2、我的项目中进行日志入库操作的线程池: 单线程的线程池 + 固定数线程的线程池 单线程的线程池:用来循环的监听队列中的日志数量以及决策什么时候将队列中的日志取出交由固定数线程的线程池做入库操作;固定数线程的线程池:次要用来进行日志的入库操作;局部代码实现如下: /** * 初始化 */public void init(){ // 基于链表的双向阻塞队列,在队列的两端都能够插入和移除元素,是线程平安的,多线程并发下效率更高 queue = new LinkedBlockingDeque<TestLogBean>(MAX_QUEUE_SIZE); lastExecuteTime = System.currentTimeMillis(); logger.info("LogPoolManager init successfully......"); logManagerThreadPool.execute(new Runnable() { @Override public void run() { while (run.get()){ try { // 线程休眠,具体工夫依据我的项目的理论状况配置 Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { logger.error("log Manager Thread sleep fail ", e); } // 满足寄存了10个日志 或 满足工夫距离曾经大于设置的最大工夫距离时 执行日志插入 if (logCount.get() >= BATCH_SIZE || (System.currentTimeMillis() - lastExecuteTime) > MAX_EXE_TiME) { if (logCount.get() > 0) { logger.info("begin drain log queue to database..."); List<TestLogBean> list = new ArrayList<TestLogBean>(); /** * drainTo (): 一次性从BlockingQueue获取所有可用的数据对象(还能够指定获取数据的个数), * 通过该办法,能够晋升获取数据效率;不须要屡次分批加锁或开释锁。 * 将取出的数据放入指定的list汇合中 */ queue.drainTo(list); // 工作队列 中工作数量置为0 logCount.set(0); // 从线程池中取出线程执行日志插入 logWorkerThreadPool.execute(new InsertThread(testLogService, list)); logger.info("end drain log queue to database..."); } // 获取以后执行的工夫 lastExecuteTime = System.currentTimeMillis(); } } logger.info("LogPoolManager shutdown successfully"); } });}本我的项目中测试服务接口日志异步线程池化入库解决,我的项目启动后,在浏览器输出上面URL,并刷新页面即可:http://127.0.0.1:8081/v1/api/log/test ...

December 30, 2020 · 2 min · jiezi

关于线程池:高并发下的大数据处理多线程数据分析实例

之前我的项目中遇到的问题:须要对单日告警量超过四百万条的数据进行逐条剖析和逻辑解决,导致靠繁多过程跑数据基本跑不完,无奈满足零碎和业务要求,因而决定采纳开多线程形式实现大数据处理。数据处理流程:1、申明一个内存队列2、从库中轮巡取数据放入内存队列3、开多个线程逐条取内存队列中的数据,剖析后在库中对该条数据打上标识,下次不取了。 main办法程序入口 /** * */package cn.com.starit.main;import org.apache.log4j.Logger;import cn.com.starit.ge.persistence.cache.AllCacheRefresh;/** * ================================================= <br> * 工程:GessAlarmAnalysis <br> * 类名:Main <br> * 作者:xt<br> * 工夫:2019-10-08上午02:31:17<br> * 版本:Version 1.0 <br><br> * 形容:告警解析入口<br> * ================================================= <br> */public class Main { private static final Logger logger = Logger.getLogger(Main.class); private static Main instance = null; private Main(){} /** * 零碎入口 * @param args */ public static void main(String[] args) { logger.info("过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576))); Main.getInstance().sysInit(); logger.info("零碎初始化结束:过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576))); logger.info("过程最大内存:" + (int)(Runtime.getRuntime().maxMemory()/(1048576))); //从库中取出流动告警存入内存队列 new LoadGesBsmThread().start(); //告警解析过程 new alarmAnalysis(10).start(); } /** * 零碎初始化 */ public void sysInit() { // 加载静态数据 AllCacheRefresh.refreshAll(); } /** * @return the instance */ public static Main getInstance() { if(null == instance) instance = new Main(); return instance; }}创立内存队列实例及办法 ...

December 23, 2020 · 13 min · jiezi

关于线程池:并发线程池原理与应用

java 线程池常识汇总.线程池参数含意int corePoolSize外围线程数,也即失常状况工作线程数 int maximumPoolSize,最大线程数 long keepAliveTime,须要联合阻塞队列来了解:假如阻塞队列的长度是3,外围数是2,最大线程数是5. 运行时是这样的:大于外围数时,会放到阻塞队列外面排队,如果队列满了才会启用新的工作线程,直到达到最大线程数当达到最大线程数时,如果此时submit到线程池的工作变慢了,外围线程可能应答工作的话,这时线程池会动静缩小工作线程数到外围线程数。这里的keepAliveTime 就是指 除外围线程以外的那几个线程的闲暇工夫。如果大于这个参数所指定的,线程池则会回收这些线程 TimeUnit unit,第三个参数的单位 BlockingQueue<Runnable> workQueue,比外围线程数多进去的线程会进入阻塞队列排队。别用 Executors.newFixedThreadPool 办法结构, 默认指定的阻塞队列(LinkedBlockingQueue)大小是Integer.MAX_VALUE,需显示指定 ThreadFactory threadFactory,用默认的就行,有需要的话辨别下线程名字 RejectedExecutionHandler handler回绝策略: AbortPolicy(默认): 间接抛异样DiscardPolicy: 随机抛弃DiscardOldestPolicy: 抛弃最老的线程CallerRunsPolicy:将执行权回退给调用者线程。 源码剖析准备常识(位运算)原码(带符号位): 最高位为符号位, 正数为1,负数为0反码: 原码除符号位,其余位取反.补码(次要用于示意正数): 带符号的正数反码 + 1, 次要为了打消-0,只有正数才用补码示意. refer: https://www.zhihu.com/questio...线程状态如何保留线程池中,用前三位代表运行状态,用后29位代表工作线程数. // 29 private static final int COUNT_BITS = Integer.SIZE - 3; //1(原码示意)左移29位 + (-1)的补码 (32位1), 前三位都为0, 后29位为1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //即取后29位 private static int workerCountOf(int c) { return c & CAPACITY; } //CAPACITY 取反即前三位是1, 后29位是0, 故任何数与 ~CAPACITY 做与运算都只保留前3位. private static int runStateOf(int c) { return c & ~CAPACITY; }工作治理submit 提交工作后的执行逻辑 ...

December 22, 2020 · 2 min · jiezi

关于线程池:C-线程池

C++ 实现线程池

December 4, 2020 · 1 min · jiezi

关于线程池:Java并发编程面试必备之线程池

什么是线程池是一种基于池化思维治理线程的工具。池化技术:池化技术简略点来说,就是提前保留大量的资源,以备不时之需。比方咱们的对象池,数据库连接池等。 线程池益处咱们为什么要应用线程池,间接new thread start不好吗? 升高资源耗费: 通过反复利用已创立的线程来升高线程创立和销毁所造成的耗费。进步响应速度: 工作达到时,能够立刻执行,不须要等到线程创立再来执行工作。进步线程的可管理性: 线程是稀缺资源,如果无限度创立,不仅会耗费系统资源,还会因为线程的不合理散布导致资源调度失衡,升高零碎的稳定性。应用线程池能够进行对立的调配、调优和监控。线程池的执行流程咱们先来看看线程池的一个执行流程图,此图来自文末参考1 通过上述图咱们能够得出线程池执行工作能够有以下几种状况: 如果以后的运行线程小于coreSize,则创立新线程来执行工作。如果以后运行的线程等于coreSize或多余coreSize(动静批改了coreSize才会呈现这种状况),把工作放到阻塞队列中。如果队列已满无奈将新退出的工作放进去的话,则须要创立新的线程来执行工作。如果新创建线程曾经达到了最大线程数,工作将会被回绝。怎么是用线程池在java jdk的Executors有提供创立不同线程池的办法(个别不举荐这种做法)阿里巴巴的开发手册也明确强制规定不让通过Executors来创立的,在一些公司的开发标准外面应该也会有这么一条吧。 newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPoolnewScheduledThreadPoolnewWorkStealingPool (jdk1.8新增的)咱们能够应用ThreadPoolExecutor来创立线程池 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 咱们能够看出创立线程池有七个参数,而上述咱们通过Executors工具类来创立的线程池就一两个参数,其余参数它都帮咱们默认写死了,咱们只有真正了解了这几个参数能力更好的去是用线程池。上面咱们来看看这七个参数(线程池参数)。 corePoolSize外围线程数(线程池的根本大小)当咱们提交一个工作到线程池时就会创立一个线程来执行工作.当咱们须要执行的工作数大于外围线程数了就不再创立,如果咱们调用了prestartAllCoreThreads()办法线程池就会为咱们提前创立好所有的根本线程。 maximumPoolSize最大线程数:线程池容许创立的最大线程数。如果队列曾经满了,且已创立的线程数小于最大线程数,则线程池就会创立新的线程来执行工作。这里有个小知识点,如果咱们的队列是用的无界队列,这个参数是不会起作用的,因为咱们的工作会始终往队列中加,队列永远不会满(内存容许的状况)。keepAliveTime闲暇线程最大生存工夫。以后线程数大于外围线程数时,完结多余的闲暇线程期待新工作的最长工夫。默认状况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程闲暇的工夫达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。然而如果调用了allowCoreThreadTimeOut(boolean)办法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;比方以后线程池中最大线程数(maximumPoolSize)为50,外围线程数(corePoolSize)为10,以后正在跑工作的线程数为30.而后是不是空出了20个线程没活干,所以这20个线程就要被消毁,有点卸磨杀驴的感觉。如果剩下的30个线程干完活了也劳动了keepAliveTime这么久,而后这30个线程外面也要被销毁20个,就保留个外围线程。如果设置了allowCoreThreadTimeOut等于true外围线程也会被销毁。就跟咱们做外包我的项目一样,甲方我的项目实现了就得去另外一个甲方,如果短时间内都没有甲方接收你的话,你就要被解雇了,只会留下几个外围人员保护下我的项目,如果甲方我的项目保护的话用本人的人的话,所有的外包人会都会被解雇。 unit线程存活工夫的的单位。可选的单位有days、hours等。workQueue工作队列。能够抉择以下这些队列 threadFactory用户设置创立线程的工厂,咱们能够通过这个工厂来创立有业务意义的线程名字。咱们能够比照下自定义的线程工厂和默认的线程工厂创创立的名字。 默认产生线程的名字自定义线程工厂产生名字pool-5-thread-1testPool-1-thread-1阿里开发手册也有明确说到,须要指定有意义的线程名字。 RejectedExecutionHandler线程池回绝策略。当队列和线程池都满了阐明线程池曾经处于饱和状态。 必须要采取肯定的策略来解决新提交的工作。jdk默认提供了四种回绝策略:其实咱们也能够自定义工作回绝策略(实现下RejectedExecutionHandler接口),比如说如果工作回绝了咱们能够记录下日志,或者重试等,依据本人的业务需要来实现。 dubbo 工作回绝策略 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: " + "%d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); dispatchThreadPoolExhaustedEvent(msg); throw new RejectedExecutionException(msg); }咱们能够看出dubbo的回绝策略次要记录了具体的级别为warm的日志、输入以后线程堆栈详情、持续抛出回绝工作异样。 ...

November 6, 2020 · 1 min · jiezi

转载面试必问的线程池你懂了吗

前言 在上次和二狗的“HashMap 最强者”PK后,二狗始终耿耿于怀,常常缠着我要复仇,甚至违心出卖本人的屁股???我破口大骂:“这个死基佬”,而后许可了他... 于是“独身狗大厦11楼11室”又是一场血雨腥风。 注释 二狗:为什么要应用线程池?间接new个线程不是很难受? 如果咱们在办法中间接new一个线程来解决,当这个办法被调用频繁时就会创立很多线程,不仅会耗费系统资源,还会升高零碎的稳定性,一不小心把零碎搞崩了,就能够间接去财务那结帐了。 如果咱们正当的应用线程池,则能够防止把零碎搞崩的困境。总得来说,应用线程池能够带来以下几个益处: 升高资源耗费。通过反复利用已创立的线程,升高线程创立和销毁造成的耗费。进步响应速度。当工作达到时,工作能够不须要等到线程创立就能立刻执行。减少线程的可管理型。线程是稀缺资源,应用线程池能够进行统一分配,调优和监控。二狗:线程池的外围属性有哪些? threadFactory(线程工厂):用于创立工作线程的工厂。 corePoolSize(外围线程数):当线程池运行的线程少于 corePoolSize 时,将创立一个新线程来解决申请,即便其余工作线程处于闲暇状态。 workQueue(队列):用于保留工作并移交给工作线程的阻塞队列。 maximumPoolSize(最大线程数):线程池容许开启的最大线程数。 handler(回绝策略):往线程池增加工作时,将在上面两种状况触发回绝策略:1)线程池运行状态不是 RUNNING;2)线程池曾经达到最大线程数,并且阻塞队列已满时。 keepAliveTime(放弃存活工夫):如果线程池以后线程数超过 corePoolSize,则多余的线程闲暇工夫超过 keepAliveTime 时会被终止。 二狗:说下线程池的运作流程 我给你画张图吧。 二狗:(尼玛,这图也太香了,珍藏珍藏)线程池中的各个状态别离代表什么含意?线程池目前有5个状态: RUNNING:承受新工作并解决排队的工作。SHUTDOWN:不承受新工作,但解决排队的工作。STOP:不承受新工作,不解决排队的工作,并中断正在进行的工作。TIDYING:所有工作都已终止,workerCount 为零,线程转换到 TIDYING 状态将运行 terminated() 钩子办法。TERMINATED:terminated() 已实现。二狗:这几个状态之间是怎么流转的? 我再给你画个图,看好了! 二狗:(这图也不错,珍藏就对了)线程池有哪些队列? 常见的阻塞队列有以下几种: ArrayBlockingQueue:基于数组构造的有界阻塞队列,按先进先出对元素进行排序。 LinkedBlockingQueue:基于链表构造的有界/无界阻塞队列,按先进先出对元素进行排序,吞吐量通常高于 ArrayBlockingQueue。Executors.newFixedThreadPool 应用了该队列。 SynchronousQueue:不是一个真正的队列,而是一种在线程之间移交的机制。要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在期待承受这个元素。如果没有线程期待,并且线程池的以后大小小于最大值,那么线程池将创立一个线程,否则依据回绝策略,这个工作将被回绝。应用间接移交将更高效,因为工作会间接移交给执行它的线程,而不是被放在队列中,而后由工作线程从队列中提取工作。只有当线程池是无界的或者能够回绝工作时,该队列才有理论价值。Executors.newCachedThreadPool应用了该队列。 PriorityBlockingQueue:具备优先级的无界队列,按优先级对元素进行排序。元素的优先级是通过天然程序或 Comparator 来定义的。 二狗:应用队列有什么须要留神的吗? 应用有界队列时,须要留神线程池满了后,被回绝的工作如何解决。 应用无界队列时,须要留神如果工作的提交速度大于线程池的处理速度,可能会导致内存溢出。 二狗:线程池有哪些回绝策略? 常见的有以下几种: AbortPolicy:停止策略。默认的回绝策略,间接抛出 RejectedExecutionException。调用者能够捕捉这个异样,而后依据需要编写本人的解决代码。 DiscardPolicy:摈弃策略。什么都不做,间接摈弃被回绝的工作。 DiscardOldestPolicy:摈弃最老策略。摈弃阻塞队列中最老的工作,相当于就是队列中下一个将要被执行的工作,而后从新提交被回绝的工作。如果阻塞队列是一个优先队列,那么“摈弃最旧的”策略将导致摈弃优先级最高的工作,因而最好不要将该策略和优先级队列放在一起应用。 CallerRunsPolicy:调用者运行策略。在调用者线程中执行该工作。该策略实现了一种调节机制,该策略既不会摈弃工作,也不会抛出异样,而是将工作回退到调用者(调用线程池执行工作的主线程),因为执行工作须要肯定工夫,因而主线程至多在一段时间内不能提交工作,从而使得线程池有工夫来解决完正在执行的工作。 二狗:线程只能在工作达到时才启动吗? 默认状况下,即便是外围线程也只能在新工作达到时才创立和启动。然而咱们能够应用 prestartCoreThread(启动一个外围线程)或 prestartAllCoreThreads(启动全副外围线程)办法来提前启动外围线程。 二狗:外围线程怎么实现始终存活? 阻塞队列办法有四种模式,它们以不同的形式解决操作,如下表。 抛出异样 返回非凡值 始终阻塞 超时退出 插入 add(e) ...

July 10, 2020 · 2 min · jiezi

线程池技术相关

线程池技术相关: 线程池的工作原理与源码解读一、线程池创建二、线程池执行流程 1、先看一下线程池的executor方法2、再看下addWorker的方法实现3、再到Worker里看看其实现4、接下来咱们看看runWorker方法的逻辑5、最后在看看getTask方法实现ThreadPoolExecutor 的八种拒绝策略 | 含番外!JDK内置4种线程池拒绝策略 CallerRunsPolicy(调用者运行策略)AbortPolicy(中止策略)DiscardPolicy(丢弃策略)DiscardOldestPolicy(弃老策略)第三方实现的拒绝策略 dubbo中的线程拒绝策略Netty中的线程池拒绝策略activeMq中的线程池拒绝策略pinpoint中的线程池拒绝策略结语

June 17, 2020 · 1 min · jiezi