关于java:快速掌握并发编程深入学习Condition

6次阅读

共计 10909 个字符,预计需要花费 28 分钟才能阅读完成。

关注Java 后端技术全栈”**

回复“000”获取大量电子书

目录

notify 和 waitConditionCondition 应用案例生产者消费者测试类后果 Condition 源码剖析 await 办法 addConditionWaiter 办法 fullyRelease 办法 isOnSyncQueue 办法 signal 办法 doSignal 办法 transferForSignal 办法从 lock、await、signal,release 的整个过程 Condition 期待告诉的实质总结

notify 和 wait

在后面学习 synchronized 的时候:疾速把握并发编程 —synchronized 篇(上),有讲到 wait/notify 的根本应用,联合 synchronized 能够实现对线程的通信。

waitnotifynotifyAll是 Object 对象的属性,并不属于线程 Thread。

咱们先解释这三个的一个很重要的概念:

wait使持有该对象的线程把该对象的控制权交出去,而后处于期待状态(这句话很重要,也就是说当调用 wait 的时候会开释锁并处于期待的状态)

notify:告诉某个正在期待这个对象的控制权的线程能够持续运行(这个就是获取锁,使本人的程序开始执行,最初通过 notify 同样去开释锁,并唤醒正在期待的线程)

notifyAll:会告诉所有期待这个对象控制权的线程持续运行(和下面一样,只不过是唤醒所有期待的线程继续执行)

从下面的解释咱们能够看出通过 wait 和 notify 能够做线程之间的通信,当 A 线程处理完毕告诉 B 线程执行,B 线程执行结束当前 A 线程能够继续执行。

那么这个时候我就在思考了,既然 J.U.C 外面提供了 Lock 锁的实现机制,那·J.U.C外面有没有提供相似的线程通信的工具呢?

Condition

Condition 是一个多线程协调通信的工具类,能够让某些线程一起期待某个条件(condition),只有满足条件时,线程才会被唤醒。

Condition 应用案例

上面来实现一个十分典型的生产者和消费者模式;

生产者
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Producer implements Runnable{
    private Queue<String> msg;
    private int maxSize;
    Lock lock;
    Condition condition;
    public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
        this.msg = msg;
        this.maxSize = maxSize;
        this.lock = lock;
        this.condition = condition;
    }
    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            lock.lock();
                // 队列中音讯满了,此时生产者不能再生产了,因为装不下了
                // 所以生产者就开始期待状态
                while(msg.size()==maxSize){System.out.println("生产者队列满了,先期待");
                    try {condition.await(); // 阻塞线程并开释锁
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                }
            try {Thread.sleep(1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("生产音讯:"+i);
                msg.add("生产者的音讯内容"+i);
                condition.signal(); // 唤醒阻塞状态下的线程
            lock.unlock();}
    }
}
消费者
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Consumer implements Runnable{
    private Queue<String> msg;
    private int maxSize;
    Lock lock;
    Condition condition;
    public Consumer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
        this.msg = msg;
        this.maxSize = maxSize;
        this.lock = lock;
        this.condition = condition;
    }
    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            lock.lock(); //synchronized
            // 消费者进来的时候须要判断是有可用的音讯,// 没有可用的音讯就期待状态
            while(msg.isEmpty()){System.out.println("消费者队列空了,先期待");
                try {condition.await(); // 阻塞线程并开释锁   wait
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
            try {Thread.sleep(1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("生产音讯:"+msg.remove());
            condition.signal(); // 唤醒阻塞状态下的线程
            lock.unlock();}
    }
}
测试类
public class TestCondition {public static void main( String[] args ){Queue<String> queue=new LinkedList<>();
        Lock lock=new ReentrantLock(); // 重入锁
        Condition condition=lock.newCondition();
        int maxSize=5;
        Producer producer=new Producer(queue,maxSize,lock,condition);
        Consumer consumer=new Consumer(queue,maxSize,lock,condition);
        Thread t1=new Thread(producer);
        Thread t2=new Thread(consumer);
        t1.start();
        t2.start();}
}
后果

通过这个案例简略实现了 wait 和 notify 的性能,当调用 await 办法后,以后线程会开释锁并期待,而其余线程调用 condition 对象的 signal 或者 signalall 办法告诉并被阻塞的线程,而后本人执行 unlock 开释锁,被唤醒的线程取得之前的锁继续执行,最初开释锁。

所以,condition 中两个最重要的办法,一个是 await,一个是 signal 办法。

await:把以后线程阻塞挂起;

signal:唤醒阻塞的线程。

Condition 源码剖析

await 办法

在 Condition 接口只是定义了 await 办法

void await() throws InterruptedException;

实现类在 AQS

public final void await() throws InterruptedException {
    // 示意 await 容许被中断
    if (Thread.interrupted()) throw new InterruptedException();
    // 创立一个新的节点,节点状态为 condition,采纳的数据结构依然是单向链表
     Node node = addConditionWaiter();
    // 开释以后的锁,失去锁的状态,并唤醒 AQS 队列中的一个线程
    // 不论重入几次,都把 state 开释为 0
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果以后节点没有在同步队列上,即还没有被 signal,则将以后线程阻塞
    while (!isOnSyncQueue(node)) {
            //// 通过 park 挂起以后线程
            LockSupport.park(this);
            // 当这个线程醒来, 会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
        }
    // 当这个线程醒来, 会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
    // interruptMode != THROW_IE -> 示意这个线程没有胜利将 node 入队, 但 signal 执行了 enq 办法让其入队了.
    // 将这个变量设置成 REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
    // 如果 node 的下一个期待者不是 null, 则进行清理, 清理 Condition 队列上的节点. 
    // 如果是 null , 就没有什么好清理的了.
    if (node.nextWaiter != null) {
        // 清理掉状态为 cancelled 状态的 
        nlinkCancelledWaiters();}
    // 如果线程被中断了, 须要抛出异样. 或者什么都不做
    if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
}

接下来吧整个办法里波及到的重要办法走一遍。

addConditionWaiter 办法

这个办法的次要作用是把以后线程封装成 Node,增加到期待队列。这里的队列不再是双向链表,而是单向链表。

 /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            // 如果 lastWaiter 为 canclled 状态,则把他从链表中清理进来
            if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,// 所以相比 AQS 里的双向队来说简略了很多
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null){firstWaiter = node;} else{t.nextWaiter = node;}
            lastWaiter = node;
            return node;
        }

下面这段代码用图来展现:

fullyRelease办法

就是彻底的开释锁,什么叫彻底呢,就是如果以后锁存在多次重入,那么在这个办法中只须要开释一次就会把所有的重入次数归零。

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 获取 AQS 中 state 值
            int savedState = getState();
            // 开释锁并且唤醒下一个同步队列中的线程
            // 留神这里解决的是同步队列
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {throw new IllegalMonitorStateException();
            }
        } finally {if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

此时线程 A 开释了锁,线程 B 就取得的锁。上面用一张图来展现:

isOnSyncQueue 办法

判断以后节点是否在同步队列中,返回 false 示意不在,返回 true 示意在。

如果不在 AQS 同步队列,阐明以后节点没有唤醒去争抢同步锁,所以须要把以后线程阻塞起来,直到其余的线程调用 signal 唤醒。

如果在 AQS 同步队列,意味着它须要去竞争同步锁去取得执行程序执行权限。

为什么要做这个判断呢?

因为在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal 的时候,会把以后节点从 condition 队列转移到 AQS 队列。

 final boolean isOnSyncQueue(Node node) {
        // 如果以后节点状态是 CONDITION 或 node.prev 是 null,则证实以后节点在期待队列上而不是同步队列上。// 之所以能够用 node.prev 来判断,是因为一个节点如果要退出同步队列,在退出前就会设置好 prev 字段。if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 如果 node.next 不为 null,则肯定在同步队列上,// 因为 node.next 是在节点退出同步队列后设置的
        if (node.next != null)
            return true;
        // 后面的两个判断没有返回的话,就
        // 从同步队列队尾遍历一个一个看是不是以后节点。return findNodeFromTail(node);
    } 
   // 这个办法就相当简略了,就是从同步队列队尾遍历一个一个看是不是以后节点。private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

如何去判断 ThreadA 这个节点是否存在于 AQS队列中呢?

  1. 如果 ThreadAwaitStatus 的状态为 CONDITION,阐明它存在于 condition 队列中,不在 AQS 队列。因为 AQS 队列的状态肯定不可能有 CONDITION
  2. 如果 node.prev为空,阐明也不存在于 AQS队列,起因是 prev=nullAQS队列中只有一种可能性,就是它是 head 节点,head 节点意味着它是取得锁的节点。
  3. 如果 node.next 不等于空,阐明肯定存在于 AQS 队列中,因为只有 AQS队列才会存在 next 和 prev的关系
  4. findNodeFromTail,示意从 tail 节点往前扫描 AQS队列,一旦发现 AQS队列的节点和以后节点相等,阐明节点肯定存在于 AQS队列中
signal办法

await 办法会阻塞 ThreadA,而后 ThreadB抢占到了锁取得了执行权限,这个时候在 ThreadB中调用了 Condition 的 signal()办法,将会唤醒在期待队列中节点。

public final void signal() {
      // 先判断以后线程是否取得了锁,这个判断比较简单,间接用取得锁的线程和以后线程相比即可
      if (!isHeldExclusively()){
          // 如果同步状态不是被以后线程独占,间接抛出异样。从这里也能看进去,Condition 只能配合独占类同步组件应用。throw new IllegalMonitorStateException();}
       // 拿到 Condition 队列上第一个节点
       Node first = firstWaiter;
       if (first != null){
           // 告诉期待队列队首的节点。doSignal(first);
        }
 } 
doSignal 办法
private void doSignal(Node first) {
      do {
             // 从 Condition 队列中删除 first 节点
             if ((firstWaiter = first.nextWaiter) == null){
                    // 将 next 节点设置成 null
                    lastWaiter = null;
              }
              first.nextWaiter = null;
         //transferForSignal 办法尝试唤醒以后节点,如果唤醒失败,则持续尝试唤醒以后节点的后继节点。} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
transferForSignal 办法
final boolean transferForSignal(Node node) {
         // 更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 调用 enq,把以后节点增加到 AQS 队列。并且返回返回按以后节点的上一个节点,也就是原 tail 节点
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果上一个节点的状态被勾销了, 或者尝试设置上一个节点的状态为 SIGNAL 
       // 失败了(SIGNAL 示意: 他的 next 节点须要进行阻塞)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 唤醒节点上的线程
            LockSupport.unpark(node.thread);
        // 如果 node 的 prev 节点曾经是 signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来实现
        return true;
}

执行完 doSignal 当前,会把 condition 队列中的节点转移到 AQS队列上,这个时候会判断 ThreadAprev 节点也就是 head 节点的 waitStatus。

如果大于 0 或者设置 SIGNAL 失败,示意点被设置成了 CANCELLED 状态。这个时候会唤醒 ThreadA 这个线程。否则就基于 AQS队列的机制来唤醒,也就是等到 ThreadB 开释锁之后来唤醒 ThreadA

逻辑结构图如下:

从 lock、await、signal,release 的整个过程

Condition 期待告诉的实质


总的来说,Condition 的实质就是期待队列和同步队列的交互:

当一个持有锁的线程调用 Condition.await() 时,它会执行以下步骤:

  1. 结构一个新的期待队列节点退出到期待队列队尾
  2. 开释锁,也就是将它的同步队列节点从同步队列队首移除
  3. 自旋,直到它在期待队列上的节点挪动到了同步队列(通过其余线程调用 signal())或被中断
  4. 阻塞以后节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。

当一个持有锁的线程调用 Condition.signal() 时,它会执行以下操作:

从期待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点 CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED 则持续迭代。

对每个节点执行唤醒操作时,首先将节点退出同步队列,此时 await()操作的步骤 3 的解锁条件就曾经开启了。而后分两种状况探讨:

  1. 如果先驱节点的状态为 CANCELLED(>0) 或设置先驱节点的状态为 SIGNAL 失败,那么就立刻唤醒以后节点对应的线程,此时 await() 办法就会实现步骤 3,进入步骤 4。
  2. 如果胜利把先驱节点的状态设置为了 SIGNAL,那么就不立刻唤醒了。等到先驱节点成为同步队列首节点并开释了同步状态后,会主动唤醒以后节点对应线程的,这时候 await()的步骤 3 才执行实现,而且有很大概率疾速实现步骤 4。

总结

用一张图来总结:

线程 awaitThread 先通过 lock.lock()办法获取锁胜利后调用了 condition.await 办法进入期待队列,而另一个线程 signalThread 通过 lock.lock()办法获取锁胜利后调用了 condition.signal 或者 signalAll 办法,使得线程 awaitThread 可能有机会移入到同步队列中。

当其余线程开释 lock 后使得线程 awaitThread 可能有机会获取 lock,从而使得线程 awaitThread 可能从 await 办法中退出执行后续操作。如果 awaitThread 获取 lock 失败会间接进入到同步队列。

阻塞 :await() 办法中,在线程开释锁资源之后,如果节点不在 AQS 期待队列,则阻塞以后线程,如果在期待队列,则自旋期待尝试获取锁;

开释 :signal() 后,节点会从 condition 队列挪动到 AQS 期待队列,则进入失常锁的获取流程。

关注公众号“Java 后端技术全栈”

** 收费获取 500G 最新学习材料


正文完
 0