上篇文章咱们详细分析了AQS的底层实现原理,这节就来摸索jdk中应用AQS实现的工具类
从源码看AQS
ReentrantLock
一, 是什么?怎么用?
是什么?
是一个独占锁,也就是在并发环境下同一时刻只能有一个线程取得资源,也是一个可重入锁.可重入锁: 一个线程曾经获取到了该资源,下次再次获取资源时不会呈现期待状况(上次获取资源没有开释)
怎么用?
在各类并发的场景下,为了保障资源获取的正确性,能够保障每个资源同时只能被一个线程获取到.例如: 宿舍选宿零碎(每张床位只能有一个学生抢到),秒杀流动(同一件商品不能被两个人买走)
二, 类架构
由下面架构图能够看出,ReentrantLock能够分为偏心锁和非偏心锁,而底层实现是AQS,在前面咱们还能够看到更多的类底层都是由AQS实现的,所以说相熟AQS原理对了解这些类是非常有必要的
类的属性
/** * 实现锁的同步器 */private final Sync sync;
/** * 形象同步器 * 子类可有偏心和非偏心两种形式,应用AQS的state字段来示意是否获取到锁和重入次数 */abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 依据子类实现,能够实现偏心锁和非偏心锁 */ abstract void lock(); /** * 非偏心形式获取锁 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果以后状态值为0也就是以后锁没有被其余线程持有,则尝试获取锁 if (c == 0) { //获取锁,如果过来胜利,则设置以后线程为独占线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果以后锁是以后线程所持有,则将重入次数+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //如果冲入次数超过阈值,则将其置为负值 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 开释锁 */ @Override protected final boolean tryRelease(int releases) { int c = getState() - releases; //判断以后线程和锁持有线程是否为同一个线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //判断以后可重入次数是否为0,如果为0则革除线程占有标记 if (c == 0) { free = true; //革除掉独占标记 setExclusiveOwnerThread(null); } setState(c); return free; } /** * 判断以后线程是否持有锁 */ @Override protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /** * 创立条件变量 */ final ConditionObject newCondition() { return new ConditionObject(); } /** * 获取资源持有者 */ final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } /** * 获取重入次数 */ final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } /** * 是否曾经持有锁 */ final boolean isLocked() { return getState() != 0; }}
三, 具体实现
偏心式
先获取资源的状态,如果没有人占用,判断以后线程是否为队列的首节点,如果是则尝试获取资源,获取胜利批改独占线程,如果有人占用则判断独占线程和以后线程是否雷同,如果雷同的判断可重入的次数,超过抛出谬误,否则重入胜利
/** * 偏心锁 */static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; @Override final void lock() { acquire(1); } /** * 获取锁 */ @Override protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { /** * 以后线程为队列的头节点并且获取资源胜利,设置独占锁 */ if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextc); return true; } return false; }}
非偏心式
先获取以后资源的状态,如果没有人占用,间接获取,获取胜利批改独占线程的状态,有人占用查看以后占有线程是否为以后线程,如果是则进行重入,此外还须要判断重入次数,如果超过了阈值,抛出谬误
/** * 非偏心锁 */static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; @Override final void lock() { /** * 自旋获取资源,胜利后批改独占状态, * 失败后持续尝试获取,并将其退出到队列中以CLH自旋锁形式始终尝试获取资源 */ if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); } else { //AQS外部办法,实则调用的是tryAcquire(),如果获取资源失败,则退出到AQS队列尾部,并且以自旋的方 //式始终尝试获取资源,不会响应中断,然而设置了终端标记,在获取到资源后会开释掉资源,并且将以后线程 //状态设置为CANCELLED,这一部分具体代码请看AQS源码解析 acquire(1); } } /** * 尝试获取资源 */ @Override protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }}
结构器
/*** 默认为非偏心锁,相比于偏心锁,性能更高,因为偏心锁每次还须要查看AQS中是否有期待的线程*/public ReentrantLock() { sync = new NonfairSync();}//也能够指定创立偏心锁或非偏心锁public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
案例
之前某大厂的一个面试题,应用三个线程程序打印出ABC三个字母,第一个线程打印A,而后第二个线程打印B,第三个线程打印C,打印10轮
public class PrintWord { private static ReentrantLock lock = new ReentrantLock(); static Condition conditionA = lock.newCondition(); static Condition conditionB = lock.newCondition(); static Condition conditionC = lock.newCondition(); private static int i = 1; public static void main(String[] args) { new Thread(() -> { for (int j = 0; j < 10; j++) { printA(); } },"A").start(); new Thread(() -> { for (int j = 0; j < 10; j++) { printB(); } },"B").start(); new Thread(() -> { for (int j = 0; j < 10; j++) { printC(); } },"C").start(); } private static void printA() { lock.lock(); try { if (i != 1) { conditionA.await(); } System.out.println(Thread.currentThread().getName()); i = 2; conditionB.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private static void printB() { lock.lock(); try { if (i != 2) { conditionB.await(); } System.out.println(Thread.currentThread().getName()); i = 3; conditionC.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private static void printC() { lock.lock(); try { if (i != 3) { conditionC.await(); } System.out.println(Thread.currentThread().getName()); i = 1; conditionA.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }}
Semaphore
一, 是什么?怎么用?
是什么?
信号量,从概念上讲,信号量保护一组许可证,每个线程都能够来获取许可证,直至许可证为空
怎么用?
能够应用其管制并发线程的数量
二, 类架构
从下面架构图咱们能够看出,Semaphore底层也是应用的AQS,并且和ReentrantLock一样,都提供了偏心式和非偏心式获取资源
类的属性
/** * 实现信号量的同步器 */private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; /** * 设置许可证的数量 */ Sync(int permits) { setState(permits); } /** * 获取以后残余的许可证数量 */ final int getPermits() { return getState(); } /** * 以共享的形式非偏心获取许可证 */ final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取目前所残余的许可证 int available = getState(); //计算获取之后残余的许可证 int remaining = available - acquires; //如果许可证数量为负就批改state值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * 以共享的形式开释许可证 */ @Override protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS设置许可证数量 if (compareAndSetState(current, next)) return true; } } /** * 依照具体的数量缩小许可证 */ final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } /** * 获取以后能够应用的许可证,如果等于0则间接批改state */ final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }}
三, 具体实现
偏心式
获取许可证: 每次获取先都须要看AQS中是否有期待的线程,如果有,则间接退出,否则获取许可证,批改残余许可证的数量,并且返回残余许可证数量开释许可证: 由AQS的releaseShared调用,开释许可证时,在原先的根底上加上开释的许可证,然而开释的数量不能为负,开释胜利,调用AQS中的doReleaseShared办法,将队列头节点的状态设置为0而后从头节点的后继节点中找出一个状态值小于0的线程节点开释
/** * 偏心式同步器 */static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } /** * 获取许可证,由AQS中的acquireShared办法调用,如果许可证数量小于0, * 则将以后线程退出到队列中始终轮询尝试过来许可证 */ @Override protected int tryAcquireShared(int acquires) { for (;;) { //队列中有期待的线程,间接返回 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; //获取许可证,CAS批改state值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }}
非偏心式
获取许可证: 上来间接尝试获取信号量,如果获取胜利返回残余许可证,如果许可证数量小于0则进入AQS队列中开释许可证: 和非偏心式雷同
/** * 非偏心形式下同步器 */static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } /** * 获取资源,这一步由AQS中的acquireShared调用,每次获取资源后会返回残余的许可证,下面有写 * 如果小于等于0则以后线程会始终处于CLH锁中,如果大于0则会唤醒队列中所有状态为SIGNAL的线程 * 详情见AQS源码948行开始 */ @Override protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }}
获取许可证
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);}public void acquireUninterruptibly() { sync.acquireShared(1);}
开释许可证
public void release() { sync.releaseShared(1);}