一、基于AQS实现的锁

AQS(AbstractQueuedSynchronizer) 是Java并发包JUC中十分重要的一个类,大部分锁都是基于AQS实现的,次要实现的类如下:

  • ReentrantLock:可重入锁,独占锁,实现了偏心锁和非偏心锁,这个是上篇内容介绍的类,也是最罕用类,通常会和synchronized作比拟。
  • ReentrantReadWriteLock:读写锁,可共享也可独占锁,读是共享锁,写是独占锁,也实现了偏心锁和非偏心锁。
  • Semaphore:信号锁,共享锁,也实现了偏心锁和非偏心锁,次要同于管制流量,比方:数据库连接池给你调配10个链接,来一个调配一个,如果10个都调配完了且没有开释那就期待开释。
  • CountDownLatch:闭锁,共享锁,也实现了偏心锁和非偏心锁,Latch门闩的意思,比方:说四个人一个漂流艇,坐满了就推上水。

二、Semaphore

Semaphore是什么

下面曾经介绍了Semaphore,基于AQS实现的信号锁,是共享锁,实现了偏心锁和非偏心锁。能够用来管制同时拜访特定资源的线程数,通过协调各个线程以保障正当的应用资源。

Semaphore应用场景

通常用于资源有明确拜访数量限度的场景,罕用于限流。

比方:数据库连接池,同时进行连贯的线程数量有限度,连贯不能超过肯定的数量,当连贯达到了限度的数量后,前面的线程只能排队期待后面的线程开释了数据库链接能力获取数据库链接。

比方:停车场场景,车位数量无限,同时只能听肯定数量的车,当停满了之后里面的车只能等外面的车进去能力进去停车。

Semaphore的罕用办法

// 从信号锁获取获取一个锁,在获取到锁之前始终解决阻塞状态,除非线程被中断acquire();// 从信号锁获取获取指定数量锁,在获取到锁之前始终解决阻塞状态,除非线程被中断acquire(int permits);// 从信号锁获取一个锁,在获取到锁之前线程始终处于阻塞状态(疏忽中断)acquireUninterruptibly();// 尝试从信号锁获取锁,返回获取胜利或者失败,不会阻塞线程tryAcquire();// 尝试从信好锁获取锁,指定获取工夫,在指定工夫内没有获取到则超时返回,不会阻塞线程tryAcquire(long timeount, TimeUnit unit);// 开释锁release();// 获取期待队列中是否还有期待线程hadQueuedThreads();// 获取期待队列里阻塞线程的数量getQueuedLength();// 清空锁,返回清空锁的数量drainPermits();// 返回可用的锁的数量availabelPermits();

Semaphore实现原理

初始化

Semaphore提供了两个构造方法,默认构造方法创立指定锁数量的非偏心信号锁,另外一个构造方法多了一个指定是偏心锁还是非偏心锁的参数,源码如下:

// 构建指定数量锁的非偏心信号锁public Semaphore(int permits) {    sync = new NonfairSync(permits);}// 构建指定数量锁的偏心/非偏心信号锁public Semaphore(int permits, boolean fair) {    sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
获取锁过程
acquire

acquire()是获取锁办法,调用了外部类同步器Sync继承了AQS理论调用AQSacquireSharedInterruptibly()外围,源码如下:

 public void acquire() throws InterruptedExcetion {     sync.acquireSharedInterruptibly(); }

加JDK中,与锁相干的办法,Interruptibly示意可中断,也就是可中断锁。可中断锁的意思是线程在期待获取锁的过程中是能够被中断的,换言之,线程在期待锁的过程中能够响应中断。

acquireSharedInterruptiby

acquireSharedInterruptibly办法是获取可中断锁,源码如下:

public final void acquireSharedInterruptibly(int arg)    throws InterruptedException {    if (Thread.interrupted())        // 检测线程的中断状态,如果曾经被中断了,就响应中断,该办法会革除线程中的中断标识        throw new InterruptedException();    // 尝试获取锁,arg为锁的数量    // 当锁被获取完了之后,则为以后线程创立一个节点退出阻塞队列    if (tryAcquireShared(arg) < 0)        doAcquireSharedInterruptibly(arg);}

acquireSharedInterruptibly办法首先会判断以后线程的中断状态如果是中断状态则响应中断,抛异样,而后调用tryAcquireShared()办法获取锁,如果锁被获取完了就为以后线程创立一个节点退出期待队列。

tryAcquireShared

tryAcquireShared()AQS定义的一个模版办法,具体由子类实现,Semaphore也实现了偏心锁和非偏心锁,两种锁大同小异,咱们具体来看一下偏心锁的具体实现,源码如下:

protected int tryAcquireShared(int acquires) {    // 自旋    for (;;) {        // 判断是否有排在本人后面的线程,如果有间接返回-1,进入阻塞状态        if (hasQueuedPredecessors())            return -1;        // 获取同步状态的值(以后可用锁数量)        int available = getState();        // 残余锁数量, 可用的-申请的        int remaining = available - acquires;        // 如果残余的锁小于0, 或者设置胜利就返回,如果设置失败持续循环设置        // 如果残余锁数量小于0,返回正数,示意获取锁失败        // 如果残余锁数量大于0,且设置状态胜利,示意获取锁胜利        if (remaining < 0 ||            compareAndSetState(available, remaining))            return remaining;    }}

tryAcquireShared()通过自旋+CAS的形式获取锁和保障线程平安。

doAcquireSharedInterruptibly

doAcquireSharedInterruptibly()办法在偏心锁的时候如果以后线程后面有期待线程或者锁被获取完了之后,以后线程须要进入期待状态时会被调用,用于为以后线程创立节点并退出期待队列,源码如下:

private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {    // 为以后线程创立共享模式节点退出队列结尾    final Node node = addWaiter(Node.SHARED);    // 操作失败标记    boolean failed = true;    try {        //自旋        for (;;) {            // 获取以后节点的前一节点            final Node p = node.predecessor();            if (p == head) {                // 如果前一节点是头节点, 则尝试获取锁                int r = tryAcquireShared(arg);                if (r >= 0) {                    //如果获取胜利,设置头节点和共享模式流传                    setHeadAndPropagate(node, r);                    p.next = null;                    failed = false;                    return;                }            }            // 如果前一节点不是头节点或者没有获取到锁            // shouldParkAfterFailedAcquire办法判断以后线程是否须要被阻塞            // parkAndCheckInterrupt办法用于阻塞线程并检测线程是否被中断,如果被中断抛错            if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();        }    } finally {        if (failed)            // 自旋异样退出 勾销线程获取锁            cancelAcquire(node);    }}
获取锁过程总结
  1. 首先调用Semaphore.acquire()办法获取令牌,该办法调用内置同步器Sync.acquireSharedInterruptibly()办法,该同步器继承AQS理论调用的是AQSacquireSharedInterruptibly()办法。
  2. acquireSharedInterruptibly()办法首先判断以后线程是否被中断,如果中断了就抛InterruptedException异样,如果没有被中单,就调用tryAcquireShared()办法尝试获取锁。
  3. tryAcquireShared()办法AQS只是定了一个模版办法由子类实现,Semaphore.Sync同步器提供了两种实现,别离是FairSync(偏心锁)和NonfairSync(非偏心锁),这两种锁实现差不多,偏心锁就是多一个hasQueuedPredecessor()办法判断是否有排在本人后面的线程,如果有则返回-1。
  4. tryAcquireShared()办法通过自旋或者锁,先获取以后可用锁,减去须要获取的锁获取到残余锁,如果残余锁小于0间接返回,示意获取失败,否则通过CAS去获取锁,胜利获取返回胜利,失败获取返回失败。
  5. tryAcquireShared()如果获取锁失败,就要调用doAcquireSharedInterruptibly()办法用于为以后线程创立节点退出期待队列,该办法首先创立共享模式的节点退出队列,而后自旋,判断以后节点是不是头节点,如果是头节点也尝试获取锁,获取胜利的话设置头节点并胜利返回,如果获取失败则会调用shouldParkAfterFailedAcquire()判断以后线程是否须要期待,如果须要期待而后调用parkAndCheckInterrupt()办法阻塞线程并判断线程是否中断,如果中断则抛错,如果没有中断就阻塞再通过自旋获取锁。如果自旋异样退出,则调用cancelAcquire()办法勾销线程获取锁。
开释锁过程
release

Semaphore.relese()办法用于开释锁,开释一个锁,源码如下:

public void release(){    // 开释一个共享锁    sync.releaseShared(1);}

release()办法调用Semaphore.Sync同步器的releaseShared()办法,该同步器继承与AQS理论调用的是AQS.releaseShared()办法。

releaseShared

releaseShared()办法开释指定数量的共享锁,开释胜利之后会唤醒期待队列中的一个线程,源码如下:

public final boolean releaseShared(int arg) {    // 尝试开释锁    if (tryReleaseShared(arg)) {        // 开释胜利,唤醒期待队列中的线程        doReleaseShared();        return true;    }    return false;}
tryReleaseShared

tryReleaseShared()办法是AQS定义的模版办法由子类实现,调用了Semaphore.tryReleaseShared(),该办法通过自旋+CAS开释锁,源码如下:

protected final boolean tryReleaseShared(int releases) {    // 自旋    for(;;){        // 获取以后可用的锁数量        int current = getState();        // 可用的+开释的        int next = current + releases;        if (next < current) {            throw new Error("Maximum permit count exceeded");        }        // 通过CAS批改状态值开释锁        if (compareAndSetState(current, next))            return true;    }}
doReleaseShared

doReleaseShared()办法用于开释锁胜利之后唤醒期待队列中的线程,源码如下:

private void doReleaseShared() {    // 自旋    for (;;) {        // 将队列头节点赋值与节点h        Node h = head;        // 如果节点不为null,且不等于尾节点        if (h != null && h != tail) {            // 失去h节点的状态            int ws = h.waitStatus;            // 如果节点状态是Node.SIGNAL,就要唤醒节点h下一节点            if (ws == Node.SIGNAL) {                // 设置节点h的状态为勾销状态,如果失败就循环再试一次                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;                    // 唤醒节点h下一节点线程                unparkSuccessor(h);            }            // 如果节点h状态为0,就设置ws的状态为PROPAGATE,阐明下次循环的时候节点h应该无条件被流传            // 在shouldParkAfterFailedAcquire办法中应用            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                        }        // 如果队列中头节点发生变化就持续循环        // 如果没有发生变化就跳出循环        if (h == head)                               break;    }}
开释锁过程总结
  1. 首先调用Semaphore.release()办法开释锁,该办法调用同步器SyncreleasShared()办法,因为同步器继承与AQS所以理论调用的是AQS.releaseShared()办法。
  2. AQS.releaseShared()办法首先调用tryReleaseShared()办法尝试开释锁,该办法是AQS定义的模版办法,通过子类实现,调用的是Semaphore.tryReleaseShared()办法。
  3. Semaphore.tryReleaseShared()办法通过自旋+CAS开释锁,先获取以后的锁数量加上开释的锁数量,会判断超过判断,而后通过CAS批改锁的数量达到开释锁的目标。tryReleaseShared()开释锁胜利之后,会调用AQS.doReleaseShared()唤醒期待队列中的线程。
  4. AQS.doReleaseShared()用于开释锁胜利之后唤醒期待队列中的线程,也是通过自旋+CAS实现,首先获取头节点h,先判断节点h不为null且不是尾节点,失去节点h的状态,如果状态是Node.SIGNAL阐明这个节点曾经要被唤醒应该唤醒下一节点,通过CAS操作设置节点h状态为勾销,而后调用unparkSuccessor()办法唤醒下一节点。如果节点h的状态为0,就设置状态为PROPAGATE阐明下一次应该被无条件流传。如果队列中头节点发生变化就持续循环,没有发生变化就终止循环。

三、应用Semaphor实现停车场提示牌性能

每个停车场入口都有一个提示牌,下面显示着停车场残余的车位是多少,当残余车位为0时,则不容许车辆进入停车场,直到停车场有车来到停车场,这是提示牌显示新的残余车位数。

业务场景

  1. 停车场包容量为10。
  2. 当一辆车进入停车场后,显示牌的残余车位数响应的减1.
  3. 每有一辆车驶出停车场后,显示牌的残余车位数响应的加1。
  4. 停车场残余车位有余时,车辆只能在里面期待。

代码实现

public class SemaphoreDemo {    /**     * 停车场同时包容10辆车     */    private static Semaphore semaphore = new Semaphore(10);    public static void main(String[] args) {        // 模仿15辆车同时停车        for (int i = 0; i < 15; i++) {            Thread thread = new Thread(new Runnable() {                @Override                public void run() {                    try {                        System.out.println("=====" + Thread.currentThread().getName() + "车辆来到停车场");                        if (semaphore.availablePermits() == 0) {                            // 没有车位了                            System.out.println("车位有余,请急躁期待," + Thread.currentThread().getName() + "车辆正在期待");                        }                        // 获取车位停车                        semaphore.acquire();                        System.out.println(Thread.currentThread().getName()+"胜利进入停车场");                        //模仿车辆在停车场停留的工夫                        Thread.sleep(1000);                        //驶出停车场                        semaphore.release();                        System.out.println(Thread.currentThread().getName()+"驶出停车场");                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            });            thread.start();        }    }}

下面代码输入如下:

=====Thread-0车辆来到停车场=====Thread-3车辆来到停车场Thread-3胜利进入停车场=====Thread-1车辆来到停车场=====Thread-2车辆来到停车场Thread-1胜利进入停车场=====Thread-5车辆来到停车场=====Thread-4车辆来到停车场Thread-0胜利进入停车场=====Thread-7车辆来到停车场Thread-4胜利进入停车场Thread-7胜利进入停车场Thread-5胜利进入停车场=====Thread-6车辆来到停车场Thread-2胜利进入停车场Thread-6胜利进入停车场=====Thread-8车辆来到停车场Thread-8胜利进入停车场=====Thread-10车辆来到停车场Thread-10胜利进入停车场=====Thread-9车辆来到停车场=====Thread-14车辆来到停车场车位有余,请急躁期待,Thread-14车辆正在期待=====Thread-12车辆来到停车场车位有余,请急躁期待,Thread-12车辆正在期待=====Thread-11车辆来到停车场车位有余,请急躁期待,Thread-11车辆正在期待=====Thread-13车辆来到停车场车位有余,请急躁期待,Thread-9车辆正在期待车位有余,请急躁期待,Thread-13车辆正在期待Thread-14胜利进入停车场Thread-10驶出停车场Thread-2驶出停车场Thread-6驶出停车场Thread-11胜利进入停车场Thread-7驶出停车场Thread-3驶出停车场Thread-9胜利进入停车场Thread-13胜利进入停车场Thread-1驶出停车场Thread-0驶出停车场Thread-4驶出停车场Thread-5驶出停车场Thread-8驶出停车场Thread-12胜利进入停车场Thread-13驶出停车场Thread-9驶出停车场Thread-12驶出停车场Thread-14驶出停车场Thread-11驶出停车场

从下面输入能够看出,当10个车位被停满了之后,再进来的5辆车进入期待状态直到有车驶出停车场,而后再停车,达到了咱们预期的成果。