关注Java后端技术全栈”**

回复“000”获取大量电子书

目录

notify和waitConditionCondition应用案例生产者消费者测试类后果Condition源码剖析await办法addConditionWaiter 办法fullyRelease办法isOnSyncQueue 办法signal办法doSignal 办法transferForSignal 办法从lock、await、signal,release的整个过程Condition期待告诉的实质总结

notify和wait

在后面学习 synchronized 的时候:疾速把握并发编程---synchronized篇(上),有讲到 wait/notify 的根本应用,联合synchronized 能够实现对线程的通信。

waitnotifynotifyAll是Object对象的属性,并不属于线程Thread。

咱们先解释这三个的一个很重要的概念:

wait使持有该对象的线程把该对象的控制权交出去,而后处于期待状态(这句话很重要,也就是说当调用wait的时候会开释锁并处于期待的状态)

notify:告诉某个正在期待这个对象的控制权的线程能够持续运行(这个就是获取锁,使本人的程序开始执行,最初通过notify同样去开释锁,并唤醒正在期待的线程)

notifyAll:会告诉所有期待这个对象控制权的线程持续运行(和下面一样,只不过是唤醒所有期待的线程继续执行)

从下面的解释咱们能够看出通过wait和notify能够做线程之间的通信,当A线程处理完毕告诉B线程执行,B线程执行结束当前A线程能够继续执行。

那么这个时候我就在思考了,既然 J.U.C 外面提供了Lock锁的实现机制,那·J.U.C外面有没有提供相似的线程通信的工具呢?

Condition

Condition 是一个多线程协调通信的工具类,能够让某些线程一起期待某个条件(condition),只有满足条件时,线程才会被唤醒。

Condition应用案例

上面来实现一个十分典型的生产者和消费者模式;

生产者
import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;public class Producer implements Runnable{    private Queue<String> msg;    private int maxSize;    Lock lock;    Condition condition;    public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {        this.msg = msg;        this.maxSize = maxSize;        this.lock = lock;        this.condition = condition;    }    @Override    public void run() {        int i=0;        while(true){            i++;            lock.lock();                //队列中音讯满了,此时生产者不能再生产了,因为装不下了                //所以生产者就开始期待状态                while(msg.size()==maxSize){                    System.out.println("生产者队列满了,先期待");                    try {                        condition.await(); //阻塞线程并开释锁                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("生产音讯:"+i);                msg.add("生产者的音讯内容"+i);                condition.signal(); //唤醒阻塞状态下的线程            lock.unlock();        }    }}
消费者
import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;public class Consumer implements Runnable{    private Queue<String> msg;    private int maxSize;    Lock lock;    Condition condition;    public Consumer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {        this.msg = msg;        this.maxSize = maxSize;        this.lock = lock;        this.condition = condition;    }    @Override    public void run() {        int i=0;        while(true){            i++;            lock.lock(); //synchronized            //消费者进来的时候须要判断是有可用的音讯,            //没有可用的音讯就期待状态            while(msg.isEmpty()){                System.out.println("消费者队列空了,先期待");                try {                    condition.await(); //阻塞线程并开释锁   wait                } catch (InterruptedException e) {                    e.printStackTrace();                }            }            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("生产音讯:"+msg.remove());            condition.signal(); //唤醒阻塞状态下的线程            lock.unlock();        }    }}
测试类
public class TestCondition {    public static void main( String[] args ){        Queue<String> queue=new LinkedList<>();        Lock lock=new ReentrantLock(); //重入锁        Condition condition=lock.newCondition();        int maxSize=5;        Producer producer=new Producer(queue,maxSize,lock,condition);        Consumer consumer=new Consumer(queue,maxSize,lock,condition);        Thread t1=new Thread(producer);        Thread t2=new Thread(consumer);        t1.start();        t2.start();    }}
后果

通过这个案例简略实现了 wait 和 notify 的性能,当调用 await 办法后,以后线程会开释锁并期待,而其余线程调用 condition 对象的 signal 或者 signalall 办法告诉并被阻塞的线程,而后本人执行 unlock 开释锁,被唤醒的线程取得之前的锁继续执行,最初开释锁。

所以,condition 中两个最重要的办法,一个是 await,一个是 signal 办法。

await:把以后线程阻塞挂起;

signal:唤醒阻塞的线程。

Condition源码剖析

await办法

在Condition接口只是定义了await办法

void await() throws InterruptedException;

实现类在AQS

public final void await() throws InterruptedException {    //示意 await 容许被中断    if (Thread.interrupted()) throw new InterruptedException();    //创立一个新的节点,节点状态为 condition,采纳的数据结构依然是单向链表     Node node = addConditionWaiter();    //开释以后的锁,失去锁的状态,并唤醒 AQS 队列中的一个线程    //不论重入几次,都把state开释为0    int savedState = fullyRelease(node);    int interruptMode = 0;    //如果以后节点没有在同步队列上,即还没有被 signal,则将以后线程阻塞    while (!isOnSyncQueue(node)) {            ////通过 park 挂起以后线程            LockSupport.park(this);            // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;        }    // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.    // interruptMode != THROW_IE -> 示意这个线程没有胜利将 node 入队,但 signal 执行了 enq 办法让其入队了.    // 将这个变量设置成 REINTERRUPT    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;    // 如果 node 的下一个期待者不是 null, 则进行清理,清理 Condition 队列上的节点.     // 如果是 null ,就没有什么好清理的了.    if (node.nextWaiter != null) {        //清理掉状态为cancelled状态的         nlinkCancelledWaiters();    }    // 如果线程被中断了,须要抛出异样.或者什么都不做    if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);}

接下来吧整个办法里波及到的重要办法走一遍。

addConditionWaiter 办法

这个办法的次要作用是把以后线程封装成 Node,增加到期待队列。这里的队列不再是双向链表,而是单向链表。

 /**         * Adds a new waiter to wait queue.         * @return its new wait node         */        private Node addConditionWaiter() {            Node t = lastWaiter;            // If lastWaiter is cancelled, clean out.            //如果lastWaiter为canclled状态,则把他从链表中清理进来            if (t != null && t.waitStatus != Node.CONDITION) {                unlinkCancelledWaiters();                t = lastWaiter;            }            //构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,            //所以相比 AQS 里的双向队来说简略了很多            Node node = new Node(Thread.currentThread(), Node.CONDITION);            if (t == null){                firstWaiter = node;            } else{                t.nextWaiter = node;            }            lastWaiter = node;            return node;        }

下面这段代码用图来展现:

fullyRelease办法

就是彻底的开释锁,什么叫彻底呢,就是如果以后锁存在多次重入,那么在这个办法中只须要开释一次就会把所有的重入次数归零。

final int fullyRelease(Node node) {        boolean failed = true;        try {            //获取AQS中state值            int savedState = getState();            //开释锁并且唤醒下一个同步队列中的线程            //留神这里解决的是同步队列            if (release(savedState)) {                failed = false;                return savedState;            } else {                throw new IllegalMonitorStateException();            }        } finally {            if (failed)                node.waitStatus = Node.CANCELLED;        }    }

此时线程A开释了锁,线程B就取得的锁。上面用一张图来展现:

isOnSyncQueue 办法

判断以后节点是否在同步队列中,返回 false 示意不在,返回 true 示意在。

如果不在 AQS 同步队列,阐明以后节点没有唤醒去争抢同步锁,所以须要把以后线程阻塞起来,直到其余的线程调用 signal 唤醒。

如果在 AQS 同步队列,意味着它须要去竞争同步锁去取得执行程序执行权限。

为什么要做这个判断呢?

因为在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal的时候,会把以后节点从 condition 队列转移到 AQS 队列。

 final boolean isOnSyncQueue(Node node) {        //如果以后节点状态是CONDITION或node.prev是null,则证实以后节点在期待队列上而不是同步队列上。        //之所以能够用node.prev来判断,是因为一个节点如果要退出同步队列,在退出前就会设置好prev字段。        if (node.waitStatus == Node.CONDITION || node.prev == null)            return false;        //如果node.next不为null,则肯定在同步队列上,        //因为node.next是在节点退出同步队列后设置的        if (node.next != null)            return true;        //后面的两个判断没有返回的话,就        //从同步队列队尾遍历一个一个看是不是以后节点。        return findNodeFromTail(node);    }    //这个办法就相当简略了,就是从同步队列队尾遍历一个一个看是不是以后节点。    private boolean findNodeFromTail(Node node) {        Node t = tail;        for (;;) {            if (t == node)                return true;            if (t == null)                return false;            t = t.prev;        }    }

如何去判断ThreadA这个节点是否存在于 AQS队列中呢?

  1. 如果 ThreadAwaitStatus 的状态为 CONDITION,阐明它存在于 condition 队列中,不在 AQS队列。因为AQS队列的状态肯定不可能有 CONDITION
  2. 如果 node.prev为空,阐明也不存在于 AQS队列,起因是prev=nullAQS队列中只有一种可能性,就是它是head 节点,head 节点意味着它是取得锁的节点。
  3. 如果node.next 不等于空,阐明肯定存在于 AQS队列中,因为只有 AQS队列才会存在 next 和 prev的关系
  4. findNodeFromTail,示意从 tail 节点往前扫描 AQS队列,一旦发现 AQS队列的节点和以后节点相等,阐明节点肯定存在于 AQS队列中
signal办法

await 办法会阻塞 ThreadA,而后 ThreadB抢占到了锁取得了执行权限,这个时候在 ThreadB中调用了 Condition的 signal()办法,将会唤醒在期待队列中节点。

public final void signal() {      //先判断以后线程是否取得了锁,这个判断比较简单,间接用取得锁的线程和以后线程相比即可      if (!isHeldExclusively()){          //如果同步状态不是被以后线程独占,间接抛出异样。从这里也能看进去,Condition只能配合独占类同步组件应用。          throw new IllegalMonitorStateException();       }       // 拿到 Condition 队列上第一个节点       Node first = firstWaiter;       if (first != null){           //告诉期待队列队首的节点。           doSignal(first);        } } 
doSignal 办法
private void doSignal(Node first) {      do {             //从 Condition 队列中删除 first 节点             if ( (firstWaiter = first.nextWaiter) == null){                    // 将 next 节点设置成 null                    lastWaiter = null;              }              first.nextWaiter = null;         //transferForSignal办法尝试唤醒以后节点,如果唤醒失败,则持续尝试唤醒以后节点的后继节点。        } while (!transferForSignal(first) &&(first = firstWaiter) != null);}
transferForSignal 办法
final boolean transferForSignal(Node node) {         //更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))            return false;        //调用 enq,把以后节点增加到AQS 队列。并且返回返回按以后节点的上一个节点,也就是原tail 节点        Node p = enq(node);        int ws = p.waitStatus;        // 如果上一个节点的状态被勾销了, 或者尝试设置上一个节点的状态为 SIGNAL        //失败了(SIGNAL 示意: 他的 next 节点须要进行阻塞)        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))            // 唤醒节点上的线程            LockSupport.unpark(node.thread);        //如果 node 的 prev 节点曾经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来实现        return true;}

执行完 doSignal 当前,会把 condition 队列中的节点转移到 AQS队列上,这个时候会判断 ThreadAprev 节点也就是 head 节点的 waitStatus。

如果大于 0 或者设置 SIGNAL 失败,示意点被设置成了 CANCELLED 状态。这个时候会唤醒ThreadA这个线程。否则就基于 AQS队列的机制来唤醒,也就是等到ThreadB开释锁之后来唤醒 ThreadA

逻辑结构图如下:

从lock、await、signal,release的整个过程

Condition期待告诉的实质


总的来说,Condition的实质就是期待队列和同步队列的交互:

当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:

  1. 结构一个新的期待队列节点退出到期待队列队尾
  2. 开释锁,也就是将它的同步队列节点从同步队列队首移除
  3. 自旋,直到它在期待队列上的节点挪动到了同步队列(通过其余线程调用signal())或被中断
  4. 阻塞以后节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。

当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:

从期待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则持续迭代。

对每个节点执行唤醒操作时,首先将节点退出同步队列,此时await()操作的步骤3的解锁条件就曾经开启了。而后分两种状况探讨:

  1. 如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立刻唤醒以后节点对应的线程,此时await()办法就会实现步骤3,进入步骤4。
  2. 如果胜利把先驱节点的状态设置为了SIGNAL,那么就不立刻唤醒了。等到先驱节点成为同步队列首节点并开释了同步状态后,会主动唤醒以后节点对应线程的,这时候await()的步骤3才执行实现,而且有很大概率疾速实现步骤4。

总结

用一张图来总结:

线程 awaitThread 先通过lock.lock()办法获取锁胜利后调用了 condition.await 办法进入期待队列,而另一个线程signalThread 通过 lock.lock()办法获取锁胜利后调用了 condition.signal 或者 signalAll 办法,使得线程 awaitThread 可能有机会移入到同步队列中。

当其余线程开释 lock 后使得线程 awaitThread 可能有机会获取 lock,从而使得线程 awaitThread 可能从 await 办法中退出执行后续操作。如果 awaitThread 获取 lock 失败会间接进入到同步队列。

阻塞:await()办法中,在线程开释锁资源之后,如果节点不在 AQS 期待队列,则阻塞以后线程,如果在期待队列,则自旋期待尝试获取锁;

开释:signal()后,节点会从 condition 队列挪动到 AQS 期待队列,则进入失常锁的获取流程。

关注公众号“Java后端技术全栈”

**收费获取500G最新学习材料