关于java:Lock底层原理ReentrantLockAQSCondition

Lock底层原理—ReentrantLock、AQS、Condition

先来看看J.U.C包下的构造

  • juc-locks 锁框架
  • juc-atomic 原子类框架
  • juc-sync 同步器框架
  • juc-collections 汇合框架
  • juc-executors 执行器框架

而咱们明天的配角ReentrantLock,就是juc-locks包下的。上一篇刚算理解一下synchronized的底层原理,所以就想看看ReentrantLock和它的区别到底是什么,改良在哪里,实用于什么场景。

1. Lock

LockReadWriteLock是两大锁的根接口,上面看一下JDK 1.8 API中如何形容的

通过API的介绍,sychronizedLock的区别能够分为如下:

  • Lock减少了灵活性,最次要的就是反对多个Condition。
  • Lock提供了非阻塞取得锁的形式,synchronized有自旋锁。
  • Lock更适宜应用在简单的同步,而synchronized更简略、简杰。
  • Lock 能够设置偏心锁和非偏心锁,synchronized只有非偏心锁。

浏览一下Lock 源码

package java.util.concurrent.locks;

import java.util.concurrent.TimeUnit;

public interface Lock {
    //取得锁
    void lock();
    //获取锁定,除非以后线程中断
    void lockInterruptibly() throws InterruptedException;
    //只有在调用时才能够取得锁
    boolean tryLock();
    //如果在给定的等待时间内是闲暇的,并且以后的线程尚未失去 interrupted,则获取该锁
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    //开释锁。
    void unlock();
    //返回一个新Condition绑定到该实例Lock实例
    Condition newCondition();
}

2. ReentrantLock

Lock理解之后,看一下实现类是如何实现接口的办法的。最罕用的实现类就是ReentrantLock

2.1. 偏心锁和非偏心锁

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

从构造函数上能够看进去ReentrantLock实现了偏心锁和非偏心锁两种,默认个别应用非偏心锁,它的效率和吞吐量都比偏心锁高的多。

2.2. Sync


    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

2.3. NonfairSync

非偏心锁实现

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            /**
             * 先抢锁
             */
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

2.4. FairSync

说完了非偏心锁,那就不得不提偏心锁的实现了。

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
        
        //非偏心锁调用的是父类sync的nonfairTryAcquire办法
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

有了非偏心锁做铺垫,理解偏心锁就不便多了。偏心锁和非偏心锁不同之处在于,偏心锁在获取锁的时候,不会先去查看state状态。

偏心锁和非偏心锁最次要的区别在于lock(),一个是先排队,一个是先抢锁。然而非偏心锁的吞吐量更大。

通过查看ReentrantLock中的底层实现,咱们发现了AQS的形迹,先看一下AQS是什么,而后剖析一下整个锁的流程。

3. AbtractQueuedSynchronizer

AbtractQueuedSynchronizer形象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,是JUC包下整体的根底框架。AQS是一个形象的队列同步器,保护着一个同步队列和State状态 。底层应用了Unsafe类进行CAS操作。Sync就是AbstractQueuedSynchronizer的子类,AQS采纳了模板办法模式,进步了扩展性有保障了规范性。

AQS只是一个框架,具体资源的获取/开释形式交由自定义同步器去实现

JDK 1.8 API中的介绍如下

从上述介绍中能够看出:

  • 同步器依赖于每个Node的状态,状态都是通过CAS来设置的。
  • AQS反对独占模式共享模式

还有一些对于Condition,上面再去钻研。

看一下其外部图片:

AbtractQueuedSynchronizer 外部维持着一个虚构的双向同步队列,因为队列中只有头节点和尾节点的指针。

Node是AQS中的节点类

        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;

waitstatus

查看状态值

  • CANCELLED:当线程期待超时或者被中断,则勾销期待,设期待状态为-1,进入勾销状态则不再变动
  • SIGNAL:后继节点处于期待状态,以后节点被勾销或中断时会告诉后继节点,使后继节点的线程得以运行
  • CONDITION:以后节点处于期待队列,节点线程期待在Condition上,当其余线程对condition执行signall办法时,期待队列转移到同步队列,退出到对同步状态的获取
  • PROPAGATE:与共享模式相干,在共享模式中,该状态标识结点的线程处于可运行状态
  • 0状态:值为0,代表初始化状态。

那么SHAREDEXCLUSIVE是干什么的呢?

SHARED标识的是共享,而EXCLUSIVE是独占。然而ReentrantLock是排他锁。

看一下最次要的lock办法

通过CAS判断以后状态是否为闲暇状态(0),如果是则将状态设置为独占线程(1),这样获取锁胜利。CAS操作只保障一个线程获取锁,如果多个线程竞争,失败的就须要排队。

所以else就是失败者走的,古有名言:失败是胜利之母。那就看一下失败者的经验吧!!!

acquire是父类AbstractQueuedSynchronizer的办法。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire办法Nonfair重写了

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        final boolean nonfairTryAcquire(int acquires) {
            //咱们走下来 acquires = 1
            final Thread current = Thread.currentThread();
            //获取锁状态
            int c = getState();
            //如果是0,示意没有线程领有
            if (c == 0) {
                //获取锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //以后线程曾经占用了此锁
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //更新state值
                setState(nextc);
                return true;
            }
            //尝试获取失败
            return false;
        }

第一个吃螃蟹的当不上,只能当下一个了。再次尝试获取,如果此时状态为闲暇,则获取;如果有线程占用,判断是不是本人;如果是,更新状态值;如果不是,则失败。

!tryAcquire(arg) -> acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

缓缓往下撸

    private Node addWaiter(Node mode) {
        //包装成一个结点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //无头节点,进入enq创立队列也就是头节点
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
                //CAS操作设置头节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //设置成尾节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取前驱结点
                final Node p = node.predecessor();
                //作为第二节点,能够尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    //获取锁胜利
                    setHead(node);//将以后节点设置为头节点
                    p.next = null;// help GC
                    failed = false;//更新failed
                    return interrupted;//返回是否被中断过
                }
                //锁未获取后,判断是否能够被挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

到当初有点走晕了,先捋一捋

  1. 通过lock获取锁,如果获取失败,则进入acquire办法尝试再次获取。
  2. acquire中通过!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)进入两个办法,第一个是尝试第二次尝试获取锁,如果失败再次看第二个办法。
  3. addWaiter将以后线程包装成结点退出同步队列。
  4. acquireQueued办法addWaiter返回、已入队的结点作为参数,而后第三次尝试获取锁(获取的条件是成为二号结点)。
  5. 最初通过shouldParkAfterFailedAcquireparkAndCheckInterrupt判断是否须要被挂起。

此时咱们再看shouldParkAfterFailedAcquireparkAndCheckInterrupt如何实现的

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驱结点的状态
        int ws = pred.waitStatus;
        //如果前驱结点能够符合条件(能够唤醒以后线程)
        if (ws == Node.SIGNAL)
            return true;
        //如果不符合条件,则始终向前遍历,找到能够唤醒本人的结点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果都没有符合条件的结点,那就通过CAS条件将前驱结点的设置为signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    //挂起以后线程,返回线程中断状态并重置
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

这两个办法执行实现后,就代表着线程曾经被挂起了,只能期待signal标记的线程来唤醒它。

    static void selfInterrupt() {
        //中断以后线程
        Thread.currentThread().interrupt();
    }

最初回到selfInterrupt办法。

4. 通过ReentrantLock来理解AQS

首先明确其调用过程很重要

先以非偏心锁举例子:

4.1. lock()

        final void lock() {
            /** 上来先抢锁 */
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            /** 抢锁失败 */
            else
                acquire(1);
        }

4.2. acquire(int arg)

    public final void acquire(int arg) { /** arg = 1 */
        if (!tryAcquire(arg) && /** 尝试获取锁 */
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) /** 退出同步队列、在期待队列中排队拿资源 */
            selfInterrupt();
    }

​ 此办法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程间接返回,否则进入期待队列,直到获取到资源为止,且整个过程疏忽中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就能够去执行其临界区代码了。

4.2.1. tryAcquire(int arg)

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

tryAcquire是在NonfairSync中实现的,其中调用的是父类Sync中实现的nonfairTryAcquire办法。

4.2.1.1. nonfairTryAcquire(int acquires)
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            /** 以后资源的状态为无锁状态  则以后线程尝试获取锁状态 */
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    /** 将以后线程设置成独占的 */
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            /**  如果以后线程是资源的持有者 则进行重入 */
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

​ 此办法尝试去获取独占资源。如果获取胜利,则间接返回true,否则间接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。

4.2.2. addWaiter(Node mode)

    private Node addWaiter(Node mode) {
        /** 为以后线程创立一个节点 */
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        /** 尝试疾速形式间接放到队尾 */
        if (pred != null) {
            node.prev = pred;
            /** 是否能连贯胜利 */
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        /** 疾速如队尾失败 则走enq办法 */
        enq(node);
        return node;
    }

​ 此办法用于将以后线程退出到期待队列的队尾,并返回以后线程所在的结点。

4.2.2.1. enq(Node node)
    private Node enq(final Node node) {
        /** 自旋 放入队尾 */
        for (;;) {
            Node t = tail;
            if (t == null) { /** 队列为空 创立一个空的标记结点作为head结点 并将tail也指向它 */
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { /** 失常流程,放入队尾 */
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

CAS自旋volatile变量,是一种很经典的用法。

4.2.3. acquireQueued(Node node , int arg)

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            /** 标记是否被中断过 */
            boolean interrupted = false;
            /** 自旋 */
            for (;;) {
                /** 获取前驱节点 */
                final Node p = node.predecessor();
                /** 前驱节点是头节点(阐明是二号节点) 且 尝试获取锁(头节点可能开释资源) */
                /** 可能被唤醒的、也可能是被中断奥的 */
                if (p == head && tryAcquire(arg)) {
                    /** 拿到资源后 将head指向该结点 所以head所指的标杆结点*/
                    /** 就是以后获取到资源的那个结点或null */
                    setHead(node);
                    /** setHead 中 曾经将 node.pred 设置为 null */
                    /** 就是为了不便GC收集该节点 也就是 拿完资源的节点须要出队列了 */
                    p.next = null; // help GC
                    /** 胜利获取资源 */
                    failed = false;
                    /** 返回在期待期间是否被中断过 */
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && /** 判断本人是否能够劳动 */
                    parkAndCheckInterrupt()) /** 如果能够劳动 LockSupport.park() 进入 waiting状态 如果不可中断的状况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而持续进入park()期待。*/
                    /** 如果期待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true */
                    interrupted = true;
            }
        } finally {
            /** 如果期待过程中没有胜利获取资源(如timeout,或者可中断的状况下被中断了),
            那么勾销结点在队列中的期待。 */
            if (failed)
                cancelAcquire(node);
        }
    }

到这里咱们先进入shouldParkAfterFailedAcquireparkAndCheckInterrupt看看具体操作。

4.2.4.1. shouldParkAfterFailedAcquire(Node pred , Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; /** 前驱节点的状态*/
        if (ws == Node.SIGNAL)
            /*
             * 如果曾经通知前驱拿完号后告诉本人一下,那就能够安心劳动了
             */
            return true;
        if (ws > 0) {
            /*
             *  如果前驱放弃了,那就始终往前找,直到找到最近一个失常期待的状态,并排在它的后边。
             *  留神:那些放弃的结点,因为被“加塞”,它们相当于造成一个无援用链,稍后就会被GC回收
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 如果前驱失常,那就把前驱的状态设置成SIGNAL,通知它拿完号后告诉本人一下。
             * 有可能失败,人家说不定刚刚开释完呢
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果前驱节点不是SIGNAL,则不能安心劳动,须要找到离本人最近的失常节点,而后接在它前面。

4.2.4.2. parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        /** Thread.interrupted()会革除以后线程的中断标记位  */
        return Thread.interrupted();
    }

让线程安心的进入期待状态。park()会让以后线程进入waiting状态。在此状态下,有两种路径能够唤醒该线程:被unpark()、被interrupt()。须要留神的是,Thread.interrupted()会革除以后线程的中断标记位。

这里插入一下线程的状态转换:

4.2.4. selfInterrupt()

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

以后线程进行中断。

4.2.5 acquire小结

仍旧是这个图片,lock的全过程。

lock -> acquire -> tryAcquire -> addWaiter -> enq -> acquireQueued -> shouldParkAfterFailedAcquire -> parkAndCheckInterrupt -> setHead -> 拿到锁、进行业务代码

4.3. unlock()

加锁搞定了,上面开始解锁 – unlock。还是独占模式下开释共享资源。

    public void unlock() {
        sync.release(1);
    }

unlock算是解锁的顶级入口,其中调用的还是AQS中的release办法

4.4. release(int arg)

加锁搞定了,上面开始解锁 – release 了。还是独占模式下开释共享资源,并且唤醒期待队列其余线程来获取共享资源。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            /** 找到头节点 */
            Node h = head;
            if (h != null && h.waitStatus != 0)
                /** 唤醒期待队列中的下一个线程 */
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

逻辑并不简单。它调用tryRelease()来开释资源。有一点须要留神的是,它是依据tryRelease()的返回值来判断该线程是否曾经实现开释掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点

4.4.1. tryRelease(int arg)

尝试开释指定量的资源。这个办法是交由子类来实现的。在ReentrantLock实现的。

    protected final boolean tryRelease(int releases) {
            /** 以后的状态值 - 须要开释的状态值 因为这外面会有重入状况 */
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            /** 开释胜利 */
            if (c == 0) {
                free = true;
                /** 独占线程设置为 null */
                setExclusiveOwnerThread(null);
            }
            /** 设置为开释后的状态 可能不为0 */
            setState(c);
            return free;
        }

​ 跟tryAcquire()一样,这个办法是须要独占模式的自定义同步器去实现的。失常来说,tryRelease()都会胜利的,因为这是独占模式,该线程来开释资源,那么它必定曾经拿到独占资源了,间接减掉相应量的资源即可(state-=arg),也不须要思考线程平安的问题。但要留神它的返回值,下面曾经提到了,release()是依据tryRelease()的返回值来判断该线程是否曾经实现开释掉资源了!所以自义定同步器在实现时,如果曾经彻底开释资源(state=0),要返回true,否则返回false。

4.4.2. unparkSuccessor(Node node)

private void unparkSuccessor(Node node) {
        /** node个别为以后线程所在的结点 */
        int ws = node.waitStatus;
        /** 置零以后线程所在的结点状态 容许失败 */
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        /** 找到下一个须要唤醒的结点s */
        Node s = node.next;
        /** 如果为空或已勾销 */
        if (s == null || s.waitStatus > 0) {
            s = null;
            /** 从后往前找 */
            for (Node t = tail; t != null && t != node; t = t.prev)
                /** waitStatus <= 0 都是无效节点 */
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            /** 唤醒下一个节点 */
            LockSupport.unpark(s.thread);
    }

 这个函数并不简单。一句话概括:用unpark()唤醒期待队列中最前边的那个未放弃线程,这里咱们也用s来示意吧。此时,再和acquireQueued()分割起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即便p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个平安点。这里既然s曾经是期待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),而后s把本人设置成head标杆结点,示意本人曾经获取到资源了,acquire()也返回了!!And then, DO what you WANT!

4.4.3. release总结

​ release()是独占模式下线程开释共享资源的顶层入口。它会开释指定量的资源,如果彻底开释了(即state=0),它会唤醒期待队列里的其余线程来获取资源。

问题:如果获取锁的线程在release时异样了,没有unpark队列中的其余结点,这时队列中的其余结点会怎么办?是不是没法再被唤醒了?

  1. 线程忽然死掉了?能够通过thread.stop来进行线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程平安的,曾经表明Deprecated;
  2. 线程被interupt了?线程在运行态是不响应中断的,所以也不会抛出异样;
  3. release代码有bug,抛出异样了?目前来看,Doug Lea的release办法还是比拟强壮的,没有看出能引发异样的情景(如果有,恐怕早被用户吐槽了)。除非本人写的tryRelease()有bug,那就没啥说的,本人写的bug只能本人含着泪去接受了

第四局部原文:

4.5. MyLock

仿照ReentrantLock来实现一个MyLock

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * @author gyh
 * @csdn https://blog.csdn.net/qq_40788718
 * @date 2020/6/4 10:15
 */

public class MyLock extends AbstractQueuedSynchronizer {

    public void lock() {
        this.acquire(1);
    }

    public void unlock() {
        this.release(1);
    }

    @Override
    protected boolean tryAcquire(int arg) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0){
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }else if (current == getExclusiveOwnerThread()){
            setState(1);
            return true ;
        }
        return false ;
    }

    @Override
    protected boolean tryRelease(int arg) {
        if (getExclusiveOwnerThread() != Thread.currentThread())
            throw new IllegalMonitorStateException() ;
        setExclusiveOwnerThread(null);
        setState(0);
        return true ;
    }

}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author gyh
 * @csdn https://blog.csdn.net/qq_40788718
 * @date 2020/6/4 13:59
 */

public class MyLockDemo {

    static int count = 0 ;

    public static void main(String[] args) throws InterruptedException {
        final MyLock lock = new MyLock() ;
        ExecutorService executorsService = Executors.newCachedThreadPool() ;
        CountDownLatch countDownLatch = new CountDownLatch(1000) ;
        for (int i=0 ; i<1000 ; i++){
            executorsService.execute(()->{
                try{
                    lock.lock();
                    count++ ;
                    System.out.println("线程:"+Thread.currentThread().getName()+" 数值:"+count);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                    countDownLatch.countDown();
                }
            });

        }
        countDownLatch.await();
        executorsService.shutdown();
        System.out.println(count);
    }
}

测试了几次,感觉胜利了,如果有问题请指出来。

4. Condition

Condition是一个接口,从1.5的时候呈现的,是用来代替Objectwaitnotify。所以不言而喻,Conditionawaitsignal必定效率更高、安全性更好。Condition是依赖于lock实现的。并且awaitsignal只能在lock的爱护之内应用。

sychronized + Object.wait = Lock + Condition

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;

public interface Condition {
    //导致以后线程等到发信号或 interrupted 。
    void await() throws InterruptedException;
    //使以后线程期待直到发出信号
    void awaitUninterruptibly();
    //使以后线程期待直到发出信号或中断,或指定的等待时间过来。 
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    //使以后线程期待直到发出信号或中断,或指定的等待时间过来。 
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    //使以后线程期待直到发出信号或中断,或者指定的最初期限过来。 
    boolean awaitUntil(Date deadline) throws InterruptedException;
    //唤醒一个期待线程。 
    void signal();
    //唤醒所有期待线程。 
    void signalAll();
}

(网图)

从图片中能够看出AQS的整体框架的大抵轮廓,ConditionObjectCondition的实现类。

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

能够看出这是队列的头和尾。

看一下两个最罕用的办法

ConditionObject.await

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //1
            Node node = addConditionWaiter();
            //2
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //3
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
             //4
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  1. 把以后线程的节点退出到期待队列中
  2. 因为调用await()办法的线程是曾经获取了锁的,所以在退出到期待队列之后,须要去开释锁,并且唤醒后继节点线程
  3. 挂起以后线程,当别的线程调用了signal(),并且是以后线程被唤醒的时候才从park()办法返回
  4. 当被唤醒后,该线程会尝试去获取锁,只有获取到了才会从await()办法返回,否则的话,会挂起本人

ConditionObject.signal

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

从first开始遍历期待队列,把第一个非空、没勾销的节点transfer到同步队列

5. 同步队列与期待队列

AQS中存有同步队列,而Condition中存有期待队列。他们二者有什么区别?

同步队列寄存着竞争同步资源的线程的援用;期待队列寄存着待唤醒的线程的援用。

同步队列中寄存着一个个节点,当线程获取同步状态失败时,同步器会将以后线程以及期待状态等信息结构成为一个节点并将其退出同步队列,首节点示意的获取同步状态胜利的线程节点。

Condition保护着一个期待队列,与同步队列类似。次要针对await和signal的操作。

借助博客:https://blog.csdn.net/MakeCon… 的例子可能更好了解

public class CondtionTest {

    public static class ThreadA extends Thread{
        @Override
        public void run(){
            try{
                lock.lock();
                for (int i=0 ;i<3;i++){
                    System.out.println("A过程输入" + " : " + ++index);
                    conditionB.signal();
                    conditionA.await();
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }

    public static class ThreadB extends Thread{
        @Override
        public void run(){
            try{
                lock.lock();
                for (int i=0 ;i<3;i++){
                    System.out.println("B过程输入" + " : " + ++index);
                    conditionC.signal();
                    conditionB.await();
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }

    public static class ThreadC extends Thread{
        @Override
        public void run(){
            try{
                lock.lock();
                for (int i=0 ;i<3;i++){
                    System.out.println("C过程输入" + " : " + ++index);
                    conditionA.signal();
                    conditionC.await();
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }

    public static ReentrantLock lock = new ReentrantLock();
    public static Condition conditionA = lock.newCondition();
    public static Condition conditionB = lock.newCondition();
    public static Condition conditionC = lock.newCondition();
    public static int index = 0;
    public static void main(String[] args){
        ThreadA threadA = new ThreadA();
        ThreadB threadB = new ThreadB();
        ThreadC threadC = new ThreadC();

        threadA.start();//(1)
        threadB.start();//(2)
        threadC.start();//(3)
    }
}

输入后果为:

这种状况是B线程先占到了锁了。

参考:

https://blog.csdn.net/m478387…

https://blog.csdn.net/zhangdo…

https://blog.csdn.net/qq_2860…

https://blog.csdn.net/fuyuwei…

https://www.cnblogs.com/yanlo…

https://blog.csdn.net/andyzha…

https://blog.csdn.net/MakeCon…

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据