关于java:快速掌握并发编程CountDownLatch原理和实战

关注Java后端技术全栈”**

回复“000”获取大量电子书

常见面试题

如何实现让主线程等所有子线程执行完了后,次要线程再继续执行?即如何实现一个线程等其余线程执行完了后再继续执行?

办法一

在后面的文章中咱们介绍了Thread类的join办法:疾速把握并发编程—Thread罕用办法,join的工作原理是,不停查看thread是否存活,如果存活则让以后线程永远wait,直到thread线程终止,线程的notifyAll就会被调用。

上面咱们就应用join来实现下面面试题。

import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
    public static void main(String[] args) {
        System.out.println("次要线程开始期待其余子线程执行");
        try {
            test();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void test() throws InterruptedException {
       Thread thread1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " 线程开始");
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(10000) + 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println( Thread.currentThread().getName() + " 线程执行结束");
        },"线程1");
       thread1.start();
        Thread thread2 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " 线程开始");
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(10000) + 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 线程执行结束");
        },"线程2");
        thread2.start();
        Thread thread3 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " 线程开始");
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(10000) + 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println( Thread.currentThread().getName() + " 线程执行结束");
        },"线程3");
        thread3.start();
        Thread thread4 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " 线程开始");
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(10000) + 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 线程执行结束");
        },"线程4");
        thread4.start();
        //启动了四个线程,而后让四个线程始终检测本人是否曾经完结
        thread1.join();
        thread2.join();
        thread3.join();
        thread4.join();
        System.out.println("主线程继续执行");
        //todo 业务代码
    }
}

运行后果

次要线程开始期待其余子线程执行
线程1 线程开始
线程2 线程开始
线程3 线程开始
线程4 线程开始
线程3 线程执行结束
线程2 线程执行结束
线程1 线程执行结束
线程4 线程执行结束
主线程继续执行

主线程持续干活是要等后面四个线程全副执行结束后再持续的。然而这么搞有点麻烦,那就是每个线程都得调用join办法,有没有更好玩的的呢?

答案是有的,它来了。

它就是juc上面的一个很牛逼的并发工具类CountDownLatch。是JDK1.5的时候有的,话中有话就是在JDK1.5之前就只能用join办法了。

办法二

CountDownLatch中咱们次要用到两个办法一个是await()办法,调用这个办法的线程会被阻塞,另外一个是countDown()办法,调用这个办法会使计数器减一,当计数器的值为0时,因调用await()办法被阻塞的线程会被唤醒,继续执行。请看代码:

import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
    public static void main(String[] args) {
        System.out.println("次要线程开始期待其余子线程执行");
        test();
    }
    public static void test() {
        int threadCount = 5;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int finalI = i + 1;
            new Thread(() -> {
                System.out.println("第 " + finalI + " 线程开始");
                Random random = new Random();
                try {
                    Thread.sleep(random.nextInt(10000) + 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第 " + finalI + " 线程执行结束");
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(threadCount + " 个线程全副执行结束");
        System.out.println("主线程继续执行");
        //todo业务代码
    }
}

输入

次要线程开始期待其余子线程执行
第 1 线程开始
第 2 线程开始
第 3 线程开始
第 4 线程开始
第 5 线程开始
第 1 线程执行结束
第 2 线程执行结束
第 5 线程执行结束
第 4 线程执行结束
第 3 线程执行结束
5 个线程全副执行结束
主线程继续执行

面试能把这两种形式说进去,证实你还是能够解决这个问题。

但问题来了,如果面试官问你实现原理,你却答复不上来,就会给人你在瞎用的感觉,这样好不容易后面拿到点好印象后果被打回原型。

至于join的原理,倡议去看看我之前发的线程罕用办法里:疾速把握并发编程—Thread罕用办法,那外面说的很分明了,所这里就不在反复了。

明天咱们着重了了CountDownLatch。

CountDownLatch

概念

CountDownLatch能够使一个获多个线程期待其余线程各自执行结束后再执行。

CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递加为0之前,阻塞队列外面的线程处于挂起状态,当计数器递加到0时会唤醒阻塞队列所有线程,这里的计数器是一个标记,能够示意一个工作一个线程,也能够示意一个倒计时器,CountDownLatch能够解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

整体

罕用办法

构造方法

咱们在下面的案例中

 int threadCount = 5;
 CountDownLatch countDownLatch = new CountDownLatch(threadCount);

有用到new CountDownLatch(threadCount);来创立一个CountDownLatch实例对象。咱们看看这个构造方法

private final Sync sync;
public CountDownLatch(int count) { 
    //记者count值不能小于0
    if (count < 0) throw new IllegalArgumentException("count < 0");
    //创立一个Sync实例对象入参就是count
    this.sync = new Sync(count);
}

而后这里有个外部类Sync,这个Sync外部类也没几行代码,Sync继承了AbstractQueuedSynchronizer形象队列同步器(以下简称AQS)。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        //入参count
        Sync(int count) {
            //这个setState办法还记得否?就是上篇文章AQS中的setState()办法
            //就是给AQS中的state赋值,state=count
            setState(count);
        }
        //获取AQS中state的值
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        //死循环
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取AQS中的state
                int c = getState();
                //如果AQS中的state==0,就返回false
                if (c == 0)  return false;
                int nextc = c-1;
                //nextc=state-1
                //
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
 }
countDown办法
public void countDown() {
    //调用的就是AQS中的办法
    sync.releaseShared(1);
}

AQS中releaseShared办法

public final boolean releaseShared(int arg) {
    // arg 为固定值 1
    // 如果计数器state 为0 返回true,前提是调用 countDown() 之前不能曾经为0
    //tryReleaseShared在AQS是空办法
    if (tryReleaseShared(arg)) {
      // 唤醒期待队列的线程
        doReleaseShared();
         return true;
    }
    return false;
}
protected boolean tryReleaseShared(int arg) {
   throw new UnsupportedOperationException();
}

这个办法tryReleaseShared()是在CountDownLatch中外部类Sync中实现的

//其实也没什么新招
//还是死循环+CAS配合 实现计数器state减1
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)  return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
           return nextc == 0;
     }
}

办法doReleaseShared却是AQS种实现的(因为CountDownLatch和其内部类都没有实现,只有AQS实现了,那就只认AQS中的实现了)。

//实现思路就是从头到尾的遍历列队中所有的节点为shared状态的
private void doReleaseShared() {
        //死循环
        for (;;) {
            //获取以后列队的头节点
            Node h = head;
            //列队中可能为空列队,也有可能只有一个node节点
            if (h != null && h != tail) {
                //获取头节点的状态
                int ws = h.waitStatus;
                //如果头节点为SIGNAL状态, 阐明后继节点须要唤醒
                if (ws == Node.SIGNAL) {
                    //将头结点的waitstatue设置为0,当前就不会再次唤醒后继节点了。
                    //这一步是为了解决并发问题,保障只unpark一次!!不胜利就持续
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //(开释)唤醒头节点的后继节点
                    unparkSuccessor(h);
                }// 状态为0并且不胜利,持续
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;// loop on failed CAS
            }
            // 若头结点扭转,持续循环 
            if (h == head) // loop if head changed
                break;
        }
}

整个调用逻辑大抵为

await办法

在CountDownLatch中await犯法

public void await() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);
}

而后调用AQS中的

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
      //判断是否被中断过
      if (Thread.interrupted()) throw new InterruptedException();
      //如果state不等于0的时候
      if (tryAcquireShared(arg) < 0){
            doAcquireSharedInterruptibly(arg);
      }
}

其中办法tryAcquireShared(arg)是CountDownLatch的外部类Sync的tryAcquireShared办法

protected int tryAcquireShared(int acquires) {
  //判断AQS中的state是否曾经等于0了,等于翻译1否则返回-1
  return (getState() == 0) ? 1 : -1;
}

再调用AQS中的doAcquireSharedInterruptibly办法

 //这个办法就是将以后线程封装成node节点退出到列队中,并判断是否须要阻塞以后线程
 //这个节点都会被设置成shared状态,这样做的目标时当state值为0时会唤醒所有shared的节点
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //这个办法应该很相熟了吧,后面的文章都介绍过,将以后线程封装成节点退出到列队中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            //(又是死循环)始终执行,直到获取锁,返回
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                //前驱节点为头结点
                if (p == head) {
                    //所以再次尝试获取信号量,这就是下面剖析的那个获取办法
                    int r = tryAcquireShared(arg);
                    //如果r大于0证实获取信号量获取胜利了证实下一个能够获取信号量的线程是以后线程
                    if (r >= 0) {
                        //将以后节点变成列队的head节点而后返回
                        setHeadAndPropagate(node, r);
                        //不便GC
                        p.next = null; 
                        failed = false;
                        return;
                    }
                }
               //判断是否要进入阻塞状态.如果shouldParkAfterFailedAcquire办法
               //返回true,示意须要进入阻塞
               //调用parkAndCheckInterrupt;否则示意还能够再次尝试获取锁,持续进行for循环
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            //失败就放弃
            if (failed){
                cancelAcquire(node);
            }
        }
}

办法shouldParkAfterFailedAcquire是AQS的

//p是前驱结点,node是以后结点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //获取前驱节点的状态
    if (ws == Node.SIGNAL) //表明前驱节点能够运行
        return true;
    if (ws > 0) { //如果前驱节点状态大于0表明曾经中断,
        do {
            node.prev = pred = pred.prev; 
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //等于0进入这里
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
    }
    //只有前节点状态为NodeSIGNAL才返回真
    return false; 
}

咱们对shouldParkAfterFailedAcquire来进行一个整体的概述,首先应该明确节点的状态,节点的状态是为了表明以后线程的良好度,如果以后线程被打断了,在唤醒的过程中是不是应该疏忽该线程

 static final class Node {
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
       //....

目前你只需晓得大于0时表明该线程已近被勾销,已近是有效节点,不应该被唤醒,留神:初始化链头节点时头节点状态值为0。

shouldParkAfterFailedAcquire是位于有限for循环内的,这一点须要留神个别每个节点都会经验两次循环后而后被阻塞。

在AQS的doAcquireSharedInterruptibly中可能会再次调用CountDownLatch的外部类Sync的tryAcquireShared办法和AQS的setHeadAndPropagate办法。setHeadAndPropagate办法源码如下。

private void setHeadAndPropagate(Node node, int propagate) {
        // 获取头结点
        Node h = head; 
        // 设置头结点
        setHead(node);
        // 进行判断
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 获取节点的后继
            Node s = node.next;
            if (s == null || s.isShared()) // 后继为空或者为共享模式
                // 以共享模式进行开释
                doReleaseShared();
        }
    }

该办法设置头结点并且开释头结点前面的满足条件的结点,该办法中可能会调用到AQS的doReleaseShared办法,其源码如下。

private void doReleaseShared() {
        // 有限循环
        for (;;) {
            // 保留头结点
            Node h = head;
            if (h != null && h != tail) { // 头结点不为空并且头结点不为尾结点
                // 获取头结点的期待状态
                int ws = h.waitStatus; 
                if (ws == Node.SIGNAL) { // 状态为SIGNAL
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不胜利就持续
                        continue;            // loop to recheck cases
                    // 开释后继结点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不胜利,持续
                    continue;                // loop on failed CAS
            }
            if (h == head) // 若头结点扭转,持续循环 
                break;
        }
    }

CountDownLatch的await调用大抵会有如下的调用链

经典应用场景

CountDownLatch的一个十分典型的利用场景是:有一个工作想要往下执行,但必须要等到其余的工作执行结束后才能够持续往下执行。如果咱们这个想要持续往下执行的工作调用一个CountDownLatch对象的await()办法,其余的工作执行完本人的工作后调用同一个CountDownLatch对象上的countDown()办法,这个调用await()办法的工作将始终阻塞期待,直到这个CountDownLatch对象的计数值减到0为止。

案例1

举个例子,有三个工人在为老板干活,这个老板有一个习惯,就是当三个工人把一天的活都干完了的时候,他就来查看所有工人所干的活。记住这个条件:三个工人先全副干完活,老板才查看。

案例2

比方读取excel表格,须要把execl表格中的多个sheet进行数据汇总,为了进步汇总的效率咱们个别会开启多个线程,每个线程去读取一个sheet,可是线程执行是异步的,咱们不晓得什么时候数据处理完结了。那么这个时候咱们就能够使用CountDownLatch,有几个sheet就把state初始化几。每个线程执行完就调用countDown()办法,在汇总的中央加上await()办法,当所有线程执行完了,就能够进行数据的汇总了。

END

扫描关注公众号“Java后端技术全栈”

解锁程序员的狂野世界