关于java:Lock底层原理ReentrantLockAQSCondition

42次阅读

共计 18878 个字符,预计需要花费 48 分钟才能阅读完成。

Lock 底层原理—ReentrantLock、AQS、Condition

先来看看 J.U.C 包下的构造

  • juc-locks 锁框架
  • juc-atomic 原子类框架
  • juc-sync 同步器框架
  • juc-collections 汇合框架
  • juc-executors 执行器框架

而咱们明天的配角 ReentrantLock,就是 juc-locks 包下的。上一篇刚算理解一下synchronized 的底层原理,所以就想看看 ReentrantLock 和它的区别到底是什么,改良在哪里,实用于什么场景。

1. Lock

LockReadWriteLock 是两大锁的根接口,上面看一下 JDK 1.8 API 中如何形容的

通过 API 的介绍,sychronizedLock 的区别能够分为如下:

  • Lock减少了灵活性,最次要的就是反对多个 Condition。
  • Lock提供了非阻塞取得锁的形式,synchronized有自旋锁。
  • Lock更适宜应用在简单的同步,而 synchronized 更简略、简杰。
  • Lock 能够设置偏心锁和非偏心锁,synchronized只有非偏心锁。

浏览一下Lock 源码

package java.util.concurrent.locks;

import java.util.concurrent.TimeUnit;

public interface Lock {
    // 取得锁
    void lock();
    // 获取锁定,除非以后线程中断
    void lockInterruptibly() throws InterruptedException;
    // 只有在调用时才能够取得锁
    boolean tryLock();
    // 如果在给定的等待时间内是闲暇的,并且以后的线程尚未失去 interrupted,则获取该锁
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    // 开释锁。void unlock();
    // 返回一个新 Condition 绑定到该实例 Lock 实例
    Condition newCondition();}

2. ReentrantLock

Lock理解之后,看一下实现类是如何实现接口的办法的。最罕用的实现类就是ReentrantLock

2.1. 偏心锁和非偏心锁

    public ReentrantLock() {sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

从构造函数上能够看进去 ReentrantLock 实现了偏心锁和非偏心锁两种,默认个别应用 非偏心锁,它的效率和吞吐量都比偏心锁高的多。

2.2. Sync


    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}

        final ConditionObject newCondition() {return new ConditionObject();
        }

        final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}

        final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}

        final boolean isLocked() {return getState() != 0;
        }

        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

2.3. NonfairSync

非偏心锁实现

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            /**
             * 先抢锁
             */
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
        }
    }

2.4. FairSync

说完了非偏心锁,那就不得不提偏心锁的实现了。

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {acquire(1);
        }
        
        // 非偏心锁调用的是父类 sync 的 nonfairTryAcquire 办法
        protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

有了非偏心锁做铺垫,理解偏心锁就不便多了。偏心锁和非偏心锁不同之处在于,偏心锁在获取锁的时候,不会先去查看 state 状态。

偏心锁和非偏心锁最次要的区别在于lock(),一个是先排队,一个是先抢锁。然而非偏心锁的吞吐量更大。

通过查看 ReentrantLock 中的底层实现,咱们发现了 AQS 的形迹,先看一下 AQS 是什么,而后剖析一下整个锁的流程。

3. AbtractQueuedSynchronizer

AbtractQueuedSynchronizer形象的队列式的同步器,AQS 定义了一套多线程访问共享资源的同步器框架,是 JUC 包下整体的根底框架。AQS 是一个形象的队列同步器,保护着一个同步队列和 State 状态。底层应用了 Unsafe 类进行 CAS 操作。Sync就是 AbstractQueuedSynchronizer 的子类,AQS 采纳了模板办法模式,进步了扩展性有保障了规范性。

AQS 只是一个框架,具体资源的获取 / 开释形式交由自定义同步器去实现

JDK 1.8 API 中的介绍如下

从上述介绍中能够看出:

  • 同步器依赖于每个 Node 的状态,状态都是通过 CAS 来设置的。
  • AQS 反对 独占模式 共享模式

还有一些对于Condition,上面再去钻研。

看一下其外部图片:

AbtractQueuedSynchronizer 外部维持着一个 虚构的双向同步队列,因为队列中只有头节点和尾节点的指针。

Node是 AQS 中的节点类

        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;

waitstatus

查看状态值

  • CANCELLED:当线程期待超时或者被中断,则勾销期待,设期待状态为 -1,进入勾销状态则不再变动
  • SIGNAL:后继节点处于期待状态,以后节点被勾销或中断时会告诉后继节点,使后继节点的线程得以运行
  • CONDITION:以后节点处于期待队列,节点线程期待在 Condition 上,当其余线程对 condition 执行 signall 办法时,期待队列转移到同步队列,退出到对同步状态的获取
  • PROPAGATE:与共享模式相干,在共享模式中,该状态标识结点的线程处于可运行状态
  • 0 状态:值为 0,代表初始化状态。

那么 SHAREDEXCLUSIVE是干什么的呢?

SHARED标识的是共享,而 EXCLUSIVE 是独占。然而 ReentrantLock 是排他锁。

看一下最次要的 lock 办法

通过 CAS 判断以后状态是否为闲暇状态(0),如果是则将状态设置为独占线程(1),这样获取锁胜利。CAS 操作只保障一个线程获取锁,如果多个线程竞争,失败的就须要排队。

所以 else 就是失败者走的,古有名言:失败是胜利之母。那就看一下失败者的经验吧!!!

acquire是父类 AbstractQueuedSynchronizer 的办法。

    public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

tryAcquire办法 Nonfair 重写了

        protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
        }
        final boolean nonfairTryAcquire(int acquires) {
            // 咱们走下来 acquires = 1
            final Thread current = Thread.currentThread();
            // 获取锁状态
            int c = getState();
            // 如果是 0,示意没有线程领有
            if (c == 0) {
                // 获取锁
                if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 以后线程曾经占用了此锁
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 更新 state 值
                setState(nextc);
                return true;
            }
            // 尝试获取失败
            return false;
        }

第一个吃螃蟹的当不上,只能当下一个了。再次尝试获取,如果此时状态为闲暇,则获取;如果有线程占用,判断是不是本人;如果是,更新状态值;如果不是,则失败。

!tryAcquire(arg) -> acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

缓缓往下撸

    private Node addWaiter(Node mode) {
        // 包装成一个结点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 无头节点,进入 enq 创立队列也就是头节点
        enq(node);
        return node;
    }

    private Node enq(final Node node) {for (;;) {
            Node t = tail;
            if (t == null) { 
                //CAS 操作设置头节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // 设置成尾节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前驱结点
                final Node p = node.predecessor();
                // 作为第二节点,能够尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 获取锁胜利
                    setHead(node);// 将以后节点设置为头节点
                    p.next = null;// help GC
                    failed = false;// 更新 failed
                    return interrupted;// 返回是否被中断过
                }
                // 锁未获取后,判断是否能够被挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

到当初有点走晕了,先捋一捋

  1. 通过 lock 获取锁,如果获取失败,则进入 acquire 办法尝试再次获取。
  2. acquire中通过 !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 进入两个办法,第一个是尝试第二次尝试获取锁,如果失败再次看第二个办法。
  3. addWaiter将以后线程包装成结点退出同步队列。
  4. acquireQueued办法 addWaiter 返回、已入队的结点作为参数,而后第三次尝试获取锁(获取的条件是成为二号结点)。
  5. 最初通过 shouldParkAfterFailedAcquireparkAndCheckInterrupt判断是否须要被挂起。

此时咱们再看 shouldParkAfterFailedAcquireparkAndCheckInterrupt如何实现的

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱结点的状态
        int ws = pred.waitStatus;
        // 如果前驱结点能够符合条件(能够唤醒以后线程)if (ws == Node.SIGNAL)
            return true;
        // 如果不符合条件,则始终向前遍历,找到能够唤醒本人的结点
        if (ws > 0) {
            do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 如果都没有符合条件的结点,那就通过 CAS 条件将前驱结点的设置为 signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    // 挂起以后线程, 返回线程中断状态并重置
    private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
        return Thread.interrupted();}

这两个办法执行实现后,就代表着线程曾经被挂起了,只能期待 signal 标记的线程来唤醒它。

    static void selfInterrupt() {
        // 中断以后线程
        Thread.currentThread().interrupt();
    }

最初回到 selfInterrupt 办法。

4. 通过 ReentrantLock 来理解 AQS

首先明确其调用过程很重要

先以非偏心锁举例子:

4.1. lock()

        final void lock() {
            /** 上来先抢锁 */
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            /** 抢锁失败 */
            else
                acquire(1);
        }

4.2. acquire(int arg)

    public final void acquire(int arg) { /** arg = 1 */
        if (!tryAcquire(arg) && /** 尝试获取锁 */
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) /** 退出同步队列、在期待队列中排队拿资源 */
            selfInterrupt();}

​ 此办法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程间接返回,否则进入期待队列,直到获取到资源为止,且整个过程疏忽中断的影响 。这也正是 lock() 的语义,当然不仅仅只限于lock()。获取到资源后,线程就能够去执行其临界区代码了。

4.2.1. tryAcquire(int arg)

        protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
        }

tryAcquire是在 NonfairSync 中实现的,其中调用的是父类 Sync 中实现的 nonfairTryAcquire 办法。

4.2.1.1. nonfairTryAcquire(int acquires)
        final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            /** 以后资源的状态为无锁状态  则以后线程尝试获取锁状态 */
            if (c == 0) {if (compareAndSetState(0, acquires)) {
                    /** 将以后线程设置成独占的 */
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            /**  如果以后线程是资源的持有者 则进行重入 */
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

​ 此办法尝试去获取独占资源。如果获取胜利,则间接返回 true,否则间接返回 false。这也正是 tryLock()的语义,还是那句话,当然不仅仅只限于 tryLock()。

4.2.2. addWaiter(Node mode)

    private Node addWaiter(Node mode) {
        /** 为以后线程创立一个节点 */
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        /** 尝试疾速形式间接放到队尾 */
        if (pred != null) {
            node.prev = pred;
            /** 是否能连贯胜利 */
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        /** 疾速如队尾失败 则走 enq 办法 */
        enq(node);
        return node;
    }

​ 此办法用于将以后线程退出到期待队列的队尾,并返回以后线程所在的结点。

4.2.2.1. enq(Node node)
    private Node enq(final Node node) {
        /** 自旋 放入队尾 */
        for (;;) {
            Node t = tail;
            if (t == null) { /** 队列为空 创立一个空的标记结点作为 head 结点 并将 tail 也指向它 */
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { /** 失常流程,放入队尾 */
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

CAS 自旋 volatile 变量,是一种很经典的用法。

4.2.3. acquireQueued(Node node , int arg)

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            /** 标记是否被中断过 */
            boolean interrupted = false;
            /** 自旋 */
            for (;;) {
                /** 获取前驱节点 */
                final Node p = node.predecessor();
                /** 前驱节点是头节点(阐明是二号节点) 且 尝试获取锁(头节点可能开释资源) */
                /** 可能被唤醒的、也可能是被中断奥的 */
                if (p == head && tryAcquire(arg)) {
                    /** 拿到资源后 将 head 指向该结点 所以 head 所指的标杆结点 */
                    /** 就是以后获取到资源的那个结点或 null */
                    setHead(node);
                    /** setHead 中 曾经将 node.pred 设置为 null */
                    /** 就是为了不便 GC 收集该节点 也就是 拿完资源的节点须要出队列了 */
                    p.next = null; // help GC
                    /** 胜利获取资源 */
                    failed = false;
                    /** 返回在期待期间是否被中断过 */
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && /** 判断本人是否能够劳动 */
                    parkAndCheckInterrupt()) /** 如果能够劳动 LockSupport.park() 进入 waiting 状态 如果不可中断的状况下被中断了,那么会从 park()中醒过来,发现拿不到资源,从而持续进入 park()期待。*/
                    /** 如果期待过程中被中断过,哪怕只有那么一次,就将 interrupted 标记为 true */
                    interrupted = true;
            }
        } finally {
            /** 如果期待过程中没有胜利获取资源(如 timeout,或者可中断的状况下被中断了),那么勾销结点在队列中的期待。*/
            if (failed)
                cancelAcquire(node);
        }
    }

到这里咱们先进入 shouldParkAfterFailedAcquireparkAndCheckInterrupt看看具体操作。

4.2.4.1. shouldParkAfterFailedAcquire(Node pred , Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; /** 前驱节点的状态 */
        if (ws == Node.SIGNAL)
            /*
             * 如果曾经通知前驱拿完号后告诉本人一下,那就能够安心劳动了
             */
            return true;
        if (ws > 0) {
            /*
             *  如果前驱放弃了,那就始终往前找,直到找到最近一个失常期待的状态,并排在它的后边。*  留神:那些放弃的结点,因为被“加塞”,它们相当于造成一个无援用链,稍后就会被 GC 回收
             */
            do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 如果前驱失常,那就把前驱的状态设置成 SIGNAL,通知它拿完号后告诉本人一下。* 有可能失败,人家说不定刚刚开释完呢
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果前驱节点不是SIGNAL,则不能安心劳动,须要找到离本人最近的失常节点,而后接在它前面。

4.2.4.2. parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
        /** Thread.interrupted()会革除以后线程的中断标记位  */
        return Thread.interrupted();}

让线程安心的进入期待状态。park()会让以后线程进入 waiting 状态。在此状态下,有两种路径能够唤醒该线程:被 unpark()、被 interrupt()。须要留神的是,Thread.interrupted()会革除以后线程的中断标记位。

这里插入一下线程的状态转换:

4.2.4. selfInterrupt()

    static void selfInterrupt() {Thread.currentThread().interrupt();}

以后线程进行中断。

4.2.5 acquire 小结

仍旧是这个图片,lock 的全过程。

lock -> acquire -> tryAcquire -> addWaiter -> enq -> acquireQueued -> shouldParkAfterFailedAcquire -> parkAndCheckInterrupt -> setHead -> 拿到锁、进行业务代码

4.3. unlock()

加锁搞定了,上面开始解锁 – unlock。还是独占模式下开释共享资源。

    public void unlock() {sync.release(1);
    }

unlock 算是解锁的顶级入口,其中调用的还是 AQS 中的 release 办法

4.4. release(int arg)

加锁搞定了,上面开始解锁 – release 了。还是独占模式下开释共享资源,并且唤醒期待队列其余线程来获取共享资源。

    public final boolean release(int arg) {if (tryRelease(arg)) {
            /** 找到头节点 */
            Node h = head;
            if (h != null && h.waitStatus != 0)
                /** 唤醒期待队列中的下一个线程 */
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

逻辑并不简单。它调用 tryRelease()来开释资源。有一点须要留神的是,它是依据 tryRelease()的返回值来判断该线程是否曾经实现开释掉资源了!所以自定义同步器在设计 tryRelease()的时候要明确这一点

4.4.1. tryRelease(int arg)

尝试开释指定量的资源。这个办法是交由子类来实现的。在 ReentrantLock 实现的。

    protected final boolean tryRelease(int releases) {
            /** 以后的状态值 - 须要开释的状态值 因为这外面会有重入状况 */
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            /** 开释胜利 */
            if (c == 0) {
                free = true;
                /** 独占线程设置为 null */
                setExclusiveOwnerThread(null);
            }
            /** 设置为开释后的状态 可能不为 0 */
            setState(c);
            return free;
        }

​ 跟 tryAcquire()一样,这个办法是须要独占模式的自定义同步器去实现的。失常来说,tryRelease()都会胜利的,因为这是独占模式,该线程来开释资源,那么它必定曾经拿到独占资源了,间接减掉相应量的资源即可 (state-=arg),也不须要思考线程平安的问题。但要留神它的返回值,下面曾经提到了,release() 是依据 tryRelease()的返回值来判断该线程是否曾经实现开释掉资源了!所以自义定同步器在实现时,如果曾经彻底开释资源(state=0),要返回 true,否则返回 false。

4.4.2. unparkSuccessor(Node node)

private void unparkSuccessor(Node node) {
        /** node 个别为以后线程所在的结点 */
        int ws = node.waitStatus;
        /** 置零以后线程所在的结点状态 容许失败 */
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        /** 找到下一个须要唤醒的结点 s */
        Node s = node.next;
        /** 如果为空或已勾销 */
        if (s == null || s.waitStatus > 0) {
            s = null;
            /** 从后往前找 */
            for (Node t = tail; t != null && t != node; t = t.prev)
                /** waitStatus <= 0 都是无效节点 */
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            /** 唤醒下一个节点 */
            LockSupport.unpark(s.thread);
    }

这个函数并不简单。一句话概括:用 unpark()唤醒期待队列中最前边的那个未放弃线程 ,这里咱们也用 s 来示意吧。此时, 再和 acquireQueued()分割起来 ,s 被唤醒后, 进入 if (p == head && tryAcquire(arg))的判断 (即便 p!=head 也没关系,它会再进入 shouldParkAfterFailedAcquire() 寻找一个平安点。这里既然 s 曾经是期待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire()的调整,s 也必然会跑到 head 的 next 结点,下一次自旋 p ==head 就成立啦),而后 s 把本人设置成 head 标杆结点,示意本人曾经获取到资源了,acquire()也返回了!!And then, DO what you WANT!

4.4.3. release 总结

​ release()是独占模式下线程开释共享资源的顶层入口。它会开释指定量的资源,如果彻底开释了(即 state=0), 它会唤醒期待队列里的其余线程来获取资源。

问题:如果获取锁的线程在 release 时异样了,没有 unpark 队列中的其余结点,这时队列中的其余结点会怎么办?是不是没法再被唤醒了?

  1. 线程忽然死掉了?能够通过 thread.stop 来进行线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程平安的,曾经表明 Deprecated;
  2. 线程被 interupt 了?线程在运行态是不响应中断的,所以也不会抛出异样;
  3. release 代码有 bug,抛出异样了?目前来看,Doug Lea 的 release 办法还是比拟强壮的,没有看出能引发异样的情景(如果有,恐怕早被用户吐槽了)。除非本人写的 tryRelease()有 bug,那就没啥说的,本人写的 bug 只能本人含着泪去接受了

第四局部原文:

4.5. MyLock

仿照 ReentrantLock 来实现一个MyLock

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * @author gyh
 * @csdn https://blog.csdn.net/qq_40788718
 * @date 2020/6/4 10:15
 */

public class MyLock extends AbstractQueuedSynchronizer {public void lock() {this.acquire(1);
    }

    public void unlock() {this.release(1);
    }

    @Override
    protected boolean tryAcquire(int arg) {final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0){if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(current);
                return true;
            }
        }else if (current == getExclusiveOwnerThread()){setState(1);
            return true ;
        }
        return false ;
    }

    @Override
    protected boolean tryRelease(int arg) {if (getExclusiveOwnerThread() != Thread.currentThread())
            throw new IllegalMonitorStateException() ;
        setExclusiveOwnerThread(null);
        setState(0);
        return true ;
    }

}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author gyh
 * @csdn https://blog.csdn.net/qq_40788718
 * @date 2020/6/4 13:59
 */

public class MyLockDemo {

    static int count = 0 ;

    public static void main(String[] args) throws InterruptedException {final MyLock lock = new MyLock() ;
        ExecutorService executorsService = Executors.newCachedThreadPool() ;
        CountDownLatch countDownLatch = new CountDownLatch(1000) ;
        for (int i=0 ; i<1000 ; i++){executorsService.execute(()->{
                try{lock.lock();
                    count++ ;
                    System.out.println("线程:"+Thread.currentThread().getName()+"数值:"+count);
                }catch (Exception e){e.printStackTrace();
                }finally {lock.unlock();
                    countDownLatch.countDown();}
            });

        }
        countDownLatch.await();
        executorsService.shutdown();
        System.out.println(count);
    }
}

测试了几次,感觉胜利了,如果有问题请指出来。

4. Condition

Condition是一个接口,从 1.5 的时候呈现的,是用来代替 Objectwaitnotify。所以不言而喻,Conditionawaitsignal必定效率更高、安全性更好。Condition是依赖于 lock 实现的。并且 awaitsignal只能在 lock 的爱护之内应用。

sychronized + Object.wait = Lock + Condition

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;

public interface Condition {
    // 导致以后线程等到发信号或 interrupted。void await() throws InterruptedException;
    // 使以后线程期待直到发出信号
    void awaitUninterruptibly();
    // 使以后线程期待直到发出信号或中断,或指定的等待时间过来。long awaitNanos(long nanosTimeout) throws InterruptedException;
    // 使以后线程期待直到发出信号或中断,或指定的等待时间过来。boolean await(long time, TimeUnit unit) throws InterruptedException;
    // 使以后线程期待直到发出信号或中断,或者指定的最初期限过来。boolean awaitUntil(Date deadline) throws InterruptedException;
    // 唤醒一个期待线程。void signal();
    // 唤醒所有期待线程。void signalAll();}

(网图)

从图片中能够看出 AQS 的整体框架的大抵轮廓,ConditionObjectCondition 的实现类。

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

能够看出这是队列的头和尾。

看一下两个最罕用的办法

ConditionObject.await

        public final void await() throws InterruptedException {if (Thread.interrupted())
                throw new InterruptedException();
            //1
            Node node = addConditionWaiter();
            //2
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //3
            while (!isOnSyncQueue(node)) {LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
             //4
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  1. 把以后线程的节点退出到期待队列中
  2. 因为调用 await()办法的线程是曾经获取了锁的,所以在退出到期待队列之后,须要去开释锁,并且唤醒后继节点线程
  3. 挂起以后线程,当别的线程调用了 signal(),并且是以后线程被唤醒的时候才从 park()办法返回
  4. 当被唤醒后,该线程会尝试去获取锁,只有获取到了才会从 await()办法返回,否则的话,会挂起本人

ConditionObject.signal

        public final void signal() {if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

从 first 开始遍历期待队列,把第一个非空、没勾销的节点 transfer 到同步队列

5. 同步队列与期待队列

AQS 中存有同步队列,而 Condition 中存有期待队列。他们二者有什么区别?

同步队列寄存着竞争同步资源的线程的援用;期待队列寄存着待唤醒的线程的援用。

同步队列中寄存着一个个节点,当线程获取同步状态失败时,同步器会将以后线程以及期待状态等信息结构成为一个节点并将其退出同步队列,首节点示意的获取同步状态胜利的线程节点。

Condition 保护着一个期待队列,与同步队列类似。次要针对 await 和 signal 的操作。

借助博客:https://blog.csdn.net/MakeCon… 的例子可能更好了解

public class CondtionTest {

    public static class ThreadA extends Thread{
        @Override
        public void run(){
            try{lock.lock();
                for (int i=0 ;i<3;i++){System.out.println("A 过程输入" + ":" + ++index);
                    conditionB.signal();
                    conditionA.await();}
            }catch (Exception e){e.printStackTrace();
            }finally {lock.unlock();
            }
        }
    }

    public static class ThreadB extends Thread{
        @Override
        public void run(){
            try{lock.lock();
                for (int i=0 ;i<3;i++){System.out.println("B 过程输入" + ":" + ++index);
                    conditionC.signal();
                    conditionB.await();}
            }catch (Exception e){e.printStackTrace();
            }finally {lock.unlock();
            }
        }
    }

    public static class ThreadC extends Thread{
        @Override
        public void run(){
            try{lock.lock();
                for (int i=0 ;i<3;i++){System.out.println("C 过程输入" + ":" + ++index);
                    conditionA.signal();
                    conditionC.await();}
            }catch (Exception e){e.printStackTrace();
            }finally {lock.unlock();
            }
        }
    }

    public static ReentrantLock lock = new ReentrantLock();
    public static Condition conditionA = lock.newCondition();
    public static Condition conditionB = lock.newCondition();
    public static Condition conditionC = lock.newCondition();
    public static int index = 0;
    public static void main(String[] args){ThreadA threadA = new ThreadA();
        ThreadB threadB = new ThreadB();
        ThreadC threadC = new ThreadC();

        threadA.start();//(1)threadB.start();//(2)threadC.start();//(3)}
}

输入后果为:

这种状况是 B 线程先占到了锁了。

参考:

https://blog.csdn.net/m478387…

https://blog.csdn.net/zhangdo…

https://blog.csdn.net/qq_2860…

https://blog.csdn.net/fuyuwei…

https://www.cnblogs.com/yanlo…

https://blog.csdn.net/andyzha…

https://blog.csdn.net/MakeCon…

正文完
 0