关于java:快速掌握并发编程CountDownLatch原理和实战

34次阅读

共计 12956 个字符,预计需要花费 33 分钟才能阅读完成。

关注 Java 后端技术全栈 ”**

回复“000”获取大量电子书

常见面试题

如何实现让主线程等所有子线程执行完了后,次要线程再继续执行?即如何实现一个线程等其余线程执行完了后再继续执行?

办法一

在后面的文章中咱们介绍了 Thread 类的 join 办法:疾速把握并发编程 —Thread 罕用办法,join 的工作原理是,不停查看 thread 是否存活,如果存活则让以后线程永远 wait,直到 thread 线程终止,线程的 notifyAll 就会被调用。

上面咱们就应用 join 来实现下面面试题。

import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {public static void main(String[] args) {System.out.println("次要线程开始期待其余子线程执行");
        try {test();
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
    public static void test() throws InterruptedException {Thread thread1 = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "线程开始");
            Random random = new Random();
            try {Thread.sleep(random.nextInt(10000) + 1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "线程执行结束");
        },"线程 1");
       thread1.start();
        Thread thread2 = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "线程开始");
            Random random = new Random();
            try {Thread.sleep(random.nextInt(10000) + 1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "线程执行结束");
        },"线程 2");
        thread2.start();
        Thread thread3 = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "线程开始");
            Random random = new Random();
            try {Thread.sleep(random.nextInt(10000) + 1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "线程执行结束");
        },"线程 3");
        thread3.start();
        Thread thread4 = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "线程开始");
            Random random = new Random();
            try {Thread.sleep(random.nextInt(10000) + 1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "线程执行结束");
        },"线程 4");
        thread4.start();
        // 启动了四个线程,而后让四个线程始终检测本人是否曾经完结
        thread1.join();
        thread2.join();
        thread3.join();
        thread4.join();
        System.out.println("主线程继续执行");
        //todo 业务代码
    }
}

运行后果

 次要线程开始期待其余子线程执行
线程 1 线程开始
线程 2 线程开始
线程 3 线程开始
线程 4 线程开始
线程 3 线程执行结束
线程 2 线程执行结束
线程 1 线程执行结束
线程 4 线程执行结束
主线程继续执行

主线程持续干活是要等后面四个线程全副执行结束后再持续的。然而这么搞有点麻烦,那就是每个线程都得调用 join 办法,有没有更好玩的的呢?

答案是有的,它来了。

它就是 juc 上面的一个很牛逼的并发工具类 CountDownLatch。是 JDK1.5 的时候有的,话中有话就是在 JDK1.5 之前就只能用 join 办法了。

办法二

CountDownLatch 中咱们次要用到两个办法一个是 await() 办法,调用这个办法的线程会被阻塞,另外一个是 countDown() 办法,调用这个办法会使计数器减一,当计数器的值为 0 时,因调用 await() 办法被阻塞的线程会被唤醒,继续执行。请看代码:

import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {public static void main(String[] args) {System.out.println("次要线程开始期待其余子线程执行");
        test();}
    public static void test() {
        int threadCount = 5;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int finalI = i + 1;
            new Thread(() -> {System.out.println("第" + finalI + "线程开始");
                Random random = new Random();
                try {Thread.sleep(random.nextInt(10000) + 1000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println("第" + finalI + "线程执行结束");
                countDownLatch.countDown();}).start();}
        try {countDownLatch.await();
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println(threadCount + "个线程全副执行结束");
        System.out.println("主线程继续执行");
        //todo 业务代码
    }
}

输入

 次要线程开始期待其余子线程执行
第 1 线程开始
第 2 线程开始
第 3 线程开始
第 4 线程开始
第 5 线程开始
第 1 线程执行结束
第 2 线程执行结束
第 5 线程执行结束
第 4 线程执行结束
第 3 线程执行结束
5 个线程全副执行结束
主线程继续执行

面试能把这两种形式说进去,证实你还是能够解决这个问题。

但问题来了,如果面试官问你实现原理,你却答复不上来,就会给人你在瞎用的感觉,这样好不容易后面拿到点好印象后果被打回原型。

至于 join 的原理,倡议去看看我之前发的线程罕用办法里:疾速把握并发编程 —Thread 罕用办法,那外面说的很分明了,所这里就不在反复了。

明天咱们着重了了 CountDownLatch。

CountDownLatch

概念

CountDownLatch 能够使一个获多个线程期待其余线程各自执行结束后再执行。

CountDownLatch 定义了一个计数器,和一个阻塞队列,当计数器的值递加为 0 之前,阻塞队列外面的线程处于挂起状态,当计数器递加到 0 时会唤醒阻塞队列所有线程,这里的计数器是一个标记,能够示意一个工作一个线程,也能够示意一个倒计时器,CountDownLatch 能够解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

整体

罕用办法

构造方法

咱们在下面的案例中

 int threadCount = 5;
 CountDownLatch countDownLatch = new CountDownLatch(threadCount);

有用到 new CountDownLatch(threadCount); 来创立一个 CountDownLatch 实例对象。咱们看看这个构造方法

private final Sync sync;
public CountDownLatch(int count) { 
    // 记者 count 值不能小于 0
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 创立一个 Sync 实例对象入参就是 count
    this.sync = new Sync(count);
}

而后这里有个外部类 Sync,这个 Sync 外部类也没几行代码,Sync 继承了 AbstractQueuedSynchronizer 形象队列同步器(以下简称 AQS)。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        // 入参 count
        Sync(int count) {// 这个 setState 办法还记得否?就是上篇文章 AQS 中的 setState() 办法
            // 就是给 AQS 中的 state 赋值,state=count
            setState(count);
        }
        // 获取 AQS 中 state 的值
        int getCount() {return getState();
        }
        protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
        }
        // 死循环
        protected boolean tryReleaseShared(int releases) {for (;;) {
                // 获取 AQS 中的 state
                int c = getState();
                // 如果 AQS 中的 state==0, 就返回 false
                if (c == 0)  return false;
                int nextc = c-1;
                //nextc=state-1
                //
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
 }
countDown 办法
public void countDown() {
    // 调用的就是 AQS 中的办法
    sync.releaseShared(1);
}

AQS 中 releaseShared 办法

public final boolean releaseShared(int arg) {
    // arg 为固定值 1
    // 如果计数器 state 为 0 返回 true,前提是调用 countDown() 之前不能曾经为 0
    //tryReleaseShared 在 AQS 是空办法
    if (tryReleaseShared(arg)) {
      // 唤醒期待队列的线程
        doReleaseShared();
         return true;
    }
    return false;
}
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}

这个办法 tryReleaseShared() 是在 CountDownLatch 中外部类 Sync 中实现的

// 其实也没什么新招
// 还是死循环 +CAS 配合 实现计数器 state 减 1
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {int c = getState();
        if (c == 0)  return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
           return nextc == 0;
     }
}

办法 doReleaseShared 却是 AQS 种实现的(因为 CountDownLatch 和其内部类都没有实现,只有 AQS 实现了,那就只认 AQS 中的实现了)。

// 实现思路就是从头到尾的遍历列队中所有的节点为 shared 状态的
private void doReleaseShared() {
        // 死循环
        for (;;) {
            // 获取以后列队的头节点
            Node h = head;
            // 列队中可能为空列队,也有可能只有一个 node 节点
            if (h != null && h != tail) {
                // 获取头节点的状态
                int ws = h.waitStatus;
                // 如果头节点为 SIGNAL 状态, 阐明后继节点须要唤醒
                if (ws == Node.SIGNAL) {
                    // 将头结点的 waitstatue 设置为 0,当前就不会再次唤醒后继节点了。// 这一步是为了解决并发问题,保障只 unpark 一次!!不胜利就持续
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //(开释) 唤醒头节点的后继节点
                    unparkSuccessor(h);
                }// 状态为 0 并且不胜利,持续
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;// loop on failed CAS
            }
            // 若头结点扭转,持续循环 
            if (h == head) // loop if head changed
                break;
        }
}

整个调用逻辑大抵为

await 办法

在 CountDownLatch 中 await 犯法

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

而后调用 AQS 中的

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
      // 判断是否被中断过
      if (Thread.interrupted()) throw new InterruptedException();
      // 如果 state 不等于 0 的时候
      if (tryAcquireShared(arg) < 0){doAcquireSharedInterruptibly(arg);
      }
}

其中办法 tryAcquireShared(arg) 是 CountDownLatch 的外部类 Sync 的 tryAcquireShared 办法

protected int tryAcquireShared(int acquires) {
  // 判断 AQS 中的 state 是否曾经等于 0 了,等于翻译 1 否则返回 -1
  return (getState() == 0) ? 1 : -1;
}

再调用 AQS 中的 doAcquireSharedInterruptibly 办法

 // 这个办法就是将以后线程封装成 node 节点退出到列队中,并判断是否须要阻塞以后线程
 // 这个节点都会被设置成 shared 状态,这样做的目标时当 state 值为 0 时会唤醒所有 shared 的节点
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 这个办法应该很相熟了吧,后面的文章都介绍过,将以后线程封装成节点退出到列队中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            //(又是死循环)始终执行, 直到获取锁, 返回
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 前驱节点为头结点
                if (p == head) {
                    // 所以再次尝试获取信号量,这就是下面剖析的那个获取办法
                    int r = tryAcquireShared(arg);
                    // 如果 r 大于 0 证实获取信号量获取胜利了证实下一个能够获取信号量的线程是以后线程
                    if (r >= 0) {
                        // 将以后节点变成列队的 head 节点而后返回
                        setHeadAndPropagate(node, r);
                        // 不便 GC
                        p.next = null; 
                        failed = false;
                        return;
                    }
                }
               // 判断是否要进入阻塞状态. 如果 shouldParkAfterFailedAcquire 办法
               // 返回 true, 示意须要进入阻塞
               // 调用 parkAndCheckInterrupt;否则示意还能够再次尝试获取锁, 持续进行 for 循环
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();}
        } finally {
            // 失败就放弃
            if (failed){cancelAcquire(node);
            }
        }
}

办法 shouldParkAfterFailedAcquire 是 AQS 的

// p 是前驱结点,node 是以后结点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 获取前驱节点的状态
    if (ws == Node.SIGNAL) // 表明前驱节点能够运行
        return true;
    if (ws > 0) { // 如果前驱节点状态大于 0 表明曾经中断,do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 等于 0 进入这里
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
    }
    // 只有前节点状态为 NodeSIGNAL 才返回真
    return false; 
}

咱们对 shouldParkAfterFailedAcquire 来进行一个整体的概述,首先应该明确节点的状态,节点的状态是为了表明以后线程的良好度,如果以后线程被打断了,在唤醒的过程中是不是应该疏忽该线程

 static final class Node {
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
       //....

目前你只需晓得大于 0 时表明该线程已近被勾销,已近是有效节点,不应该被唤醒,留神:初始化链头节点时头节点状态值为 0。

shouldParkAfterFailedAcquire 是位于有限 for 循环内的,这一点须要留神个别每个节点都会经验两次循环后而后被阻塞。

在 AQS 的 doAcquireSharedInterruptibly 中可能会再次调用 CountDownLatch 的外部类 Sync 的 tryAcquireShared 办法和 AQS 的 setHeadAndPropagate 办法。setHeadAndPropagate 办法源码如下。

private void setHeadAndPropagate(Node node, int propagate) {
        // 获取头结点
        Node h = head; 
        // 设置头结点
        setHead(node);
        // 进行判断
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 获取节点的后继
            Node s = node.next;
            if (s == null || s.isShared()) // 后继为空或者为共享模式
                // 以共享模式进行开释
                doReleaseShared();}
    }

该办法设置头结点并且开释头结点前面的满足条件的结点,该办法中可能会调用到 AQS 的 doReleaseShared 办法,其源码如下。

private void doReleaseShared() {
        // 有限循环
        for (;;) {
            // 保留头结点
            Node h = head;
            if (h != null && h != tail) { // 头结点不为空并且头结点不为尾结点
                // 获取头结点的期待状态
                int ws = h.waitStatus; 
                if (ws == Node.SIGNAL) { // 状态为 SIGNAL
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不胜利就持续
                        continue;            // loop to recheck cases
                    // 开释后继结点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为 0 并且不胜利,持续
                    continue;                // loop on failed CAS
            }
            if (h == head) // 若头结点扭转,持续循环 
                break;
        }
    }

CountDownLatch 的 await 调用大抵会有如下的调用链

经典应用场景

CountDownLatch 的一个十分典型的利用场景是:有一个工作想要往下执行,但必须要等到其余的工作执行结束后才能够持续往下执行。如果咱们这个想要持续往下执行的工作调用一个 CountDownLatch 对象的 await() 办法,其余的工作执行完本人的工作后调用同一个 CountDownLatch 对象上的 countDown() 办法,这个调用 await() 办法的工作将始终阻塞期待,直到这个 CountDownLatch 对象的计数值减到 0 为止。

案例 1

举个例子,有三个工人在为老板干活,这个老板有一个习惯,就是当三个工人把一天的活都干完了的时候,他就来查看所有工人所干的活。记住这个条件:三个工人先全副干完活,老板才查看。

案例 2

比方读取 excel 表格,须要把 execl 表格中的多个 sheet 进行数据汇总, 为了进步汇总的效率咱们个别会开启多个线程,每个线程去读取一个 sheet,可是线程执行是异步的,咱们不晓得什么时候数据处理完结了。那么这个时候咱们就能够使用 CountDownLatch, 有几个 sheet 就把 state 初始化几。每个线程执行完就调用 countDown() 办法,在汇总的中央加上 await() 办法, 当所有线程执行完了,就能够进行数据的汇总了。

END

扫描关注公众号“Java 后端技术全栈”

解锁程序员的狂野世界

正文完
 0