乐趣区

关于java:Java并发编程线程池相关知识点整理

为什么要用线程池?

池化技术:缩小每次获取资源的耗费,进步对资源的利用率。

线程池 提供了一种限度和治理资源(包含执行一个工作)。每个 线程池 还保护一些根本统计信息,例如已实现工作的数量。

应用线程池的益处:

  • 升高资源耗费。通过反复利用已创立的线程升高线程创立和销毁造成的耗费。
  • 进步响应速度。当工作达到时,工作能够不须要的等到线程创立就能立刻执行。
  • 进步线程的可管理性。线程是稀缺资源,如果无限度的创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行对立的调配,调优和监控。

线程池的实现原理?

execute 办法源码

 public void execute(Runnable command) {// 如果工作为 null,则抛出异样。if (command == null)
            throw new NullPointerException();        // ctl 中保留的线程池以后的一些状态信息  AtomicInteger        int c = ctl.get();        // 判断以后线程池中执行的工作数量是否小于 corePoolSize        if (workerCountOf(c) < corePoolSize) {// 如果小于,则通过 addWorker 新建一个线程,而后,启动该线程从而执行工作。if (addWorker(command, true))
                return;
            c = ctl.get();}        // 通过 isRunning 办法判断线程池状态        // 线程池处于 RUNNING 状态才会被并且队列能够退出工作        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就须要从工作队列中移除工作。// 并尝试判断线程是否全副执行结束。同时执行回绝策略。if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果以后线程池为空就新创建一个线程并执行。else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        // 通过 addWorker 新建一个线程,并将工作 (command) 增加到该线程中;// 而后,启动该线程从而执行工作。// 如果 addWorker 执行失败,则通过 reject()执行相应的回绝策略的内容。else if (!addWorker(command, false))
            reject(command);
    }

线程池创立线程的时候,会将线程封装成工作线程 Worker,Worker 在执行实现工作之后,还会循环获取工作队列利的工作来执行。

 final void runWorker(Worker w) {Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {while (task != null || (task = getTask()) != null) {// 循环执行工作
                w.lock();                // 如果线程池正在进行,确保线程被中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {beforeExecute(wt, task);                    Throwable thrown = null;
                    try {task.run();                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);                    }                } finally {
                    task = null;
                    w.completedTasks++;                    w.unlock();}            }            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly);        }    }

  1. 线程池判断 外围线程池【corePoolSize】里的线程是否都在执行工作。如果不是,则创立一个新的工作线程来执行工作。如果外围线程池里的线程都在执行工作,则进入下个流程。
  2. 线程池判断 工作队列【BlockingQueue】是否曾经满。如果工作队列没有满,则将新提交的工作存储在这个工作队列里。如果工作队列满了,则进入下个流程。
  3. 线程池判断 线程池 【maximumPoolSize】的线程是否都处于工作状态。如果没有,则创立一个新的工作线程来执行工作。如果曾经满了,则交给 饱和策略【RejectedExecutionHandler.rejectedExecution()】来解决这个工作。

如何应用线程池?

ThreadPoolExecutor 重要剖析

构造方法的重要参数

ThreadPoolExecutor 办法的结构参数有很多,咱们看看最长的那个就能够了:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:外围线程数定义了 最小能够同时运行的线程数量
  • maximumPoolSize:当队列中寄存的工作达到队列容量的时候 ,以后能够同时运行的线程数量变为 最大线程数。【如果应用的无界队列,这个参数就没啥成果】
  • workQueue: 当新工作来的时候会先判断以后运行的线程数量是否达到外围线程数,如果达到外围线程数的话,新工作就会被寄存在队列中
  • keepAliveTime: 当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的工作提交,外围线程外的线程不会立刻销毁,而是会期待,直到期待的工夫超过了 keepAliveTime 才会被回收销毁。
  • unit:keepAliveTime 的工夫单位。
  • threadFactory:用于设置创立线程的工厂,能够通过线程工厂给每个创立进去的线程设置更有意义的名字。
  • handler:饱和策略,以后同时运行的线程数量达到最大线程数量【maximumPoolSize】并且队列也曾经被放满时,执行饱和策略。

线程池的简略应用

public class ThreadPoolTest {
    private static final int CORE_POOL_SIZE = 5; // 外围线程数
    private static final int MAX_POOL_SIZE = 10; // 最大线程数
    private static final int QUEUE_CAPACITY = 100; // 工作队列的容量
    private static final Long KEEP_ALIVE_TIME = 1L; // 等待时间
    public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(                CORE_POOL_SIZE,                MAX_POOL_SIZE,                KEEP_ALIVE_TIME,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(QUEUE_CAPACITY),                new ThreadPoolExecutor.AbortPolicy());        for(int i = 0; i < 10 ; i ++){Runnable worker = new MyRunnable(""+ i); // 创立工作
            threadPool.execute(worker); // 通过 execute 提交
        }        threadPool.shutdown();        while(!threadPool.isTerminated()){}        System.out.println("Finished all threads");
    }}class MyRunnable implements Runnable {private String command;    MyRunnable(String s) {this.command = s;}    @Override    public void run() {        System.out.println(Thread.currentThread().getName() + "Start. Time =" + new Date());
        processCommand();        System.out.println(Thread.currentThread().getName() + "End. Time =" + new Date());
    }    private void processCommand() {        try {            Thread.sleep(5000);
        } catch (InterruptedException e) {e.printStackTrace();        }    }    @Override    public String toString() {        return this.command;}}

工作队列有哪些?

  • ArrayBlockingQueue:基于数组构造的有界阻塞队列,按 FIFO 准则对元素进行排序。
  • LinkedBlockingQueue:基于链表构造的阻塞队列,按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue,Executors.newFixedThreadPool()就是应用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作始终处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,Executors.newCachedThreadPool()就是应用了这个队列。
  • PriorityBlockingQueue:一个具备优先级的有限阻塞队列。

饱和策略有哪些呢?

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException 来回绝新工作的解决。【默认的饱和策略】
  • ThreadPoolExecutor.CallerRunsPolicy【提供可伸缩队列】:调用执行本人的线程运行工作,也就是间接在调用 execute 办法的线程中运行 (run) 被回绝的工作,如果执行程序已敞开,则会抛弃该工作。因而这种策略会升高对于新工作提交速度,影响程序的整体性能。如果您的应用程序能够接受此提早并且你要求任何一个工作申请都要被执行的话,你能够抉择这个策略。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run(); } }
  • ThreadPoolExecutor.DiscardPolicy: 不解决新工作,间接抛弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将抛弃最早的未解决的工作申请,并执行当前任务。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll(); e.execute(r); } }

当然,也能够依据须要自定义回绝策略,须要实现 RejectedExecutionHandler。

如何创立线程池?

一、应用 ThreadPoolExecutor 的各种构造方法。

二、通过 Executor 框架的工具类 Executors 能够创立三种类型的 ThreadPoolExecutor。

《阿里巴巴 Java 开发手册》中 强制线程池不容许应用 Executors 去创立,而是通过 ThreadPoolExecutor 的形式,这样的解决形式让写的同学更加明确线程池的运行规定,躲避资源耗尽的危险

Executors 返回线程池对象的弊病如下:FixedThreadPool 和 SingleThreadExecutor:容许申请的队列长度为 Integer.MAX_VALUE,可能沉积大量的申请,从而导致 OOM。CachedThreadPool 和 ScheduledThreadPool:容许创立的线程数量为 Integer.MAX_VALUE,可能会创立大量线程,从而导致 OOM。

执行 execute 办法和 submit 办法的区别?

  • execute()办法用于提交不须要返回值的工作,所以无奈判断工作是否被线程池执行胜利与否;
threadPool.execute(new Runnable() {
    @Override
    public void run() {}}); // 通过 execute 提交
  • submit()办法用于提交须要返回值的工作。线程池会返回一个 Future 类型的对象,通过这个 Future 对象能够判断工作是否执行胜利 ,并且能够通过 Future 的 get() 办法来获取返回值,get()办法会阻塞以后线程直到工作实现,而应用 get(long timeout,TimeUnit unit)办法则会阻塞以后线程一段时间后立刻返回,这时候有可能工作没有执行完。
Future<Object> future = threadPool.submit(hasReturnValueTask);
try{Object s = future.get();
}catch(InterruptedException e){// 解决中断异样}catch(ExecutionException e){// 解决无奈执行工作异样}finally{threadPool.shutdown();
}

如果感觉本文对你有帮忙,能够点赞关注反对一下

退出移动版