关注“Java后端技术全栈”**
回复“000”获取大量电子书
目录
notify和waitConditionCondition应用案例生产者消费者测试类后果Condition源码剖析await办法addConditionWaiter 办法fullyRelease办法isOnSyncQueue 办法signal办法doSignal 办法transferForSignal 办法从lock、await、signal,release的整个过程Condition期待告诉的实质总结
notify和wait
在后面学习 synchronized 的时候:疾速把握并发编程---synchronized篇(上),有讲到 wait/notify 的根本应用,联合synchronized 能够实现对线程的通信。
wait
、notify
、notifyAll
是Object对象的属性,并不属于线程Thread。
咱们先解释这三个的一个很重要的概念:
wait
:使持有该对象的线程把该对象的控制权交出去,而后处于期待状态(这句话很重要,也就是说当调用wait的时候会开释锁并处于期待的状态)
notify
:告诉某个正在期待这个对象的控制权的线程能够持续运行(这个就是获取锁,使本人的程序开始执行,最初通过notify同样去开释锁,并唤醒正在期待的线程)
notifyAll
:会告诉所有期待这个对象控制权的线程持续运行(和下面一样,只不过是唤醒所有期待的线程继续执行)
从下面的解释咱们能够看出通过wait和notify能够做线程之间的通信,当A线程处理完毕告诉B线程执行,B线程执行结束当前A线程能够继续执行。
那么这个时候我就在思考了,既然 J.U.C
外面提供了Lock锁的实现机制,那·J.U.C
外面有没有提供相似的线程通信的工具呢?
Condition
Condition 是一个多线程协调通信的工具类,能够让某些线程一起期待某个条件(condition),只有满足条件时,线程才会被唤醒。
Condition应用案例
上面来实现一个十分典型的生产者和消费者模式;
生产者
import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;public class Producer implements Runnable{ private Queue<String> msg; private int maxSize; Lock lock; Condition condition; public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) { this.msg = msg; this.maxSize = maxSize; this.lock = lock; this.condition = condition; } @Override public void run() { int i=0; while(true){ i++; lock.lock(); //队列中音讯满了,此时生产者不能再生产了,因为装不下了 //所以生产者就开始期待状态 while(msg.size()==maxSize){ System.out.println("生产者队列满了,先期待"); try { condition.await(); //阻塞线程并开释锁 } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产音讯:"+i); msg.add("生产者的音讯内容"+i); condition.signal(); //唤醒阻塞状态下的线程 lock.unlock(); } }}
消费者
import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;public class Consumer implements Runnable{ private Queue<String> msg; private int maxSize; Lock lock; Condition condition; public Consumer(Queue<String> msg, int maxSize, Lock lock, Condition condition) { this.msg = msg; this.maxSize = maxSize; this.lock = lock; this.condition = condition; } @Override public void run() { int i=0; while(true){ i++; lock.lock(); //synchronized //消费者进来的时候须要判断是有可用的音讯, //没有可用的音讯就期待状态 while(msg.isEmpty()){ System.out.println("消费者队列空了,先期待"); try { condition.await(); //阻塞线程并开释锁 wait } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产音讯:"+msg.remove()); condition.signal(); //唤醒阻塞状态下的线程 lock.unlock(); } }}
测试类
public class TestCondition { public static void main( String[] args ){ Queue<String> queue=new LinkedList<>(); Lock lock=new ReentrantLock(); //重入锁 Condition condition=lock.newCondition(); int maxSize=5; Producer producer=new Producer(queue,maxSize,lock,condition); Consumer consumer=new Consumer(queue,maxSize,lock,condition); Thread t1=new Thread(producer); Thread t2=new Thread(consumer); t1.start(); t2.start(); }}
后果
通过这个案例简略实现了 wait 和 notify 的性能,当调用 await 办法后,以后线程会开释锁并期待,而其余线程调用 condition 对象的 signal 或者 signalall 办法告诉并被阻塞的线程,而后本人执行 unlock 开释锁,被唤醒的线程取得之前的锁继续执行,最初开释锁。
所以,condition 中两个最重要的办法,一个是 await,一个是 signal 办法。
await:把以后线程阻塞挂起;
signal:唤醒阻塞的线程。
Condition源码剖析
await办法
在Condition接口只是定义了await办法
void await() throws InterruptedException;
实现类在AQS
中
public final void await() throws InterruptedException { //示意 await 容许被中断 if (Thread.interrupted()) throw new InterruptedException(); //创立一个新的节点,节点状态为 condition,采纳的数据结构依然是单向链表 Node node = addConditionWaiter(); //开释以后的锁,失去锁的状态,并唤醒 AQS 队列中的一个线程 //不论重入几次,都把state开释为0 int savedState = fullyRelease(node); int interruptMode = 0; //如果以后节点没有在同步队列上,即还没有被 signal,则将以后线程阻塞 while (!isOnSyncQueue(node)) { ////通过 park 挂起以后线程 LockSupport.park(this); // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了. // interruptMode != THROW_IE -> 示意这个线程没有胜利将 node 入队,但 signal 执行了 enq 办法让其入队了. // 将这个变量设置成 REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 如果 node 的下一个期待者不是 null, 则进行清理,清理 Condition 队列上的节点. // 如果是 null ,就没有什么好清理的了. if (node.nextWaiter != null) { //清理掉状态为cancelled状态的 nlinkCancelledWaiters(); } // 如果线程被中断了,须要抛出异样.或者什么都不做 if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}
接下来吧整个办法里波及到的重要办法走一遍。
addConditionWaiter 办法
这个办法的次要作用是把以后线程封装成 Node,增加到期待队列。这里的队列不再是双向链表,而是单向链表。
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //如果lastWaiter为canclled状态,则把他从链表中清理进来 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的, //所以相比 AQS 里的双向队来说简略了很多 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null){ firstWaiter = node; } else{ t.nextWaiter = node; } lastWaiter = node; return node; }
下面这段代码用图来展现:
fullyRelease办法
就是彻底的开释锁,什么叫彻底呢,就是如果以后锁存在多次重入,那么在这个办法中只须要开释一次就会把所有的重入次数归零。
final int fullyRelease(Node node) { boolean failed = true; try { //获取AQS中state值 int savedState = getState(); //开释锁并且唤醒下一个同步队列中的线程 //留神这里解决的是同步队列 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
此时线程A开释了锁,线程B就取得的锁。上面用一张图来展现:
isOnSyncQueue 办法
判断以后节点是否在同步队列中,返回 false 示意不在,返回 true 示意在。
如果不在 AQS 同步队列,阐明以后节点没有唤醒去争抢同步锁,所以须要把以后线程阻塞起来,直到其余的线程调用 signal 唤醒。
如果在 AQS 同步队列,意味着它须要去竞争同步锁去取得执行程序执行权限。
为什么要做这个判断呢?
因为在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal的时候,会把以后节点从 condition 队列转移到 AQS 队列。
final boolean isOnSyncQueue(Node node) { //如果以后节点状态是CONDITION或node.prev是null,则证实以后节点在期待队列上而不是同步队列上。 //之所以能够用node.prev来判断,是因为一个节点如果要退出同步队列,在退出前就会设置好prev字段。 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果node.next不为null,则肯定在同步队列上, //因为node.next是在节点退出同步队列后设置的 if (node.next != null) return true; //后面的两个判断没有返回的话,就 //从同步队列队尾遍历一个一个看是不是以后节点。 return findNodeFromTail(node); } //这个办法就相当简略了,就是从同步队列队尾遍历一个一个看是不是以后节点。 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
如何去判断ThreadA
这个节点是否存在于 AQS
队列中呢?
- 如果
ThreadA
的waitStatus
的状态为 CONDITION,阐明它存在于 condition 队列中,不在AQS
队列。因为AQS
队列的状态肯定不可能有 CONDITION - 如果
node.prev
为空,阐明也不存在于AQS
队列,起因是prev=null
在AQS
队列中只有一种可能性,就是它是head 节点,head 节点意味着它是取得锁的节点。 - 如果
node.next
不等于空,阐明肯定存在于AQS
队列中,因为只有AQS
队列才会存在 next 和prev
的关系 findNodeFromTail
,示意从 tail 节点往前扫描AQS
队列,一旦发现AQS
队列的节点和以后节点相等,阐明节点肯定存在于AQS
队列中
signal办法
await 办法会阻塞 ThreadA
,而后 ThreadB
抢占到了锁取得了执行权限,这个时候在 ThreadB
中调用了 Condition的 signal()办法,将会唤醒在期待队列中节点。
public final void signal() { //先判断以后线程是否取得了锁,这个判断比较简单,间接用取得锁的线程和以后线程相比即可 if (!isHeldExclusively()){ //如果同步状态不是被以后线程独占,间接抛出异样。从这里也能看进去,Condition只能配合独占类同步组件应用。 throw new IllegalMonitorStateException(); } // 拿到 Condition 队列上第一个节点 Node first = firstWaiter; if (first != null){ //告诉期待队列队首的节点。 doSignal(first); } }
doSignal 办法
private void doSignal(Node first) { do { //从 Condition 队列中删除 first 节点 if ( (firstWaiter = first.nextWaiter) == null){ // 将 next 节点设置成 null lastWaiter = null; } first.nextWaiter = null; //transferForSignal办法尝试唤醒以后节点,如果唤醒失败,则持续尝试唤醒以后节点的后继节点。 } while (!transferForSignal(first) &&(first = firstWaiter) != null);}
transferForSignal 办法
final boolean transferForSignal(Node node) { //更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //调用 enq,把以后节点增加到AQS 队列。并且返回返回按以后节点的上一个节点,也就是原tail 节点 Node p = enq(node); int ws = p.waitStatus; // 如果上一个节点的状态被勾销了, 或者尝试设置上一个节点的状态为 SIGNAL //失败了(SIGNAL 示意: 他的 next 节点须要进行阻塞) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒节点上的线程 LockSupport.unpark(node.thread); //如果 node 的 prev 节点曾经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来实现 return true;}
执行完 doSignal
当前,会把 condition
队列中的节点转移到 AQS
队列上,这个时候会判断 ThreadA
的 prev
节点也就是 head 节点的 waitStatus。
如果大于 0 或者设置 SIGNAL 失败,示意点被设置成了 CANCELLED
状态。这个时候会唤醒ThreadA
这个线程。否则就基于 AQS
队列的机制来唤醒,也就是等到ThreadB
开释锁之后来唤醒 ThreadA
。
逻辑结构图如下:
从lock、await、signal,release的整个过程
Condition期待告诉的实质
总的来说,Condition的实质就是期待队列和同步队列的交互:
当一个持有锁的线程调用Condition.await()
时,它会执行以下步骤:
- 结构一个新的期待队列节点退出到期待队列队尾
- 开释锁,也就是将它的同步队列节点从同步队列队首移除
- 自旋,直到它在期待队列上的节点挪动到了同步队列(通过其余线程调用signal())或被中断
- 阻塞以后节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。
当一个持有锁的线程调用Condition.signal()
时,它会执行以下操作:
从期待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED
,就尝试唤醒下一个节点;如果再CANCELLED
则持续迭代。
对每个节点执行唤醒操作时,首先将节点退出同步队列,此时await()操作的步骤3的解锁条件就曾经开启了。而后分两种状况探讨:
- 如果先驱节点的状态为
CANCELLED
(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立刻唤醒以后节点对应的线程,此时await()办法就会实现步骤3,进入步骤4。 - 如果胜利把先驱节点的状态设置为了SIGNAL,那么就不立刻唤醒了。等到先驱节点成为同步队列首节点并开释了同步状态后,会主动唤醒以后节点对应线程的,这时候await()的步骤3才执行实现,而且有很大概率疾速实现步骤4。
总结
用一张图来总结:
线程 awaitThread 先通过lock.lock()办法获取锁胜利后调用了 condition.await 办法进入期待队列,而另一个线程signalThread 通过 lock.lock()办法获取锁胜利后调用了 condition.signal 或者 signalAll 办法,使得线程 awaitThread 可能有机会移入到同步队列中。
当其余线程开释 lock 后使得线程 awaitThread 可能有机会获取 lock,从而使得线程 awaitThread 可能从 await 办法中退出执行后续操作。如果 awaitThread 获取 lock 失败会间接进入到同步队列。
阻塞:await()办法中,在线程开释锁资源之后,如果节点不在 AQS 期待队列,则阻塞以后线程,如果在期待队列,则自旋期待尝试获取锁;
开释:signal()后,节点会从 condition 队列挪动到 AQS 期待队列,则进入失常锁的获取流程。
关注公众号“Java后端技术全栈”
**收费获取500G最新学习材料