关于java:并发编程浅析AQS及并发工具类SemaphoreCountDownLatch-CyclicBarrier

29次阅读

共计 19878 个字符,预计需要花费 50 分钟才能阅读完成。

1 AQS 概述

AQS 的全称为(AbstractQueuedSynchronizer),中文即“队列同步器”,这个类放在 java.util.concurrent.locks 包上面。

AQS 是用来构建锁或者其余同步组件的根底框架,它应用了一个 int 成员变量示意同步状态,通过内置的 FIFO 队列来实现资源获取线程的排队工作。应用 AQS 能简略且高效地结构出利用宽泛的大量的同步器,比方上篇文章写的 ReentrantLock 与 ReentrantReadWriteLock。除此之外,AQS 还能结构出 Semaphore,FutureTask(jdk1.7) 等同步器。

2 AQS 原理

2.1 同步队列

AQS 是依赖 CLH 队列锁来实现同步状态的治理 。以后线程获取同步状态失败时,同步器会将以后线程以及期待状态等信息构建为一个 节点 (Node) 并将其退出同步队列,同步会阻塞以后线程,当同步状态开释时,会将首节点中的线程唤醒,使其再次尝试获取同步状态。

CLH(Craig,Landin,and Hagersten)队列是一个虚构的双向队列(FIFO 双向队列)(虚构的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条申请共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的调配

同步队列中的节点(Node)用来保留获取同步状态失败的线程援用、期待状态以及前驱和后继节点信息。

属性类型与名称 形容
int waitStatus 期待状态(如 CANCELLED=1、SIGNAL=-1、CONDITION=-2、PROPAGATE=-3、INITIAL=0)
Node prev 前驱节点(当节点退出同步队列时被设置,在尾部增加)
Node next 后继节点
Thread thread 以后获取同步状态的线程

节点源码如下:

static final class Node {
    // 示意该节点期待模式为共享式,通常记录于 nextWaiter,// 通过判断 nextWaiter 的值能够判断以后结点是否处于共享模式
    static final Node SHARED = new Node();
    // 示意节点处于独占式模式,与 SHARED 绝对
    static final Node EXCLUSIVE = null;
    // waitStatus 的不同状态
    // 以后结点是因为超时或者中断勾销的,进入该状态后将无奈复原
    static final int CANCELLED =  1;
    // 以后结点的后继结点是 (或者将要) 由 park 导致阻塞的,当结点被开释或者勾销时,须要通过 unpark 唤醒后继结点
    static final int SIGNAL    = -1;
    // 表明结点在期待队列中,结点线程期待在 Condition 上
    // 当其余线程对 Condition 调用了 signal()办法时,会将其退出到同步队列中   
    static final int CONDITION = -2;
    // 下一次共享式同步状态的获取将会无条件地向后继结点流传
    static final int PROPAGATE = -3;
    volatile int waitStatus;
    // 记录前驱结点
    volatile Node prev;
    // 记录后继结点
    volatile Node next;
    // 记录以后的线程
    volatile Thread thread;
    // 用于记录共享模式(SHARED), 也能够用来记录 CONDITION 队列
    Node nextWaiter;
    // 通过 nextWaiter 的记录值判断以后结点的模式是否为共享模式
    final boolean isShared() {    return nextWaiter == SHARED;}
    // 获取以后结点的前置结点
    final Node predecessor() throws NullPointerException { ...}
    // 用于初始化时创立 head 结点或者创立 SHARED 结点
    Node() {}
    // 在 addWaiter 办法中应用,用于创立一个新的结点
    Node(Thread thread, Node mode) {     
        this.nextWaiter = mode;
        this.thread = thread;
    }
    // 在 CONDITION 队列中应用该构造函数新建结点
    Node(Thread thread, int waitStatus) { 
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
// 记录头结点
private transient volatile Node head;
// 记录尾结点
private transient volatile Node tail;

节点是形成同步队列的根底,同步器领有 首节点(Head)和尾节点(Tail),没有胜利 获取 同步状态的线程将会成为节点退出该队列的尾部。同步器提供了一个基于 CAS 的设置尾节点的办法:compareAndSetTail(Node expect, Node update),它须要传递以后线程“认为”的尾节点和以后节点,只有设置胜利后,以后节点才正式与之前的尾节点建设关联。

首节点是获取同步状态胜利的节点 ,首节点的线程在 开释 同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态胜利时将本人设置为首节点。设置首节点是通过获取同步状态胜利的线程来实现的,不须要应用 CAS 来保障,只需将首节点设置成为原首节点的后继节点并断开原首节点的 next 援用即可。

2.2 同步状态

2.2.1 独占式(EXCLUSIVE)

独占式 (EXCLUSIVE) 获取需重写 tryAcquiretryRelease 办法,并拜访 acquirerelease 办法实现相应的性能。

    public final void acquire(int arg) {
        // 如果线程间接获取胜利,或者再尝试获取胜利后都是间接工作,// 如果是从阻塞状态中唤醒开始工作的线程,将以后的线程中断        
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}
    // 封装线程,新建结点并退出到同步队列中
    private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // 尝试入队,胜利返回
        if (pred != null) {
            node.prev = pred;
            // CAS 操作设置队尾
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 通过 CAS 操作自旋实现 node 入队操作
        enq(node);
        return node;
    }
    // 在同步队列中期待获取同步状态
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋
            for (;;) {final Node p = node.predecessor();
                // 前驱节点是否为头节点 &&tryAcquire 获取同步状态
                if (p == head && tryAcquire(arg)) {setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                // 获取不到同步状态,将前置结点标为 SIGNAL 状态并且通过 park 操作将 Node 封装的线程阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                // 如果获取失败,将 node 标记为 CANCELLED
                cancelAcquire(node);
        }
    }

独占式获取同步状态流程:

通过调用同步器的 release(int arg) 办法能够开释同步状态,该办法在开释了同步状态之后,会唤醒其 后继节点(进而使后继节点从新尝试获取同步状态)。

public final boolean release(int arg) {
    // 首先尝试开释并更新同步状态
    if (tryRelease(arg)) {
        Node h = head;
        // 查看是否须要唤醒后置结点
        if (h != null && h.waitStatus != 0)
            // 唤醒后置结点
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// 唤醒后继结点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 通过 CAS 操作将 waitStatus 更新为 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    // 查看后置结点,若为空或者状态为 CANCELLED,找到后置非 CANCELLED 结点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒后继结点
    if (s != null)
        LockSupport.unpark(s.thread);
}

2.2.2 共享式(SHARED)

共享式获取与独占式获取最次要的区别在于同一时刻是否有多个线程同时获取到同步状态

共享式 (SHARED) 获取需重写 tryAcquireSharedtryReleaseShared 办法,并拜访 acquireSharedreleaseShared 办法实现相应的性能。与独占式绝对,共享式反对多个线程同时获取到同步状态并进行工作,如 Semaphore、CountDownLatch、CyclicBarrier 等。ReentrantReadWriteLock 能够看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁容许多个线程同时对某一资源进行读。

public final void acquireShared(int arg) {
    // 尝试共享式获取同步状态,如果胜利获取则能够继续执行,否则执行 doAcquireShared
    if (tryAcquireShared(arg) < 0)
        // 以共享式不停得尝试获取同步状态
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    // 向同步队列中新增一个共享式的结点
    final Node node = addWaiter(Node.SHARED);
    // 标记获取失败状态
    boolean failed = true;
    try {// 标记中断状态(若在该过程中被中断是不会响应的,须要手动中断)
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 获取前置结点
            final Node p = node.predecessor();
            // 若前置结点为头结点
            if (p == head) {
                // 尝试获取同步状态
                int r = tryAcquireShared(arg);
                // 若获取到同步状态。if (r >= 0) {
                    // 此时,以后结点存储的线程复原执行,须要将以后结点设置为头结点并且向后流传,// 告诉合乎唤醒条件的结点一起复原执行
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    // 须要中断,中断以后线程
                    if (interrupted)
                        selfInterrupt();
                    // 获取胜利
                    failed = false;
                    return;
                }
            }
            // 获取同步状态失败,须要进入阻塞状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 获取失败,CANCELL node
        if (failed)
            cancelAcquire(node);
    }
}
// 将 node 设置为同步队列的头结点,并且向后告诉以后结点的后置结点,实现流传
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);
    // 向后流传
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if(s == null || s.isShared())
            doReleaseShared();}
}

与独占式一样,共享式获取也须要开释同步状态,通过调用 releaseShared(intarg)办法能够开释同步状态,开释同步状态胜利后,会唤醒后置结点,并且保障流传性。

public final boolean releaseShared(int arg) {
    // 尝试开释同步状态
    if (tryReleaseShared(arg)) {
        // 胜利后唤醒后置结点
        doReleaseShared();
        return true;
    }
    return false;
}
// 唤醒后置结点
private void doReleaseShared() {
    // 循环的目标是为了避免新结点在该过程中进入同步队列产生的影响,同时要保障 CAS 操作的实现
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            
                    unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        if (h == head)                   
            break;
    }
}

2.2.3 超时获取形式

通过调用同步器的 doAcquireNanos(int arg, long nanosTimeout) 办法能够 超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回 true,否则,返回 false。该办法提供了传统 Java 同步操作(比方 synchronized 关键字)所不具备的个性。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {if (nanosTimeout <= 0L)
        return false;
    // 计算超时的工夫 = 以后虚拟机的工夫 + 设置的超时工夫
    final long deadline = System.nanoTime() + nanosTimeout;
    // 调用 addWaiter 将以后线程封装成独占模式的节点,并且退出到同步队列尾部。final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                // 如果以后节点的前驱节点为头结点,则让以后节点去尝试获取锁。setHead(node);
                p.next = null; 
                failed = false;
                return true;
            }
            // 如果以后节点的前驱节点不是头结点,或以后节点获取锁失败,// 则再次判断以后线程是否曾经超时。nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 调用 shouldParkAfterFailedAcquire 办法,通知以后节点的前驱节点,马上进入
            // 期待状态了,即做好进入期待状态前的筹备。if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 调用 LockSupport.parkNanos 办法,将以后线程设置成超时期待的状态。LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

由下面代码可知,超时获取也是调用 addWaiter 将以后线程封装成 独占模式 的节点,并且退出到同步队列尾部。

超时获取与独占式获取同步状态区别 在于获取同步状态失败后的解决 。如果以后线程获取同步状态失败,则判断是否超时(nanosTimeout 小于等于 0 示意曾经超时);如果没有超时,从新计算超时距离 nanosTimeout,而后使以后线程期待 nanosTimeout 纳秒(当已到设置的超时工夫,该线程会从LockSupport.parkNanos(Object blocker, long nanos) 办法返回)。

独占式超时获取同步状态流程:

2.3 模板办法

AQS 应用一个 int 成员变量来示意同步状态,通过内置的 FIFO 队列来实现获取资源线程的排队工作。AQS 应用 CAS 对该同步状态进行原子操作实现对其值的批改。

private volatile int state;// 共享变量,应用 volatile 润饰保障线程可见性

同步状态 state 通过 protected 类型的 getStatesetStatecompareAndSetState 办法进行操作

// 返回同步状态的以后值
protected final int getState() {return state;}
// 设置同步状态的值
protected final void setState(int newState) {state = newState;}
// CAS 更新同步状态,该办法可能保障状态设置的原子性
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

同步器的设计是基于模板办法模式的,也就是说,使用者须要继承同步器并重写指定的办法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板办法,而这些模板办法将会调用使用者重写的办法。

自定义同步器时须要重写上面几个 AQS 提供的模板办法:

isHeldExclusively()// 该线程是否正在独占资源。只有用到 condition 才须要去实现它。tryAcquire(int)// 独占形式。尝试获取资源,胜利则返回 true,失败则返回 false。tryRelease(int)// 独占形式。尝试开释资源,胜利则返回 true,失败则返回 false。tryAcquireShared(int)// 共享形式。尝试获取资源。正数示意失败;0 示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源。tryReleaseShared(int)// 共享形式。尝试开释资源,胜利则返回 true,失败则返回 false。

同步器提供的模板办法基本上分为 3 类:独占式 获取与开释同步状态、共享式 获取与开释同步状态和查问同步队列中的期待线程状况。

一般来说,自定义同步器要么是独占办法,要么是共享形式,他们也只需实现 tryAcquire-tryReleasetryAcquireShared-tryReleaseShared 中的一种即可。

以 ReentrantLock 为例,state 初始化为 0,示意未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。尔后,其余线程再 tryAcquire()时就会失败,直到 A 线程 unlock()到 state=0(即开释锁)为止,其它线程才有机会获取该锁。当然,开释锁之前,A 线程本人是能够反复获取此锁的(state 会累加),这就是可重入的概念。但要留神,获取多少次就要开释如许次,这样能力保障 state 是能回到零态的。

再以 CountDownLatch 以例,工作分为 N 个子线程去执行,state 也初始化为 N(留神 N 要与线程个数统一)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown()一次,state 会 CAS(Compare and Swap)减 1。等到所有子线程都执行完后 (即 state=0),会 unpark() 主调用线程,而后主调用线程就会从 await()函数返回,持续后续动作。

但 AQS 也反对自定义同步器同时实现独占和共享两种形式,如ReentrantReadWriteLock

上面就来学习几个罕用的并发同步工具。

3 Semaphore(信号量)

Semaphore(信号量)用来管制 同时拜访特定资源的线程数量 ,它通过协调各个线程,以保障正当的应用公共资源。synchronized 和 ReentrantLock 都是一次只容许一个线程拜访某个资源,而Semaphore(信号量) 能够指定多个线程同时拜访某个资源

以停车场为例。假如一个停车场只有 10 个车位,这时如果同时来了 15 辆车,则只容许其中 10 辆不受妨碍的进入。剩下的 5 辆车则必须在入口期待,尔后来的车也都不得不在入口处期待。这时,如果有 5 辆车来到停车场,放入 5 辆;如果又来到 2 辆,则又能够放入 2 辆,如此往返。

在这个停车场零碎中,车位即是共享资源,每辆车就好比一个线程,信号量就是空车位的数目。

Semaphore 中蕴含了一个实现了 AQS 的同步器 Sync,以及它的两个子类 FairSync 和 NonFairSync。查看 Semaphore 类构造:

可见 Semaphore 也是辨别偏心模式和非偏心模式的。

  • 偏心模式: 调用 acquire 的程序就是获取许可证的程序,遵循 FIFO。
  • 非偏心模式: 抢占式的。

Semaphore 对应的两个构造方法如下:

   public Semaphore(int permits) {sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

这两个构造方法,都必须提供许可的数量,第二个构造方法能够指定是偏心模式还是非偏心模式,默认非偏心模式。

Semaphore 实现原理这里就不剖析了,能够参考死磕 java 同步系列之 Semaphore 源码解析这篇文章。

须要明确的是,Semaphore 也是共享锁的一种实现。它默认结构 AQS 的 state 为 permits。当执行工作的线程数量超出 permits,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程能力继续执行,此时先前执行工作的线程继续执行 release 办法,release 办法使得 state 的变量会加 1,那么自旋的线程便会判断胜利。如此,每次只有最多不超过 permits 数量的线程能自旋胜利,便限度了执行工作线程的数量。

Semaphore 罕用于做 流量管制,特地是专用资源无限的利用场景。

罕用办法 形容
acquire()/acquire(int permits) 获取许可证。获取许可失败,会进入 AQS 的队列中排队。
tryAcquire()/tryAcquire(int permits) 获取许可证。获取许可失败,间接返回 false。
tryAcquire(long timeout, TimeUnit unit)/
tryAcquire(int permits, long timeout, TimeUnit unit)
超时期待获取许可证。
release() 偿还许可证。
intavailablePermits() 返回此信号量中以后可用的许可证数。
intgetQueueLength() 返回正在期待获取许可证的线程数。
booleanhasQueuedThreads() 是否有线程正在期待获取许可证。
void reducePermits(int reduction) 缩小 reduction 个许可证,是个 protected 办法。
Collection getQueuedThreads() 返回所有期待获取许可证的线程汇合,是个 protected 办法。

应用示例:

public class SemaphoreTest {
    private static final int THREAD_COUNT = 50;

    public static void main(String[] args) throws InterruptedException {
        // 创立一个具备固定线程数量的线程池对象
        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
        // 一次只能容许执行的线程数量
        final Semaphore semaphore = new Semaphore(10);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadNum = i;
            threadPool.execute(() -> {
                try {semaphore.acquire();// 获取 1 个许可,所以可运行线程数量为 10/1=10
                    test(threadNum);
                    semaphore.release();// 开释 1 个许可,所以可运行线程数量为 10/1=10} catch (InterruptedException e) {e.printStackTrace();
                }
            });
        }
        threadPool.shutdown();}

    public static void test(int threadNum) throws InterruptedException {Thread.sleep(1000);// 模仿申请的耗时操作
        System.out.println("threadNum:" + threadNum);
        Thread.sleep(1000);// 模仿申请的耗时操作
    }
}

在代码中,尽管有 50 个线程在执行,然而只容许 10 个并发执行。Semaphore 的构造方法 Semaphore(int permits)承受一个整型的数字,示意可用的许可证数量。Semaphore(10)示意容许 10 个线程获取许可证,也就是最大并发数是 10。Semaphore 的用法也很简略,首先线程应用 Semaphore 的 acquire()办法获取一个许可证,应用完之后调用 release()办法偿还许可证。

除了 acquire办法之外,另一个比拟罕用的与之对应的办法是 tryAcquire 办法,该办法如果获取不到许可就立刻返回 false。

4 CountDownLatch (倒计时器)

4.1 概述

在日常开发中常常会遇到须要在主线程中开启多个线程去并行执行工作,并且主线程须要期待所有子线程执行结束后再进行汇总的场景。jdk 1.5 之前个别都应用线程的 join()办法来实现这一点,然而 join 办法不够灵便,难以满足不同场景的须要,所以 jdk 1.5 之后 concurrent 包提供了 CountDownLatch 这个类。

CountDownLatch是一种同步辅助工具,它 容许一个或多个线程期待其余线程实现操作

CountDownLatch 是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程实现了本人的工作后,计数器的值就相应得减 1。当计数器达到 0 时,示意所有的线程都已实现工作,而后在闭锁上期待的线程就能够复原执行工作。

CountDownLatch 的办法:

办法 形容
await() 调用该办法的线程等到构造方法传入的 N 减到 0 的时候,能力持续往下执行。
await(long timeout, TimeUnit unit) 调用该办法的线程等到指定的 timeout 工夫后,不论 N 是否减至为 0,都会持续往下执行。
countDown() 使 CountDownLatch 初始值 N 减 1。
getCount() 获取以后 CountDownLatch 保护的值,也就是 AQS 的 state 的值。

CountDownLatch 的实现原理,能够查看【JUC】JDK1.8 源码剖析之 CountDownLatch(五)一文。

依据源码剖析可知,CountDownLatch 是 AQS 中共享锁的一种实现。AbstractQueuedSynchronizer 中保护了一个 volatile 类型的整数 state,volatile 能够保障多线程环境下该变量的批改对每个线程都可见,并且因为该属性为整型,因此对该变量的批改也是原子的。

CountDownLatch 默认结构 AQS 的 state 值为 count。创立一个 CountDownLatch 对象时,所传入的整数 N 就会赋值给 state 属性。

当调用 countDown()办法时,其实是调用了 tryReleaseShared 办法以 CAS 的操作来对 state 减 1;而调用 await()办法时,以后线程就会判断 state 属性是否为 0。如果为 0,阻塞线程被唤醒持续往下执行;如果不为 0,则使以后线程放入阻塞队列 Park,直至最初一个线程调用了 countDown()办法使得 state == 0,再唤醒在 await()办法中期待的线程。

特地留神的是

CountDownLatch 是一次性的 ,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 应用结束后,它不能再次被应用。如果 须要能重置计数,能够应用 CyclicBarrier

4.2 利用场景

CountDownLatch 次要利用场景:

  1. 实现最大的并行性:同时启动多个线程,实现最大水平的并行性。例如 110 跨栏较量中,所有运动员筹备好起跑姿态,进入到准备状态,期待裁判一声枪响。裁判开了枪,所有运动员才能够开跑。
  2. 开始执行前期待 N 个线程实现各自工作:例如一群学生在教室考试,学生们都实现了作答,老师才能够进行收卷操作。

案例:

public class CountDownLatchTest {
    private static final int THREAD_COUNT = 30;

    public static void main(String[] args) throws InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(10);

        final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadNum = i;
            threadPool.execute(() -> {
                try {Thread.sleep(1000);// 模仿申请的耗时操作
                    System.out.println("子线程:" + threadNum);
                    Thread.sleep(1000);// 模仿申请的耗时操作
                } catch (InterruptedException e) {e.printStackTrace();
                } finally {countDownLatch.countDown();// 示意一个申请曾经被实现
                }
            });
        }
        System.out.println("主线程启动...");
        countDownLatch.await();
        threadPool.shutdown();
        System.out.println("子线程执行结束...");
        System.out.println("主线程执行结束...");
    }
}

下面的代码中,咱们定义了申请的数量为 30,当这 30 个申请被解决实现之后,才会打印 子线程执行结束

主线程在启动其余线程后调用 CountDownLatch.await() 办法进入阻塞状态,直到其余线程实现各自的工作才被唤醒。

开启的 30 个线程必须援用闭锁对象,因为他们须要告诉 CountDownLatch 对象,他们曾经实现了各自的工作。这种告诉机制是通过 CountDownLatch.countDown()办法来实现的;每调用一次这个办法,在构造函数中初始化的 count 值就减 1。所以当 30 个线程都调用了这个办法后,count 的值才等于 0,而后主线程就能通过 await()办法,继续执行本人的工作。

5 CyclicBarrier(循环栅栏)

5.1 概述

CyclicBarrier 的字面意思是可循环应用(Cyclic)的屏障(Barrier)。它要做的事件是,让一组线程达到一个屏障(也能够叫同步点)时被阻塞,直到最初一个线程达到屏障时,屏障才会开门,所有被屏障拦挡的线程才会持续运行。CyclicBarrier 的性能和利用场景与 CountDownLatch 都十分相似。

CyclicBarrier 罕用办法:

罕用办法 形容
await() 在所有线程都曾经在此 barrier 上并调用 await 办法之前,将始终期待。
await(long timeout, TimeUnit unit) 所有线程都曾经在此屏障上调用 await 办法之前将始终期待,或者超出了指定的等待时间。
getNumberWaiting() 返回以后在屏障处期待的线程数目。
getParties() 返回要求启动此 barrier 的线程数目。
isBroken() 查问此屏障是否处于损坏状态。
reset() 将屏障重置为其初始状态。

5.2 源码剖析

构造函数:

// 创立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于期待状态时启动,但它不会在启动 barrier 时执行预约义的操作。public CyclicBarrier(int parties) {this(parties, null);
}

// 创立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于期待状态时启动,并在启动 barrier 时执行给定的屏障操作。// 该操作由最初一个进入 barrier 的线程执行。public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

其中,parties 就示意屏障拦挡的线程数量

CyclicBarrier 的最重要的办法就是 await 办法,await() 办法就像建立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会关上,线程才得以通过执行

    public int await() throws InterruptedException, BrokenBarrierException {
        try {return dowait(false, 0L);
        } catch (TimeoutException toe) {throw new Error(toe);
        }
    }

当调用 await() 办法时,实际上调用的是 dowait(false, 0L) 办法。查看dowait(boolean timed, long nanos)

    // 当线程数量或者申请数量达到 count 时 await 之后的办法才会被执行。private int count;

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 获取”独占锁“lock.lock();
        try {
            // 保留“以后的 generation”final Generation g = generation;
            // 如果以后代损坏,抛出异样
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果线程中断,抛出异样
            if (Thread.interrupted()) {
                // 将损坏状态设置为 true,并唤醒所有阻塞在此栅栏上的线程
                breakBarrier();
                throw new InterruptedException();}
            // 将“count 计数器”-1
            int index = --count;
            // 当 count== 0,阐明最初一个线程曾经达到栅栏
            if (index == 0) {
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 执行栅栏工作
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 将 count 重置为 parties 属性的初始化值
                    // 唤醒之前期待的线程,并更新 generation。nextGeneration();
                    // 完结,等价于 return index
                    return 0;
                } finally {if (!ranAction)
                        breakBarrier();}
            }

            // 以后线程始终阻塞,直到“有 parties 个线程达到 barrier”或“以后线程被中断”或“超时”这 3 者条件之一产生
            // 以后线程才继续执行。for (;;) {
                try {
                    // 如果没有工夫限度,则间接期待,直到被唤醒。if (!timed)
                        trip.await();
                    // 如果有工夫限度,则期待指定工夫再唤醒(超时期待)。else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 以后代没有损坏
                    if (g == generation && ! g.broken) {
                        // 让栅栏生效
                        breakBarrier();
                        throw ie;
                    } else {
                        // 下面条件不满足, 阐明这个线程不是这代的。// 就不会影响以后这代栅栏执行逻辑。中断。Thread.currentThread().interrupt();
                    }
                }
                // 如果“以后 generation 曾经损坏”,则抛出异样。if (g.broken)
                    throw new BrokenBarrierException();  
                // 如果“generation 曾经换代”,则返回 index。if (g != generation)
                    return index; 
                // 如果是“超时期待”,并且工夫已到,则通过 breakBarrier()终止 CyclicBarrier
                // 唤醒 CyclicBarrier 中所有期待线程。if (timed && nanos <= 0L) {breakBarrier();
                    throw new TimeoutException();}
            }
        } finally {// 开释“独占锁(lock)”lock.unlock();}
    }

generation 是 CyclicBarrier 的一个成员变量:

   /**
     * Generation 一代的意思。* CyclicBarrier 是能够循环应用的,用它来标记本代和下一代。* broken:以后代是否损坏的标记。标记有线程产生了中断,或者异样,就是工作没有实现。*/
    private static class Generation {boolean broken = false;}

    // 实现独占锁
    private final ReentrantLock lock = new ReentrantLock();
    // 实现多个线程之间互相期待告诉,就是满足某些条件之后,线程能力执行,否则就期待
    private final Condition trip = lock.newCondition();
    // 初始化时屏障数量 
    private final int parties;
    // 当条件满足 (即屏障数量为 0) 之后,会回调这个 Runnable
    private final Runnable barrierCommand;
    // 以后代
    private Generation generation = new Generation();

    // 残余的屏障数量 count
    private int count;

在 CyclicBarrier 中,同一批的线程属于同一代 ,即同一个 generation;CyclicBarrier 中通过 generation 对象,记录属于哪一代。
当有 parties 个线程达到 barrier,generation 就会被更新换代。

总结

  1. CyclicBarrier 外部通过一个 count 变量作为计数器,cout 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏,那么就将计数器减 1。如果 count 值为 0 了,示意这是这一代最初一个线程达到栅栏,就会将代更新并重置计数器,并唤醒所有之前期待在栅栏上的线程。
  2. 如果在期待的过程中,线程中断都也会抛出 BrokenBarrierException 异样,并且这个异样会流传到其余所有的线程,CyclicBarrier 会被损坏。
  3. 如果超出指定的等待时间,以后线程会抛出 TimeoutException 异样,其余线程会抛出 BrokenBarrierException 异样,CyclicBarrier 会被损坏。

5.3 利用场景

CyclicBarrier 能够用于多线程计算数据,最初合并计算结果的利用场景。比方咱们用一个 Excel 保留了用户所有银行流水,每个 Sheet 保留一个帐户近一年的每笔银行流水,当初须要统计用户的日均银行流水,先用多线程解决每个 sheet 里的银行流水,都执行完之后,失去每个 sheet 的日均银行流水,最初,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

示例 1:

public class CyclicBarrierTest1 {
    private static final int THREAD_COUNT = 30;
    // 须要同步的线程数量
    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

    public static void main(String[] args) throws InterruptedException {
        // 创立线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            threadPool.execute(() -> {System.out.println("childThread:" + threadNum + "is ready");
                try {
                    // 期待 60 秒,保障子线程齐全执行完结
                    cyclicBarrier.await(60, TimeUnit.SECONDS);
                } catch (InterruptedException e) {e.printStackTrace();
                } catch (BrokenBarrierException e) {e.printStackTrace();
                } catch (TimeoutException e) {e.printStackTrace();
                }
                System.out.println("childThread:" + threadNum + "is finish");
            });
        }
        threadPool.shutdown();}
}

运行后果如下:

childThread:0 is ready
childThread:1 is ready
childThread:2 is ready
childThread:3 is ready
childThread:4 is ready
childThread:4 is finish
childThread:0 is finish
childThread:1 is finish
childThread:3 is finish
childThread:2 is finish
childThread:5 is ready
childThread:6 is ready
childThread:7 is ready
childThread:8 is ready
childThread:9 is ready
childThread:9 is finish
childThread:8 is finish
... ...

能够看到当线程数量也就是申请数量达定义的 5 个的时候,await办法之后的办法才被执行。

另外,CyclicBarrier 还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程达到屏障时,优先执行barrierAction,不便解决更简单的业务场景。示例代码如下:

public class CyclicBarrierTest2 {
    private static final int THREAD_COUNT = 30;
    // 须要同步的线程数量
    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {System.out.println("------ 优先执行 ------");
    });

    public static void main(String[] args) throws InterruptedException {
        // 创立线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            threadPool.execute(() -> {System.out.println("childThread:" + threadNum + "is ready");
                try {
                    // 期待 60 秒,保障子线程齐全执行完结
                    cyclicBarrier.await(60, TimeUnit.SECONDS);
                } catch (InterruptedException e) {e.printStackTrace();
                } catch (BrokenBarrierException e) {e.printStackTrace();
                } catch (TimeoutException e) {e.printStackTrace();
                }
                System.out.println("childThread:" + threadNum + "is finish");
            });
        }
        threadPool.shutdown();}
}

运行后果如下:

childThread:0 is ready
childThread:1 is ready
childThread:2 is ready
childThread:3 is ready
childThread:4 is ready
------ 优先执行 ------
childThread:4 is finish
childThread:0 is finish
childThread:1 is finish
childThread:3 is finish
childThread:2 is finish
childThread:5 is ready
childThread:6 is ready
childThread:7 is ready
childThread:8 is ready
childThread:9 is ready
------ 优先执行 ------
childThread:9 is finish
childThread:6 is finish
... ...

5.4 CyclicBarrier 和 CountDownLatch 的区别

  1. CountDownLatch 的计数器只能应用一次。而 CyclicBarrier 的计数器能够应用 reset()办法重置,可屡次应用。
  2. 侧重点不同。CountDownLatch 多用于某一个线程期待若干个其余线程执行完工作之后,它才执行;而 CyclicBarrier 个别用于多个线程相互期待至一个同步点,而后这些线程再持续一起执行。
  3. CyclicBarrier 还提供其余有用的办法,比方 getNumberWaiting 办法能够取得 Cyclic-Barrier 阻塞的线程数量;isBroken()办法用来理解阻塞的线程是否被中断。

参考

JAVA 并发编程的艺术

AQS 原理学习笔记

【JUC】JDK1.8 源码剖析之 AbstractQueuedSynchronizer(二)

死磕 java 同步系列之 Semaphore 源码解析

【JUC】JDK1.8 源码剖析之 CountDownLatch(五)

并发编程之 CyclicBarrier 源码剖析

Java 并发之 CyclicBarrier

正文完
 0