上篇文章咱们详细分析了AQS的底层实现原理,这节就来摸索jdk中应用AQS实现的工具类

从源码看AQS


ReentrantLock

一, 是什么?怎么用?

是什么?

是一个独占锁,也就是在并发环境下同一时刻只能有一个线程取得资源,也是一个可重入锁.

可重入锁: 一个线程曾经获取到了该资源,下次再次获取资源时不会呈现期待状况(上次获取资源没有开释)

怎么用?

在各类并发的场景下,为了保障资源获取的正确性,能够保障每个资源同时只能被一个线程获取到.

例如: 宿舍选宿零碎(每张床位只能有一个学生抢到),秒杀流动(同一件商品不能被两个人买走)

二, 类架构

由下面架构图能够看出,ReentrantLock能够分为偏心锁和非偏心锁,而底层实现是AQS,在前面咱们还能够看到更多的类底层都是由AQS实现的,所以说相熟AQS原理对了解这些类是非常有必要的

类的属性

/** * 实现锁的同步器 */private final Sync sync;
/** * 形象同步器 * 子类可有偏心和非偏心两种形式,应用AQS的state字段来示意是否获取到锁和重入次数 */abstract static class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = -5179523762034025860L;    /**     * 依据子类实现,能够实现偏心锁和非偏心锁     */    abstract void lock();    /**     * 非偏心形式获取锁     */    final boolean nonfairTryAcquire(int acquires) {        final Thread current = Thread.currentThread();        int c = getState();        //如果以后状态值为0也就是以后锁没有被其余线程持有,则尝试获取锁        if (c == 0) {            //获取锁,如果过来胜利,则设置以后线程为独占线程            if (compareAndSetState(0, acquires)) {                setExclusiveOwnerThread(current);                return true;            }        }        //如果以后锁是以后线程所持有,则将重入次数+1        else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;            //如果冲入次数超过阈值,则将其置为负值            if (nextc < 0) // overflow                throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }        return false;    }    /**     * 开释锁     */    @Override    protected final boolean tryRelease(int releases) {        int c = getState() - releases;        //判断以后线程和锁持有线程是否为同一个线程        if (Thread.currentThread() != getExclusiveOwnerThread())            throw new IllegalMonitorStateException();        boolean free = false;        //判断以后可重入次数是否为0,如果为0则革除线程占有标记        if (c == 0) {            free = true;            //革除掉独占标记            setExclusiveOwnerThread(null);        }        setState(c);        return free;    }    /**     * 判断以后线程是否持有锁     */    @Override    protected final boolean isHeldExclusively() {        return getExclusiveOwnerThread() == Thread.currentThread();    }    /**     * 创立条件变量     */    final ConditionObject newCondition() {        return new ConditionObject();    }    /**     * 获取资源持有者     */    final Thread getOwner() {        return getState() == 0 ? null : getExclusiveOwnerThread();    }    /**     * 获取重入次数     */    final int getHoldCount() {        return isHeldExclusively() ? getState() : 0;    }    /**     * 是否曾经持有锁     */    final boolean isLocked() {        return getState() != 0;    }}

三, 具体实现

偏心式

先获取资源的状态,如果没有人占用,判断以后线程是否为队列的首节点,如果是则尝试获取资源,获取胜利批改独占线程,如果有人占用则判断独占线程和以后线程是否雷同,如果雷同的判断可重入的次数,超过抛出谬误,否则重入胜利
/** * 偏心锁 */static final class FairSync extends Sync {    private static final long serialVersionUID = -3000897897090466540L;    @Override    final void lock() {        acquire(1);    }    /**     * 获取锁     */    @Override    protected final boolean tryAcquire(int acquires) {        final Thread current = Thread.currentThread();        int c = getState();        if (c == 0) {            /**             * 以后线程为队列的头节点并且获取资源胜利,设置独占锁             */            if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {                setExclusiveOwnerThread(current);                return true;            }        } else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;            if (nextc < 0) {                throw new Error("Maximum lock count exceeded");            }            setState(nextc);            return true;        }        return false;    }}

非偏心式

先获取以后资源的状态,如果没有人占用,间接获取,获取胜利批改独占线程的状态,有人占用查看以后占有线程是否为以后线程,如果是则进行重入,此外还须要判断重入次数,如果超过了阈值,抛出谬误
/** * 非偏心锁 */static final class NonfairSync extends Sync {    private static final long serialVersionUID = 7316153563782823691L;    @Override    final void lock() {        /**         * 自旋获取资源,胜利后批改独占状态,         * 失败后持续尝试获取,并将其退出到队列中以CLH自旋锁形式始终尝试获取资源         */        if (compareAndSetState(0, 1)) {            setExclusiveOwnerThread(Thread.currentThread());        }        else {            //AQS外部办法,实则调用的是tryAcquire(),如果获取资源失败,则退出到AQS队列尾部,并且以自旋的方             //式始终尝试获取资源,不会响应中断,然而设置了终端标记,在获取到资源后会开释掉资源,并且将以后线程               //状态设置为CANCELLED,这一部分具体代码请看AQS源码解析            acquire(1);        }    }    /**     * 尝试获取资源     */    @Override    protected final boolean tryAcquire(int acquires) {        return nonfairTryAcquire(acquires);    }}

结构器

/*** 默认为非偏心锁,相比于偏心锁,性能更高,因为偏心锁每次还须要查看AQS中是否有期待的线程*/public ReentrantLock() {    sync = new NonfairSync();}//也能够指定创立偏心锁或非偏心锁public ReentrantLock(boolean fair) {    sync = fair ? new FairSync() : new NonfairSync();}

案例

之前某大厂的一个面试题,应用三个线程程序打印出ABC三个字母,第一个线程打印A,而后第二个线程打印B,第三个线程打印C,打印10轮
public class PrintWord {    private static ReentrantLock lock = new ReentrantLock();    static Condition conditionA = lock.newCondition();    static Condition conditionB = lock.newCondition();    static Condition conditionC = lock.newCondition();    private static int i = 1;    public static void main(String[] args) {        new Thread(() -> {            for (int j = 0; j < 10; j++) {                printA();            }        },"A").start();        new Thread(() -> {            for (int j = 0; j < 10; j++) {                printB();            }        },"B").start();        new Thread(() -> {            for (int j = 0; j < 10; j++) {                printC();            }        },"C").start();    }    private static void printA() {        lock.lock();        try {            if (i != 1) {                conditionA.await();            }            System.out.println(Thread.currentThread().getName());            i = 2;            conditionB.signal();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            lock.unlock();        }    }    private static void printB() {        lock.lock();        try {            if (i != 2) {                conditionB.await();            }            System.out.println(Thread.currentThread().getName());            i = 3;            conditionC.signal();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            lock.unlock();        }    }    private static void printC() {        lock.lock();        try {            if (i != 3) {                conditionC.await();            }            System.out.println(Thread.currentThread().getName());            i = 1;            conditionA.signal();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            lock.unlock();        }    }}

Semaphore

一, 是什么?怎么用?

是什么?

信号量,从概念上讲,信号量保护一组许可证,每个线程都能够来获取许可证,直至许可证为空

怎么用?

能够应用其管制并发线程的数量

二, 类架构

从下面架构图咱们能够看出,Semaphore底层也是应用的AQS,并且和ReentrantLock一样,都提供了偏心式和非偏心式获取资源

类的属性

/** * 实现信号量的同步器 */private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = 1192457210091910933L;    /**     * 设置许可证的数量     */    Sync(int permits) {        setState(permits);    }    /**     * 获取以后残余的许可证数量     */    final int getPermits() {        return getState();    }    /**     * 以共享的形式非偏心获取许可证     */    final int nonfairTryAcquireShared(int acquires) {        for (;;) {            //获取目前所残余的许可证            int available = getState();            //计算获取之后残余的许可证            int remaining = available - acquires;            //如果许可证数量为负就批改state值            if (remaining < 0 || compareAndSetState(available, remaining))                return remaining;        }    }    /**     * 以共享的形式开释许可证     */    @Override    protected final boolean tryReleaseShared(int releases) {        for (;;) {            int current = getState();            int next = current + releases;            if (next < current) // overflow                throw new Error("Maximum permit count exceeded");            //CAS设置许可证数量            if (compareAndSetState(current, next))                return true;        }    }    /**     * 依照具体的数量缩小许可证     */    final void reducePermits(int reductions) {        for (;;) {            int current = getState();            int next = current - reductions;            if (next > current)                 throw new Error("Permit count underflow");            if (compareAndSetState(current, next))                return;        }    }    /**     * 获取以后能够应用的许可证,如果等于0则间接批改state     */    final int drainPermits() {        for (;;) {            int current = getState();            if (current == 0 || compareAndSetState(current, 0))                return current;        }    }}

三, 具体实现

偏心式

获取许可证: 每次获取先都须要看AQS中是否有期待的线程,如果有,则间接退出,否则获取许可证,批改残余许可证的数量,并且返回残余许可证数量

开释许可证: 由AQS的releaseShared调用,开释许可证时,在原先的根底上加上开释的许可证,然而开释的数量不能为负,开释胜利,调用AQS中的doReleaseShared办法,将队列头节点的状态设置为0而后从头节点的后继节点中找出一个状态值小于0的线程节点开释

/** * 偏心式同步器 */static final class FairSync extends Sync {    private static final long serialVersionUID = 2014338818796000944L;    FairSync(int permits) {        super(permits);    }    /**     * 获取许可证,由AQS中的acquireShared办法调用,如果许可证数量小于0,     * 则将以后线程退出到队列中始终轮询尝试过来许可证     */    @Override    protected int tryAcquireShared(int acquires) {        for (;;) {            //队列中有期待的线程,间接返回            if (hasQueuedPredecessors())                return -1;            int available = getState();            int remaining = available - acquires;            //获取许可证,CAS批改state值            if (remaining < 0 || compareAndSetState(available, remaining))                return remaining;        }    }}

非偏心式

获取许可证: 上来间接尝试获取信号量,如果获取胜利返回残余许可证,如果许可证数量小于0则进入AQS队列中

开释许可证: 和非偏心式雷同

/** * 非偏心形式下同步器 */static final class NonfairSync extends Sync {    private static final long serialVersionUID = -2694183684443567898L;    NonfairSync(int permits) {        super(permits);    }    /**     * 获取资源,这一步由AQS中的acquireShared调用,每次获取资源后会返回残余的许可证,下面有写     * 如果小于等于0则以后线程会始终处于CLH锁中,如果大于0则会唤醒队列中所有状态为SIGNAL的线程     * 详情见AQS源码948行开始     */    @Override    protected int tryAcquireShared(int acquires) {        return nonfairTryAcquireShared(acquires);    }}

获取许可证

public void acquire() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}public void acquireUninterruptibly() {    sync.acquireShared(1);}

开释许可证

public void release() {    sync.releaseShared(1);}