关于java:基础篇高并发一瞥线程和线程池的总结

24次阅读

共计 8498 个字符,预计需要花费 22 分钟才能阅读完成。

  • 过程是执行程序的实体,零碎的调度执行单元,领有独属的过程空间(内存、磁盘等)。而线程是过程的一个执行流程,一个过程可蕴含多个线程,共享该过程的所有资源:代码段,数据段(全局变量和动态变量),堆存储;但每个线程领有本人的执行栈和局部变量
  • 过程创立要分配资源,过程切换既要保留以后过程环境,也要设置新过程环境,开销大;而线程共享过程的资源,共享局部不需重调配、切换,线程的创立切换是小于过程的。因而更偏差应用线程晋升程序的并发性
  • 线程又分内核态和用户态,内核态可被零碎感知调度执行;用户态是用户程序级别的,零碎不知线程的存在,线程调度由程序负责

1 JAVA 线程的实现原理

  • java 的线程是基于操作系统原生的线程模型(非用户态),通过零碎调用,将线程交给系统调度执行
  • java 线程领有属于本人的虚拟机栈,当 JVM 将栈、程序计数器、工作内存等筹备好后,会调配一个零碎原生线程来执行。Java 线程完结,原生线程随之被回收
  • 原生线程初始化结束,会调 Java 线程的 run 办法。当 JAVA 线程完结时,则开释原生线程和 Java 线程的所有资源
  • java 办法的执行对应虚拟机栈的一个栈帧,用于存储局部变量、操作数栈、动静链接、办法进口等

2 JAVA 线程的生命周期

  • New(新建状态):用 new 关键字创立线程之后,该线程处于新建状态,此时仅由 JVM 为其分配内存,并初始化其成员变量
  • Runnable(就绪状态):当调用 Thread.start 办法后,该线程处于就绪状态。JVM 会为其调配虚拟机栈等,而后期待系统调度
  • Running(运行状态):处于就绪状态的线程取得 CPU,执行 run 办法时,则线程处于运行状态
  • Blocked(阻塞状态):阻塞状态是指线程放弃了 cpu 的使用权(join,sleep 函数的调用),处于暂进行状态。Blocked 状态的线程须要复原到 Runnable 状态,能力再次被系统调度执行变成 Running
  • Dead(线程死亡):线程失常 run 完结、或抛出一个未捕捉的 Throwable、调用 Thread.stop 来完结该线程,都会导致线程的死亡

  • java 线程和 linux 线程的生命周期根本是一一对应了,就是多了 new 阶段

3 JAVA 线程的罕用办法

  • 线程启动函数
//Thread.java
// 调用 start 启动线程,进入 Runnable 状态,期待系统调度执行
public synchronized void start(){//synchronized 同步执行
    if (threadStatus != 0) //0 代表 new 状态,非 0 则抛出谬误
            throw new IllegalThreadStateException();
    ...
    start0(); // 本地办法办法 private native void start0()
    ...
}
//Running 状态,新线程执行的代码办法,可被子类重写
public void run() {if (target != null) {//target 是 Runnable,new Thread(Runnable)时传入
        target.run();}
}
  • 线程终止函数
//Thread.java
@Deprecated public final void stop();
// 中断线程
public void interrupt()
// 判断的是以后线程是否处于中断状态
public static boolean interrupted()
  • 用 stop 会强行终止线程,导致线程所持有的全副锁忽然开释(不可管制),而被锁突同步的逻辑受到毁坏。不倡议应用
  • interrupt 函数中断线程,但它不肯定会让线程退出的。它比 stop 函数优雅,可管制

    • 当线程处于调用 sleep、wait 的阻塞状态时,会抛出 InterruptedException,代码外部捕捉,而后完结线程
    • 线程处于非阻塞状态,则须要程序本人调用 interrupted()判断,再决定是否退出
  • 其余罕用办法
//Thread.java
// 阻塞期待其余线程
public final synchronized void join(final long millis)
// 临时让出 CPU 执行
public static native void yield();
// 休眠一段时间
public static native void sleep(long millis) throws InterruptedException;
  • start 与 run 办法的区别

    • start 是 Thread 类的办法,从线程的生命周期来看,start 的执行并不意味着新线程的执行,而是让 JVM 调配虚拟机栈,进入 Runnable 状态,start 的执行还是在旧线程上
    • run 则是新线程被系统调度,获取 CPU 时执行的办法,函数 run 则是继承 Thread 重写的 run 或者实现接口 Runnable 的 run
  • Thread.sleep 与 Object.wait 区别

    • Thread.sleep 须要指定休眠工夫,工夫一到可持续运行;和锁机制无关,没有加锁也不必开释锁
    • Object.wait 须要在 synchronized 中调用,否则报 IllegalMonitorStateException 谬误。wait 办法会开释锁,须要调用雷同锁对象 Object.notify 来唤醒线程

4 线程池及其长处

  • 线程的每次应用创立,完结销毁是十分微小的开销。若用缓存的策略(线程池),暂存已经创立的线程,复用这些线程,能够缩小程序的耗费,进步线程的利用率
  • 升高资源耗费:反复利用线程可升高线程创立和销毁造成的耗费
  • 进步响应速度:当工作达到时,不须要期待线程创立就能立刻执行
  • 进步线程的可管理性:应用线程池能够进行对立的调配,监控和调优

5 JDK 封装的线程池

//ThreadPoolExecutor.java
public ThreadPoolExecutor(
    int corePoolSize, 
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) 
  • 1 corePoolSize:外围线程数,线程池维持的线程数量
  • 2 maximumPoolSize:最大的线程数,当阻塞队列不可再接受任务时且 maximumPoolSize 大于 corePoolSize 则会创立非核心线程来执行。但工作执行时,会被销毁
  • 3 keepAliveTime:非核心线程在空闲间的存活工夫
  • 4 TimeUnit:和 keepAliveTime 配合应用,示意 keepAliveTime 参数的工夫单位
  • 5 workQueue:工作的期待阻塞队列,正在执行的工作数超过 corePoolSize 时,退出该队列
  • 6 threadFactory:线程的创立工厂
  • 7 handler:回绝策略,线程数达到了 maximumPoolSize,还有工作提交则应用回绝策略解决

6 线程池原理之执行流程

//ThreadPoolExecutor.java
public void execute(Runnable command) {
    ...
    if (workerCountOf(c) < corePoolSize) { //plan A
        if (addWorker(command, true))  
            return;
        c = ctl.get();}
    if (isRunning(c) && workQueue.offer(command)) { //plan B
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //addWorker(command, false) false 代表可创立非核心线程来执行工作
    else if (!addWorker(command, false)) //plan C
        reject(command);    // //plan D
}
  • plan A:工作的 execute,先判断外围线程数量达到下限;否,则创立外围线程来执行工作;是,则执行 plan B
  • plan B:当工作数大于外围数时,工作被退出阻塞队列,如果超过阻塞队列的容量下限,执行 C
  • plan C: 阻塞队列不能接受任务时,且设置的 maximumPoolSize 大于 corePoolSize,创立新的非核心线程执行工作
  • plan D:当 plan A、B、C 都无能为力时,应用回绝策略解决

7 阻塞队列的简略理解

  • 队列的阻塞插入:当队列满时,队列会阻塞插入元素的线程,直到队列不满
  • 队列的阻塞移除:当队列为空时,获取元素的线程会期待队列变为非空
  • BlockingQueue 提供的办法如下,其中 put 和 take 是阻塞操作
操作方法 抛出异样 返回非凡值 阻塞线程 超时退出
插入元素 add(e) offer(e) put(e) offer(e, timeout, unit)
移除元素 remove() poll() take() pull(timeout, unit)
查看 element() peek()
  • ArrayBlockingQueue

    • ArrayBlockingQueue 是用数组实现的 有界阻塞队列 ,必须指定队列大小,先进先出(FIFO) 准则排队
  • LinkedBlockingQueue

    • 是用链表实现的 有界阻塞队列,如果结构 LinkedBlockingQueue 时没有指定大小,则默认是 Integer.MAX_VALUE,无限大
    • 该队列生产端和生产端应用独立的锁来控制数据操作,以此来进步队列的并发性
  • PriorityBlockingQueue

    • public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
    • 基于数组,元素具备优先级的 无界阻塞队列,优先级由 Comparator 决定
    • PriorityBlockingQueue 不会阻塞生产者,却会在没有可生产的工作时,阻塞消费者
  • DelayQueue

    • 反对延时获取元素的 无界阻塞队列,基于 PriorityQueue 实现
    • 元素必须实现 Delayed 接口,指定多久能力从队列中获取该元素。
    • 可用于缓存零碎的设计、定时任务调度等场景的应用
  • SynchronousQueue

    • SynchronousQueue 是一种无缓冲的期待队列,增加一个元素必须期待被取走后能力持续增加元素
  • LinkedTransferQueue

    • 由链表组成的 TransferQueue无界阻塞队列,相比其余队列多了 tryTransfer 和 transfer 函数
    • transfer:以后有消费者正在期待元素,则间接传给消费者,否则存入队尾,并阻塞期待元素被生产才返回
    • tryTransfer:试探传入的元素是否能间接传给消费者。如果没消费者期待生产元素,元素退出队尾,返回 false
  • LinkedBlockingDeque

    • LinkedBlockingDeque 是由链表构建的双向阻塞队列,多了一端可操作入队出队,少了一半的竞争,进步并发性

8 Executors 的四种线程池浅析

  • newFixedThreadPool
//Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
}
  • 指定外围线程数,队列是 LinkedBlockingQueue 无界阻塞队列,永远不可能回绝工作;适宜用在稳固且固定的并发场景,倡议线程设置为 CPU 核数
  • newCachedThreadPool
//Executors.java
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
}
  • 外围池大小为 0,线程池最大线程数为最大整型,工作提交先退出到阻塞队列中,非核心线程 60s 没工作执行则销毁,阻塞队列为 SynchronousQueue。newCachedThreadPool 会一直的创立新线程来执行工作,不倡议用
  • newScheduledThreadPool
//Executors.java
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}
// 指定提早执行工夫    
public <V> ScheduledFuture<V> 
schedule(Callable<V> callable, long delay, TimeUnit unit)    
  • ScheduledThreadPoolExecutor(STPE)其实是 ThreadPoolExecutor 的子类,可指定外围线程数,队列是 STPE 的外部类 DelayedWorkQueue。STPE 的益处是 A 延时可执行工作,B 可执行带有返回值的工作
  • newSingleThreadExecutor
//Executors.java
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>())); // 无界队列
}
  • 和 newFixedThreadPool 构造方法统一,不过线程数被设置为 1 了。SingleThreadExecutor 比 new 个线程的益处是;线程运行时抛出异样的时候会有新的线程退出线程池实现接下来的工作;阻塞队列能够保障工作按 FIFO 执行

9 如果优雅地敞开线程池

  • 线程池的敞开,就要先敞开池中的线程,上文第三点有提,暴力强制性 stop 线程会导致同步数据的不统一,因而咱们要调用 interrupt 敞开线程
  • 而线程池提供了两个敞开办法,shutdownNow 和 shuwdown
  • shutdownNow:线程池拒接管新工作,同时立马敞开线程池(进行中的工作会执行完),队列的工作不再执行,返回未执行工作 List
public List<Runnable> shutdownNow() {
    ...
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); // 加锁
    try {checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers(); //interrupt 敞开线程
        tasks = drainQueue(); // 未执行工作
    ...    
  • shuwdown:线程池拒接管新工作,同时期待线程池里的工作执行结束后敞开线程池,代码和 shutdownNow 相似就不贴了

10 线程池为什么应用的是阻塞队列

先思考下为啥线程池的线程不会被开释,它是怎么治理线程的生命周期的呢

//ThreadPoolExecutor.Worker.class
final void runWorker(Worker w) {
    ...
    // 工作线程会进入一个循环获取工作执行的逻辑
    while (task != null || (task = getTask()) != null)
    ...
}

private Runnable getTask(){
    ...
    Runnable r = timed ? 
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 
        : workQueue.take(); // 线程会阻塞挂起期待工作,...    
}

能够看出,无工作执行时,线程池其实是利用阻塞队列的 take 办法挂起,从而维持外围线程的存活

11 线程池的 worker 继承 AQS 的意义

//Worker class,一个 worker 一个线程
Worker(Runnable firstTask) {
    // 禁止新线程未开始就被中断
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

final void runWorker(Worker w) {
    ....
    // 对应结构 Worker 是的 setState(-1)
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
        ....
        w.lock(); // 加锁同步
        ....
        try {
            ...
            task.run();
            afterExecute(task, null);
        } finally {
            ....
            w.unlock(); // 开释锁}

worker 继承 AQS 的意义:A 禁止线程未开始就被中断;B 同步 runWorker 办法的解决逻辑

12 回绝策略

  • AbortPolicy 抛弃工作并抛出 RejectedExecutionException 异样
  • DiscardOldestPolicy 抛弃队列最后面的工作,而后从新提交被回绝的工作
  • DiscardPolicy 抛弃工作,然而不抛出异样
  • CallerRunsPolicy

A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.

如果工作被回绝了,则由 提交工作的线程 执行此工作

13 ForkJoinPool 理解一波

  • ForkJoinPool 和 ThreadPoolExecutor 不同,它适宜执行能够合成子工作的工作,如树的遍历,归并排序等一些递归场景


  • ForkJoinPool 每个线程有一个对应的双端队列 deque;当线程中的工作被 fork 决裂,决裂进去的子工作会放入线程本人的 deque,缩小线程的竞争
  • work-stealing 工作窃取算法


当线程执行完本人 deque 的工作,且其余线程 deque 还有多的工作,则会启动窃取策略,从其余线程 deque 队尾获取线程

  • 应用 RecursiveTask 实现 ForkJoin 流程 demo
// 该 demo 代码是援用别人的,如有侵权,请分割我
public class ForkJoinPoolTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ForkJoinPool forkJoinPool = new ForkJoinPool();
        for (int i = 0; i < 10; i++) {ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));
            System.out.println(task.get());
        }
    }
    static class Fibonacci extends RecursiveTask<Integer> {
        int n;
        public Fibonacci(int n) {this.n = n;}
        @Override
        protected Integer compute() {if (n <= 1) {return n;}
            Fibonacci fib1 = new Fibonacci(n - 1);
            fib1.fork(); // 相当于开启新线程执行
            Fibonacci fib2 = new Fibonacci(n - 2);
            fib2.fork(); // 相当于开启新线程执行
            return fib1.join() + fib2.join(); // 合并返回后果
        }
    }
}

首发掘金网站,心愿大家反对下

https://juejin.im/post/5f016b…

欢送指注释中谬误

关注公众号,一起交换

参考文章

  • Java 线程和操作系统线程的关系
  • 线程的 3 种实现形式
  • 如何优雅的敞开 Java 线程池
  • Java 程序员必备的一些流程图
  • JDK 提供的四种线程池
  • 7 种阻塞队列相干整顿
  • 六种常见的线程池含 ForkJoinPool

正文完
 0