1. 多线程 J.U.C

1.1 线程池

1.1.1 线程回顾

1)回顾线程创立形式

  • 继承Thread
  • 实现Runnable

2)线程的状态

  • NEW:刚刚创立,没做任何操作

    Thread thread = new Thread();System.out.println(thread.getState());
  • RUNNABLE:调用run,能够执行,但不代表肯定在执行(RUNNING,READY)

    thread.start();System.out.println(thread.getState());
  • BLOCKED:抢不到锁

            final byte[] lock = new byte[0];        new Thread(new Runnable() {            public void run() {                synchronized (lock){                    try {                        Thread.sleep(3000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        }).start();        Thread thread2 = new Thread(new Runnable() {            public void run() {                synchronized (lock){                }            }        });        thread2.start();        Thread.sleep(1000);        System.out.println(thread2.getState());
  • WAITING

    Thread thread2 = new Thread(new Runnable() {    public void run() {        LockSupport.park();    }});thread2.start();Thread.sleep(500);System.out.println(thread2.getState());LockSupport.unpark(thread2);Thread.sleep(500);System.out.println(thread2.getState());
  • TIMED_WAITING

    Thread thread3 = new Thread(new Runnable() {    public void run() {        try {            Thread.sleep(10000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }});thread3.start();Thread.sleep(500);System.out.println(thread3.getState());
  • TERMINATED

    //期待1s后再来看Thread.sleep(1000);System.out.println(thread.getState());

3)线程池

依据下面的状态,一般线程执行完,就会进入TERMINATED销毁掉,而线程池就是创立一个缓冲池寄存线程,执行完结当前,该线程并不会死亡,而是再次返回线程池中成为闲暇状态,等待下次工作降临,这使得线程池比手动创立线程有着更多的劣势:

  • 升高系统资源耗费,通过重用已存在的线程,升高线程创立和销毁造成的耗费;
  • 进步零碎响应速度,当有工作达到时,通过复用已存在的线程,无需期待新线程的创立便能立刻执行;
  • 不便线程并发数的管控。因为线程若是无限度的创立,可能会导致内存占用过多而产生OOM
  • 节俭cpu切换线程的工夫老本(须要放弃以后执行线程的现场,并复原要执行线程的现场)。
  • 提供更弱小的性能,延时定时线程池。(Timer vs ScheduledThreadPoolExecutor)

4)线程池体系(查看:ScheduledThreadPoolExecutor,ForkJoinPool类图

阐明:

  • 最罕用的是ThreadPoolExecutor
  • 调度用ScheduledThreadPoolExecutor
  • 工作拆分合并用ForkJoinPool
  • Executors是工具类,帮助你创立线程池的

1.1.2 工作机制

1)线程池状态

  • RUNNING:初始化状态是RUNNING。线程池被一旦被创立,就处于RUNNING状态,并且线程池中的工作数为0。RUNNING状态下,可能接管新工作,以及对已增加的工作进行解决。
  • SHUTDOWN:SHUTDOWN状态时,不接管新工作,但能解决已增加的工作。调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

    //shutdown后不承受新工作,然而task1,依然能够执行实现ExecutorService poolExecutor = Executors.newFixedThreadPool(5);poolExecutor.execute(new Runnable() {    public void run() {        try {            Thread.sleep(1000);            System.out.println("finish task 1");        } catch (InterruptedException e) {            e.printStackTrace();        }    }});poolExecutor.shutdown();poolExecutor.execute(new Runnable() {    public void run() {        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }});System.out.println("ok");
  • STOP:不接管新工作,不解决已增加的工作,并且会中断正在解决的工作。调用线程池的shutdownNow()接口时,线程池由(RUNNING 或 SHUTDOWN ) -> STOP

    留神:容易引发不可预知的后果!运行中的工作兴许还会打印,直到完结,因为调的是Thread.interrupt

    //改为shutdownNow后,工作立马终止,sleep被打断,新工作无奈提交,task1进行poolExecutor.shutdownNow();
  • TIDYING:所有的工作已终止,队列中的”工作数量”为0,线程池会变为TIDYING。线程池变为TIDYING状态时,会执行钩子函数terminated(),能够通过重载terminated()函数来实现自定义行为

    //自定义类,重写terminated办法public class MyExecutorService extends ThreadPoolExecutor {    public MyExecutorService(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);    }    @Override    protected void terminated() {        super.terminated();        System.out.println("terminated");    }        //调用 shutdownNow, ternimated办法被调用打印    public static void main(String[] args) throws InterruptedException {        MyExecutorService service = new MyExecutorService(1,2,10000,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(5));        service.shutdownNow();    }}
  • TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED

2)构造阐明

在线程池的编程模式下,工作是提交给整个线程池,而不是间接提交给某个线程,线程池在拿到工作后,就在外部协调闲暇的线程,如果有,则将工作交给某个闲暇的线程。一个线程同时只能执行一个工作,但能够同时向一个线程池提交多个工作。

(源码查看:两个汇合,一个queue,一个hashset)

3)工作的提交

  • 增加工作,如果线程池中线程数没达到coreSize,间接创立新线程执行
  • 达到core,放入queue
  • queue已满,未达到maxSize持续创立线程
  • 达到maxSize,依据reject策略解决
  • 超时后,线程被开释,降落到coreSize

1.1.3 源码分析

//工作提交阶段:(4个if条件路线)public void execute(Runnable command) {  if (command == null)            throw new NullPointerException();  int c = ctl.get();  //判断工作数,如果小于coreSize,addWork,留神第二个参数core=true  if (workerCountOf(c) < corePoolSize) {      if (addWorker(command, true))          return;      c = ctl.get();  }  //否则,如果线程池还在运行,offer到队列  if (isRunning(c) && workQueue.offer(command)) {      //再检查一下状态      int recheck = ctl.get();      //如果线程池曾经终止,间接移除工作,不再响应      if (! isRunning(recheck) && remove(command))          reject(command);      //否则,如果没有可用线程的话(比方coreSize=0),创立一个空work        //该work创立时不会给指派工作(为null),然而会被放入works汇合,进而从队列获取工作去执行      else if (workerCountOf(recheck) == 0)          addWorker(null, false);  }  //队列也满,持续调addWork,然而留神,core=false,开启到maxSize的大门  //超出max的话,addWork会返回false,进入reject  else if (!addWorker(command, false))      reject(command);}
//线程创立private boolean addWorker(Runnable firstTask, boolean core) {    //第一步,计数判断,不符合条件打回false    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // Check if queue empty only if necessary.        for (;;) {            int wc = workerCountOf(c);            //判断线程数,留神这里!            //也就阐明线程池的线程数是不可能设置任意大的。            //最大29位(CAPACITY=29位二进制)              //超出规定范畴,返回false,示意不容许再开启新工作线程,创立worker失败!            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            if (compareAndIncrementWorkerCount(c))                break retry;            c = ctl.get();  // Re-read ctl            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    //第二步,创立新work放入线程汇合works(一个HashSet)    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        //符合条件,创立新的work并包装task        w = new Worker(firstTask);        final Thread t = w.thread;        if (t != null) {            final ReentrantLock mainLock = this.mainLock;              //加锁,workers是一个hashset,这里要保障线程安全性            mainLock.lock();            try {                             //...                    //在这里!!!                    workers.add(w);                                          //...                                        workerAdded = true;                            } finally {                mainLock.unlock();            }            if (workerAdded) {                //留神,只有是胜利add了新的work,那么将该新work立刻启动,工作失去执行                t.start();                workerStarted = true;            }        }    } finally {        if (! workerStarted)            addWorkerFailed(w);    }    return workerStarted;}
//工作获取与执行  //在worker执行runWorker()的时候,不停循环,先查看本人有没有携带Task,如果有,执行while (task != null || (task = getTask()) != null)//如果没用,会调用getTask,从队列获取工作private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // ...        int wc = workerCountOf(c);        // Are workers subject to culling? - 很形象,要不要乖乖的被“捕杀”?        //判断是不是要超时解决,重点!!!决定了以后线程要不要被开释        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;                //线程数超出max,并且上次循环中poll期待超时了,那么阐明该线程已终止        //将线程队列数量原子性减        if ((wc > maximumPoolSize || (timed && timedOut))            && (wc > 1 || workQueue.isEmpty())) {            //计数器做原子递加,递加胜利后,返回null,for被停止            if (compareAndDecrementWorkerCount(c))                return null;            //递加失败,持续下一轮循环,直到胜利            continue;        }        try {            //重点!!!            //如果线程可被开释,那就poll,开释的工夫为:keepAliveTime            //否则,线程是不会被开释的,take始终被阻塞在这里,直到来了新工作持续工作            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            //到这里阐明可被开释的线程期待超时,曾经销毁,设置该标记,下次循环将线程数缩小            timedOut = true;        } catch (InterruptedException retry) {            timedOut = false;        }    }}

1.1.4 Executors

以上构造函数比拟多,为了方便使用,juc提供了一个Executors工具类,外部提供静态方法

1)newCachedThreadPool() : 弹性线程数

2)newFixedThreadPool(int nThreads) : 固定线程数

3)newSingleThreadExecutor() : 繁多线程数

4)newScheduledThreadPool(int corePoolSize) : 可调度,罕用于定时

1.1.5 经典面试

1)线程池是如何保障线程不被销毁的呢?

答案:如果队列中没有工作时,外围线程会始终阻塞在获取工作的办法,直到返回工作。而工作执行完后,又会进入下一轮 work.runWork()中循环

验证:机密就藏在外围源码里 ThreadPoolExecutor.getTask()

//work.runWork():while (task != null || (task = getTask()) != null)        //work.getTask():boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ?    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    workQueue.take();

2)那么线程池中的线程会处于什么状态?

答案:RUNNABLE,WAITING

验证:起一个线程池,搁置一个工作sleep,debug查看完结前后的状态

//debug add watcher:((ThreadPoolExecutor) poolExecutor).workers.iterator().next().thread.getState()
ThreadPoolExecutor poolExecutor = Executors.newFixedThreadPool(1);poolExecutor.execute(new Runnable() {    public void run() {        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }});System.out.println("ok");

3)外围线程与非核心线程有区别吗?

答案:没有。被销毁的线程和创立的先后无关。即使是第一个被创立的外围线程,依然有可能被销毁

验证:看源码,每个work在runWork()的时候去getTask(),在getTask外部,并没有针对性的辨别以后work是否是外围线程或者相似的标记。只有判断works数量超出core,就会调用poll(),否则take()

1.2 Fork/Join

1.2.1 概念

ForkJoin是由JDK1.7后提供多线并发解决框架。能够了解为一种非凡的线程池。

1.工作宰割:Fork(分岔),先把大的工作宰割成足够小的子工作,如果子工作比拟大的话还要对子工作进行持续宰割。

  2.合并后果:join,宰割后的子工作被多个线程执行后,再合并后果,失去最终的残缺输入。

1.2.2 组成

  • ForkJoinTask:次要提供fork和join两个办法用于工作拆分与合并;个别用子类 RecursiveAction(无返回值的工作)和RecursiveTask(须要返回值)来实现compute办法。

  • ForkJoinPool:调度ForkJoinTask的线程池;

  • ForkJoinWorkerThread:Thread的子类,寄存于线程池中的工作线程(Worker);

  • WorkQueue:工作队列,用于保留工作;

1.2.3 根本应用

一个典型的例子:计算1-1000的和

package com.itheima.thread;import java.util.concurrent.*;public class SumTask {    private static final Integer MAX = 100;    static class SubTask extends RecursiveTask<Integer> {        // 子工作开始计算的值        private Integer start;        // 子工作完结计算的值        private Integer end;        public SubTask(Integer start , Integer end) {            this.start = start;            this.end = end;        }        @Override        protected Integer compute() {            if(end - start < MAX) {                //小于边界,开始计算                System.out.println("start = " + start + ";end = " + end);                Integer totalValue = 0;                for(int index = this.start ; index <= this.end  ; index++) {                    totalValue += index;                }                return totalValue;            }else {                //否则,两头劈开持续拆分                SubTask subTask1 = new SubTask(start, (start + end) / 2);                subTask1.fork();                SubTask subTask2 = new SubTask((start + end) / 2 + 1 , end);                subTask2.fork();                return subTask1.join() + subTask2.join();            }        }    }    public static void main(String[] args) {        ForkJoinPool pool = new ForkJoinPool();        Future<Integer> taskFuture =  pool.submit(new SubTask(1,1000));        try {            Integer result = taskFuture.get();            System.out.println("result = " + result);        } catch (InterruptedException | ExecutionException e) {            e.printStackTrace(System.out);        }    }}

1.2.4 设计思维

  • 一般线程池外部有两个重要汇合:工作线程汇合(一般线程),和工作队列。
  • ForkJoinPool也相似,线程汇合里放的是非凡线程ForkJoinWorkerThread,工作队列里放的是特殊任务ForkJoinTask
  • 不同之处在于,一般线程池只有一个队列。而ForkJoinPool的工作线程ForkJoinWorkerThread每个线程内都绑定一个双端队列。

  • 在fork的时候,也就是工作拆分,将拆分的task会被以后线程放到本人的队列中。
  • 如果有工作,那么线程优先从本人的队列里取工作执行,默认从队尾
  • 当本人队列中执行完后,工作线程会跑到其余队列的队首偷工作来执行。 也就是所说的 “窃取”

1.2.5 留神点

应用ForkJoin将雷同的计算工作通过多线程执行。然而在应用中须要留神:

  • 留神工作切分的粒度,也就是fork的界线。并非越小越好
  • 判断要不要应用ForkJoin。任务量不是太大的话,串行可能优于并行。因为多线程会波及到上下文的切换

1.3 原子操作

1.3.1 概念

原子(atom)本意是“不能被进一步宰割的最小粒子”,而原子操作(atomic operation)意为"不可被中断的一个或一系列操作" 。类比于数据库事务,redis的multi。

1.3.2 CAS

Compare And Set(或Compare And Swap),翻译过去就是比拟并替换,CAS操作蕴含三个操作数——内存地位(V)、预期原值(A)、新值(B)。从第一视角来看,了解为:我认为地位 V 应该是 A,如果是A,则将 B 放到这个地位;否则,不要更改,只通知我这个地位当初的值即可。所以cas外部个别随同着while循环操作,不停的去尝试

juc中提供了Atomic结尾的类,基于cas实现原子性操作,最根本的利用就是计数器

package com.itheima;import java.util.concurrent.atomic.AtomicInteger;public class AtomicCounter {    private static AtomicInteger i = new AtomicInteger(0);    public int get(){        return i.get();    }    public void inc(){        i.incrementAndGet();    }    public static void main(String[] args) throws InterruptedException {        final AtomicCounter counter = new AtomicCounter();        for (int i = 0; i < 10; i++) {            new Thread(new Runnable() {                public void run() {                    counter.inc();                }            }).start();        }        Thread.sleep(3000);        //能够正确输入10        System.out.println(counter.i.get());    }}

注:AtomicInteger源码。基于unsafe类cas思维实现,性能篇会讲到

1.3.3 atomic

下面展现了AtomicInteger,对于atomic包,还有很多其余类型:

  • 根本类型

    • AtomicBoolean:以原子更新的形式更新boolean;
    • AtomicInteger:以原子更新的形式更新Integer;
    • AtomicLong:以原子更新的形式更新Long;
  • 援用类型

    • AtomicReference : 原子更新援用类型
    • AtomicReferenceFieldUpdater :原子更新援用类型的字段
    • AtomicMarkableReference : 原子更新带有标记位的援用类型
  • 数组

    • AtomicIntegerArray:原子更新整型数组里的元素。
    • AtomicLongArray:原子更新长整型数组里的元素。
    • AtomicReferenceArray:原子更新援用类型数组里的元素。
  • 字段

    • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
    • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
    • AtomicStampedReference:原子更新带有版本号的援用类型。

1.3.4 留神!

应用atomic要留神原子性的边界,把握不好会起不到应有的成果,原子性被毁坏。

案例:原子性被毁坏景象

package com.itheima;import java.util.concurrent.atomic.AtomicInteger;public class BadAtomic {    AtomicInteger i = new AtomicInteger(0);    static int j=0;    public void badInc(){        int k = i.incrementAndGet();        try {            Thread.sleep(new Random().nextInt(100));        } catch (InterruptedException e) {            e.printStackTrace();        }        j=k;    }    public static void main(String[] args) throws InterruptedException {        BadAtomic atomic = new BadAtomic();        for (int i = 0; i < 10; i++) {            new Thread(()->{                atomic.badInc();            }).start();        }        Thread.sleep(3000);        System.out.println(atomic.j);    }}

后果剖析:

  • 每次都不一样,总之不是10
  • i是原子性的,没问题。然而再赋值,变成了两部操作,原子性被突破
  • 在badInc上加synchronized,问题解决

1.4 AQS

1.4.1 概念

首先搞清楚,AbstractQueuedSynchronizer形象的队列式同步器,是一个抽象类,这个类在java.util.concurrent.locks包。

除了java自带的synchronized关键字之外,jdk提供的另外一种锁机制。如果须要本人实现灵便的锁逻辑,能够思考应用AQS,十分的便捷。

public abstract class AbstractQueuedSynchronizer    extends AbstractOwnableSynchronizer    implements java.io.Serializable

jdk中应用AQS的线程工具类很多,自旋锁、互斥锁、读锁写锁、信号量、通过类继承关系能够轻松查看,所以说,AQS是juc中很多类的基石。

1.4.2 原理

  • state:状态,int类型的成员变量,当state>0时示意锁有人占着,当state = 0时示意开释了锁。
  • 队列:拿不到锁的线程进队列。

1.4.3 源码

AQS应用了模板设计模式。只须要实现指定的锁获取办法即可,外部的机制AQS已帮你封装好。

(AQS源码idea中查看)

须要子类继承AQS,并实现的办法(protected):

protected boolean tryAcquire(int arg) //独占式获取同步状态protected boolean tryRelease(int arg) //独占式开释同步状态protected int tryAcquireShared(int arg) //共享式获取同步状态protected boolean tryReleaseShared(int arg) //共享式开释同步状态

应用时,调用的是父类的办法(public)

public final void acquire(int arg) //独享锁获取public final boolean release(int arg) //独享锁开释public final void acquireShared(int arg) //共享锁获取public final boolean releaseShared(int arg) //共享锁开释

源码剖析

public abstract class AbstractQueuedSynchronizer    extends AbstractOwnableSynchronizer    implements java.io.Serializable {    //可共享式获取锁,内部调用,模板模式    public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }    //须要实现的局部,空protected办法,被下面的对外办法所调用    protected int tryAcquireShared(int arg) {        throw new UnsupportedOperationException();    }            //同理,锁的开释,模板模式    public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }    protected boolean tryReleaseShared(int arg) {        throw new UnsupportedOperationException();    }            //独占式获取    public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }    protected boolean tryAcquire(int arg) {        throw new UnsupportedOperationException();    }    //独占式开释    public final boolean release(int arg) {        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }    protected boolean tryRelease(int arg) {        throw new UnsupportedOperationException();    }       }    

1.4.4 经典面试

一个阿里面试题:本人实现一个锁,最大容许指定数量的线程并行运作。其余排队等待

package com.itheima;import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class MyLock extends AbstractQueuedSynchronizer {    public MyLock(int count){        setState(count);    }    @Override    protected int tryAcquireShared(int arg) {        //自旋,cas形式不停获取数量        for (; ; ) {            int current = getState();            int newCount = current - arg;            if (newCount < 0 || compareAndSetState(current, newCount)) {                return newCount;            }        }    }    @Override    protected boolean tryReleaseShared(int arg) {        for (; ; ) {            int current = getState();            int newState = current + arg;            if (compareAndSetState(current, newState)) {                return true;            }        }    }    public static void main(String[] args) {        final MyLock lock = new MyLock(3);        for (int i = 0; i < 30; i++) {            new Thread(new Runnable() {                public void run() {                    lock.acquireShared(1);                    try {                        Thread.sleep(1000);                        System.out.println("ok");                    } catch (InterruptedException e) {                        e.printStackTrace();                    } finally {                        lock.releaseShared(1);                    }                }            }).start();        }    }}

验证后果:尽管30个一次性start,然而会每1s输入3个ok,达到了并发管制

1.5 并发容器

juc中还蕴含很多其余的并发容器(理解)

1.ConcurrentHashMap

对应:HashMap

指标:代替Hashtable、synchronizedMap,应用最多,源码篇会具体解说

原理:JDK7中采纳Segment分段锁,JDK8中采纳CAS+synchronized

2.CopyOnWriteArrayList

对应:ArrayList

指标:代替Vector、synchronizedList

原理:高并发往往是读多写少的个性,读操作不加锁,而对写操作加Lock独享锁,先复制一份新的汇合,在新的汇合下面批改,而后将新汇合赋值给旧的援用,并通过volatile 保障其可见性。

查看源码:volatile array,lock加锁,数组复制

3.CopyOnWriteArraySet

对应:HashSet

指标:代替synchronizedSet

原理:与CopyOnWriteArrayList实现原理相似。

4.ConcurrentSkipListMap

对应:TreeMap

指标:代替synchronizedSortedMap(TreeMap)

原理:基于Skip list(跳表)来代替均衡树,依照分层key高低链接指针来实现。

附加:跳表

5.ConcurrentSkipListSet

对应:TreeSet

指标:代替synchronizedSortedSet(TreeSet)

原理:外部基于ConcurrentSkipListMap实现,原理统一

6.ConcurrentLinkedQueue

对应:LinkedList

对应:无界限程平安队列

原理:通过队首队尾指针,以及Node类元素的next实现FIFO队列

7.BlockingQueue

对应:Queue

特点:拓展了Queue,减少了可阻塞的插入和获取等操作

原理:通过ReentrantLock实现线程平安,通过Condition实现阻塞和唤醒

实现类:

  • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
  • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
  • PriorityBlockingQueue:按优先级排序的队列
本文由传智教育博学谷 - 狂野架构师教研团队公布
转载请注明出处!