关于线程池:深入浅出线程池-京东云技术团队

一、线程1、什么是线程线程(thread)是操作系统可能进行运算调度的最小单位。它被蕴含在过程之中,是过程中的理论 运作单位。一条线程指的是过程中一个繁多程序的控制流,一个过程中能够并发多个线程,每条线 程并行执行不同的工作。 2、如何创立线程2.1、JAVA中创立线程/** * 继承Thread类,重写run办法 */class MyThread extends Thread { @Override public void run() { System.out.println("myThread..." + Thread.currentThread().getName());} }/** * 实现Runnable接口,实现run办法 */class MyRunnable implements Runnable { @Override public void run() { System.out.println("MyRunnable..." + Thread.currentThread().getName());} }/** * 实现Callable接口,指定返回类型,实现call办法 */class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "MyCallable..." + Thread.currentThread().getName();} }2.2、测试一下public static void main(String[] args) throws Exception { MyThread thread = new MyThread(); thread.run(); //myThread...main thread.start(); //myThread...Thread-0 MyRunnable myRunnable = new MyRunnable(); Thread thread1 = new Thread(myRunnable); myRunnable.run(); //MyRunnable...main thread1.start(); //MyRunnable...Thread-1 MyCallable myCallable = new MyCallable(); FutureTask<String> futureTask = new FutureTask<>(myCallable); Thread thread2 = new Thread(futureTask); thread2.start(); System.out.println(myCallable.call()); //MyCallable...main System.out.println(futureTask.get()); //MyCallable...Thread-2} 2.3、问题既然咱们创立了线程,那为何咱们间接调用办法和咱们调用start()办法的后果不同?new Thread() 是否实在创立了线程? ...

September 22, 2023 · 8 min · jiezi

关于线程池:别再纠结线程池池大小线程数量了哪有什么固定公式-京东云技术团队

可能很多人都看到过一个线程数设置的实践: CPU 密集型的程序 - 外围数 + 1I/O 密集型的程序 - 外围数 * 2不会吧,不会吧,真的有人依照这个实践布局线程数? 线程数和CPU利用率的小测试抛开一些操作系统,计算机原理不谈,说一个根本的实践(不必纠结是否谨严,只为好了解):一个CPU外围,单位工夫内只能执行一个线程的指令 那么实践上,我一个线程只须要不停的执行指令,就能够跑满一个外围的利用率。 来写个死循环空跑的例子验证一下: 测试环境:AMD Ryzen 5 3600, 6 - Core, 12 - Threads public class CPUUtilizationTest { public static void main(String[] args) { //死循环,什么都不做 while (true){ } }}运行这个例子后,来看看当初CPU的利用率: 从图上能够看到,我的3号外围利用率曾经被跑满了 那基于下面的实践,我多开几个线程试试呢? public class CPUUtilizationTest { public static void main(String[] args) { for (int j = 0; j < 6; j++) { new Thread(new Runnable() { @Override public void run() { while (true){ } } }).start(); } }}此时再看CPU利用率,1/2/5/7/9/11 几个外围的利用率曾经被跑满: ...

September 20, 2023 · 2 min · jiezi

关于线程池:再说Java线程池

线程池流程图 execute外围办法这里不必多说,其实就是线程池的流程图中的所说的,如果小于外围线程数量,创立外围线程,否则往队列仍工作,假如队列也满了,那就创立非核心线程,如果大于了maximumPooSize,也就是说非核心线程也创立不了了,不好意思,那就走reject策略 addWorkeraddWorker别看代码比拟多,其实外围代码就是通过CAS来减少一个Worker count数,而后跳出循环,创立Worker加锁的形式退出到线程池中workers(HashSet)数据结构中,而后启动Worker WorkerThreadPoolExecutor创立了一个Worker并启动了他,咱们是不是要看一下Worker是什么玩意?唉吆喂,Worker居然继承了AQS并实现了Runnable,加锁的线程,是不是?好吧,那既然是线程了,那看一下他run办法吧:首先看看一下 task != null || (task = getTask()) != null,这个条件,对于第一次创立外围线程并执行run来说,firstTask是通过 w = new Worker(firstTask)传递过去,给了Worker实例变量了,所以task必定是不为null的,批准吧?而后看外围代码task.run,其实就是调用用户本人定义的run办法的逻辑,了解了吧 好了,那比如说咱们外围线程的第一个工作运行完了呢?那getTask()就上场了: 其实有三局部外围逻辑 : 敞开线程池如果曾经对线程池进行了shutdown了,那好,等我workerQueue队列工作生产结束,那我就线程退出如果曾经对线程池进行了stop,我去,好吧,你太暴力了,那就间接退出吧,会在shutdownNow中具体说 非核心线程超时如果wc > corePoolSize,也就是说有非核心线程,是不是timed为true,而后非核心线程是以workQueue.poll以规定工夫来获取工作,此时如果工作是null,没有工作,是不是timedOut=true了,好,而后再循环timed和timedOut都为true了,上述第二个逻辑的if就成立了,那非核心线程的Worker就会退出 阻塞和非阻塞形式获取工作如果是外围线程,就会阻塞的形式take来拿工作,如果是非核心线程,就会进行poll(time)形式来拿工作 shutdonwNow & shutdownshutdonwNowadvanceRunState(STOP):将线程池的状态扭转为STOP,实质上是一个线程标记位的扭转interruptWorkers():对线程池中的所有线程都进行终止(interrupt)操作这里会对所有的线程都会进行interrupt,那当初就有一个疑难,Worker会在哪里阻塞呢?其实有两处(Worker中): 对于闲暇的线程会在getTask中以take阻塞的形式来拿工作还有就是在task.run中,咱们本人定义业务逻辑如果有可中断的代码,就会被中断而shutdownNow会对所有的Worker进行interrupt,也可能会对正在运行的工作,如果能够响应中断,就会中断以后正在运行的工作,否则会进行下一次getTask的时候,即运行完当前任务,再下一次获取工作前判断退出Worker线程 shutdowninterruptIdleWorkers:是interrupt闲暇的线程,谁是闲暇的线程?怎么来辨别,这里AQS就出场了w.tryLock()是一个很要害的玩意,要晓得,如果Worker正在执行工作,正在执行task.run(),这后面必定是加锁的,所以tryLock是拿不到锁的,所以针对正在运行的Worker线程是不interrupt的

April 2, 2023 · 1 min · jiezi

关于线程池:NacosThreadPoolExecutor构建动态线程池

0 文章概述动静线程池是指能够动静调节线程池某些参数,本文咱们联合Apollo和线程池实现一个动静线程池。 1 线程池根底1.1 七个参数咱们首先回顾Java线程池七大参数,这对后续设置线程池参数有帮忙。咱们查看ThreadPoolExecutor构造函数如下: public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }}corePoolSize线程池外围线程数,类比业务大厅开设的固定窗口。例如业务大厅开设2个固定窗口,那么这两个窗口不会敞开,全天都会进行业务办理 ...

March 7, 2023 · 6 min · jiezi

关于线程池:线程池调优计算原理

1、背景咱们应用线程池来无效地使零碎工作负载与系统资源保持一致。这个零碎工作负载应该是能够独立运行的工作,比方一个web利用的每一个Http申请都能够属于这个类别,咱们能够解决每一个申请而不必思考另一个Http申请。 咱们冀望咱们的应用程序具备良好的吞吐量和良好的响应能力。为了实现这一点,首先咱们应该将咱们的应用程序工作划分为独立的工作,而后咱们应该以无效利用 CPU、RAM(利用率)等系统资源的形式运行这些工作。通过应用线程池,指标是在无效应用系统资源的同时运行这些独自的工作。 如果疏忽磁盘和网络,给定单个 CPU 资源,按程序执行A和B总是比通过工夫切片“同时”执行A和B快,这是计算的基本定律。一旦线程数超过 CPU 内核数,增加更多线程就会变慢,而不是变快。 比方在8核服务器上,现实状态下将线程数设置为 8 将提供最佳性能,超出此范畴的任何事件都会因为上下文切换的开销而开始变慢。但在理论状况中不能疏忽Disk和Network。 2、Java原生线程池 对于线程池的具体实现原理:线程池的基本原理,线程池生命周期治理,具体设计等等,能想到的根本都有,十分具体; Java 的Executors类提供了一些不同类型的线程池;static ExecutorService newSingleThreadExecutor() 创立一个 Executor,它应用单个工作线程在无界队列中运行。static ExecutorService newCachedThreadPool() newCachedThreadPool创立一个可缓存线程池,如果线程池长度超过解决须要,可灵便回收闲暇线程,若无可回收,则新建线程。实现原理将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,应用的synchronousQueue(无界),也就是说来了工作就创立线程运行,当线程闲暇超过60秒,就销毁线程。是大小无界的线程池。比方,实用于执行很多的短期异步工作的小程序,或者是负载较轻的服务器。static ExecutorService newFixedThreadPool(int nThreads) 创立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中期待。这个创立的线程池corePoolSize和maximum PoolSize 值是相等的,它应用的LinkedBlockingQueue(无界队列)。实用于为了满足资源管理要求,而须要限度以后线程数量的利用场景。static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创立一个定长线程池,反对定时及周期性工作执行。实用于多个后盾线程执行周期工作,同时为了满足资源管理的需要而限度后盾线程的数量的利用场景。newSingleThreadExecutor(),因为这个池只有 1 个线程,因而咱们提交到这个线程池的每个工作都是程序工作的,没有并发,如果咱们有能够独立运行的工作,这个配置在咱们的应用程序吞吐量和响应能力方面并不好。 newCachedThreadPool(),因为这个池会为提交到池的每个工作创立一个新线程或应用现有线程。对于某些场景(例如,如果咱们的工作是短期工作,此池应用对于咱们的独立工作可能很有意义。如果咱们的工作不是短暂的,应用这种线程池会导致在应用程序上创立许多线程。如果咱们创立的线程超过阈值,那么咱们就不能无效地应用 CPU 资源,因为 CPU 的大部分工夫都花在线程或上下文切换上,而不是真正的工作。这再次导致咱们的应用程序响应能力和吞吐量降落。 咱们须要线程池newFixedThreadPool(int nThreads),咱们应该抉择现实的大小来减少咱们的应用程序吞吐量和响应能力(我假如咱们有能够独立运行的工作)。重点是抉择不要太多也不要太小。前者导致CPU破费太多工夫进行线程切换而不是真正的工作,也会导致过多的内存应用问题,后者导致CPU闲暇,而咱们有应该解决的工作。 ⚠️:只有工作都是同类型并且互相独立时,线程池的效率达到最佳 2.1、问题2.1.1、线程饥饿死锁在线程池中所有正在执行工作的线程都因为期待其余仍处于工作队列中的工作而阻塞 例1:(饥饿或死锁)在单线程池中,正在执行的工作阻塞期待队列中的某个工作执行结束 例2:(饥饿或死锁)线程池不够大时,通过栅栏机制协调多个工作时 例3:(饥饿)因为其余资源的隐性限度,每个工作都须要应用无限的数据库连贯资源,那么不论线程池多大,都会体现出和连贯资源雷同的大小。 每当提交了一个有依赖性的Executor工作时,要分明地晓得可能会呈现线程"饥饿"死锁,因而须要在代码或配置Executor地配置文件中记录线程池的大小限度或配置限度。以下代码对死锁的产生做了举例。 package com.flydean;import org.junit.Test;import java.util.concurrent.*;public class ThreadPoolDeadlock { ExecutorService executorService= Executors.newSingleThreadExecutor(); public class RenderPageTask implements Callable<String> { public String call() throws Exception{ Future<String> header, footer; header= executorService.submit(()->{ return "加载页眉"; }); footer= executorService.submit(()->{ return "加载页脚"; }); return header.get()+ footer.get(); } } public void submitTask(){ executorService.submit(new RenderPageTask()); }}产生死锁剖析: ...

October 15, 2022 · 2 min · jiezi

关于线程池:ForkJoinPool线程池

ForkJoinPool是自java7开始,jvm提供的一个用于并行执行的工作框架。其宗旨是将大工作分成若干小工作,之后再并行对这些小工作进行计算,最终汇总这些工作的后果,失去最终的后果。1.分治理:大工作拆分小工作,计算小工作把计算结果进行合并,实现大工作计算。2.工作窃取:当前工作线程没有可用线程,则通过利用其余现场队列闲置线程进行本工作队列工作执行,充分利用CPU资源。

July 29, 2022 · 1 min · jiezi

关于线程池:创建线程池

线程池的创立有四种,别离是:1.newCachedThreadPool,最大线程数是Integer最大值,个别不倡议应用该线程池,有OOM危险。2.newFixedThreadPool,指定线程数量。3.newScheduledThreadPool,定时工作线程池,定时执行一些周期性工作。4.newSingleThreadExecutor,队列型线程池,严格依照单线程先进先出执行队列工作。 上述四种线程池创立形式都不倡议,倡议应用ThreadPoolExecutor,通过七大参数创立自定义线程池,七大参数:1.corePoolSize,外围线程数。2.maxPoolSize,最大线程数。3.keepAliveTime:线程存活工夫数。4.unit:工夫单位。5.threadFactory:创立线程工厂。6.workQueue:线程工作队列。7.handler:回绝策略。

July 15, 2022 · 1 min · jiezi

关于线程池:动态调整线程池参数

指标实现动静调整线程池参数对线程池运行状况进行监控实现一,线程池可调整的参数 外围线程数超时工夫最大线程数回绝策略而队列BlockingQueue因为是final类型,所以没有对外批改入口。但能够通过重写LinkedBlockingQueue并把capacity设置为非final。 二,联合配置核心实现动静调整 这里的配置核心应用Apollo, 通过监听配置核心变动,而后更新线程池配置。示例代码如下: @Slf4j@Componentpublic class DynamicThreadPoolConfig { /** 线程执行器 **/ private volatile ThreadPoolExecutor executor; /** 外围线程数 **/ private Integer corePoolSize = 10; /** 最大值线程数 **/ private Integer maximumPoolSize = 20; /** 待执行工作的队列的长度 **/ private Integer workQueueSize = 1000; /** 线程闲暇工夫 **/ private Long keepAliveTime = 1000L; /** 线程名 **/ private String threadName; private Config config = ConfigService.getConfig("lepu-activity-center");; public DynamicThreadPoolConfig() { init(config); } /** * 初始化 */ private void init(Config config) { log.info("线程池初始化中.........."); if (executor == null) { synchronized (DynamicThreadPoolConfig.class) { if (executor == null) { String corePoolSizeProperty = config.getProperty("corePoolSize", corePoolSize.toString()); log.info("批改前的外围线程池:{}",corePoolSizeProperty); String maximumPoolSizeProperty = config.getProperty("maximumPoolSize", maximumPoolSize.toString()); String keepAliveTImeProperty = config.getProperty("keepAliveTime", keepAliveTime.toString()); BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize); executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty), Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty); } } } } /** * 监听到配置核心发生变化后,更新线程池配置 * @param changeEvent */ @ApolloConfigChangeListener public void onChange(ConfigChangeEvent changeEvent){ log.info("线程池参数配置发生变化,namespace:{}",changeEvent.getNamespace()); for(String key : changeEvent.changedKeys()){ ConfigChange change = changeEvent.getChange(key); String newValue = change.getNewValue(); refreshThreadPool(key,newValue); } } /** * 更新线程池配置 * @param key * @param newValue */ private void refreshThreadPool(String key, String newValue) { if (executor == null) { return; } if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) { executor.setCorePoolSize(Integer.valueOf(newValue)); log.info("批改外围线程数key={},value={}",key,newValue); } if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) { executor.setMaximumPoolSize(Integer.valueOf(newValue)); log.info("批改最大线程数key={},value={}", key, newValue); } if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) { executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS); log.info("批改线程闲暇工夫key={},value={}", key, newValue); } } public ThreadPoolExecutor getExecutor() { return executor; }}三,监控形式 ...

June 30, 2022 · 1 min · jiezi

关于线程池:Java多线程与线程池技术

一、序言Java多线程编程线程池被宽泛应用,甚至成为了标配。 线程池实质是池化技术的利用,和连接池相似,创立连贯与敞开连贯属于耗时操作,创立线程与销毁线程也属于重操作,为了提高效率,先提前创立好一批线程,当有须要应用线程时从线程池取出,用完后放回线程池,这样防止了频繁创立与销毁线程。 // 工作Runnable runnable = () -> System.out.println(Thread.currentThread().getId());在利用中优先选用线程池执行异步工作,依据不同的场景选用不同的线程池,进步异步工作执行效率。 1、一般执行new Thread(runnable).start();2、线程池执行Executors.newSingleThreadExecutor().execute(runnable)二、线程池根底(一)外围参数1、外围参数线程池的外围参数决定了池的类型,进而决定了池的个性。 参数解释行为corePoolSize外围线程数池中长期保护的线程数量,不被动回收maximumPoolSize最大线程数最大线程数大于等于外围线程数keepAliveTime线程最大闲暇工夫非核心线程最大闲暇工夫,超时回收线程workQueue工作队列工作队列间接决定线程池的类型2、参数与池的关系Executors类默认创立线程池与参数对应关系。 线程池corePoolSizemaximumPoolSizekeepAliveTimeworkQueuenewCachedThreadPool0Integer.MAX_VALUE60SynchronousQueuenewSingleThreadExecutor110LinkedBlockingQueuenewFixedThreadPoolNN0LinkedBlockingQueuenewScheduledThreadPoolNInteger.MAX_VALUE0DelayedWorkQueue(二)线程池比照依据应用场景抉择对应的线程池。 1、通用比照线程池特点实用场景newCachedThreadPool超时未应用的线程回主动销毁,有新工作时主动创立实用于低频、轻量级的工作。回收线程的目标是节约线程长时间闲暇而占有的资源。newSingleThreadExecutor线程池中有且只有一个线程程序执行工作newFixedThreadPool线程池中有固定数量的线程,且始终存在实用于高频的工作,即线程在大多数工夫里都处于工作状态。newScheduledThreadPool定时线程池与定时调度相关联2、拓展比照保护仅有一个线程的线程池有如下两种形式,失常应用的状况下,二者差别不大;简单应用环境下,二者存在轻微的差别。用newSingleThreadExecutor形式创立的线程池在任何时刻至少只有一个线程,因而能够了解为用异步的形式执行程序工作;后者初始化的时候也只有一个线程,应用过程中可能会呈现最大线程数超过1的状况,这时要求线性执行的工作会并行执行,业务逻辑可能会呈现问题,与理论场景无关。 private final static ExecutorService executor = Executors.newSingleThreadExecutor();private final static ExecutorService executor = Executors.newFixedThreadPool(1);(三)线程池原理 线程池次要解决流程,工作提交之后是怎么执行的。大抵如下: 判断外围线程池是否已满,如果不是,则创立线程执行工作如果外围线程池满了,判断队列是否满了,如果队列没满,将工作放在队列中如果队列满了,则判断线程池是否已满,如果没满,创立线程执行工作如果线程池也满了,则依照回绝策略对工作进行解决(四)提交工作的形式往线程池中提交工作,次要有两种办法:提交无返回值的工作和提交有返回值的工作。 1、无返回值工作execute用于提交不须要返回后果的工作。 public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(() -> System.out.println("hello"));}2、有返回值工作submit()用于提交一个须要返回果的工作。 该办法返回一个Future对象,通过调用这个对象的get()办法,咱们就能取得返回后果。get()办法会始终阻塞,直到返回后果返回。 咱们也能够应用它的重载办法get(long timeout, TimeUnit unit),这个办法也会阻塞,然而在超时工夫内依然没有返回后果时,将抛出异样TimeoutException。 public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(2); Future<Long> future = executor.submit(() -> { System.out.println("task is executed"); return System.currentTimeMillis(); }); System.out.println("task execute time is: " + future.get());}在提交工作时,如果无返回值工作,优先应用execute。(无)敞开线程池在线程池应用实现之后,咱们须要对线程池中的资源进行开释操作,这就波及到敞开性能。咱们能够调用线程池对象的shutdown()和shutdownNow()办法来敞开线程池。 ...

April 6, 2022 · 2 min · jiezi

关于线程池:如何判断线程池已经执行完所有任务了

很多场景下,咱们须要期待线程池的所有工作都执行完,而后再进行下一步操作。对于线程 Thread 来说,很好实现,加一个 join 办法就解决了,然而对于线程池的判断就比拟麻烦了。 咱们本文提供 4 种判断线程池工作是否执行完的办法: 应用 isTerminated 办法判断。应用 getCompletedTaskCount 办法判断。应用 CountDownLatch 判断。应用 CyclicBarrier 判断。接下来咱们一个一个来看。 不判断的问题如果不对线程池是否曾经执行完做判断,就会呈现以下问题,如下代码所示: import java.util.Random;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ThreadPoolCompleted { public static void main(String[] args) { // 创立线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024)); // 增加工作 addTask(threadPool); // 打印后果 System.out.println("线程池工作执行实现!"); } /** * 给线程池增加工作 */ private static void addTask(ThreadPoolExecutor threadPool) { // 工作总数 final int taskCount = 5; // 增加工作 for (int i = 0; i < taskCount; i++) { final int finalI = i; threadPool.submit(new Runnable() { @Override public void run() { try { // 随机休眠 0-4s int sleepTime = new Random().nextInt(5); TimeUnit.SECONDS.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("工作%d执行实现", finalI)); } }); } }}复制代码以上程序的执行后果如下: ...

March 30, 2022 · 3 min · jiezi

关于线程池:从简单代码入手分析线程池原理

一、线程池简介1、池化思维在我的项目工程中,基于池化思维的技术利用很多,例如基于线程池的工作并发执行,中间件服务的连接池配置,通过对共享资源的治理,升高资源的占用耗费,晋升效率和服务性能。 池化思维从直观感觉上了解,既有作为容器的存储能力(持续性的承接),也要具备维持一定量的储备能力(初始化的提供),同时作为容器又必然有大小的限度,上面通过这个根底逻辑来详细分析Java中的线程池原理。 2、线程池首先相熟JVM执行周期的都晓得,在内存中频繁的创立和销毁对象是很影响性能的,而线程作为过程中运行的根本单位,通过线程池的形式重复使用已创立的线程,在工作执行动作上防止或缩小线程的频繁创立动作。 线程池中保护多个线程,当收到调度工作时能够防止创立线程间接执行,并以此升高服务资源的耗费,把绝对不确定的并发工作治理在绝对确定的线程池中,进步零碎服务的稳定性。下文基于JDK1.8围绕ThreadPoolExecutor类深入分析。 二、原理与周期1、类图设计 Executor 接口源码正文解读:未来会执行命令,工作提交和执行两个动作会被解耦,传入Runnable工作对象即可,线程池会执行相应调度和工作解决。Executor尽管是ThreadPoolExecutor线程池的顶层接口,然而其自身只是形象了工作的解决思维。 ExecutorService 接口扩大Executor接口,单个或批量的给工作的执行后果生成Future,并削减工作中断或终止的治理办法。 AbstractExecutorService 抽象类提供对ExecutorService接口定义的工作执行办法(submit,invokeAll)等默认实现,提供newTaskFor办法用于构建RunnableFuture对象。 ThreadPoolExecutor 类保护线程池生命周期,治理线程和工作,通过相应调度机制实现工作的并发执行。 2、根本案例示例中创立了一个简略的butte-pool线程池,设置4个外围线程执行工作,队列容器设置256大小;在理论业务中,对于参数设定须要考量工作执行工夫,服务配置,测试数据等。 public class ThrPool implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ThrPool.class) ; /** * 线程池治理,ThreadFactoryBuilder出自Guava工具库 */ private static final ThreadPoolExecutor DEV_POOL; static { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("butte-pool-%d").build(); DEV_POOL = new ThreadPoolExecutor(0, 8,60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(256),threadFactory, new ThreadPoolExecutor.AbortPolicy()); DEV_POOL.allowCoreThreadTimeOut(true); } /** * 工作办法 */ @Override public void run() { try { logger.info("Print...Job...Run...;queue_size:{}",DEV_POOL.getQueue().size()); Thread.sleep(5000); } catch (Exception e){ e.printStackTrace(); } }} ...

March 20, 2022 · 2 min · jiezi

关于线程池:使用线程池时限制每个线程间隔启动的一种思路

需要形容最近跑仿真时,为了高效利用服务器的计算资源,启动多个仿真程序以达到并行。我将须要运行的命令一次性加载到Python的线程池中,并限度同时可执行的程序在服务器可承受的范畴内。然而遇到了一个新的问题,在每次启动时,线程池容许的20个并行程序同时启动,导致程序启动时遇到读写文件的抵触。为了防止这种抵触,须要并行的程序顺次启动,而不是同时启动。例如,每个程序的启动工夫距离20秒。 一种解决思路我想到的方法是利用线程锁。在启动线程后,每个线程中首先调用一个函数,这个函数的实现是抢一个线程锁,而后睡眠20秒,之后再解锁。这样,其余线程尽管也被启动了,然而因为只有一个线程可能抢到线程锁,所以抢到锁的线程在睡眠20秒后就能够启动程序进行仿真,后续的所有线程仍然须要抢锁而后再启动。这样就防止了同时有多个线程启动仿真导致读写抵触了。 改良思路我的思路比较简单,其中还存在一个小问题:第一个启动的线程也须要睡眠20秒。这个问题比拟好解决,能够定义一个睡眠时长变量,最开始初始化这个睡眠时长0秒,而后每次睡眠后讲这个变量改写为20秒。这样第一个抢到锁的过程就能够睡眠0秒,后续抢到锁的过程会睡眠20秒。

February 21, 2022 · 1 min · jiezi

关于线程池:Java-JUC-ThreadPoolExecutor解析

线程池 ThreadPoolExecutor介绍线程池次要解决两个问题:一是当执行大量异步工作时线程池可能提供较好的性能。在不应用线程池时,每当须要执行工作时就须要 new 一个线程来执行,频繁的创立与销毁十分耗费性能。而线程池中的线程是能够复用的,不须要在每次须要执行工作时候都从新创立和销毁。二是线程池提供了资源限度和治理的伎俩,比方能够限度线程个数,动静减少线程等。 另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同状况下的须要,咱们能够应用更不便的 Executors 的工厂办法,来创立不同类型的线程池,也能够本人自定义线程池。 线程池的工作机制 线程池刚创立的时候没有任何线程,当来了新的申请的时候才会创立外围线程去解决对应的申请当解决实现之后,外围线程并不会回收在外围线程达到指定的数量之前,每一个申请都会在线程池中创立一个新的外围线程当外围线程全都被占用的时候,新来的申请会放入工作队列中。工作队列实质上是一个阻塞队列当工作队列被占满,再来的新申请会交给长期线程来解决长期线程在应用实现之后会持续存活一段时间,直到没有申请解决才会被销毁类图介绍 如上类图所示,Executors 是一个工具类,提供了多种静态方法,依据咱们抉择的不同提供不同的线程池实例。 ThreadPoolExecutor 继承了 AbstractExecutorService 抽象类,在 ThreadPoolExecutor 中成员变量 ctl 是一个 Integer 的原子性变量,用来记录线程池的状态和线程中线程个数,相似于 ReentrantReadWriteLock 应用一个变量来保留两种信息一样。 假如 Integer 类型是 32 位二进制示意,则其中高 3 位示意线程池的状态,后 29 为示意线程池线程数量。 //默认RUNNING状态,线程个数为0private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));获取高 3 位,运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; }获取低 29 位,线程个数 private static int workerCountOf(int c) { return c & CAPACITY; }线程池状态含意如下: RUNNING:承受新工作并且解决阻塞队列中的工作SHUTDOWN:回绝新工作然而解决阻塞队列中的工作STOP:回绝新工作并且摈弃阻塞队列里的工作,同时中断正在解决的工作TIDYING:所有工作都执行完(包含阻塞队列中的工作)后以后线程池流动线程数量为 0,将调用 terminated 办法TERMINATED:终止状态。terminated 调用实现办法后的状态线程池状态转换如下: ...

February 9, 2022 · 6 min · jiezi

关于线程池:线程池

下面是提交了工作后,线程池解决流程。如果想在工作提交前预热线程池能够应用prestartAllCoreThreads提前创立线程。须要留神的是,上图中两个中央的创立线程都是须要加锁的,相干代码如下图: private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //这里的外部类Worker继承了AbstractQueuedSynchronizer,也实现了Runnable //Worker外面有个Thread属性,每次创立工作的时候都会对应创立一个线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }这里再列一下几种工作队列: ...

December 10, 2021 · 2 min · jiezi

关于线程池:90的人以为会用ThreadPoolExecutor了看了这10张图再说吧

在阿里巴巴手册中有一条倡议: 【强制】线程池不容许应用 Executors 去创立,而是通过ThreadPoolExecutor的形式,这样的解决形式让写的同学更加明确线程池的运行规定,躲避资源耗尽的危险。如果常常基于Executors提供的工厂办法创立线程池,很容易疏忽线程池外部的实现。特地是回绝策略,因应用Executors创立线程池时不会传入这个参数,间接采纳默认值,所以经常被疏忽。 上面咱们就来理解一下线程池相干的实现原理、API以及实例。 线程池的作用在实际利用中创立线程池次要是为了: 缩小资源开销:缩小每次创立、销毁线程的开销;进步响应速度:申请到来时,线程已创立好,可间接执行,进步响应速度;进步线程的可管理性:线程是稀缺资源,需依据状况加以限度,确保零碎稳固运行;ThreadPoolExecutorThreadPoolExecutor能够实现线程池的创立。ThreadPoolExecutor相干类图如下: 从类图能够看出,ThreadPoolExecutor最终实现了Executor接口,是线程池创立的真正实现者。 Executor两级调度模型 在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。在Java虚拟机层面,用户将多个工作提交给Executor框架,Executor负责调配线程执行它们;在操作系统层面,操作系统再将这些线程调配给处理器执行。 ThreadPoolExecutor的三个角色工作ThreadPoolExecutor承受两种类型的工作:Callable和Runnable。 Callable:该类工作有返回后果,能够抛出异样。通过submit办法提交,返回Future对象。通过get获取执行后果。Runnable:该类工作只执行,无奈获取返回后果,在执行过程中无奈抛异样。通过execute或submit办法提交。工作执行器Executor框架最外围的接口是Executor,它示意工作的执行器。 通过下面类图能够看出,Executor的子接口为ExecutorService。再往底层有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor(集成自ThreadPoolExecutor)。 执行后果Future接口示意异步的执行后果,它的实现类为FutureTask。 三个角色之间的解决逻辑图如下: 线程池解决流程 一个线程从被提交(submit)到执行共经验以下流程: 线程池判断外围线程池里是的线程是否都在执行工作,如果不是,则创立一个新的工作线程来执行工作。如果外围线程池里的线程都在执行工作,则进入下一个流程;线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的工作贮存在这个工作队列里。如果工作队列满了,则进入下一个流程;线程池判断其外部线程是否都处于工作状态。如果没有,则创立一个新的工作线程来执行工作。如果已满了,则交给饱和策略来解决这个工作。线程池在执行execute办法时,次要有以下四种状况: 如果以后运行的线程少于corePoolSize,则创立新线程来执行工作(须要取得全局锁);如果运行的线程等于或多于corePoolSize,则将工作退出BlockingQueue;如果无奈将工作退出BlockingQueue(队列已满),则创立新的线程来解决工作(须要取得全局锁);如果创立新线程将使以后运行的线程超出maxiumPoolSize,工作将被回绝,并调用RejectedExecutionHandler.rejectedExecution()办法。线程池采取上述的流程进行设计是为了缩小获取全局锁的次数。在线程池实现预热(以后运行的线程数大于或等于corePoolSize)之后,简直所有的excute办法调用都执行步骤二。 线程的状态流转顺便再回顾一下线程的状态的转换,在JDK中Thread类中提供了一个枚举类,例举了线程的各个状态: public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }一共定义了6个枚举值,其实代表的是5种类型的线程状态: NEW:新建;RUNNABLE:运行状态;BLOCKED:阻塞状态;WAITING:期待状态,WAITING和TIMED_WAITING能够归为一类,都属于期待状态,只是后者能够设置等待时间,即期待多久;TERMINATED:终止状态;线程关系转换图: 当new Thread()阐明这个线程处于NEW(新建状态);调用Thread.start()办法示意这个线程处于RUNNABLE(运行状态); 然而RUNNABLE状态中又蕴含了两种状态:READY(就绪状态)和RUNNING(运行中)。调用start()办法,线程不肯定取得了CPU工夫片,这时就处于READY,期待CPU工夫片,当取得了CPU工夫片,就处于RUNNING状态。 在运行中调用synchronized同步的代码块,没有获取到锁,这时会处于BLOCKED(阻塞状态),当从新获取到锁时,又会变为RUNNING状态。在代码执行的过程中可能会碰到Object.wait()等一些期待办法,线程的状态又会转变为WAITING(期待状态),期待被唤醒,当调用了Object.notifyAll()唤醒了之后线程执行完就会变为TERMINATED(终止状态)。 线程池的状态线程池中状态通过2个二进制位(bit)来示意线程池的5个状态:RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED: RUNNING:线程池失常工作的状态,在 RUNNING 状态下线程池承受新的工作并解决工作队列中的工作;SHUTDOWN:调用shutdown()办法会进入 SHUTDOWN 状态。在 SHUTDOWN 状态下,线程池不承受新的工作,然而会继续执行工作队列中已有的工作;STOP:调用shutdownNow()会进入 STOP 状态。在 STOP 状态下线程池既不承受新的工作,也不解决曾经在队列中的工作。对于还在执行工作的工作线程,线程池会发动中断请求来中断正在执行的工作,同时会清空工作队列中还未被执行的工作;TIDYING:当线程池中的所有执行工作的工作线程都曾经终止,并且工作线程汇合为空的时候,进入 TIDYING 状态;TERMINATED:当线程池执行完terminated()钩子办法当前,线程池进入终态 TERMINATED;ThreadPoolExecutor APIThreadPoolExecutor创立线程池API: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 参数解释: ...

November 2, 2021 · 3 min · jiezi

关于线程池:解析ThreadPoolExecutor类是如何保证线程池正确运行的

摘要:对于线程池的外围类ThreadPoolExecutor来说,有哪些重要的属性和外部类为线程池的正确运行提供重要的保障呢?本文分享自华为云社区《【高并发】通过源码深度解析ThreadPoolExecutor类是如何保障线程池正确运行的》,作者: 冰 河 。 对于线程池的外围类ThreadPoolExecutor来说,有哪些重要的属性和外部类为线程池的正确运行提供重要的保障呢?明天咱们就一起来深入探讨下这些问题!! ThreadPoolExecutor类中的重要属性在ThreadPoolExecutor类中,存在几个十分重要的属性和办法,接下来,咱们就介绍下这些重要的属性和办法。 ctl相干的属性AtomicInteger类型的常量ctl是贯通线程池整个生命周期的重要属性,它是一个原子类对象,次要用来保留线程的数量和线程池的状态,咱们看下与这个属性相干的代码如下所示。 //次要用来保留线程数量和线程池的状态,高3位保留线程状态,低29位保留线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//线程池中线程的数量的位数(32-3)private static final int COUNT_BITS = Integer.SIZE - 3;//示意线程池中的最大线程数量//将数字1的二进制值向右移29位,再减去1private static final int CAPACITY = (1 << COUNT_BITS) - 1;//线程池的运行状态private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;//获取线程状态private static int runStateOf(int c) { return c & ~CAPACITY; }//获取线程数量private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) { return c < s;}private static boolean runStateAtLeast(int c, int s) { return c >= s;}private static boolean isRunning(int c) { return c < SHUTDOWN;}private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1);}private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1);}private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get()));}对于线程池的各状态阐明如下所示。 ...

August 31, 2021 · 3 min · jiezi

关于线程池:从源码分析创建线程池的4种方式

摘要:从创立线程池的源码来深入分析到底有哪些形式能够创立线程池。本文分享自华为云社区《【高并发】从源码角度剖析创立线程池到底有哪些形式》,作者:冰 河 。 在Java的高并发畛域,线程池始终是一个绕不开的话题。有些童鞋始终在应用线程池,然而,对于如何创立线程池仅仅停留在应用Executors工具类的形式,那么,创立线程池到底存在哪几种形式呢?就让咱们一起从创立线程池的源码来深入分析到底有哪些形式能够创立线程池。 应用Executors工具类创立线程池在创立线程池时,初学者用的最多的就是Executors 这个工具类,而应用这个工具类创立线程池时非常简单的,不须要关注太多的线程池细节,只须要传入必要的参数即可。Executors 工具类提供了几种创立线程池的办法,如下所示。 • Executors.newCachedThreadPool:创立一个可缓存的线程池,如果线程池的大小超过了须要,能够灵便回收闲暇线程,如果没有可回收线程,则新建线程• Executors.newFixedThreadPool:创立一个定长的线程池,能够控制线程的最大并发数,超出的线程会在队列中期待• Executors.newScheduledThreadPool:创立一个定长的线程池,反对定时、周期性的工作执行• Executors.newSingleThreadExecutor: 创立一个单线程化的线程池,应用一个惟一的工作线程执行工作,保障所有工作依照指定程序(先入先出或者优先级)执行• Executors.newSingleThreadScheduledExecutor:创立一个单线程化的线程池,反对定时、周期性的工作执行• Executors.newWorkStealingPool:创立一个具备并行级别的work-stealing线程池 其中,Executors.newWorkStealingPool办法是Java 8中新增的创立线程池的办法,它可能为线程池设置并行级别,具备更高的并发度和性能。除了此办法外,其余创立线程池的办法实质上调用的是ThreadPoolExecutor类的构造方法。 例如,咱们能够应用如下代码创立线程池。 Executors.newWorkStealingPool();Executors.newCachedThreadPool();Executors.newScheduledThreadPool(3);应用ThreadPoolExecutor类创立线程池从代码构造上看ThreadPoolExecutor类继承自AbstractExecutorService,也就是说,ThreadPoolExecutor类具备AbstractExecutorService类的全副性能。 既然Executors工具类中创立线程池大部分调用的都是ThreadPoolExecutor类的构造方法,所以,咱们也能够间接调用ThreadPoolExecutor类的构造方法来创立线程池,而不再应用Executors工具类。接下来,咱们一起看下ThreadPoolExecutor类的构造方法。 ThreadPoolExecutor类中的所有构造方法如下所示。 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}由ThreadPoolExecutor类的构造方法的源代码可知,创立线程池最终调用的构造方法如下。 ...

August 26, 2021 · 3 min · jiezi

关于线程池:高并发中那些不得不说的线程池与ThreadPoolExecutor类

摘要:从整体上意识下线程池中最外围的类之一——ThreadPoolExecutor,对于ThreadPoolExecutor的底层原理和源码实现,以及线程池中的其余技术细节的底层原理和源码实现。本文分享自华为云社区《高并发之——不得不说的线程池与ThreadPoolExecutor类浅析》,作者: 冰 河 。 既然Java中反对以多线程的形式来执行相应的工作,但为什么在JDK1.5中又提供了线程池技术呢?这个问题大家自行脑补,多动脑,必定没害处,哈哈哈。。。 说起中的线程池技术,在很多框架和异步解决中间件中都有波及,而且性能禁受起了短暂的考验。能够这样说,Java的线程池技术是Java最外围的技术之一,在Java的高并发畛域中,Java的线程池技术是一个永远绕不开的话题。既然Java的线程池技术这么重要(怎么能说是这么重要呢?那是相当的重要,那家伙老重要了,哈哈哈),那么,本文咱们就来简略的说下线程池与ThreadPoolExecutor类。 一、Thread间接创立线程的弊病(1)每次new Thread新建对象,性能差。(2)线程不足对立治理,可能无限度的新建线程,相互竞争,有可能占用过多系统资源导致死机或OOM。(3)短少更多的性能,如更多执行、定期执行、线程中断。(4)其余弊病,大家自行脑补,多动脑,没害处,哈哈。 二、线程池的益处(1)重用存在的线程,缩小对象创立、沦亡的开销,性能佳。(2)能够无效管制最大并发线程数,进步系统资源利用率,同时能够防止过多资源竞争,防止阻塞。(3)提供定时执行、定期执行、单线程、并发数管制等性能。(4)提供反对线程池监控的办法,可对线程池的资源进行实时监控。(5)其余益处,大家自行脑补,多动脑,没害处,哈哈。 三、线程池1.线程池类构造关系线程池中的一些接口和类的构造关系如下图所示。 后文会死磕这些接口和类的底层原理和源码。 2.创立线程池罕用的类——ExecutorsExecutors.newCachedThreadPool:创立一个可缓存的线程池,如果线程池的大小超过了须要,能够灵便回收闲暇线程,如果没有可回收线程,则新建线程Executors.newFixedThreadPool:创立一个定长的线程池,能够控制线程的最大并发数,超出的线程会在队列中期待Executors.newScheduledThreadPool:创立一个定长的线程池,反对定时、周期性的工作执行Executors.newSingleThreadExecutor: 创立一个单线程化的线程池,应用一个惟一的工作线程执行工作,保障所有工作依照指定程序(先入先出或者优先级)执行Executors.newSingleThreadScheduledExecutor:创立一个单线程化的线程池,反对定时、周期性的工作执行Executors.newWorkStealingPool:创立一个具备并行级别的work-stealing线程池3.线程池实例的几种状态Running:运行状态,能接管新提交的工作,并且也能解决阻塞队列中的工作Shutdown: 敞开状态,不能再接管新提交的工作,然而能够解决阻塞队列中曾经保留的工作,当线程池处于Running状态时,调用shutdown()办法会使线程池进入该状态Stop: 不能接管新工作,也不能解决阻塞队列中曾经保留的工作,会中断正在解决工作的线程,如果线程池处于Running或Shutdown状态,调用shutdownNow()办法,会使线程池进入该状态Tidying: 如果所有的工作都曾经终止,无效线程数为0(阻塞队列为空,线程池中的工作线程数量为0),线程池就会进入该状态。Terminated: 处于Tidying状态的线程池调用terminated()办法,会应用线程池进入该状态留神:不须要对线程池的状态做非凡的解决,线程池的状态是线程池外部依据办法自行定义和解决的。 4.合理配置线程的一些倡议(1)CPU密集型工作,就须要尽量压迫CPU,参考值能够设置为NCPU+1(CPU的数量加1)。(2)IO密集型工作,参考值能够设置为2*NCPU(CPU数量乘以2) 四、线程池最外围的类之一——ThreadPoolExecutor1.构造方法ThreadPoolExecutor参数最多的构造方法如下: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectHandler) 其余的构造方法都是调用的这个构造方法来实例化对象,能够说,咱们间接剖析这个办法之后,其余的构造方法咱们也明确是怎么回事了!接下来,就对此构造方法进行具体的剖析。 留神:为了更加深刻的剖析ThreadPoolExecutor类的构造方法,会适当调整参数的程序进行解析,以便于大家更能深刻的了解ThreadPoolExecutor构造方法中每个参数的作用。 上述构造方法接管如下参数进行初始化: (1)corePoolSize:外围线程数量。 (2)maximumPoolSize:最大线程数。 (3)workQueue:阻塞队列,存储期待执行的工作,很重要,会对线程池运行过程产生重大影响。 其中,上述三个参数的关系如下所示: 如果运行的线程数小于corePoolSize,间接创立新线程解决工作,即便线程池中的其余线程是闲暇的。如果运行的线程数大于等于corePoolSize,并且小于maximumPoolSize,此时,只有当workQueue满时,才会创立新的线程解决工作。如果设置的corePoolSize与maximumPoolSize雷同,那么创立的线程池大小是固定的,此时,如果有新工作提交,并且workQueue没有满时,就把申请放入到workQueue中,期待闲暇的线程,从workQueue中取出工作进行解决。如果运行的线程数量大于maximumPoolSize,同时,workQueue曾经满了,会通过回绝策略参数rejectHandler来指定解决策略。根据上述三个参数的配置,线程池会对工作进行如下解决形式: 当提交一个新的工作到线程池时,线程池会依据以后线程池中正在运行的线程数量来决定该工作的解决形式。解决形式总共有三种:间接切换、应用有限队列、应用有界队列。 间接切换罕用的队列就是SynchronousQueue。应用有限队列就是应用基于链表的队列,比方:LinkedBlockingQueue,如果应用这种形式,线程池中创立的最大线程数就是corePoolSize,此时maximumPoolSize不会起作用。当线程池中所有的外围线程都是运行状态时,提交新工作,就会放入期待队列中。应用有界队列应用的是ArrayBlockingQueue,应用这种形式能够将线程池的最大线程数量限度为maximumPoolSize,能够升高资源的耗费。然而,这种形式使得线程池对线程的调度更艰难,因为线程池和队列的容量都是无限的了。依据下面三个参数,咱们能够简略得出如何升高系统资源耗费的一些措施: 如果想升高系统资源的耗费,包含CPU使用率,操作系统资源的耗费,上下文环境切换的开销等,能够设置一个较大的队列容量和较小的线程池容量。这样,会升高线程解决工作的吞吐量。如果提交的工作常常产生阻塞,能够思考调用设置最大线程数的办法,从新设置线程池最大线程数。如果队列的容量设置的较小,通常须要将线程池的容量设置的大一些,这样,CPU的使用率会高些。如果线程池的容量设置的过大,并发量就会减少,则须要思考线程调度的问题,反而可能会升高解决工作的吞吐量。接下来,咱们持续看ThreadPoolExecutor的构造方法的参数。 (4)keepAliveTime:线程没有工作执行时最多放弃多久工夫终止当线程池中的线程数量大于corePoolSize时,如果此时没有新的工作提交,外围线程外的线程不会立刻销毁,须要期待,直到期待的工夫超过了keepAliveTime就会终止。 (5)unit:keepAliveTime的工夫单位 (6)threadFactory:线程工厂,用来创立线程默认会提供一个默认的工厂来创立线程,当应用默认的工厂来创立线程时,会使新创建的线程具备雷同的优先级,并且是非守护的线程,同时也设置了线程的名称 (7)rejectHandler:回绝解决工作时的策略 如果workQueue阻塞队列满了,并且没有闲暇的线程池,此时,持续提交工作,须要采取一种策略来解决这个工作。 线程池总共提供了四种策略: 间接抛出异样,这也是默认的策略。实现类为AbortPolicy。用调用者所在的线程来执行工作。实现类为CallerRunsPolicy。抛弃队列中最靠前的工作并执行当前任务。实现类为DiscardOldestPolicy。间接抛弃当前任务。实现类为DiscardPolicy。2.ThreadPoolExecutor提供的启动和进行工作的办法(1)execute():提交工作,交给线程池执行(2)submit():提交工作,可能返回执行后果 execute+Future(3)shutdown():敞开线程池,期待工作都执行完(4)shutdownNow():立刻敞开线程池,不期待工作执行完 3.ThreadPoolExecutor提供的实用于监控的办法(1)getTaskCount():线程池已执行和未执行的工作总数(2)getCompletedTaskCount():已实现的工作数量(3)getPoolSize():线程池以后的线程数量(4)getCorePoolSize():线程池外围线程数(5)getActiveCount():以后线程池中正在执行工作的线程数量 点击关注,第一工夫理解华为云陈腐技术~

August 16, 2021 · 1 min · jiezi

关于线程池:多线程学习Executor框架

前言Executor框架提供了组件来治理Java中的线程,Executor框架将其分为工作,线程执行工作,工作执行后果三局部。本篇文章将对Executor框架中的组件进行学习。 参考:《Java并发编程的艺术》 注释一. Executor的组件前言中曾经提到,Executor框架将线程的治理分为工作,线程执行工作,工作执行后果三局部。上面以图表模式对这三局部进行阐明。 项阐明工作Executor框架提供了Runnable接口和Callable接口,工作须要实现这两个接口能力被线程执行。线程执行工作Executor框架提供了接口Executor和继承于Executor的ExecutorService接口来定义工作执行机制。Executor框架中的线程池类ThreadPoolExecutor和ScheduledThreadPoolExecutor均实现了ExecutorService接口。工作执行后果Executor框架提供了Future接口和实现了Future接口的FutureTask类来定义工作执行后果。组件之间的类图关系如下所示。 二. ThreadPoolExecutor的解析ThreadPoolExecutor继承于AbstractExecutorService,并实现了ExecutorService接口,是Executor框架的外围类,用于治理线程。在多线程学习-线程池应用中曾经对ThreadPoolExecutor的原理,创立,执行和敞开进行了简略学习。在本大节将对ThreadPoolExecutor的具体实现进行学习。 ThreadPoolExecutor应用了原子整型ctl来示意线程池状态和Worker数量。ctl是一个原子整型,前3位示意线程池状态,后29位示意Worker数量。ThreadPoolExecutor中这部分的源码如下所示。 public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; //取整型前3位,即获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //取整型后29位,即获取Worker数量 private static int workerCountOf(int c) { return c & CAPACITY; } //依据线程池状态和Worker数量拼装ctl private static int ctlOf(int rs, int wc) { return rs | wc; } //线程池状态判断 private static boolean runStateLessThan(int c, int s) { return c < s; } //线程池状态判断 private static boolean runStateAtLeast(int c, int s) { return c >= s; } //判断线程池状态是否为RUNNING private static boolean isRunning(int c) { return c < SHUTDOWN; } ...... }在ThreadPoolExecutor中规定了线程池的状态如下。 ...

August 6, 2021 · 12 min · jiezi

关于线程池:多线程学习线程池使用

前言线程池是Java中应用较多的并发框架,正当应用线程池,能够:升高资源耗费,进步响应速度,进步线程的可管理性。本篇文章为《Java并发编程的艺术》第9章的学习笔记,依据原文作者的编写思路,顺次对线程池的原理,线程池的创立,线程池执行工作和敞开线程池进行了学习和总结。最初会给出一个线程池的实现案例,该案例为之前在某微信公众号上看到的线程池实现实战,曾经无奈讲究其出处,原案例对于线程池的敞开存在缺点,我对其进行了一些修改和阐明,分享进去一起学习。 注释一. 线程池的原理当一个工作提交到线程池ThreadPoolExecutor时,该工作的执行如下图所示。 如果以后运行的线程数小于corePoolSzie(外围线程数),则创立新线程来执行工作(须要获取全局锁);如果以后运行的线程数等于或大于corePoolSzie,则将工作退出BlockingQueue(工作阻塞队列);如果BlockingQueue已满,则创立新的线程来执行工作(须要获取全局锁);如果创立新线程会使以后线程数大于maximumPoolSize(最大线程数),则回绝工作并调用RejectedExecutionHandler的rejectedExecution()办法。因为ThreadPoolExecutor存储工作线程应用的汇合是HashSet,因而执行上述步骤1和步骤3时须要获取全局锁来保障线程平安,而获取全局锁会导致线程池性能瓶颈,因而通常状况下,线程池实现预热后(以后线程数大于等于corePoolSize),线程池的execute()办法都是执行步骤2。 二. 线程池的创立通过ThreadPoolExecutor可能创立一个线程池,ThreadPoolExecutor的构造函数签名如下。 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)通过ThreadPoolExecutor创立线程池时,须要指定线程池的外围线程数,最大线程数,线程保活工夫,线程保活工夫单位和工作阻塞队列,并按需指定线程工厂和饱和回绝策略,如果不指定线程工厂和饱和回绝策略,则ThreadPoolExecutor会应用默认的线程工厂和饱和回绝策略。上面别离介绍这些参数的含意。 参数含意corePoolSize外围线程数,即线程池的根本大小。当一个工作被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否闲暇,也需创立一个新线程来执行工作。maximumPoolSize最大线程数。当线程池中线程数大于等于corePoolSize时,新提交的工作会退出工作阻塞队列,然而如果工作阻塞队列已满且线程数小于maximumPoolSize,此时会持续创立新的线程来执行工作。该参数规定了线程池容许创立的最大线程数keepAliveTime线程保活工夫。当线程池的线程数大于外围线程数时,多余的闲暇线程会最大存活keepAliveTime的工夫,如果超过这个工夫且闲暇线程还没有获取到工作来执行,则该闲暇线程会被回收掉。unit线程保活工夫单位。通过TimeUnit指定线程保活工夫的工夫单位,可选单位有DAYS(天),HOURS(时),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(纳秒),但无论指定什么工夫单位,ThreadPoolExecutor对立会将其转换为NANOSECONDS。workQueue工作阻塞队列。线程池的线程数大于等于corePoolSize时,新提交的工作会增加到workQueue中,所有线程执行完上一个工作后,会循环从workQueue中获取工作来执行。threadFactory创立线程的工厂。能够通过线程工厂给每个创立进去的线程设置更有意义的名字。handler饱和回绝策略。如果工作阻塞队列已满且线程池中的线程数等于maximumPoolSize,阐明线程池此时处于饱和状态,应该执行一种回绝策略来解决新提交的工作。三. 线程池执行工作线程池应用两个办法执行工作,别离为execute()和submit()。execute()办法用于执行不须要返回值的工作,submit()办法用于执行须要返回值的工作。execute()是接口Executor定义的办法,submit()是接口ExecutorService定义的办法,相干类图如下所示。 ThreadPoolExecutor对execute()的实现如下。 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}AbstractExecutorService对submit()实现如下。 ...

July 15, 2021 · 7 min · jiezi

关于线程池:10问10答你真的了解线程池吗

简介: 《Java开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨ThreadPoolExecutor的一些技术细节,并且给出几个罕用的最佳实际倡议。 作者 | 风楼起源 | 阿里技术公众号 《Java开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨ThreadPoolExecutor的一些技术细节,并且给出几个罕用的最佳实际倡议。 我在查找材料的过程中,发现有些问题存在争议。前面发现,一部分起因是因为不同JDK版本的事实是有差别的。因而,上面的剖析是基于当下最罕用的版本JDK1.8,并且对于存在争议的问题,咱们剖析源码,源码才是最精确的。 1 corePoolSize=0会怎么样这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样答复这个问题的: 提交工作后,先判断以后池中线程数是否小于corePoolSize,如果小于,则创立新线程执行这个工作。否者,判断期待队列是否已满,如果没有满,则增加到期待队列。否者,判断以后池中线程数是否大于maximumPoolSize,如果大于则回绝。否者,创立一个新的线程执行这个工作。依照下面的形容,如果corePoolSize=0,则会判断期待队列的容量,如果还有容量,则排队,并且不会创立新的线程。 —— 但其实,这是老版本的实现形式,从1.6之后,实现形式就变了。咱们间接看execute的源码(submit也依赖它),我备注出了要害一行: int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); // 留神这一行代码,增加到期待队列胜利后,判断以后池内线程数是否为0,如果是则创立一个firstTask为null的worker,这个worker会从期待队列中获取工作并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);线程池提交工作后,首先判断以后池中线程数是否小于corePoolSize。如果小于则尝试创立新的线程执行该工作;否则尝试增加到期待队列。如果增加队列胜利,判断以后池内线程数是否为0,如果是则创立一个firstTask为null的worker,这个worker会从期待队列中获取工作并执行。如果增加到期待队列失败,个别是队列已满,才会再尝试创立新的线程。但在创立之前须要与maximumPoolSize比拟,如果小于则创立胜利。否则执行回绝策略。答 上述问题需辨别JDK版本。在1.6版本之后,如果corePoolSize=0,提交工作时如果线程池为空,则会立刻创立一个线程来执行工作(先排队再获取);如果提交工作的时候,线程池不为空,则先在期待队列中排队,只有队列满了才会创立新线程。 所以,优化在于,在队列没有满的这段时间内,会有一个线程在生产提交的工作;1.6之前的实现是,必须等队列满了之后,才开始生产。 2 线程池创立之后,会立刻创立外围线程么之前有人问过我这个问题,因为他发现利用中有些Bean创立了线程池,然而这个Bean个别状况下用不到,所以征询我是否须要把这个线程池正文掉,以缩小利用运行时的线程数(该利用运行时线程过多。) 答 不会。从下面的源码能够看出,在刚刚创立ThreadPoolExecutor的时候,线程并不会立刻启动,而是要等到有工作提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads当时启动外围线程。 prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.prestartAllCoreThreads:Starts all core threads.3 外围线程永远不会销毁么这个问题有点tricky。首先咱们要明确一下概念,尽管在JavaDoc中也应用了“core/non-core threads”这样的形容,但其实这是一个动静的概念,JDK并没有给一部分线程打上“core”的标记,做什么特殊化的解决。这个问题我认为想要探讨的是闲置线程终结策略的问题。 ...

June 1, 2021 · 4 min · jiezi

关于线程池:线程池ThreadPoolExecutor

一 应用线程池的益处升高资源耗费:通过重复使用线程缩小线程创立和销毁对资源的造成的耗费。进步响应速度:工作到达时,能够不须要期待线程的创立就能立刻执行。进步线程的可管理性:线程是稀缺资源,无限度的创立会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行对立的调配,调优和监控。二 线程池的参数1.外围线程数2.最大线程数3.超过外围线程数的线程放弃工夫4.线程放弃工夫的单位5.寄存期待线程的队列6.线程创立工厂7.线程的饱和策略。 三 线程池的工作原理新的工作来到,先判断线程内工作线程是否达到外围线程数,如果没有,间接创立线程执行工作。如果外围线程数已满,队列未满,工作则会退出期待队列。如果期待队列已满,并且没有达到最大线程数,会创立线程,执行工作。如果期待队列已满,也曾经打到了最大线程数,会触发饱和策略 四 有哪些饱和策略• ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来回绝新工作的解决。• ThreadPoolExecutor.CallerRunsPolicy:调用执行本人的线程运行工作,也就是间接在调用execute办法的线程中运行(run)被回绝的工作,如果执行程序已敞开,则会抛弃该工作。因而这种策略会升高对于新工作提交速度,影响程序的整体性能。如果您的应用程序能够接受此提早并且你要求任何一个工作申请都要被执行的话,你能够抉择这个策略。在以后的调用线程执行工作。提交会提早。• ThreadPoolExecutor.DiscardPolicy: 不解决新工作,间接抛弃掉。• ThreadPoolExecutor.DiscardOldestPolicy: 此策略将抛弃最早的未解决的工作申请。默认的回绝策略是AbortPolicy 抛出RejectedExecutionException。 五 一些比照1.Runnable VS CallableRunnable 不会返回后果或者抛出异样、Callable能够。2.execute() Vs submit();execute() 提交不须要返回值的工作。submit() 提交须要返回值得工作。会返回Future<T> 对象。能够调用 future.get() 获取后果,get()办法会阻塞以后线程。get(long timeout,TimeUnit unit)办法能够阻塞一段时间后就返回,但线程可能并未实现,拿不到后果。3.shutdown() Vs shutdownNow()shutdown: 敞开线程池,线程池状态变为shutdown,不承受新工作,但会期待队列里的工作执行结束。shutdownNow : 敞开线程池,线程池状态变为STOP,会终止正在运行的工作,进行解决排队的工作,并返回期待的List。4.isTerminated() Vs isShutDown();isShutDown 当调用 shutdown()之后就会返回true;isTerminated() ,当调用shutdown()之后,等所有工作执行结束才会返回true; 六 几种常见的线程池及问题1.FixedThreadPool 固定线程池。外围线程数等于最大线程数,无界队列 Integer.MAX_VALUE 工作多可能OOM2.SingleThreadPool 单例线程池。 外围线程数和最大线程数为1 无界队列 工作多可能OOM3.cacheThreadPool 缓存线程池。外围线程数为0 最大线程数为Integer.MAX_VALUE 同步队列,线程过多 七 外围线程数的大小设置计算密集型:CPU个数+1。I/O密集型:2倍CPU个数。

May 25, 2021 · 1 min · jiezi

关于线程池:图解定时任务线程池

线程池概念咱们上篇文章剖析了ThreadPoolExecutor,如果要用一句话阐明它的次要劣势,就是线程置换。还有Executors工具类,极大的简化了研发人员工作。 我用一个图反复形容下线程池概念。多生产-多生产模型。 生产者将线程工作丢进线程池中,生产者就就完结了。线程池管制消费者生产元素,消费者能够是1个或者多个,取决于线程池参数corePoolSize和maxPoolSize设置。阻塞队列是用来装生产者丢进去的线程工作,如ArrayBlockingQueue,LinkedBlockingQueue,DelayedQueue等。如果生产者生产能力超过消费者生产能力,如果阻塞队列有长度限度并且超过队列长度线程池会执行饱和策略,如果队列没有长度限度,可也能呈现OOM哦,因为线程工作可能把内存都撑爆了,这也是面试常考点哦!具体概念能够翻看我上一篇文章《线程池面试必考问题》。 定时工作延时原理还记得咱们下面说的阻塞队列吗?定时工作线程池底层应用DelayedQueue实现的,这种提早队列有一个最大的特点:按时出队列,大家都考过驾照吧,科目三考试的时候都是车上坐的是4集体,假如一个人考试须要花15分钟,那么考试学员队列看起来是这样的。 DelayedQueue底层须要实现Delayed接口同时须要实现getDelay办法和compareTo办法,getDelay办法用于计算出队列工夫,一旦小于0就会出队列;compareTo办法用于按触发工夫从小到大排序。这就是Schedule线程池工作延时原理,如果须要看案例代码,请参考我文章《并发队列:PriorityBlockingQueue和DelayQueue案例应用》。 scheduleWithFixedDelay和scheduleAtFixedRate区别 由上图可知:假如线程工作:耗时1秒,定时3秒执行,scheduleWithFixedDelay其实是4秒执行一次。 scheduleWithFixedDelay:是以工作完结工夫周期运行。scheduleAtFixedRate:是以固定周期运行。FutureTask获取返回值在ScheduledThreadPoolExecutor中,ScheduledFutureTask是获取定时工作返回值,继承FutureTask。咱们看下FutureTask调用get阻塞简化流程图。 向线程池增加工作,工作被封装成ScheduledFutureTask并且实现Callable接口是为了获取工作返回值。当客户端线程通过get形式获取线程池线程执行的返回值,客户端线程会阻塞直到线程池线程工作执行完。在理论使用,咱们个别拿返回值测试多线程性能。 Timer比拟Timer是单线程,而且不带返回值。ScheduledThreadPoolExecutor是多线程的,采纳线程复用代替创立新的线程,并且FutureTask带返回值。Timer线程调用sche办法,如果TimerTask出异样,Timer单线程间接挂掉退出,而ScheduledThreadPoolExecutor会捕捉了异样,不影响其余消费者线程。上面是代码测试。import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.TimeUnit;/** * @author :jiaolian * @date :Created in 2021-02-25 13:50 * @description:Timer工作异样,Timer线程退出! * @modified By: * 公众号:叫练 */public class TimerTaskExceptionTest { public static void main(String[] args) throws InterruptedException { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("first task"); } },0,1000); TimeUnit.SECONDS.sleep(3); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("second task"); int x = 5/0; } },0,1000); }}如上代码:timer提交了first_task和second_task两个工作,3秒后,second_task执行5/0会抛出异样,此时timer线程会退出。如果换成ScheduledThreadPoolExecutor则不会影响first_task。在理论利用中,举荐用定时工作线程池中的办法去解决工作。 ...

February 25, 2021 · 1 min · jiezi

关于线程池:程序员不得不知的线程池附10道面试题

为什么要用线程池呢?上面是一段创立线程并运行的代码: for (int i = 0; i < 100; i++) { new Thread(() -> { System.out.println("run thread->" + Thread.currentThread().getName()); userService.updateUser(....); }).start();} 咱们想应用这种形式去做异步,或者进步性能,而后将某些耗时操作放入一个新线程去运行。 这种思路是没问题的,然而这段代码是存在问题的,有哪些问题呢?上面咱们就来看看有哪些问题; 创立销毁线程资源耗费;咱们应用线程的目标本是出于效率思考,能够为了创立这些线程却耗费了额定的工夫,资源,对于线程的销毁同样须要系统资源。cpu资源无限,上述代码创立线程过多,造成有的工作不能即时实现,响应工夫过长。线程无奈治理,无节制地创立线程对于无限的资源来说仿佛成了“得失相当”的一种作用。既然咱们下面应用手动创立线程会存在问题,那有解决办法吗? 答案:有的,应用线程池。 线程池介绍线程池(Thread Pool):把一个或多个线程通过对立的形式进行调度和重复使用的技术,防止了因为线程过多而带来应用上的开销。 线程池有什么长处?升高资源耗费。通过反复利用已创立的线程升高线程创立和销毁造成的耗费。进步响应速度。当工作达到时,工作能够不须要等到线程创立就能立刻执行。进步线程的可管理性。线程池应用在JDK中rt.jar包下JUC(java.util.concurrent)创立线程池有两种形式:ThreadPoolExecutor 和 Executors,其中 Executors又能够创立 6 种不同的线程池类型。 ThreadPoolExecutor 的应用 线程池应用代码如下: import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ThreadPoolDemo { private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(100)); public static void main(String[] args) { threadPoolExecutor.execute(new Runnable() { @Override public void run() { System.out.println("田学生您好"); } }); }} ...

January 28, 2021 · 3 min · jiezi

关于线程池:不看后悔的项目中线程池实际应用

前言:最近在看线程池方面的内容,联合源码学习完其外部原理后,心想本人在我的项目中有理论应用过线程池吗?想了想,的确在我的项目中很多中央应用到了线程池;上面来简略聊下最近在日志方面中多线程的利用: 服务接口日志异步线程池化入库解决定时工作中应用多线程进行日志清理 本文主线:①、线程池基本原理解读; ②、线程池理论利用例子: 线程池利用 Demo 我的项目构造形容服务接口日志异步线程池化入库解决定时工作中应用多线程进行日志清理线程池基本原理解读:啥也不说,先贴一张脑图,通过脑图对线程池疾速的进行理解;除了看图外,也能够通过此文章《ava线程池实现原理及其在美团业务中的实际》对线程池进行具体的理解; 线程池理论利用例子:上面就来聊聊最近在我的项目日志中线程池的利用;线程池利用Demo我的项目形容:Demo 地址:https://github.com/leishen6/s... 下面说的两种日志方面线程池利用曾经写好了Demo,是一个 SpringBoot 我的项目,我的项目构造如下图: 服务接口日志异步线程池化入库解决:后盾服务接口我的项目中,常常须要对接口的申请报文和响应报文日志做入库保留;上面将通过比照 一般形式入库操作和线程池形式入库操作 ,来说说为什么线程池式入库更加优雅; 一般形式入库操作:一般入库就是间接进行完业务逻辑解决并构建好响应后同时将日志进行入数据库,入库胜利后再将响应返回; 流程图如下: 然而这样存在一个很大的弊病就是因为多了一次数据库操作(日志入库),进而可能会导致响应速度比较慢; 上面就聊聊怎么通过线程池对日志入库进行优化,晋升接口的响应速度; 线程池形式入库操作:线程池形式入库,能够将日志间接放入到队列中,而后就间接返回响应,最初应用线程池中的线程取出队列中的日志数据异步做入库操作; 流程图如下: 应用线程池形式,解决申请的主线程能够将日志放入到队列后,间接将响应返回,而后再应用线程池中的线程取出队列中的日志数据异步的将其进行入库;因为缩小了一次数据库操作,会极大的晋升接口响应速度。 上面来看看代码实现:1、下面说到的寄存申请报文和响应报文日志的队列: LinkedBlockingDeque // 基于链表的双向阻塞队列,在队列的两端都能够插入和移除元素,是线程平安的,多线程并发下效率更高BlockingQueue<TestLogBean> queue = new LinkedBlockingDeque<TestLogBean>(MAX_QUEUE_SIZE);除了 LinkedBlockingDeque 阻塞队列外,还有一些其它常常会用到的阻塞队列,如下图: 2、我的项目中进行日志入库操作的线程池: 单线程的线程池 + 固定数线程的线程池 单线程的线程池:用来循环的监听队列中的日志数量以及决策什么时候将队列中的日志取出交由固定数线程的线程池做入库操作;固定数线程的线程池:次要用来进行日志的入库操作;局部代码实现如下: /** * 初始化 */public void init(){ // 基于链表的双向阻塞队列,在队列的两端都能够插入和移除元素,是线程平安的,多线程并发下效率更高 queue = new LinkedBlockingDeque<TestLogBean>(MAX_QUEUE_SIZE); lastExecuteTime = System.currentTimeMillis(); logger.info("LogPoolManager init successfully......"); logManagerThreadPool.execute(new Runnable() { @Override public void run() { while (run.get()){ try { // 线程休眠,具体工夫依据我的项目的理论状况配置 Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { logger.error("log Manager Thread sleep fail ", e); } // 满足寄存了10个日志 或 满足工夫距离曾经大于设置的最大工夫距离时 执行日志插入 if (logCount.get() >= BATCH_SIZE || (System.currentTimeMillis() - lastExecuteTime) > MAX_EXE_TiME) { if (logCount.get() > 0) { logger.info("begin drain log queue to database..."); List<TestLogBean> list = new ArrayList<TestLogBean>(); /** * drainTo (): 一次性从BlockingQueue获取所有可用的数据对象(还能够指定获取数据的个数), * 通过该办法,能够晋升获取数据效率;不须要屡次分批加锁或开释锁。 * 将取出的数据放入指定的list汇合中 */ queue.drainTo(list); // 工作队列 中工作数量置为0 logCount.set(0); // 从线程池中取出线程执行日志插入 logWorkerThreadPool.execute(new InsertThread(testLogService, list)); logger.info("end drain log queue to database..."); } // 获取以后执行的工夫 lastExecuteTime = System.currentTimeMillis(); } } logger.info("LogPoolManager shutdown successfully"); } });}本我的项目中测试服务接口日志异步线程池化入库解决,我的项目启动后,在浏览器输出上面URL,并刷新页面即可:http://127.0.0.1:8081/v1/api/log/test ...

December 30, 2020 · 2 min · jiezi

关于线程池:高并发下的大数据处理多线程数据分析实例

之前我的项目中遇到的问题:须要对单日告警量超过四百万条的数据进行逐条剖析和逻辑解决,导致靠繁多过程跑数据基本跑不完,无奈满足零碎和业务要求,因而决定采纳开多线程形式实现大数据处理。数据处理流程:1、申明一个内存队列2、从库中轮巡取数据放入内存队列3、开多个线程逐条取内存队列中的数据,剖析后在库中对该条数据打上标识,下次不取了。 main办法程序入口 /** * */package cn.com.starit.main;import org.apache.log4j.Logger;import cn.com.starit.ge.persistence.cache.AllCacheRefresh;/** * ================================================= <br> * 工程:GessAlarmAnalysis <br> * 类名:Main <br> * 作者:xt<br> * 工夫:2019-10-08上午02:31:17<br> * 版本:Version 1.0 <br><br> * 形容:告警解析入口<br> * ================================================= <br> */public class Main { private static final Logger logger = Logger.getLogger(Main.class); private static Main instance = null; private Main(){} /** * 零碎入口 * @param args */ public static void main(String[] args) { logger.info("过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576))); Main.getInstance().sysInit(); logger.info("零碎初始化结束:过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576))); logger.info("过程最大内存:" + (int)(Runtime.getRuntime().maxMemory()/(1048576))); //从库中取出流动告警存入内存队列 new LoadGesBsmThread().start(); //告警解析过程 new alarmAnalysis(10).start(); } /** * 零碎初始化 */ public void sysInit() { // 加载静态数据 AllCacheRefresh.refreshAll(); } /** * @return the instance */ public static Main getInstance() { if(null == instance) instance = new Main(); return instance; }}创立内存队列实例及办法 ...

December 23, 2020 · 13 min · jiezi

关于线程池:并发线程池原理与应用

java 线程池常识汇总.线程池参数含意int corePoolSize外围线程数,也即失常状况工作线程数 int maximumPoolSize,最大线程数 long keepAliveTime,须要联合阻塞队列来了解:假如阻塞队列的长度是3,外围数是2,最大线程数是5. 运行时是这样的:大于外围数时,会放到阻塞队列外面排队,如果队列满了才会启用新的工作线程,直到达到最大线程数当达到最大线程数时,如果此时submit到线程池的工作变慢了,外围线程可能应答工作的话,这时线程池会动静缩小工作线程数到外围线程数。这里的keepAliveTime 就是指 除外围线程以外的那几个线程的闲暇工夫。如果大于这个参数所指定的,线程池则会回收这些线程 TimeUnit unit,第三个参数的单位 BlockingQueue<Runnable> workQueue,比外围线程数多进去的线程会进入阻塞队列排队。别用 Executors.newFixedThreadPool 办法结构, 默认指定的阻塞队列(LinkedBlockingQueue)大小是Integer.MAX_VALUE,需显示指定 ThreadFactory threadFactory,用默认的就行,有需要的话辨别下线程名字 RejectedExecutionHandler handler回绝策略: AbortPolicy(默认): 间接抛异样DiscardPolicy: 随机抛弃DiscardOldestPolicy: 抛弃最老的线程CallerRunsPolicy:将执行权回退给调用者线程。 源码剖析准备常识(位运算)原码(带符号位): 最高位为符号位, 正数为1,负数为0反码: 原码除符号位,其余位取反.补码(次要用于示意正数): 带符号的正数反码 + 1, 次要为了打消-0,只有正数才用补码示意. refer: https://www.zhihu.com/questio...线程状态如何保留线程池中,用前三位代表运行状态,用后29位代表工作线程数. // 29 private static final int COUNT_BITS = Integer.SIZE - 3; //1(原码示意)左移29位 + (-1)的补码 (32位1), 前三位都为0, 后29位为1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //即取后29位 private static int workerCountOf(int c) { return c & CAPACITY; } //CAPACITY 取反即前三位是1, 后29位是0, 故任何数与 ~CAPACITY 做与运算都只保留前3位. private static int runStateOf(int c) { return c & ~CAPACITY; }工作治理submit 提交工作后的执行逻辑 ...

December 22, 2020 · 2 min · jiezi

关于线程池:C-线程池

C++ 实现线程池

December 4, 2020 · 1 min · jiezi

关于线程池:Java并发编程面试必备之线程池

什么是线程池是一种基于池化思维治理线程的工具。池化技术:池化技术简略点来说,就是提前保留大量的资源,以备不时之需。比方咱们的对象池,数据库连接池等。 线程池益处咱们为什么要应用线程池,间接new thread start不好吗? 升高资源耗费: 通过反复利用已创立的线程来升高线程创立和销毁所造成的耗费。进步响应速度: 工作达到时,能够立刻执行,不须要等到线程创立再来执行工作。进步线程的可管理性: 线程是稀缺资源,如果无限度创立,不仅会耗费系统资源,还会因为线程的不合理散布导致资源调度失衡,升高零碎的稳定性。应用线程池能够进行对立的调配、调优和监控。线程池的执行流程咱们先来看看线程池的一个执行流程图,此图来自文末参考1 通过上述图咱们能够得出线程池执行工作能够有以下几种状况: 如果以后的运行线程小于coreSize,则创立新线程来执行工作。如果以后运行的线程等于coreSize或多余coreSize(动静批改了coreSize才会呈现这种状况),把工作放到阻塞队列中。如果队列已满无奈将新退出的工作放进去的话,则须要创立新的线程来执行工作。如果新创建线程曾经达到了最大线程数,工作将会被回绝。怎么是用线程池在java jdk的Executors有提供创立不同线程池的办法(个别不举荐这种做法)阿里巴巴的开发手册也明确强制规定不让通过Executors来创立的,在一些公司的开发标准外面应该也会有这么一条吧。 newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPoolnewScheduledThreadPoolnewWorkStealingPool (jdk1.8新增的)咱们能够应用ThreadPoolExecutor来创立线程池 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 咱们能够看出创立线程池有七个参数,而上述咱们通过Executors工具类来创立的线程池就一两个参数,其余参数它都帮咱们默认写死了,咱们只有真正了解了这几个参数能力更好的去是用线程池。上面咱们来看看这七个参数(线程池参数)。 corePoolSize外围线程数(线程池的根本大小)当咱们提交一个工作到线程池时就会创立一个线程来执行工作.当咱们须要执行的工作数大于外围线程数了就不再创立,如果咱们调用了prestartAllCoreThreads()办法线程池就会为咱们提前创立好所有的根本线程。 maximumPoolSize最大线程数:线程池容许创立的最大线程数。如果队列曾经满了,且已创立的线程数小于最大线程数,则线程池就会创立新的线程来执行工作。这里有个小知识点,如果咱们的队列是用的无界队列,这个参数是不会起作用的,因为咱们的工作会始终往队列中加,队列永远不会满(内存容许的状况)。keepAliveTime闲暇线程最大生存工夫。以后线程数大于外围线程数时,完结多余的闲暇线程期待新工作的最长工夫。默认状况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程闲暇的工夫达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。然而如果调用了allowCoreThreadTimeOut(boolean)办法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;比方以后线程池中最大线程数(maximumPoolSize)为50,外围线程数(corePoolSize)为10,以后正在跑工作的线程数为30.而后是不是空出了20个线程没活干,所以这20个线程就要被消毁,有点卸磨杀驴的感觉。如果剩下的30个线程干完活了也劳动了keepAliveTime这么久,而后这30个线程外面也要被销毁20个,就保留个外围线程。如果设置了allowCoreThreadTimeOut等于true外围线程也会被销毁。就跟咱们做外包我的项目一样,甲方我的项目实现了就得去另外一个甲方,如果短时间内都没有甲方接收你的话,你就要被解雇了,只会留下几个外围人员保护下我的项目,如果甲方我的项目保护的话用本人的人的话,所有的外包人会都会被解雇。 unit线程存活工夫的的单位。可选的单位有days、hours等。workQueue工作队列。能够抉择以下这些队列 threadFactory用户设置创立线程的工厂,咱们能够通过这个工厂来创立有业务意义的线程名字。咱们能够比照下自定义的线程工厂和默认的线程工厂创创立的名字。 默认产生线程的名字自定义线程工厂产生名字pool-5-thread-1testPool-1-thread-1阿里开发手册也有明确说到,须要指定有意义的线程名字。 RejectedExecutionHandler线程池回绝策略。当队列和线程池都满了阐明线程池曾经处于饱和状态。 必须要采取肯定的策略来解决新提交的工作。jdk默认提供了四种回绝策略:其实咱们也能够自定义工作回绝策略(实现下RejectedExecutionHandler接口),比如说如果工作回绝了咱们能够记录下日志,或者重试等,依据本人的业务需要来实现。 dubbo 工作回绝策略 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: " + "%d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); dispatchThreadPoolExhaustedEvent(msg); throw new RejectedExecutionException(msg); }咱们能够看出dubbo的回绝策略次要记录了具体的级别为warm的日志、输入以后线程堆栈详情、持续抛出回绝工作异样。 ...

November 6, 2020 · 1 min · jiezi

转载面试必问的线程池你懂了吗

前言 在上次和二狗的“HashMap 最强者”PK后,二狗始终耿耿于怀,常常缠着我要复仇,甚至违心出卖本人的屁股???我破口大骂:“这个死基佬”,而后许可了他... 于是“独身狗大厦11楼11室”又是一场血雨腥风。 注释 二狗:为什么要应用线程池?间接new个线程不是很难受? 如果咱们在办法中间接new一个线程来解决,当这个办法被调用频繁时就会创立很多线程,不仅会耗费系统资源,还会升高零碎的稳定性,一不小心把零碎搞崩了,就能够间接去财务那结帐了。 如果咱们正当的应用线程池,则能够防止把零碎搞崩的困境。总得来说,应用线程池能够带来以下几个益处: 升高资源耗费。通过反复利用已创立的线程,升高线程创立和销毁造成的耗费。进步响应速度。当工作达到时,工作能够不须要等到线程创立就能立刻执行。减少线程的可管理型。线程是稀缺资源,应用线程池能够进行统一分配,调优和监控。二狗:线程池的外围属性有哪些? threadFactory(线程工厂):用于创立工作线程的工厂。 corePoolSize(外围线程数):当线程池运行的线程少于 corePoolSize 时,将创立一个新线程来解决申请,即便其余工作线程处于闲暇状态。 workQueue(队列):用于保留工作并移交给工作线程的阻塞队列。 maximumPoolSize(最大线程数):线程池容许开启的最大线程数。 handler(回绝策略):往线程池增加工作时,将在上面两种状况触发回绝策略:1)线程池运行状态不是 RUNNING;2)线程池曾经达到最大线程数,并且阻塞队列已满时。 keepAliveTime(放弃存活工夫):如果线程池以后线程数超过 corePoolSize,则多余的线程闲暇工夫超过 keepAliveTime 时会被终止。 二狗:说下线程池的运作流程 我给你画张图吧。 二狗:(尼玛,这图也太香了,珍藏珍藏)线程池中的各个状态别离代表什么含意?线程池目前有5个状态: RUNNING:承受新工作并解决排队的工作。SHUTDOWN:不承受新工作,但解决排队的工作。STOP:不承受新工作,不解决排队的工作,并中断正在进行的工作。TIDYING:所有工作都已终止,workerCount 为零,线程转换到 TIDYING 状态将运行 terminated() 钩子办法。TERMINATED:terminated() 已实现。二狗:这几个状态之间是怎么流转的? 我再给你画个图,看好了! 二狗:(这图也不错,珍藏就对了)线程池有哪些队列? 常见的阻塞队列有以下几种: ArrayBlockingQueue:基于数组构造的有界阻塞队列,按先进先出对元素进行排序。 LinkedBlockingQueue:基于链表构造的有界/无界阻塞队列,按先进先出对元素进行排序,吞吐量通常高于 ArrayBlockingQueue。Executors.newFixedThreadPool 应用了该队列。 SynchronousQueue:不是一个真正的队列,而是一种在线程之间移交的机制。要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在期待承受这个元素。如果没有线程期待,并且线程池的以后大小小于最大值,那么线程池将创立一个线程,否则依据回绝策略,这个工作将被回绝。应用间接移交将更高效,因为工作会间接移交给执行它的线程,而不是被放在队列中,而后由工作线程从队列中提取工作。只有当线程池是无界的或者能够回绝工作时,该队列才有理论价值。Executors.newCachedThreadPool应用了该队列。 PriorityBlockingQueue:具备优先级的无界队列,按优先级对元素进行排序。元素的优先级是通过天然程序或 Comparator 来定义的。 二狗:应用队列有什么须要留神的吗? 应用有界队列时,须要留神线程池满了后,被回绝的工作如何解决。 应用无界队列时,须要留神如果工作的提交速度大于线程池的处理速度,可能会导致内存溢出。 二狗:线程池有哪些回绝策略? 常见的有以下几种: AbortPolicy:停止策略。默认的回绝策略,间接抛出 RejectedExecutionException。调用者能够捕捉这个异样,而后依据需要编写本人的解决代码。 DiscardPolicy:摈弃策略。什么都不做,间接摈弃被回绝的工作。 DiscardOldestPolicy:摈弃最老策略。摈弃阻塞队列中最老的工作,相当于就是队列中下一个将要被执行的工作,而后从新提交被回绝的工作。如果阻塞队列是一个优先队列,那么“摈弃最旧的”策略将导致摈弃优先级最高的工作,因而最好不要将该策略和优先级队列放在一起应用。 CallerRunsPolicy:调用者运行策略。在调用者线程中执行该工作。该策略实现了一种调节机制,该策略既不会摈弃工作,也不会抛出异样,而是将工作回退到调用者(调用线程池执行工作的主线程),因为执行工作须要肯定工夫,因而主线程至多在一段时间内不能提交工作,从而使得线程池有工夫来解决完正在执行的工作。 二狗:线程只能在工作达到时才启动吗? 默认状况下,即便是外围线程也只能在新工作达到时才创立和启动。然而咱们能够应用 prestartCoreThread(启动一个外围线程)或 prestartAllCoreThreads(启动全副外围线程)办法来提前启动外围线程。 二狗:外围线程怎么实现始终存活? 阻塞队列办法有四种模式,它们以不同的形式解决操作,如下表。 抛出异样 返回非凡值 始终阻塞 超时退出 插入 add(e) ...

July 10, 2020 · 2 min · jiezi

线程池技术相关

线程池技术相关: 线程池的工作原理与源码解读一、线程池创建二、线程池执行流程 1、先看一下线程池的executor方法2、再看下addWorker的方法实现3、再到Worker里看看其实现4、接下来咱们看看runWorker方法的逻辑5、最后在看看getTask方法实现ThreadPoolExecutor 的八种拒绝策略 | 含番外!JDK内置4种线程池拒绝策略 CallerRunsPolicy(调用者运行策略)AbortPolicy(中止策略)DiscardPolicy(丢弃策略)DiscardOldestPolicy(弃老策略)第三方实现的拒绝策略 dubbo中的线程拒绝策略Netty中的线程池拒绝策略activeMq中的线程池拒绝策略pinpoint中的线程池拒绝策略结语

June 17, 2020 · 1 min · jiezi

程序员楼下闲聊某次jvm崩溃排查

大望路某写字楼下。猿A:上家公司的时候,我们组那个项目,每天半夜会跑个大批量数据处理的定时任务,然后程序经常崩溃。我:哦?那怎么处理的猿A:当时的架构有点水,说让调整“伊甸园”和“from-to”的比例……崩溃和这个就没关系我:少年,你成功引起了我的注意。来来来,请你喝饮料,好好聊聊当时的情况。业务场景“猿A”是我的同事兼死党,和他详聊后大概明白了当时的场景。 据我理解,那个定时任务,会从hive里拿出超多的数据(据说2亿左右),按具体业务做数据整合和处理,最终推送到es(elasticsearch)中。(hive什么的我没搞过,但不妨碍要讨论的东西) 业务处理部分,使用了线程池FixedThreadPool。 模拟解决过程问题定位猿A:那时候怀疑是内存OOM导致的jvm崩溃,进而怀疑大量对象GC回收不了,于是打了GC日志。我:嗯,没用hs_err_pid_xxx.log分析吗?猿A:当时小,还不会这个技能……死党“猿A”当时的解决过程比较粗暴,有了怀疑就直接在启动参数增加了-XX:+PrintGC。此命令会打印GC日志,姑且认为生产环境使用GC是CMS,写个demo模拟当时的场景。 public class CMSGCLogs { //启动参数:-Xmx10m -Xms10m -Xmn4M -XX:+PrintGC -XX:+UseConcMarkSweepGC public static void main(String[] args) throws InterruptedException { // 线程数设置为1,起名`T-1` ExecutorService es = Executors.newFixedThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r,"T-1"); } }); boolean flag = true; while (flag){ es.execute(()->{ try { byte[] bytes = new byte[1024*1000*1]; //模拟从hive中读取了大量数据(1M) TimeUnit.MILLISECONDS.sleep(50L); //模拟写入es过程 } catch (Exception e) { e.printStackTrace(); } }); } es.shutdown(); }}先背一段线程池的处理过程。 ...

September 9, 2019 · 3 min · jiezi

周期性线程池与主要源码解析

之前学习ThreadPool的使用以及源码剖析,并且从面试的角度去介绍知识点的解答。今天给大家介绍下周期性线程池的使用和重点源码剖析。ScheduledThreadPoolExecutorScheduledThreadPoolExecutor:用来处理延时任务或定时任务定时线程池类的类结构图 ScheduledThreadPoolExecutor接收ScheduleFutureTask类型的任务,是线程池调度任务的最小单位。它采用DelayQueue存储等待的任务:1、DelayQueue内部封装成一个PriorityQueue,它会根据time的先后时间顺序,如果time相同则根绝sequenceNumber排序;2、DelayQueue是无界队列; ScheduleFutureTask接收的参数: private final long sequenceNumber;//任务的序号private long time;//任务开始的时间private final long period;//任务执行的时间间隔工作线程的的执行过程:工作线程会从DelayQueue取出已经到期的任务去执行;执行结束后重新设置任务的到期时间,再次放回DelayQueue; ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体的排序算法实现如下: public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; //首先按照time排序,time小的排到前面,time大的排到后面 long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; //time相同,按照sequenceNumber排序; //sequenceNumber小的排在前面,sequenceNumber大的排在后面 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}接下来看看ScheduledFutureTask的run方法实现, run方法是调度task的核心,task的执行实际上是run方法的执行。 ...

August 19, 2019 · 2 min · jiezi

手撕ThreadPoolExecutor线程池源码

这篇文章对ThreadPoolExecutor创建的线程池如何操作线程的生命周期通过源码的方式进行详细解析。通过对execute方法、addWorker方法、Worker类、runWorker方法、getTask方法、processWorkerExit从源码角度详细阐述,文末有彩蛋。exexcte方法public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); /** * workerCountOf方法取出低29位的值,表示当前活动的线程数; * 如果当前活动的线程数小于corePoolSize,则新建一个线程放入线程池中,并把该任务放到线程中 */ if (workerCountOf(c) < corePoolSize) { /** * addWorker中的第二个参数表示限制添加线程的数量 是根据据corePoolSize 来判断还是maximumPoolSize来判断; * 如果是ture,根据corePoolSize判断 * 如果是false,根据maximumPoolSize判断 */ if (addWorker(command, true)) return; /** * 如果添加失败,则重新获取ctl值 */ c = ctl.get(); } /** * 如果线程池是Running状态,并且任务添加到队列中 */ if (isRunning(c) && workQueue.offer(command)) { //double-check,重新获取ctl的值 int recheck = ctl.get(); /** * 再次判断线程池的状态,如果不是运行状态,由于之前已经把command添加到阻塞队列中,这时候需要从队列中移除command; * 通过handler使用拒绝策略对该任务进行处理,整个方法返回 */ if (!isRunning(recheck) && remove(command)) reject(command); /** * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法; * 第一个参数为null,表示在线程池中创建一个线程,但不去启动 * 第二个参数为false,将线程池的线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断 */ else if (workerCountOf(recheck) == 0) addWorker(null, false); /** * 执行到这里,有两种情况: * 1、线程池的状态不是RUNNING; * 2、线程池状态是RUNNING,但是workerCount >= corePoolSize, workerQueue已满 * 这个时候,再次调用addWorker方法,第二个参数传false,将线程池的有限线程数量的上限设置为maximumPoolSize; * 如果失败则执行拒绝策略; */ } else if (!addWorker(command, false)) reject(command);}简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下: ...

August 18, 2019 · 4 min · jiezi

Java并发22并发设计模式-ThreadPerMessage-与-Worker-Thread-模式

我们曾经把并发编程领域的问题总结为三个核心问题:分工、同步和互斥。其中,同步和互斥相关问题更多地源自微观,而分工问题则是源自宏观。我们解决问题,往往都是从宏观入手,同样,解决并发编程问题,首要问题也是解决宏观的分工问题。 并发编程领域里,解决分工问题也有一系列的设计模式,比较常用的主要有 Thread-Per-Message 模式、Worker Thread 模式、生产者 - 消费者模式等等。今天我们重点介绍 Thread-Per-Message 模式。 如何理解 Thread-Per-Message 模式比如写一个 HTTP Server,很显然只能在主线程中接收请求,而不能处理 HTTP 请求,因为如果在主线程中处理 HTTP 请求的话,那同一时间只能处理一个请求,太慢了!怎么办呢?可以利用代办的思路,创建一个子线程,委托子线程去处理 HTTP 请求。 这种委托他人办理的方式,在并发编程领域被总结为一种设计模式,叫做Thread-Per-Message 模式,简言之就是为每个任务分配一个独立的线程。这是一种最简单的分工方法,实现起来也非常简单。 用 Thread 实现 Thread-Per-Message 模式Thread-Per-Message 模式的一个最经典的应用场景是网络编程里服务端的实现,服务端为每个客户端请求创建一个独立的线程,当线程处理完请求后,自动销毁,这是一种最简单的并发处理网络请求的方法。 下面我们就以 echo 程序的服务端为例,介绍如何实现 Thread-Per-Message 模式。 final ServerSocketChannel ssc = ServerSocketChannel.open().bind( new InetSocketAddress(8080));// 处理请求 try { while (true) { // 接收请求 SocketChannel sc = ssc.accept(); // 每个请求都创建一个线程 new Thread(()->{ try { // 读 Socket ByteBuffer rb = ByteBuffer .allocateDirect(1024); sc.read(rb); // 模拟处理请求 Thread.sleep(2000); // 写 Socket ByteBuffer wb = (ByteBuffer)rb.flip(); sc.write(wb); // 关闭 Socket sc.close(); }catch(Exception e){ throw new UncheckedIOException(e); } }).start(); }} finally { ssc.close();} 如果你熟悉网络编程,相信你一定会提出一个很尖锐的问题:上面这个 echo 服务的实现方案是不具备可行性的。原因在于 Java 中的线程是一个重量级的对象,创建成本很高,一方面创建线程比较耗时,另一方面线程占用的内存也比较大。所以,为每个请求创建一个新的线程并不适合高并发场景。 ...

July 12, 2019 · 2 min · jiezi

Java并发19并发设计模式-ThreadLocal-线程本地存储模式

我们曾经重复说到,多个线程同时读写同一共享变量存在并发问题。前面两篇文章我们突破的是写,没有写操作自然没有并发问题了。其实还可以突破共享变量,没有共享变量也不会有并发问题。 那如何避免共享呢?思路其实很简单,并发编程领域,就是每个线程都拥有自己的变量,彼此之间不共享,也就没有并发问题了。 我们知道局部变量可以做到避免共享, 即线程封闭,其本质上就是避免共享。那还有没有其他方法可以做到呢?有的Java 语言提供的线程本地存储(ThreadLocal)就能够做到 ThreadLocal 的使用方法下面这个静态类 ThreadId 会为每个线程分配一个唯一的线程 Id,如果一个线程前后两次调用 ThreadId 的 get() 方法,两次 get() 方法的返回值是相同的。但如果是两个线程分别调用 ThreadId 的 get() 方法,那么两个线程看到的 get() 方法的返回值是不同的。若你是初次接触 ThreadLocal,可能会觉得奇怪,为什么相同线程调用 get() 方法结果就相同,而不同线程调用 get() 方法结果就不同呢? static class ThreadId { static final AtomicLong nextId=new AtomicLong(0); // 定义 ThreadLocal 变量 static final ThreadLocal<Long> tl=ThreadLocal.withInitial( ()->nextId.getAndIncrement()); // 此方法会为每个线程分配一个唯一的 Id static long get(){ return tl.get(); }}在详细解释 ThreadLocal 的工作原理之前,我们再看一个实际工作中可能遇到的例子来加深一下对 ThreadLocal 的理解。你可能知道 SimpleDateFormat 不是线程安全的,那如果需要在并发场景下使用它。 其实有一个办法就是用 ThreadLocal 来解决,下面的示例代码就是 ThreadLocal 解决方案的具体实现,这段代码与前面 ThreadId 的代码高度相似,同样地,不同线程调用 SafeDateFormat 的 get() 方法将返回不同的 SimpleDateFormat 对象实例,由于不同线程并不共享 SimpleDateFormat,所以就像局部变量一样,是线程安全的。 ...

July 9, 2019 · 2 min · jiezi

Java并发13-ThreadPoolExecutor-如何创建正确的线程池

虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁那如何避免呢?应对方案估计你已经知道了,那就是线程池。 线程池的需求是如此普遍,所以 Java SDK 并发包自然也少不了它。这里我们需要区分 线程池与一般意义上的池化资源是不同的。一般意义上的池化资源,都是下面这样,当你需要资源的时候就调用 acquire() 方法来申请资源,用完之后就调用 release() 释放资源。 class XXXPool{ // 获取池化资源 XXX acquire() { } // 释放池化资源 void release(XXX x){ }} 但是Java 提供的线程池里面压根就没有申请线程和释放线程的方法。所以,线程池的设计,没有办法直接采用一般意义上池化资源的设计方法。那线程池该如何设计呢?目前业界线程池的设计,普遍采用的都是生产者 - 消费者模式. 线程池是一种生产者 - 消费者模式线程池的使用方是生产者,线程池本身是消费者。在下面的示例代码中,我们创建了一个非常简单的线程池 MyThreadPool,你可以通过它来理解线程池的工作原理。 // 简化的线程池,仅用来说明工作原理class MyThreadPool{ // 利用阻塞队列实现生产者 - 消费者模式 BlockingQueue<Runnable> workQueue; // 保存内部工作线程 List<WorkerThread> threads = new ArrayList<>(); // 构造方法 MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){ this.workQueue = workQueue; // 创建工作线程 for(int idx=0; idx<poolSize; idx++){ WorkerThread work = new WorkerThread(); work.start(); threads.add(work); } } // 提交任务 void execute(Runnable command){ workQueue.put(command); } // 工作线程负责消费任务,并执行任务 class WorkerThread extends Thread{ public void run() { // 循环取任务并执行 while(true){ ① Runnable task = workQueue.take(); task.run(); } } } }/** 下面是使用示例 **/// 创建有界阻塞队列BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);// 创建线程池 MyThreadPool pool = new MyThreadPool( 10, workQueue);// 提交任务 pool.execute(()->{ System.out.println("hello");});在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码①处的 while 循环。线程池主要的工作原理就这些,是不是还挺简单的? ...

June 23, 2019 · 2 min · jiezi

线程池没你想的那么简单续

前言前段时间写过一篇《线程池没你想的那么简单》,和大家一起撸了一个基本的线程池,具备: 线程池基本调度功能。线程池自动扩容缩容。队列缓存线程。关闭线程池。这些功能,最后也留下了三个待实现的 features 。 执行带有返回值的线程。异常处理怎么办?所有任务执行完怎么通知我?这次就实现这三个特性来看看 j.u.c 中的线程池是如何实现这些需求的。 再看本文之前,强烈建议先查看上文《线程池没你想的那么简单》任务完成后的通知大家在用线程池的时候或多或少都会有这样的需求: 线程池中的任务执行完毕后再通知主线程做其他事情,比如一批任务都执行完毕后再执行下一波任务等等。 以我们之前的代码为例: 总共往线程池中提交了 13 个任务,直到他们都执行完毕后再打印 “任务执行完毕” 这个日志。执行结果如下: 为了简单的达到这个效果,我们可以在初始化线程池的时候传入一个接口的实现,这个接口就是用于任务完成之后的回调。 public interface Notify { /** * 回调 */ void notifyListen() ;}以上就是线程池的构造函数以及接口的定义。 所以想要实现这个功能的关键是在何时回调这个接口? 仔细想想其实也简单:只要我们记录提交到线程池中的任务及完成的数量,他们两者的差为 0 时就认为线程池中的任务已执行完毕;这时便可回调这个接口。 所以在往线程池中写入任务时我们需要记录任务数量: 为了并发安全的考虑,这里的计数器采用了原子的 AtomicInteger 。 而在任务执行完毕后就将计数器 -1 ,一旦为 0 时则任务任务全部执行完毕;这时便可回调我们自定义的接口完成通知。 JDK 的实现这样的需求在 jdk 中的 ThreadPoolExecutor 中也有相关的 API ,只是用法不太一样,但本质原理都大同小异。 我们使用 ThreadPoolExecutor 的常规关闭流程如下: executorService.shutdown(); while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { logger.info("thread running"); }线程提交完毕后执行 shutdown() 关闭线程池,接着循环调用 awaitTermination() 方法,一旦任务全部执行完毕后则会返回 true 从而退出循环。 ...

June 6, 2019 · 2 min · jiezi

你都理解创建线程池的参数吗

微信公众号「后端进阶」,专注后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。 老司机倾囊相授,带你一路进阶,来不及解释了快上车!多线程可以说是面试官最喜欢拿来问的题目之一了,可谓是老生之常谈,不管你是新手还是老司机,我相信你一定会在面试过程中遇到过有关多线程的一些问题。那我现在就充当一次面试官,我来问你: 现有一个线程池,参数corePoolSize = 5,maximumPoolSize = 10,BlockingQueue阻塞队列长度为5,此时有4个任务同时进来,问:线程池会创建几条线程? 如果4个任务还没处理完,这时又同时进来2个任务,问:线程池又会创建几条线程还是不会创建? 如果前面6个任务还是没有处理完,这时又同时进来5个任务,问:线程池又会创建几条线程还是不会创建? 如果你此时一脸懵逼,请不要慌,问题不大。 创建线程池的构造方法的参数都有哪些?要回答这个问题,我们需要从创建线程池的参数去找答案: java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}创建线程池一共有7个参数,从源码可知,corePoolSize和maximumPoolSize都不能小于0,且核心线程数不能大于最大线程数。 ...

June 4, 2019 · 1 min · jiezi

C框架

thrift在网络一节中简单介绍了thrift的协议部分,在工程中会用得到thrift的线程并发,process,server库。定义idl后生成代码和业务编写代码的关系如下: 运行过程: 1.开启threaft线程池 主线程创建n个,开启,数量不够workerMonitor_.wait。到100个就死了(加锁,结束释放) 工作线程开启后,加锁,增加数量,workerMonitor_.notify,任务空monitor_.wait(),否则取任务,判断等待队列长度不到阈值则manager_->maxMonitor_.notify(),释放锁。执行任务。结束后继续抢锁循环2.开启nonblockingserver,io线程就绪 非0号其他线程先start,设置eventbase(iothread),createpipe,注册notify;event_base_loop(eventBase_, 0);【无监听,每个io线程自己的event_base】 0号线程注册事件,设置eventbase(iothread);注册监听;createpipe,注册notify。0号io线程run,开始监听。其他io线程join3.0号监听到handleEvent accept 加锁create connection分配连接给io线程(轮询)释放锁,通知分配的线程notifyhandler4.分配到连接的IO线程notifyhandler(read notifyfd,transition) 本次transition: 读取,调用addtask=>setidle,不需要监听cfd5.addtask thrift,加锁,如果tasks_.size() >= pendingTaskCountMax_,maxMonitor_.wait(timeout);加入task队列,有空闲线程monitor_.notify()。任何一种monitor都公用一个锁。 这里的task就是process然后notifyIOThread(read notifyfd,transition)。6.处理后通知IO线程 transition将cfd改为监听写事件,加入到本线程的事件监听中,调用connenction的回调发送。7.connenction的回调发送之后继续notifyIOThread 本次transition重置缓存区结束。总结:多reactor多线程模式,一个accept,多个读写,单独任务处理。正常只需要一个reactor。单reactor多线程形式。 http_server 关于优雅重启nginx这种多进程的比价好做,因为子进程可以独立于父进程。主进程fork,继承监听fd,锁等,exec()执行完整代码。此时旧的子进程和新的子进程都可以抢锁监听fd处理连接,关闭旧主进程,给旧的子进程发送关闭信号,子进程在处理后才会监听到信号,做到了优雅。线程没办法独立监听信号。 连接池add的就是任意连接对象。实现connect,reconnect.比如 for (int i = 0; i < connectionCount; ++i) { RedisClient* redis = new RedisClient(host, port, conn_timeout_ms, rw_timeout_ms); redis->init();//CONNECT redisPool_.add(redis); }改造的redis_pool连接池+线程池+hiredis分别负责连接管理和并发请求处理。封装目的:一般并发到分片获取数据的代理都有以下缺点:一个失败全部失败,要等所有返回才返回,而mget的失败会被放大。因此自己在业务层控制整个mget的超时时间和返回,到代理层已经拆分为当个get,用线程池实现。 spdlog业务调用 spdlog::set_async_mode(8192*4, spdlog::async_overflow_policy::block_retry,nullptr, std::chrono::seconds(3));std::string info_file = FLAGS_log_path + "/" + FLAGS_info_fileauto debug_logger = spdlog::basic_logger_mt("debug_logger", info_file.c_str());debug_logger->set_pattern("INFO: %Y-%m-%d %H:%M:%S %v");inline std::shared_ptr<spdlog::logger> spdlog::create(const std::string& logger_name, Args... args){ sink_ptr sink = std::make_shared<Sink>(args...); return details::registry::instance().create(logger_name, { sink }); /*锁控制 new_logger = std::make_shared<async_logger>(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb); //这里启线程 _loggers[logger_name] = new_logger;*/} auto logger = spdlog::get("warn_logger");\ if (logger != NULL) { \ logger->info("{}:{} {}", cplusutils::servbase_basename(__FILE__), __LINE__, log_info.str()); \ }info()=>log()->push_msg()spdlog的push_msg就是enqueue ...

May 15, 2019 · 2 min · jiezi

程序员笔记如何编写高性能的Java代码

一、并发无法创建新的本机线程...... 问题1:Java的中创建一个线程消耗多少内存? 每个线程有独自的栈内存,共享堆内存 问题2:一台机器可以创建多少线程? CPU,内存,操作系统,JVM,应用服务器 我们编写一段示例代码,来验证下线程池与非线程池的区别: //线程池和非线程池的区别<font></font>public class ThreadPool {<font></font> <font></font> public static int times = 100;//100,1000,10000<font></font> <font></font> public static ArrayBlockingQueue arrayWorkQueue = new ArrayBlockingQueue(1000);<font></font> public static ExecutorService threadPool = new ThreadPoolExecutor(5, //corePoolSize线程池中核心线程数<font></font> 10,<font></font> 60,<font></font> TimeUnit.SECONDS,<font></font> arrayWorkQueue,<font></font> new ThreadPoolExecutor.DiscardOldestPolicy()<font></font> );<font></font> <font></font> public static void useThreadPool() {<font></font> Long start = System.currentTimeMillis();<font></font> for (int i = 0; i < times; i++) {<font></font> threadPool.execute(new Runnable() {<font></font> public void run() {<font></font> System.out.println("说点什么吧...");<font></font> }<font></font> });<font></font> }<font></font> threadPool.shutdown();<font></font> while (true) {<font></font> if (threadPool.isTerminated()) {<font></font> Long end = System.currentTimeMillis();<font></font> System.out.println(end - start);<font></font> break;<font></font> }<font></font> }<font></font> }<font></font> <font></font> public static void createNewThread() {<font></font> Long start = System.currentTimeMillis();<font></font> for (int i = 0; i < times; i++) {<font></font> <font></font> new Thread() {<font></font> public void run() {<font></font> System.out.println("说点什么吧...");<font></font> }<font></font> }.start();<font></font> }<font></font> Long end = System.currentTimeMillis();<font></font> System.out.println(end - start);<font></font> }<font></font> <font></font> public static void main(String args[]) {<font></font> createNewThread();<font></font> //useThreadPool();<font></font> }<font></font> }启动不同数量的线程,然后比较线程池和非线程池的执行结果: ...

May 10, 2019 · 6 min · jiezi

线程池系列-1-让多线程不再坑爹的线程池

背景线程池的来由服务端的程序,例如数据库服务器和Web服务器,每次收到客户端的请求,都会创建一个线程来处理这些请求。 创建线程的方式又很多,例如继承Thread类、实现Runnable或者Callable接口等。 通过创建新的线程来处理客户端的请求,这种看起来很容易的方法,其实是有很大弊端且有很高的风险的。 俗话说,简单的路越走越困难,困难的路越走越简单,就是这个道理。 创建和销毁线程,会消耗大量的服务器资源,甚至创建和销毁线程消耗的时间比线程本身处理任务的时间还要长。 由于启动线程需要消耗大量的服务器资源,如果创建过多的线程会造成系统内存不足(run out of memory),因此限制线程创建的数量十分必要。 什么是线程池线程池通俗来讲就是一个取出和放回提前创建好的线程的池子,概念上,类似数据库的连接池。 那么线程池是如何发挥作用的呢? 实际上,线程池是通过重用之前创建好线程来处理当前任务,来达到大大降低线程频繁创建和销毁导致的资源消耗的目的。 A thread pool reuses previously created threads to execute current tasks and offers a solution to the problem of thread cycle overhead and resource thrashing. Since the thread is already existing when the request arrives, the delay introduced by thread creation is eliminated, making the application more responsive. 背景总结下面总结一下开篇对于线程池的一些介绍。 线程是程序的组成部分,可以帮助我们搞事情。多个线程同时帮我们搞事情,可以通过更大限度地利用服务器资源,用来大大提高我们搞事情的效率。我们创建的每个线程都不是省油的灯,线程越多就会占用越多的系统资源,因此小弟虽好使但不要贪多哦,在有限的系统资源下,线程并不是“韩信点兵,多多益善”的,要限制线程的数量。请记住这一条,因为下面“批判”Java提供的线程池创建解决方案的时候,这就是“罪魁祸首”。创建和销毁线程会耗费大量系统资源,就像大佬招募和遣散小弟,都是要大费周章的。因此聪明的大佬就想到了“池”,把线程缓存起来,用的时候拿出来不用的时候还放回去,这就可以既享受多线程的乐趣,又可以避免使用多线程的痛苦了。但到底怎么使用线程池呢?线程池真的这么简单好用吗?线程池使用的过程中有没有什么坑? 不要着急,下面就结合具体的示例,跟你讲解各种使用线程池的姿势,以及这些姿势爽在哪里,痛在哪里。 准备好纸巾,咳咳...,是笔记本,涛哥要跟你开讲啦! ...

April 29, 2019 · 3 min · jiezi

开发小记-Java-线程池-之-被吃掉的线程异常附源码分析和解决方法

前言今天遇到了一个bug,现象是,一个任务放入线程池中,似乎“没有被执行”,日志也没有打。 经过本地代码调试之后,发现在任务逻辑的前半段,抛出了NPE,但是代码外层没有try-catch,导致这个异常被吃掉。 这个问题解决起来是很简单的,外层加个try-catch就好了,但是这个异常如果没有被catch,线程池内部逻辑是怎么处理这个异常的呢?这个异常最后会跑到哪里呢? 带着疑问和好奇心,我研究了一下线程池那一块的源码,并且做了以下的总结。 源码分析项目中出问题的代码差不多就是下面这个样子 ExecutorService threadPool = Executors.newFixedThreadPool(3);threadPool.submit(() -> { String pennyStr = null; Double penny = Double.valueOf(pennyStr); ...})先进到newFixedThreadPool这个工厂方法中看生成的具体实现类,发现是ThreadPoolExecutor public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }再看这个类的继承关系, 再进到submit方法,这个方法在ExecutorService接口中约定,其实是在AbstractExectorService中实现,ThreadPoolExecutor并没有override这个方法。 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }对应的FutureTask对象的构造方法 ...

April 25, 2019 · 4 min · jiezi

3分钟干货之详解线程池执行流程

我们向线程提交任务时可以使用Execute和Submit,区别就是Submit可以返回一个Future对象,通过Future对象可以了解任务执行情况,可以取消任务的执行,还可获取执行结果或执行异常。Submit最终也是通过Execute执行的。 △线程池提交任务时的执行顺序如下: 向线程池提交任务时,会首先判断线程池中的线程数是否大于设置的核心线程数,如果不大于,就创建一个核心线程来执行任务。 如果大于核心线程数,就会判断缓冲队列是否满了,如果没有满,则放入队列,等待线程空闲时执行任务。如果队列已经满了,则判断是否达到了线程池设置的最大线程数,如果没有达到,就创建新线程来执行任务。如果已经达到了最大线程数,则执行指定的拒绝策略。这里需要注意队列的判断与最大线程数判断的顺序,不要搞反。

April 24, 2019 · 1 min · jiezi

关于线程池你不得不知道的一些设置

看完我上一篇文章「你都理解创建线程池的参数吗?」之后,当遇到这种问题,你觉得你完全能够唬住面试官了,50k轻松到手。殊不知,要是面试官此刻给你来个反杀:初始化线程池时可以预先创建线程吗?线程池的核心线程可以被回收吗?为什么?如果此刻你一脸懵逼,这个要慌,问题很大,50k马上变5k。有细心的网友早就想到了这个问题:在ThreadPoolExecutor线程池中,还有一些不常用的设置。我建议如果您在应用场景中没有特殊的要求,就不需要使用这些设置。初始化线程池时可以预先创建线程吗?prestartAllCoreThreads初始化线程池时是可以预先创建线程的,初始化线程池后,再调用prestartAllCoreThreads()方法,即可预先创建corePoolSize数量的核心线程,我们看源码:public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n;}private boolean addWorker(Runnable firstTask, boolean core) { // ..}addWorker方法目的是在线程池中添加任务并执行,如果task为空,线程获取任务执行时调用getTask()方法,该方法从blockingQueue阻塞队列中阻塞获取任务执行,因此线程不会释放,留存在线程池中,如果core=true,说明任务只能利用核心线程来执行。所以该方法会在线程池总预先创建没有任务执行的线程,数量为corePoolSize。下面我们测试一下:从测试结果来看,线程池中已经预先创建了corePoolSize数量的空闲线程。prestartCoreThreadprestartCoreThread()同样可以预先创建线程,只不过该方法只会与创建1条线程,我们来看源码:public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);}从方法源码可知,如果此时工作线程数量小于corePoolSize,那么就调用addWorker创建1条空闲核心线程。下面我们测试一下:从测试结果来看,线程池中已经预先创建了1条空闲线程。线程池的核心线程可以被回收吗?你可能会想到将corePoolSize的数量设置为0,从而线程池的所有线程都是“临时”的,只有keepAliveTime存活时间,你的思路也许时正确的,但你有没有想过一个很严重的后果,corePoolSize=0时,任务需要填满阻塞队列才会创建线程来执行任务,阻塞队列有设置长度还好,如果队列长度无限大呢,你就等着OOM异常吧,所以用这种设置行为并不是我们所需要的。有没有什么设置可以回收核心线程呢?allowCoreThreadTimeOutThreadPoolExecutor有一个私有成员变量:private volatile boolean allowCoreThreadTimeOut;如果allowCoreThreadTimeOut=true,核心线程在规定时间内会被回收。上面我也说了,当线程空闲时会从blockingQueue阻塞队列中阻塞获取任务执行,所以我们来看看是保证核心线程不被销毁的,我们直接定位到源码部位:java.util.concurrent.ThreadPoolExecutor#getTask:boolean timedOut = false; // Did the last poll() time out?for (;;) { // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}这里的关键值timed,如果allowCoreThreadTimeOut=true或者此时工作线程大于corePoolSize,timed=true,如果timed=true,会调用poll()方法从阻塞队列中获取任务,否则调用take()方法获取任务。下面我来解释这两个方法:poll(long timeout, TimeUnit unit):从BlockingQueue取出一个任务,如果不能立即取出,则可以等待timeout参数的时间,如果超过这个时间还不能取出任务,则返回null;take():从blocking阻塞队列取出一个任务,如果BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的任务被加入为止。到这里,我们就很好地解释了,当allowCoreThreadTimeOut=true或者此时工作线程大于corePoolSize时,线程调用BlockingQueue的poll方法获取任务,若超过keepAliveTime时间,则返回null,timedOut=true,则getTask会返回null,线程中的runWorker方法会退出while循环,线程接下来会被回收。下面我们测试一下:可以看到,核心线程被回收了。写在最后后面我会单独写一篇从源码的角度深度解读线程池的运行原理,敬请期待。另外,我公众号也改名字了,这个公众号的内容源自于我的博客,我的博客域名是objcoding,所以干脆公众号就叫这个名字了,但是很多网友误以为我是objective-c开发的,宝宝心里苦啊,其实这个域名的是面向对象编程的意思,即在Java狗眼里一切皆对象。相信小伙伴们上学时,也有被老师“忽悠”去当科代表的经历吧,比如老师跟你说:「当哪科科代表,哪科成绩就会上升」,「你哪科弱,就当哪科科代表,最好了」其实当科代表,最重要的一个条件是对这门学科有着浓厚的兴趣与热爱。所以,从今天开始,我厚着脸皮,当一次Java科代表。 ...

April 14, 2019 · 1 min · jiezi

任务异常导致线程池中的线程变为waiting状态

背景项目中存在一些定时任务来更新数据库表,借助了线程池提供的一些能力,线上环境偶尔会出现网络波动导致服务实例无法连上数据库,只要出现了这种情况,就会导致数据不会再被更新,通过一些命令发现更新数据库的线程池中的所有线程都处于waiting状态。通过搜索引擎了解到以下观点:提交到线程池的任务如果抛出异常会导致线程挂掉,遂将提交到线程池的任务中可能出现的异常进行了处理,确实解决了问题。同时也留下了一个疑问:为什么任务抛出的异常会导致线程处于waiting状态?本篇文章的关注点主要集中在ScheduledThreadPoolExecutor.scheduleWithFixedDelay(..)这个方法上,对线程池的一些原理性的内容以及相关的术语不做过多描述。执行流程scheduleWithFixedDelay(..) 的大体运行过程(注,ScheduledThreadPoolExecutor类中还包含了execute,submit,schedule等方法,这些方法的逻辑基本是一致的):1、首先对提交的任务(Runnable实例)进行一些包装,生成一个ScheduledFutureTask:2、进入delayedExecute(..)方法,将生成的ScheduledFutureTask 放到线程池的任务队列(注:BlockingQueue)中;3、进入ensurePrestart()方法,创建Worker实例开始处理线程:4、最后就是addWorker方法了,此方法主要关注以下部分:到这里,线程池中已经创建了线程,并且开始执行了。接下来就看看Worker线程是如何执行提交到线程池中的任务的。5、上一步中,可以看到Worker中持有的线程已经开始运行了,而Worker中的线程是这么创建的:所以,Worker中的线程start之后,则开始执行Worker中的run()方法(会进入到runWorker(..)方法)6、上面的第1、2步中,会把构造的ScheduledFutureTask实例放到任务队列中,这里会再从任务队列中取出该实例(图中的while循环条件),然后再去调用该实例的run()方法:getTask()方法:7、ScheduledThreadPoolExecutor.ScheduledFutureTask#run()方法:这里的outerTask就是第1步中的outerTask,其实就是要执行的任务本身。到了这里给出一个小结:对于周期性执行的任务,如果该任务执行失败,则后续其不会再被执行。为了内容的完整性,下面给出上图中两个方法的流程:到此,任务异常导致线程waiting的原因就明了了:由于任务执行过程中抛出了异常,会造成ScheduledFutureTask不会再将自身放入到任务队列(BlockingQueue)中,即执行完之后,任务队列变成了一个空队列,而线程池中的Worker线程会以阻塞的方式从任务队列中去取任务(第6步),当队列为空时,会导致所有的线程都被阻塞而进入waiting状态。

April 11, 2019 · 1 min · jiezi

最新java并发编程高级面试30题:并发队列+可重入锁+线程池+Synchronized

众所周知,在Java的知识体系中,并发编程是非常重要的一环,也是面试的必问题,一个好的Java程序员是必须对并发编程这块有所了解的。然而不论是哪个国家,什么背景的 Java 开发者,都对自己写的并发程序相当自信,但也会在出问题时表现得很诧异甚至一筹莫展。可见,Java 并发编程显然不是一件能速成的能力,基础搭得越好,越全面,在实践中才会有更深刻的理解。因此,大家不难发现 Java 并发问题一直是各个大厂面试的重点之一。我在平时的面试中, 也发现很多候选人对一些基本的并发概念表示没听过,或原理不理解,可能知道一些却又讲不清楚,最终导致面试失败。本文会结合实际中接触到的一些面试题,重点来聊一聊 Java 并发中的相关知识点。Synchronized 相关问题Synchronized 用过吗,其原理是什么?你刚才提到获取对象的锁,这个“锁”到底是什么?如何确定对象的锁?什么是可重入性,为什么说 Synchronized 是可重入锁?JVM 对 Java 的原生锁做了哪些优化?为什么说 Synchronized 是非公平锁?什么是锁消除和锁粗化?为什么说 Synchronized 是一个悲观锁?乐观锁的实现原理又是什么?什么是乐观锁一定就是好的吗?可重入锁 ReentrantLock 及其他显式锁相关问题跟 Synchronized 相比,可重入锁 ReentrantLock 其实现原理有什么不同?那么请谈谈 AQS 框架是怎么回事儿?请尽可能详尽地对比下 Synchronized 和 ReentrantLock 的异同。ReentrantLock 是如何实现可重入性的?除了 ReetrantLock,你还接触过 JUC 中的哪些并发工具?请谈谈 ReadWriteLock 和 StampedLock。如何让 Java 的线程彼此同步?你了解过哪些同步器?请分别介绍下。CyclicBarrier 和 CountDownLatch 看起来很相似,请对比下呢?Java 内存模型相关问题什么是 Java 的内存模型,Java 中各个线程是怎么彼此看到对方的变量的?请谈谈 volatile 有什么特点,为什么它能保证变量对所有线程的可见性?既然 volatile 能够保证线程间的变量可见性,是不是就意味着基于 volatile 变量的运算就是并发安全的?请对比下 volatile 对比 Synchronized 的异同。请谈谈 ThreadLocal 是怎么解决并发安全的?很多人都说要慎用 ThreadLocal,谈谈你的理解,使用 ThreadLocal 需要注意些什么?并发队列相关问题谈下对基于链表的非阻塞无界队列 ConcurrentLinkedQueue 原理的理解?ConcurrentLinkedQueue 内部是如何使用 CAS 非阻塞算法来保证多线程下入队出队操作的线程安全?基于链表的阻塞队列 LinkedBlockingQueue 原理。阻塞队列LinkedBlockingQueue 内部是如何使用两个独占锁 ReentrantLock 以及对应的条件变量保证多线程先入队出队操作的线程安全?为什么不使用一把锁,使用两把为何能提高并发度?基于数组的阻塞队列 ArrayBlockingQueue 原理。ArrayBlockingQueue 内部如何基于一把独占锁以及对应的两个条件变量实现出入队操作的线程安全?谈谈对无界优先级队列 PriorityBlockingQueue 原理?PriorityBlockingQueue内部使用堆算法保证每次出队都是优先级最高的元素,元素入队时候是如何建堆的,元素出队后如何调整堆的平衡的?如何学习并发编程学习java并发就像进入了另外一个学习领域,就像学习一门新的编程语言,或者是学习一套新的语言概念,要理解并发编程,其难度跟理解面向对象编程难度差不多。你花一点功夫,就可以理解它的基本机制,但是要想真正掌握它的本质,就需要深入的学习与理解。 最后在分享一个并发编程知识的学习导图给大家,为了方便大家能看的清楚我把Xmind图缩略了 ...

April 11, 2019 · 1 min · jiezi

多线程批量数据导入示例——基础版

前言当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理。常见的场景有:大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理)数据同步(从第三方接口拉取数据处理后写入自己的数据库)以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步数据读取:从数据源读取数据到内存数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理而且数据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢。设计思路由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:批量读取数据多线程写入数据示例多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入。模拟服务import java.util.concurrent.atomic.AtomicLong;/** * 数据批量写入用的模拟服务 * * @author RJH * create at 2019-04-01 /public class MockService { /* * 可读取总数 / private long canReadTotal; /* * 写入总数 / private AtomicLong writeTotal=new AtomicLong(0); /* * 写入休眠时间(单位:毫秒) / private final long sleepTime; /* * 构造方法 * * @param canReadTotal * @param sleepTime / public MockService(long canReadTotal, long sleepTime) { this.canReadTotal = canReadTotal; this.sleepTime = sleepTime; } /* * 批量读取数据接口 * * @param num * @return / public synchronized long readData(int num) { long readNum; if (canReadTotal >= num) { canReadTotal -= num; readNum = num; } else { readNum = canReadTotal; canReadTotal = 0; } //System.out.println(“read data size:” + readNum); return readNum; } /* * 写入数据接口 / public void writeData() { try { // 休眠一定时间模拟写入速度慢 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } // 写入总数自增 System.out.println(“thread:” + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet()); } /* * 获取写入的总数 * * @return / public long getWriteTotal() { return writeTotal.get(); }}批量数据处理器import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/* * 基于线程池的多线程批量写入处理器 * @author RJH * create at 2019-04-01 /public class SimpleBatchHandler { private ExecutorService executorService; private MockService service; /* * 每次批量读取的数据量 / private int batch; /* * 线程个数 / private int threadNum; public SimpleBatchHandler(MockService service, int batch,int threadNum) { this.service = service; this.batch = batch; //使用固定数目的线程池 this.executorService = Executors.newFixedThreadPool(threadNum); } /* * 开始处理 / public void startHandle() { // 开始处理的时间 long startTime = System.currentTimeMillis(); System.out.println(“start handle time:” + startTime); long readData; while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止 for (long i = 0; i < readData; i++) { executorService.execute(() -> service.writeData()); } } // 关闭线程池 executorService.shutdown(); while (!executorService.isTerminated()) {//等待线程池中的线程执行完 } // 结束时间 long endTime = System.currentTimeMillis(); System.out.println(“end handle time:” + endTime); // 总耗时 System.out.println(“total handle time:” + (endTime - startTime) + “ms”); // 写入总数 System.out.println(“total write num:” + service.getWriteTotal()); }}测试类/* * SimpleBatchHandler的测试类 * @author RJH * create at 2019-04-01 /public class SimpleBatchHandlerTest { public static void main(String[] args) { // 总数 long total=100000; // 休眠时间 long sleepTime=100; // 每次拉取的数量 int batch=100; // 线程个数 int threadNum=16; MockService mockService=new MockService(total,sleepTime); SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum); handler.startHandle(); }}运行结果start handle time:1554298681755thread:Thread[pool-1-thread-2,5,main] write data:1thread:Thread[pool-1-thread-1,5,main] write data:2…省略部分输出thread:Thread[pool-1-thread-4,5,main] write data:100000end handle time:1554299330202total handle time:648447mstotal write num:100000分析在单线程情况下的执行时间应该为totalsleepTime,即10000000ms,而改造为多线程后执行时间为648447ms。示例问题本示例存在一些问题,会在后续的博客中对本示例进行优化,同时分享给大家如何解决这些问题。 ...

April 3, 2019 · 2 min · jiezi

Springboot定时任务踩坑记录

前言在使用Springboot整合定时任务,发现当某个定时任务执行出现执行时间过长的情况时会阻塞其他定时任务的执行。问题定位后续通过翻查Springboot的文档以及打印日志(输出当前线程信息)得知问题是由于Springboot默认使用只要1个线程处理定时任务。问题复盘需要注意示例的Springboot版本为2.1.3.RELEASE。关键pom文件配置 <!–继承父项目–> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> <relativePath/> <!– lookup parent from repository –> </parent> …省略非关键配置 <!– 引入依赖–> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>定时任务import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;/** * 定时任务 * @author RJH * create at 2019-03-29 /@Componentpublic class SimpleTask { private static Logger logger= LoggerFactory.getLogger(SimpleTask.class); /* * 执行会超时的任务,定时任务间隔为5000ms(等价于5s) / @Scheduled(fixedRate = 5000) public void overtimeTask(){ try { logger.info(“current run by overtimeTask”); //休眠时间为执行间隔的2倍 Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } /* * 正常的定时任务 / @Scheduled(fixedRate = 5000) public void simpleTask(){ logger.info(“current run by simpleTask”); }}启动类import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication@EnableSchedulingpublic class TaskDemoApplication { public static void main(String[] args) { SpringApplication.run(TaskDemoApplication.class, args); }}运行结果…省略非关键信息2019-03-29 21:22:38.410 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by simpleTask2019-03-29 21:22:38.413 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by overtimeTask2019-03-29 21:22:48.413 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by simpleTask2019-03-29 21:22:48.414 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by overtimeTask2019-03-29 21:22:58.418 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by simpleTask2019-03-29 21:22:58.418 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by overtimeTask2019-03-29 21:23:08.424 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by simpleTask2019-03-29 21:23:08.424 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by overtimeTask2019-03-29 21:23:18.425 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by simpleTask2019-03-29 21:23:18.426 INFO 59731 — [ scheduling-1] com.rjh.task.SimpleTask : current run by overtimeTask…结果分析由运行结果可以看出:每次定时任务的运行都是由scheduling-1这个线程处理正常运行的simpleTask被overtimeTask阻塞导致了运行间隔变成了10秒后面通过查阅Springboot的文档也得知了定时任务默认最大运行线程数为1。解决方案由于使用的Springboot版本为2.1.3.RELEASE,所以有两种方法解决这个问题使用Springboot配置在配置文件中可以配置定时任务可用的线程数:## 配置可用线程数为10spring.task.scheduling.pool.size=10自定义定时任务的线程池使用自定义的线程池代替默认的线程池import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.TaskScheduler;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;/* * 定时任务配置类 * @author RJH * create at 2019-03-29 /@Configurationpublic class ScheduleConfig { /* * 此处方法名为Bean的名字,方法名无需固定 * 因为是按TaskScheduler接口自动注入 * @return */ @Bean public TaskScheduler taskScheduler(){ // Spring提供的定时任务线程池类 ThreadPoolTaskScheduler taskScheduler=new ThreadPoolTaskScheduler(); //设定最大可用的线程数目 taskScheduler.setPoolSize(10); return taskScheduler; }} ...

March 29, 2019 · 2 min · jiezi

线程池中你不容错过的一些细节

背景上周分享了一篇《一个线程罢工的诡异事件》,最近也在公司内部分享了这个案例。无独有偶,在内部分享的时候也有小伙伴问了之前分享时所提出的一类问题:这其实是一类共性问题,我认为主要还是两个原因:我自己确实也没讲清楚,之前画的那张图还需要再完善,有些误导。第二还是大家对线程池的理解不够深刻,比如今天要探讨的内容。线程池的工作原理首先还是来复习下线程池的基本原理。我认为线程池它就是一个调度任务的工具。众所周知在初始化线程池会给定线程池的大小,假设现在我们有 1000 个线程任务需要运行,而线程池的大小为 1020,在真正运行任务的过程中他肯定不会创建这1000个线程同时运行,而是充分利用线程池里这 1020 个线程来调度这1000个任务。而这里的 10~20 个线程最后会由线程池封装为 ThreadPoolExecutor.Worker 对象,而这个 Worker 是实现了 Runnable 接口的,所以他自己本身就是一个线程。深入分析这里我们来做一个模拟,创建了一个核心线程、最大线程数、阻塞队列都为2的线程池。这里假设线程池已经完成了预热,也就是线程池内部已经创建好了两个线程 Worker。当我们往一个线程池丢一个任务会发生什么事呢?第一步是生产者,也就是任务提供者他执行了一个 execute() 方法,本质上就是往这个内部队列里放了一个任务。之前已经创建好了的 Worker 线程会执行一个 while 循环 —> 不停的从这个内部队列里获取任务。(这一步是竞争的关系,都会抢着从队列里获取任务,由这个队列内部实现了线程安全。)获取得到一个任务后,其实也就是拿到了一个 Runnable 对象(也就是 execute(Runnable task) 这里所提交的任务),接着执行这个 Runnable 的 run() 方法,而不是 start(),这点需要注意后文分析原因。结合源码来看:从图中其实就对应了刚才提到的二三两步:while 循环,从 getTask() 方法中一直不停的获取任务。拿到任务后,执行它的 run() 方法。这样一个线程就调度完毕,然后再次进入循环从队列里取任务并不断的进行调度。再次解释之前的问题接下来回顾一下我们上一篇文章所提到的,导致一个线程没有运行的根本原因是:在单个线程的线程池中一但抛出了未被捕获的异常时,线程池会回收当前的线程并创建一个新的 Worker;它也会一直不断的从队列里获取任务来执行,但由于这是一个消费线程,根本没有生产者往里边丢任务,所以它会一直 waiting 在从队列里获取任务处,所以也就造成了线上的队列没有消费,业务线程池没有执行的问题。结合之前的那张图来看:这里大家问的最多的一个点是,为什么会没有是根本没有生产者往里边丢任务,图中不是明明画的有一个 product 嘛?这里确实是有些不太清楚,再次强调一次:图中的 product 是往内部队列里写消息的生产者,并不是往这个 Consumer 所在的线程池中写任务的生产者。因为即便 Consumer 是一个单线程的线程池,它依然具有一个常规线程池所具备的所有条件:Worker 调度线程,也就是线程池运行的线程;虽然只有一个。内部的阻塞队列;虽然长度只有1。再次结合图来看:所以之前提到的【没有生产者往里边丢任务】是指右图放大后的那一块,也就是内部队列并没有其他线程往里边丢任务执行 execute() 方法。而一旦发生未捕获的异常后,Worker1 被回收,顺带的它所调度的线程 task1(这个task1 也就是在执行一个 while 循环消费左图中的那个队列) 也会被回收掉。新创建的 Worker2 会取代 Worker1 继续执行 while 循环从内部队列里获取任务,但此时这个队列就一直会是空的,所以也就是处于 Waiting 状态。我觉得这波解释应该还是讲清楚了,欢迎还没搞明白的朋友留言讨论。为什是 run() 而不是 start()问题搞清楚后来想想为什么线程池在调度的时候执行的是 Runnable 的 run() 方法,而不是 start() 方法呢?我相信大部分没有看过源码的同学心中第一个印象就应该是执行的 start() 方法;因为不管是学校老师,还是网上大牛讲的都是只有执行了 start() 方法后操作系统才会给我们创建一个独立的线程来运行,而 run() 方法只是一个普通的方法调用。而在线程池这个场景中却恰好就是要利用它只是一个普通方法调用。回到我在文初中所提到的:我认为线程池它就是一个调度任务的工具。假设这里是调用的 Runnable 的 start 方法,那会发生什么事情。如果我们往一个核心、最大线程数为 2 的线程池里丢了 1000 个任务,那么它会额外的创建 1000 个线程,同时每个任务都是异步执行的,一下子就执行完毕了。从而没法做到由这两个 Worker 线程来调度这 1000 个任务,而只有当做一个同步阻塞的 run() 方法调用时才能满足这个要求。这事也让我发现一个奇特的现象:就是网上几乎没人讲过为什么在线程池里是 run 而不是 start,不知道是大家都觉得这是基操还是没人仔细考虑过。总结针对之前线上事故的总结上次已经写得差不多了,感兴趣的可以翻回去看看。这次呢可能更多是我自己的总结,比如写一篇技术博客时如果大部分人对某一个知识点讨论的比较热烈时,那一定是作者要么讲错了,要么没讲清楚。这点确实是要把自己作为一个读者的角度来看,不然很容易出现之前的一些误解。在这之外呢,我觉得对于线程池把这两篇都看完同时也理解后对于大家理解线程池,利用线程池完成工作也是有很大好处的。如果有在面试中加分的记得回来点赞、分享啊。你的点赞与分享是对我最大的支持 ...

March 26, 2019 · 1 min · jiezi

一个线程罢工的诡异事件

背景事情(事故)是这样的,突然收到报警,线上某个应用里业务逻辑没有执行,导致的结果是数据库里的某些数据没有更新。虽然是前人写的代码,但作为 Bug maker&killer 只能咬着牙上了。<!–more–>因为之前没有接触过出问题这块的逻辑,所以简单理了下如图:有一个生产线程一直源源不断的往队列写数据。消费线程也一直不停的取出数据后写入后续的业务线程池。业务线程池里的线程会对每个任务进行入库操作。整个过程还是比较清晰的,就是一个典型的生产者消费者模型。尝试定位接下来便是尝试定位这个问题,首先例行检查了以下几项:是否内存有内存溢出?应用 GC 是否有异常?通过日志以及监控发现以上两项都是正常的。紧接着便 dump 了线程快照查看业务线程池中的线程都在干啥。结果发现所有业务线程池都处于 waiting 状态,队列也是空的。同时生产者使用的队列却已经满了,没有任何消费迹象。结合上面的流程图不难发现应该是消费队列的 Consumer 出问题了,导致上游的队列不能消费,下有的业务线程池没事可做。review 代码于是查看了消费代码的业务逻辑,同时也发现消费线程是一个单线程。结合之前的线程快照,我发现这个消费线程也是处于 waiting 状态,和后面的业务线程池一模一样。他做的事情基本上就是对消息解析,之后丢到后面的业务线程池中,没有发现什么特别的地方。但是由于里面的分支特别多(switch case),看着有点头疼;所以我与写这个业务代码的同学沟通后他告诉我确实也只是入口处解析了一下数据,后续所有的业务逻辑都是丢到线程池中处理的,于是我便带着这个前提去排查了(埋下了伏笔)。因为这里消费的队列其实是一个 disruptor 队列;它和我们常用的 BlockQueue 不太一样,不是由开发者自定义一个消费逻辑进行处理的;而是在初始化队列时直接丢一个线程池进去,它会在内部使用这个线程池进行消费,同时回调一个方法,在这个方法里我们写自己的消费逻辑。所以对于开发者而言,这个消费逻辑其实是一个黑盒。于是在我反复 review 了消费代码中的数据解析逻辑发现不太可能出现问题后,便开始疯狂怀疑是不是 disruptor 自身的问题导致这个消费线程罢工了。再翻了一阵 disruptor 的源码后依旧没发现什么问题后我咨询对 disruptor 较熟的@咖啡拿铁,在他的帮助下在本地模拟出来和生产一样的情况。本地模拟本地也是创建了一个单线程的线程池,分别执行了两个任务。第一个任务没啥好说的,就是简单的打印。第二个任务会对一个数进行累加,加到 10 之后就抛出一个未捕获的异常。接着我们来运行一下。发现当任务中抛出一个没有捕获的异常时,线程池中的线程就会处于 waiting 状态,同时所有的堆栈都和生产相符。细心的朋友会发现正常运行的线程名称和异常后处于 waiting 状态的线程名称是不一样的,这个后续分析。解决问题当加入异常捕获后又如何呢?程序肯定会正常运行。同时会发现所有的任务都是由一个线程完成的。虽说就是加了一行代码,但我们还是要搞清楚这里面的门门道道。源码分析于是只有直接 debug 线程池的源码最快了;通过刚才的异常堆栈我们进入到 ThreadPoolExecutor.java:1142 处。发现线程池已经帮我们做了异常捕获,但依然会往上抛。在 finally 块中会执行 processWorkerExit(w, completedAbruptly) 方法。看过之前《如何优雅的使用和理解线程池》的朋友应该还会有印象。线程池中的任务都会被包装为一个内部 Worker 对象执行。processWorkerExit 可以简单的理解为是把当前运行的线程销毁(workers.remove(w))、同时新增(addWorker())一个 Worker 对象接着处理;就像是哪个零件坏掉后重新换了一个新的接着工作,但是旧零件负责的任务就没有了。接下来看看 addWorker() 做了什么事情:只看这次比较关心的部分;添加成功后会直接执行他的 start() 的方法。由于 Worker 实现了 Runnable 接口,所以本质上就是调用了 runWorker() 方法。在 runWorker() 其实就是上文 ThreadPoolExecutor 抛出异常时的那个方法。它会从队列里一直不停的获取待执行的任务,也就是 getTask();在 getTask 也能看出它会一直从内置的队列取出任务。而一旦队列是空的,它就会 waiting 在 workQueue.take(),也就是我们从堆栈中发现的 1067 行代码。线程名字的变化上文还提到了异常后的线程名称发生了改变,其实在 addWorker() 方法中可以看到 new Worker()时就会重新命名线程的名称,默认就是把后缀的计数+1。这样一切都能解释得通了,真相只有一个:在单个线程的线程池中一但抛出了未被捕获的异常时,线程池会回收当前的线程并创建一个新的 Worker;它也会一直不断的从队列里获取任务来执行,但由于这是一个消费线程,根本没有生产者往里边丢任务,所以它会一直 waiting 在从队列里获取任务处,所以也就造成了线上的队列没有消费,业务线程池没有执行的问题。总结所以之后线上的那个问题加上异常捕获之后也变得正常了,但我还是有点纳闷的是:既然后续所有的任务都是在线程池中执行的,也就是纯异步了,那即便是出现异常也不会抛到消费线程中啊。这不是把我之前储备的知识点推翻了嘛?不信邪!之后我让运维给了加上异常捕获后的线上错误日志。结果发现在上文提到的众多 switch case 中,最后一个竟然是直接操作的数据库,导致一个非空字段报错了????!!这事也给我个教训,还是得眼见为实啊。虽然这个问题改动很小解决了,但复盘整个过程还是有许多需要改进的:消费队列的线程名称竟然和业务线程的前缀一样,导致我光找它就花了许多时间,命名必须得调整。开发规范,防御式编程大家需要养成习惯。未知的技术栈需要谨慎,比如 disruptor,之前的团队应该只是看了个高性能的介绍就直接使用,并没有深究其原理;导致出现问题后对它拿不准。实例代码:https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/thread/ThreadExceptionTest.java你的点赞与分享是对我最大的支持 ...

March 13, 2019 · 1 min · jiezi

线程池工具类的封装

了解更多学习 ThreadPoolExecutor类ThreadPool.javapackage com.tool.me.thread;import java.util.Hashtable;import java.util.Map;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ThreadPool { private static Map<String, ThreadPoolExecutor> map = new Hashtable<>(); private ThreadPoolExecutor executor; /** * 阻塞任务队列数 / private int wattingCount; /* * 线程池的名字,e.g:子系统的包名(com.tool.me) / @SuppressWarnings(“unused”) private String name; /* * 创建线程池 * * @param name * 线程池的名字,eg:子系统的包名(com.tool.me) * @param corePoolSize * 核心线程池大小 the number of threads to keep in the pool, even if * they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize * 最大线程池大小 the maximum number of threads to allow in the pool * @param keepAliveTime * 线程池中超过corePoolSize数目的空闲线程最大存活时间 when the number of threads is * greater than the core, this is the maximum time that excess * idle threads will wait for new tasks before terminating. * @param unit * keepAliveTime时间单位 the time unit for the {@code keepAliveTime} * argument * @param workQueue * 阻塞任务队列 the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} tasks * submitted by the {@code execute} method. / public ThreadPool(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { synchronized (map) { this.name = name; this.wattingCount = workQueue.size(); String key = buildKey(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue.size(), “#”); if (map.containsKey(key)) { executor = map.get(key); } else { executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); map.put(key, executor); } } } /* * 创建线程池 * * @param name * 线程池的名字,eg:子系统的包名(com.tool.me) * @param corePoolSize * 核心线程池大小 the number of threads to keep in the pool, even if * they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize * 最大线程池大小 the maximum number of threads to allow in the pool * @param keepAliveTime * 线程池中超过corePoolSize数目的空闲线程最大存活时间 when the number of threads is * greater than the core, this is the maximum time that excess * idle threads will wait for new tasks before terminating. * @param unit * keepAliveTime时间单位 the time unit for the {@code keepAliveTime} * argument * @param wattingCount * 阻塞任务队列数 / public ThreadPool(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int wattingCount) { synchronized (map) { this.name = name; this.wattingCount = (int) (wattingCount * 1.5); String key = buildKey(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, wattingCount, “#”); if (map.containsKey(key)) { executor = map.get(key); } else { executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(this.wattingCount)); map.put(key, executor); } } } /* * 组装map中的key * * @param name * 线程池的名字,eg:子系统的包名(com.tool.me) * @param corePoolSize * 核心线程池大小 the number of threads to keep in the pool, even if * they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize * 最大线程池大小 the maximum number of threads to allow in the pool * @param keepAliveTime * 线程池中超过corePoolSize数目的空闲线程最大存活时间 when the number of threads is * greater than the core, this is the maximum time that excess * idle threads will wait for new tasks before terminating. * @param unit * keepAliveTime时间单位 the time unit for the {@code keepAliveTime} * argument * @param workQueue * 阻塞任务队列 the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} tasks * submitted by the {@code execute} method. * @param delimiter * 分割符 / private String buildKey(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int wattingCount, String delimiter) { StringBuilder result = new StringBuilder(); result.append(name).append(delimiter); result.append(corePoolSize).append(delimiter); result.append(maximumPoolSize).append(delimiter); result.append(keepAliveTime).append(delimiter); result.append(unit.toString()).append(delimiter); result.append(wattingCount); return result.toString(); } /* * 添加任务到线程池(execute)中 * @param runnable the task to execute / public void execute(Runnable runnable) { checkQueneSize(); executor.execute(runnable); } private void checkQueneSize() { while (getTaskSzie() >= wattingCount) {//如果线程池中的阻塞队列数 > wattingCount 则继续等待 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } /* * Returns the number of elements in this collection. If this collection * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns * <tt>Integer.MAX_VALUE</tt>. * * @return the number of elements in this collection / public int getTaskSzie(){ return executor.getQueue().size(); }}ThreadPoolManager.javapackage com.tool.me.thread;import java.util.concurrent.TimeUnit;public abstract class ThreadPoolManager { private ThreadPool executor = null; public ThreadPoolManager() { if(executor == null) { executor = new ThreadPool(getThreadPoolName(),corePoolSize(), maximumPoolSize(), keepAliveTime(), TimeUnit.SECONDS, wattingCount()); } } public void execute(Runnable runnable) { executor.execute(runnable); } /* * @return name * 线程池名称 the String of pool name / protected abstract String getThreadPoolName(); /* * @return corePoolSize * 核心线程池大小 the number of threads to keep in the pool, even if * they are idle, unless {@code allowCoreThreadTimeOut} is set / protected int corePoolSize(){ return 5; } /* * @return maximumPoolSize * 最大线程池大小 the maximum number of threads to allow in the pool / protected int maximumPoolSize(){ return 10; } /* * @return wattingCount * 阻塞任务队列数 / protected int wattingCount(){ return 200000; } /* * @return keepAliveTime * 线程池中超过corePoolSize数目的空闲线程最大存活时间 when the number of threads is * greater than the core, this is the maximum time that excess * idle threads will wait for new tasks before terminating. / protected long keepAliveTime(){ return 10; }}子系统创建类 继承ThreadPoolManager,配置参数信息ViThreadPoolManager.javapackage com.tool.me.thread;/* * 当前类(子系统中定义的类)继承 ThreadPoolManager 类,设置相关参数 /public class ViThreadPoolManager extends ThreadPoolManager{ private static ThreadPoolManager threadPool = null; public synchronized static ThreadPoolManager getInstance() { if(threadPool == null) { threadPool = new ViThreadPoolManager(); } return threadPool; } @Override protected String getThreadPoolName() { return “com.tool.me.vi”; } @Override protected int corePoolSize() { /* * 代码 设置返回值 / return 10; } @Override protected int maximumPoolSize() { /* * 代码 设置返回值 */ return 20; }}使用线程池 main public static void main(String[] args) { ViThreadPoolManager.getInstance().execute(new Runnable() { @Override public void run() { io(); } }); } ...

January 29, 2019 · 5 min · jiezi

一次 HashSet 所引起的并发问题

背景上午刚到公司,准备开始一天的摸鱼之旅时突然收到了一封监控中心的邮件。心中暗道不好,因为监控系统从来不会告诉我应用完美无 bug,其实系统挺猥琐。打开邮件一看,果然告知我有一个应用的线程池队列达到阈值触发了报警。由于这个应用出问题非常影响用户体验;于是立马让运维保留现场 dump 线程和内存同时重启应用,还好重启之后恢复正常。于是开始着手排查问题。分析首先了解下这个应用大概是做什么的。简单来说就是从 MQ 中取出数据然后丢到后面的业务线程池中做具体的业务处理。而报警的队列正好就是这个线程池的队列。跟踪代码发现构建线程池的方式如下:ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());; put(poolName,executor);采用的是默认的 LinkedBlockingQueue 并没有指定大小(这也是个坑),于是这个队列的默认大小为 Integer.MAX_VALUE。由于应用已经重启,只能从仅存的线程快照和内存快照进行分析。内存分析先利用 MAT 分析了内存,的到了如下报告。其中有两个比较大的对象,一个就是之前线程池存放任务的 LinkedBlockingQueue,还有一个则是 HashSet。当然其中队列占用了大量的内存,所以优先查看,HashSet 一会儿再看。由于队列的大小给的够大,所以结合目前的情况来看应当是线程池里的任务处理较慢,导致队列的任务越堆越多,至少这是目前可以得出的结论。线程分析再来看看线程的分析,这里利用 fastthread.io 这个网站进行线程分析。因为从表现来看线程池里的任务迟迟没有执行完毕,所以主要看看它们在干嘛。正好他们都处于 RUNNABLE 状态,同时堆栈如下:发现正好就是在处理上文提到的 HashSet,看这个堆栈是在查询 key 是否存在。通过查看 312 行的业务代码确实也是如此。这里的线程名字也是个坑,让我找了好久。定位分析了内存和线程的堆栈之后其实已经大概猜出一些问题了。这里其实有一个前提忘记讲到:这个告警是凌晨三点发出的邮件,但并没有电话提醒之类的,所以大家都不知道。到了早上上班时才发现并立即 dump 了上面的证据。所有有一个很重要的事实:这几个业务线程在查询 HashSet 的时候运行了 6 7 个小时都没有返回。通过之前的监控曲线图也可以看出:操作系统在之前一直处于高负载中,直到我们早上看到报警重启之后才降低。同时发现这个应用生产上运行的是 JDK1.7 ,所以我初步认为应该是在查询 key 的时候进入了 HashMap 的环形链表导致 CPU 高负载同时也进入了死循环。为了验证这个问题再次 review 了代码。整理之后的伪代码如下://线程池private ExecutorService executor;private Set<String> set = new hashSet();private void execute(){ while(true){ //从 MQ 中获取数据 String key = subMQ(); executor.excute(new Worker(key)) ; }}public class Worker extends Thread{ private String key ; public Worker(String key){ this.key = key; } @Override private void run(){ if(!set.contains(key)){ //数据库查询 if(queryDB(key)){ set.add(key); return; } } //达到某种条件时清空 set if(flag){ set = null ; } } }大致的流程如下:源源不断的从 MQ 中获取数据。将数据丢到业务线程池中。判断数据是否已经写入了 Set。没有则查询数据库。之后写入到 Set 中。这里有一个很明显的问题,那就是作为共享资源的 Set 并没有做任何的同步处理。这里会有多个线程并发的操作,由于 HashSet 其实本质上就是 HashMap,所以它肯定是线程不安全的,所以会出现两个问题:Set 中的数据在并发写入时被覆盖导致数据不准确。会在扩容的时候形成环形链表。第一个问题相对于第二个还能接受。通过上文的内存分析我们已经知道这个 set 中的数据已经不少了。同时由于初始化时并没有指定大小,仅仅只是默认值,所以在大量的并发写入时候会导致频繁的扩容,而在 1.7 的条件下又可能会形成环形链表。不巧的是代码中也有查询操作(contains()),观察上文的堆栈情况:发现是运行在 HashMap 的 465 行,来看看 1.7 中那里具体在做什么:已经很明显了。这里在遍历链表,同时由于形成了环形链表导致这个 e.next 永远不为空,所以这个循环也不会退出了。到这里其实已经找到问题了,但还有一个疑问是为什么线程池里的任务队列会越堆越多。我第一直觉是任务执行太慢导致的。仔细查看了代码发现只有一个地方可能会慢:也就是有一个数据库的查询。把这个 SQL 拿到生产环境执行发现确实不快,查看索引发现都有命中。但我一看表中的数据发现已经快有 7000W 的数据了。同时经过运维得知 MySQL 那台服务器的 IO 压力也比较大。所以这个原因也比较明显了:由于每消费一条数据都要去查询一次数据库,MySQL 本身压力就比较大,加上数据量也很高所以导致这个 IO 响应较慢,导致整个任务处理的就比较慢了。但还有一个原因也不能忽视;由于所有的业务线程在某个时间点都进入了死循环,根本没有执行完任务的机会,而后面的数据还在源源不断的进入,所以这个队列只会越堆越多!这其实是一个老应用了,可能会有人问为什么之前没出现问题。这是因为之前数据量都比较少,即使是并发写入也没有出现并发扩容形成环形链表的情况。这段时间业务量的暴增正好把这个隐藏的雷给揪出来了。所以还是得信墨菲他老人家的话。总结至此整个排查结束,而我们后续的调整措施大概如下:HashSet 不是线程安全的,换为 ConcurrentHashMap同时把 value 写死一样可以达到 set 的效果。根据我们后面的监控,初始化 ConcurrentHashMap 的大小尽量大一些,避免频繁的扩容。MySQL 中很多数据都已经不用了,进行冷热处理。尽量降低单表数据量。同时后期考虑分表。查数据那里调整为查缓存,提高查询效率。线程池的名称一定得取的有意义,不然是自己给自己增加难度。根据监控将线程池的队列大小调整为一个具体值,并且要有拒绝策略。升级到 JDK1.8。再一个是报警邮件酌情考虑为电话通知????。HashMap 的死循环问题在网上层出不穷,没想到还真被我遇到了。现在要满足这个条件还是挺少见的,比如 1.8 以下的 JDK 这一条可能大多数人就碰不到,正好又证实了一次墨菲定律。同时我会将文章更到这里,方便大家阅读和查询。https://crossoverjie.top/JCSprout/你的点赞与分享是对我最大的支持 ...

November 8, 2018 · 1 min · jiezi