天穹之边,浩瀚之挚,眰恦之美; 悟心悟性,虎头蛇尾,惟善惟道! —— 朝槿《朝槿兮年说》


写在结尾

在并发编程畛域,有两大外围问题:一个是互斥,即同一时刻只容许一个线程访问共享资源;另一个是同步,即线程之间如何通信、合作。
次要起因是,对于多线程实现实现并发,始终以来,多线程都存在2个问题:

  • 线程之间内存共享,须要通过加锁进行管制,然而加锁会导致性能降落,同时简单的加锁机制也会减少编程编码难度
  • 过多线程造成线程之间的上下文切换,导致效率低下

因而,在并发编程畛域中,始终有一个很重要的设计准则: “ 不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”
简略来说,就是尽可能通过音讯通信,而不是内存共享来实现过程或者线程之间的同步。

关健术语


本文用到的一些要害词语以及罕用术语,次要如下:

  • 并发(Concurrent): 在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行结束之间,且这几个程序都是在同一个处理机上运行。
  • 并行(Parallel): 当零碎有一个以上CPU时,当一个CPU执行一个过程时,另一个CPU能够执行另一个过程,两个过程互不抢占CPU资源,能够同时进行。
  • 信号量(Semaphore):  是在多线程环境下应用的一种设施,是能够用来保障两个或多个要害代码段不被并发调用,也是作零碎用来解决并发中的互斥和同步问题的一种办法。
  • 信号量机制(Semaphores): 用来解决同步/互斥的问题的,它是1965年,荷兰学者 Dijkstra提出了一种行之有效的实现过程互斥与同步的办法。
  • 管程(Monitor) :  个别是指治理共享变量以及对共享变量的操作过程,让它们反对并发的一种机制。
  • 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个过程或线程应用,多个过程或线程不能同时应用公共资源。即就是同一时刻只容许一个线程访问共享资源的问题。
  • 同步(Synchronization):两个或两个以上的过程或线程在运行过程中协同步调,按预约的先后秩序运行。即就是线程之间如何通信、合作的问题。
  • 对象池(Object Pool): 指的是一次性创立出 N 个对象,之后所有的线程反复利用这 N 个对象,当然对象在被开释前,也是不容许其余线程应用的, 个别指保留实例对象的容器。

根本概述

在Java畛域中,咱们能够将锁大抵分为基于Java语法层面(关键词)实现的锁和基于JDK层面实现的锁。

在Java畛域中, 尤其是在并发编程畛域,对于多线程并发执行始终有两大外围问题:同步和互斥。其中:

  • 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个过程或线程应用,多个过程或线程不能同时应用公共资源。即就是同一时刻只容许一个线程访问共享资源的问题。
  • 同步(Synchronization):两个或两个以上的过程或线程在运行过程中协同步调,按预约的先后秩序运行。即就是线程之间如何通信、合作的问题。

针对对于这两大外围问题,利用管程是可能解决和实现的,因而能够说,管程是并发编程的万能钥匙。
尽管,Java在基于语法层面(synchronized 关键字)实现了对管程技术,然而从应用形式和性能上来说,内置锁(synchronized 关键字)的粒度绝对过大,不反对超时和中断等问题。
为了补救这些问题,从JDK层面对其“反复造轮子”,在JDK外部对其从新设计和定义,甚至实现了新的个性。
在Java畛域中,从JDK源码剖析来看,基于JDK层面实现的锁大抵次要能够分为以下4种形式:

  • 基于Lock接口实现的锁:JDK1.5版本提供的ReentrantLock类
  • 基于ReadWriteLock接口实现的锁:JDK1.5版本提供的ReentrantReadWriteLock类
  • 基于AQS根底同步器实现的锁:JDK1.5版本提供的并发相干的同步器Semaphore,CyclicBarrier以及CountDownLatch等
  • 基于自定义API操作实现的锁:JDK1.8版本中提供的StampedLock类

从浏览源码不难发现,在Java SDK 并发包次要通过AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。


一.AQS根底同步器根本实践

在Java畛域中,同步器是专门为多线程并发设计的同步机制,次要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。


一个规范的AQS同步器次要有同步状态机制,期待队列,条件队列,独占模式,共享模式等五大外围因素组成。
在Java畛域中,JDK的JUC(java.util.concurrent.)包中提供了各种并发工具,然而大部分同步工具的实现基于AbstractQueuedSynchronizer类实现,其内部结构次要如下:

  • 同步状态机制(Synchronization Status):次要用于实现锁(Lock)机制,是指同步状态,其要求对于状态的更新必须原子性的
  • 期待队列(Wait Queue):次要用于寄存期待线程获取到的锁资源,并且把线程保护到一个Node(节点)外面和保护一个非阻塞的CHL Node FIFO(先进先出)队列,次要是采纳自旋锁+CAS操作来保障节点插入和移除的原子性操作。
  • 条件队列(Condition Queue):用于实现锁的条件机制,个别次要是指替换“期待-告诉”工作机制,次要是通过ConditionObject对象实现Condition接口提供的办法实现。
  • 独占模式(Exclusive Mode):次要用于实现独占锁,次要是基于动态外部类Node的常量标记EXCLUSIVE来标识该节点是独占模式
  • 共享模式(Shared Mode):次要用于实现共享锁,次要是基于动态外部类Node的常量标记SHARED来标识该节点是共享模式

咱们能够失去一个比拟通用的并发同步工具根底模型,大抵蕴含如下几个内容,其中:

  • 条件变量(Conditional Variable): 利用线程间共享的变量进行同步的一种工作机制
  • 共享变量((Shared Variable)):个别指对象实体对象的成员变量和属性
  • 阻塞队列(Blocking Queue):共享变量(Shared Variable)及其对共享变量的操作对立封装
  • 期待队列(Wait Queue):每个条件变量都对应有一个期待队列(Wait Queue),外部须要实现入队操作(Enqueue)和出队操作(Dequeue)办法
  • 变量状态形容机(Synchronization Status):形容条件变量和共享变量之间状态变动,又能够称其为同步状态
  • 工作模式(Operation Mode): 线程资源具备排他性,因而定义独占模式和共享模式两种工作模式

综上所述,条件变量和期待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。

二. JDK显式锁对立概念模型

在并发编程畛域,有两大外围问题:一个是互斥,即同一时刻只容许一个线程访问共享资源;另一个是同步,即线程之间如何通信、合作。

综合Java畛域中的并发锁的各种实现与利用剖析来看,一把锁或者一种锁,基本上都会蕴含以下几个方面:

  • 锁的同步器工作机制:次要是思考共享模式还是独享模式,是否反对超时机制,以及是否反对超时机制?
  • 锁的同步器工作模式:次要是基于AQS根底同步器封装外部同步器,是否思考偏心/非偏心模式?
  • 锁的状态变量机制: 次要锁的状态设置,是否共享状态变量?
  • 锁的队列封装定义:次要是指期待队列和条件队列,是否须要条件队列或者期待队列定义?
  • 锁的底层实现操作: 次要是指底层CL锁和CAS操作,是否须要思考自旋锁或者CAS操作实例对象办法?
  • 锁的组合实现新锁: 次要是基于独占锁和共享锁,是否思考对应API自定义操作实现?

综上所述,大抵能够根据上述这些方向,咱们便能够分明️晓得Java畛域中各种锁实现的根本实践时和实现思维。

六.CountDownLatch(闭锁)的设计与实现

在Java畛域中,CountDownLatch(闭锁)是针对于Java多线程并发管制中倒计数器的具体数量,次要是采纳递加计数形式的倒计数器思维和基于AQS根底同步器来实现的一种同步器工具类。

CountDownLatch(闭锁)是Java多线程并发中最常见的一种同步器,从锁的性质上来看,属于共享锁,其性能相当于一个多线程环境下的倒数门闩。
CountDownLatch通过定义一个倒计数器,在并发环境下由线程进行递加1操作,当计数值变为0之后,被await办法阻塞的线程将会唤醒。
通过CountDownLatch能够实现线程间的计数同步。

1. 设计思维

一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上期待N个条件都满足后,才让所有的线程持续往下执行,其中倒计数器的数量则为N,每满足一个条件,倒计数器就顺次逐步递加1,直到N-1=0的时,所有期待的线程才往下继续执行。
CountDownLatch类最早是在JDK1.5版本提供的,从设计思维上来看,次要包含倒计数器的同步器,管制阻塞期待的办法,倒计数器的递加操作方法等3个外围因素。其中:

  • 倒计数器的同步器:基于AQS根底形象队列同步器封装内置实现一个动态的内置同步类,次要用于设置倒计数器的初始值以及定制AQS根底同步器的获取和开释共享锁。
  • 倒计数器的初始值: 个别在构建CountDownLatch类时指定,示意的是须要期待条件的个数,即就是倒计数器的具体的资源数量Source(N)。
  • 控制线程阻塞期待的办法:定义一个控制线程阻塞期待的办法,当倒计数器的具体的资源数量 Source(N)>0时,调用办法使其线程进入阻塞期待状态。
  • 倒计数器的递加操作方法:定义一个倒计数器的递加操作方法,调用办法就会把倒计数器递加1,当倒计数器的具体的资源数量 Source(N)-1=0时,所有期待的线程才往下继续执行。

简略来说,CountDownLatch次要是让某个线程或者多个线程,期待其余线程实现某件事情或者某个工作完结之后能力继续执行。

2. 根本实现

在CountDownLatch类的JDK1.8版本中,对于CountDownLatch的根本实现如下:

public class CountDownLatch {    private final Sync sync;    /**     * CountDownLatch锁-结构一个倒计数器     */    public CountDownLatch(int count) {        if (count < 0) throw new IllegalArgumentException("count < 0");        this.sync = new Sync(count);    }    /**     * CountDownLatch锁-基于AQS定义反对同步器实现     */    private  static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860 L;        //......其余办法代码    }    /**     * CountDownLatch锁-线程期待办法     */    public void await() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }    /**     * CountDownLatch锁-倒计数器递加操作     */    public void countDown() {        sync.releaseShared(1);    }    //... 其余代码}
  • 倒计数器同步器:基于AQS根底定义反对同步器实现一个动态私有化的同步器Sync类,其中定义了获取和开释共享锁的两个办法
  • 线程期待办法:次要是提供了一个await()办法,其本质是调用的是AQS根底同步器中的acquireSharedInterruptibly(int arg)办法,否则throws InterruptedException异样
  • 倒计数器递加操作方法: 次要是提供了一个countDown()办法,其本质是调用的是AQS根底同步器中的releaseShared(int arg) 办法
2.1 基于AQS同步器封装动态外部Sync抽象类
        /**     * CountDownLatch锁-基于AQS同步器封装一个外部的同步器     */private static final class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 4982264981922014374L;        Sync(int count) {            setState(count);        }        int getCount() {            return getState();        }        /**     * CountDownLatch锁-获取共享锁办法     */        protected int tryAcquireShared(int acquires) {            return (getState() == 0) ? 1 : -1;        }        /**     * CountDownLatch锁-开释共享锁办法     */        protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            for (;;) {                int c = getState();                if (c == 0)                    return false;                int nextc = c-1;                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }                    }
  • 实现形式: 次要基于AQS封装的外部动态形象Sync同步类实现,应用的AQS的共享模式
  • 次要办法: 次要定制适配提供了tryAcquireShared()和tryReleaseShared()办法,即就是tryAcquireShared()用于获取共享锁,tryReleaseShared()办法用于开释共享锁,其中:

    • 获取共享锁tryAcquireShared()办法:首先获取状态变量status,这里是指倒计数器中的数量,当status=0时,返回值=1,示意获取锁胜利;否则,status !=0 时,返回值=-1,示意获取共享锁失败进行入队。
    • 开释共享锁tryReleaseShared()办法: 通过自旋来实现递加操作,其中会获取状态变量status,将其递加1后应用compareAndSetState(c, nextc)办法通过CAS批改状态值
  • 锁获取形式: 次要是利用getCount()来获取倒计数器中的数量,同时还能够利用构造方法领导一个倒计数器中的数量。

3. 具体实现

public class CountDownLatch {  private final Sync sync;  /*** CountDownLatch锁-基于AQS根底同步器实现一个外部同步器*/  private static final class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = 4982264981922014374 L;    Sync(int count) {      setState(count);    }    int getCount() {      return getState();    }    protected int tryAcquireShared(int acquires) {      return (getState() == 0) ? 1 : -1;    }    protected boolean tryReleaseShared(int releases) {      // Decrement count; signal when transition to zero      for (;;) {        int c = getState();        if (c == 0)          return false;        int nextc = c - 1;        if (compareAndSetState(c, nextc))          return nextc == 0;      }    }  }  /*** CountDownLatch锁-结构一个倒计数器*/  public CountDownLatch(int count) {    if (count < 0) throw new IllegalArgumentException("count < 0");    this.sync = new Sync(count);  }  /*** CountDownLatch锁-基于AQS定义反对同步器实现*/  private static class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = -5179523762034025860 L;    //......其余办法代码  }  /*** CountDownLatch锁-线程期待办法*/  public void await() throws InterruptedException {    sync.acquireSharedInterruptibly(1);  }  /*** CountDownLatch锁-返回以后计数器*/  public long getCount() {    return sync.getCount();  }  /*** CountDownLatch锁-线程期待办法(反对超时机制)*/  public boolean await(long timeout, TimeUnit unit)  throws InterruptedException {    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));  }  /*** CountDownLatch锁-倒计数器递加操作*/  public void countDown() {    sync.releaseShared(1);  }}
  • 倒计数初始值:通过构造方法CountDownLatch(int count)指定一个倒计数器的初始值,其必须大于0,否则会throw new IllegalArgumentException("count < 0")
  • 线程期待办法: 次要提供了await() 办法和await(long timeout, TimeUnit unit)办法,其中:

    • 无参数await() 办法: 个别默认的办法,其本质是调用AQS同步器中的acquireSharedInterruptibly()办法,次要示意反对中断机制
    • 有参数await(long timeout, TimeUnit unit)办法: 是用于实现超时机制,其本质是调用AQS同步器中的tryAcquireSharedNanos(int arg, long nanosTimeout)办法
  • 倒计数递加操作方法:次要是countDown() 办法, 其本质是调用AQS同步器中的releaseShared(int arg) 办法,外围实现是AQS根底同步器的doReleaseShared办法。
  • 其余办法: 次要是getCount() 办法,用来获取倒计数个数,其本质是调用AQS同步器中getCount()办法,来获取状态变量

综上所述,从肯定意义上讲,CountDownLatch是一种共享锁,属于AQS根底形象队列同步器中共享模式孵化的产物,没有反对偏心模式与非偏心模式的实现。


七.CyclicBarrier(循环屏障)的设计与实现

在Java畛域中,CyclicBarrier(循环屏障)是针对于Java多线程并发管制中倒计数器的线程数量,次要是采纳递加计数形式的倒计数器思维和基于AQS根底同步器实现的ReentrantLock锁来实现的一种同步器工具类。

CyclicBarrier(循环屏障)是Java中通过对线程预约义设置一个屏障,只有当达到屏障的线程数量达到指定的最大屏障时,屏障才会让这些线程通过执行。
从肯定意义上来讲,这里的屏障实质上还是一个倒计数器,倒计数器的最大限度反对的数量就是咱们为线程设置屏障大小,其工作原理与CountDownLatch(闭锁)相似,都是通过让线程阻塞期待时,倒计数器执行递加1运算。
然而与CountDownLatch不同是,CyclicBarrier(循环屏障)是基于ReentrantLock(可重入锁)来实现的,更精确的说,CyclicBarrier是对ReentrantLock的利用实例。

1. 设计思维

一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上束缚N个线程,须要让指定数量的线程独特到达某一个节点之后,这些线程才会一起被执行。
CyclicBarrier(循环屏障)最早是在JDK1.5版本中提供的,从设计思维上来看,次要包含倒计数器的最大屏障,管制阻塞期待的办法,倒计数器的递加操作方法,和触发点线程工作等4个外围因素。其中:

  • 倒计数器的同步器: 次要基于ReentrantLock来实现控制线程对象,其本质还是基于AQS根底同步器实现。
  • 倒计数器的最大屏障数量:个别是在构建CyclicBarrier(循环屏障)对象是预约义设置,示意须要在某个运行节点上束缚的线程数量。
  • 控制线程阻塞期待的办法:定义一个办法,使得实现阻塞线程让其进入期待状态。
  • 倒计数器的递加操作方法:定义一个办法,使得让倒计数器进行递加1运算,直到达到屏障时,期待的线程才继续执行。
  • 触发点线程工作:个别指的是当指定数量的线程达到设置的屏障时,才会去触发执行的工作。

简略来说,CyclicBarrier(循环屏障)是让多个线程相互期待,直到达到一个同步的运行节点。再持续一起执行。

2. 根本实现

在CyclicBarrier类的JDK1.8版本中,对于CountDownLatch的根本实现如下:

public class CyclicBarrier {    /** CyclicBarrier锁—屏障lock实体 */    private final ReentrantLock lock = new ReentrantLock();    /** CyclicBarrier锁—屏障条件队列 */    private final Condition trip = lock.newCondition();    /**  CyclicBarrier锁—屏障最大值 */    private final int parties;    /**  CyclicBarrier锁—屏障触发线程工作指标 */    private final Runnable barrierCommand;    /**  CyclicBarrier锁—以后计数器的最大值屏障实例 */    private Generation generation = new Generation();    /**  CyclicBarrier锁—以后计数器的最大值屏障实例 */    private int count;    /**  CyclicBarrier锁—屏障实例 */    private static class Generation {        boolean broken = false;    }    /**  CyclicBarrier锁—结构一个屏障实例(不带触发工作的) */    public CyclicBarrier(int parties) {        this(parties, null);    }    /**  CyclicBarrier锁—结构一个屏障实例(带触发工作的) */    public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;        this.count = parties;        this.barrierCommand = barrierAction;    }    /**  CyclicBarrier锁—无参数结构一个期待办法(默认模式) */    public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }    /**  CyclicBarrier锁—有参数结构一个期待办法(反对超时机制) */    public int await(long timeout, TimeUnit unit)    throws InterruptedException,    BrokenBarrierException,    TimeoutException {        return dowait(true, unit.toNanos(timeout));    }    /**  CyclicBarrier锁—更新状态变量 */    private void nextGeneration() {        // signal completion of last generation        trip.signalAll();        // set up next generation        count = parties;        generation = new Generation();    }    /**  CyclicBarrier锁—阻塞屏障 */    private void breakBarrier() {        generation.broken = true;        count = parties;        trip.signalAll();    }    //...其余代码}
  • 预约义设置屏障最大值: 次要是通过变量parties来实现预约义设置屏障最大值
  • 设置以后屏障数量:次要是通过变量count来实现
  • 控制线程的对象实例: 次要是通过ReentrantLock和Condition来控制线程间通信
  • 触发指标工作对象: 次要是通过Runable来定义barrierCommand变量
  • 提供了两个构造方法:都须要预约义指定屏障最大值parties,其中一个须要传入barrierAction触发点工作
  • 线程阻塞期待办法:次要提供了2个await()办法,其中:

    • 无参数await()办法:默认解决形式,不反对超时机制,其外围解决逻辑在dowait(boolean timed, long nanos)办法中实现
    • 有参数await()办法:指定参数解决,反对超时机制,其外围解决逻辑在dowait(boolean timed, long nanos)办法中实现
  • 屏障设置关健办法:次要是breakBarrier() 来实现,其中:

    • 告诉达到屏障的所有线程:次要是通过Condition中的signalAll()来告诉屏障中所有线程曾经满足条件
    • 屏障设置:默认预约义设置屏障最大值与设置以后屏障数雷同,次要设置count = parties
    • 更新屏障状态:次要是通过generation.broken = true来实现
  • 更新屏障的状态:次要是提供了nextGeneration() 办法,示意曾经达到预约义设置屏障最大值,其中:

    • 告诉达到屏障的所有线程:次要是通过Condition中的signalAll()来告诉屏障中所有线程曾经满足条件
    • 筹备下一轮屏障设置:意味着预约义设置屏障最大值与设置以后屏障数雷同,次要设置count = parties
    • 重置屏障状态:次要是通过generation = new Generation()来实现

一般来说,假如咱们容许管制的最大线程数量为N,预约义设置屏障最大值为Parties(N), 以后屏障的线程数量为Current(N) ,以后屏障中的期待线程数量为Waiting(N),那么咱们会失去一个计算公式:

2.1 结构Generation屏障实例标记
private static class Generation {    boolean broken = false;}

次要是结构了一个动态私有化的Generation类,其中定义了一个broken变量来作为屏障标记,默认初始值为false,示意还没达到屏障最大值。

2.1 线程阻塞期待外围dowait办法
private int dowait(boolean timed, long nanos)    throws InterruptedException, BrokenBarrierException,    TimeoutException {    // [1].实例化构建ReentrantLock的对象    final ReentrantLock lock = this.lock;    // [2].通过lock()获取锁或者说加锁操作    lock.lock();    try {        // [3].实例化构建Generation屏障实例对象        final Generation g = generation;        // [4].判断Generation屏障实例标记状态        if (g.broken)            throw new BrokenBarrierException();        // [5].判断Thread是蕴含中断标记位        if (Thread.interrupted()) {            breakBarrier();            throw new InterruptedException();        }        // [6].对倒计数器的屏障数量递加1运算        int index = --count;        // [7].根据后果index == 0示意以后指定的线程数量达到屏障最大值,须要触发Runnable工作        if (index == 0) {  // tripped            boolean ranAction = false;            try {                final Runnable command = barrierCommand;                if (command != null)                    command.run();                ranAction = true;                // 进行下一轮屏障设置                nextGeneration();                return 0;            } finally {                if (!ranAction)                    breakBarrier();            }        }        // [7].自旋操作        for (;;) {            try {                // 判断是否超时                if (!timed)                    trip.await();                else if (nanos > 0L)                    // 进行下一轮屏障设置                    nanos = trip.awaitNanos(nanos);            } catch (InterruptedException ie) {                if (g == generation && ! g.broken) {                    breakBarrier();                    throw ie;                } else {                    // 是否产生线程中断                    Thread.currentThread().interrupt();                }            }            if (g.broken)                throw new BrokenBarrierException();            if (g != generation)                return index;            // 如果等待时间超过指定超时工夫,throw new TimeoutException            if (timed && nanos <= 0L) {                breakBarrier();                throw new TimeoutException();            }        }    } finally {        // 最初开释锁操作        lock.unlock();    }}
  • 加锁操作: 实例化构建ReentrantLock的对象,通过lock()办法进行加锁操作
  • 判断屏障实例标记状态:实例化构建Generation实例标记,判断屏障实例标记状态是否统一,如果不统一则throw new BrokenBarrierException();
  • 判断以后线程是否被中断: 判断Thread是蕴含中断标记位,如果中断throw new InterruptedException()并调用breakBarrier()从新设置屏障
  • 屏障倒计数器递加运算:对倒计数器的屏障数量递加1运算,即就是对以后倒计数器的以后值减去1
  • 触发节点线程工作: 以后倒计数器的以后值为0时,须要触发Runnable工作,并调用nextGeneration办法开启下一轮操作;否则,以后倒计数器的以后值不为0时,调用awaitNanos(nanos)办法进入期待状态
  • 自旋操作判断超时: 如果应用了超时参数,调用awaitNanos(nanos)办法进入期待状态,其中如果产生中断则调用Thread.currentThread().interrupt()设置中断标记。如果等待时间> 指定超时工夫,抛出throw new TimeoutException()异样
  • 开释锁: 通过unlock()办法进行解锁操作,并开释锁

3. 具体实现

在CyclicBarrier类的JDK1.8版本中,对于CyclicBarrier的具体实现如下:

public class CyclicBarrier {    /** CyclicBarrier锁—屏障lock实体 */    private final ReentrantLock lock = new ReentrantLock();    /** CyclicBarrier锁—屏障条件队列 */    private final Condition trip = lock.newCondition();    /**  CyclicBarrier锁—屏障最大值 */    private final int parties;    /**  CyclicBarrier锁—屏障触发线程工作指标 */    private final Runnable barrierCommand;    /**  CyclicBarrier锁—以后计数器的最大值屏障实例 */    private Generation generation = new Generation();    /**  CyclicBarrier锁—以后计数器的最大值屏障实例 */    private int count;    /**  CyclicBarrier锁—屏障实例 */    private static class Generation {        boolean broken = false;    }    /**  CyclicBarrier锁—结构一个屏障实例(不带触发工作的) */    public CyclicBarrier(int parties) {        this(parties, null);    }    /**  CyclicBarrier锁—结构一个屏障实例(带触发工作的) */    public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;        this.count = parties;        this.barrierCommand = barrierAction;    }    /**  CyclicBarrier锁—无参数结构一个期待办法(默认模式) */    public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }    /**  CyclicBarrier锁—有参数结构一个期待办法(反对超时机制) */    public int await(long timeout, TimeUnit unit)    throws InterruptedException,    BrokenBarrierException,    TimeoutException {        return dowait(true, unit.toNanos(timeout));    }    /**  CyclicBarrier锁—更新状态变量 */    private void nextGeneration() {        // signal completion of last generation        trip.signalAll();        // set up next generation        count = parties;        generation = new Generation();    }    /**  CyclicBarrier锁—阻塞屏障 */    private void breakBarrier() {        generation.broken = true;        count = parties;        trip.signalAll();    }    /**  CyclicBarrier锁—阻塞屏障 */    private int dowait(boolean timed, long nanos)    throws InterruptedException, BrokenBarrierException,    TimeoutException {        // [1].实例化构建ReentrantLock的对象        final ReentrantLock lock = this.lock;        // [2].通过lock()获取锁或者说加锁操作        lock.lock();        try {            // [3].实例化构建Generation屏障实例对象            final Generation g = generation;            // [4].判断Generation屏障实例标记状态是否为true            if (g.broken)                throw new BrokenBarrierException();            // [5].判断Thread是蕴含中断标记位            if (Thread.interrupted()) {                breakBarrier();                throw new InterruptedException();            }            // [6].对倒计数器的屏障数量递加1运算            int index = --count;            // [7].根据后果index == 0示意以后指定的线程数量达到屏障最大值,须要触发Runnable工作            if (index == 0) {  // tripped                boolean ranAction = false;                try {                    final Runnable command = barrierCommand;                    if (command != null)                        command.run();                    ranAction = true;                    // 进行下一轮屏障设置                    nextGeneration();                    return 0;                } finally {                    if (!ranAction)                        breakBarrier();                }            }            // [7].自旋操作            for (;;) {                try {                    // 判断是否超时                    if (!timed)                        trip.await();                    else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        Thread.currentThread().interrupt();                    }                }                if (g.broken)                    throw new BrokenBarrierException();                if (g != generation)                    return index;                // 如果等待时间超过指定超时工夫,throw new TimeoutException                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            // 最初开释锁操作            lock.unlock();        }    }    /**  CyclicBarrier锁—获取以后等屏障期待数量 */    public int getNumberWaiting() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return parties - count;        } finally {            lock.unlock();        }    }    /**  CyclicBarrier锁—获取以后等屏障数量 */    public int getParties() {        return parties;    }    /**  CyclicBarrier锁—判断以后屏障 */    public boolean isBroken() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return generation.broken;        } finally {            lock.unlock();        }    }    /**  CyclicBarrier锁—重置屏障数量 */    public void reset() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            breakBarrier();   // break the current generation            nextGeneration(); // start a new generation        } finally {            lock.unlock();        }    }}

次要须要留神如下几个办法,都是基于ReentrantLock来实现加锁和解锁操作的,其中:

  • getNumberWaiting()办法: 获取以后屏障中期待的线程数量
  • reset() 办法:当一轮屏障操作完结,须要重置屏障中最大线程数量
  • isBroken() 办法:判断是否达到屏障最大值

综上所述,从肯定意义上讲,CyclicBarrier是一种可重入锁,属于ReentrantLock的利用实例,其中加锁和解锁操作都是独占模式的。


八.Semaphore(信号量)的设计与实现

在Java畛域中,Semaphore(信号量)是针对于Java多线程并发管制中实现对公共资源的线程数量进行并发同时访问控制,次要是采纳指定一个最大许可数的思维和基于AQS根底同步器来实现的一种同步器工具类。

Semaphore能够用来管制在同一时刻访问共享资源的线程数量,通过协调各个线程以保障共享资源的正当应用。
Semaphore保护了一组虚构许可,它的数量能够通过结构器的参数指定。
线程在访问共享资源前,必须调用Semaphore的acquire()办法取得许可,如果许可数量为0,该线程就始终阻塞。
线程在访问共享资源后,必须调用Semaphore的release()办法开释许可。

1. 设计思维

一般来说,通过定义一个倒计数器,为了管制最多N个线程同时拜访公共资源,其计数器的最大值Max(N)是被许可的最多N个线程数量,即就是许可的最大值N。
Semaphore类最早是在JDK1.5版本提供的,从设计思维上来看,次要包含倒计数器的最大许可数,同步器工作模式,获取锁办法,开释锁办法等4个外围因素。其中:

  • 同步器工作模式:基于AQS根底形象队列同步器封装内置实现一个动态的内置同步器抽象类,而后基于这个抽象类别离实现了偏心同步器和非偏心同步器,用来指定和形容同步器工作模式是偏心模式还是非偏心模式。
  • 偏心/非偏心模式:次要形容的是多个线程在同时获取锁时是否依照先到先得的程序获取锁,如果是则为偏心模式,否则为非偏心模式。
  • 获取锁办法:次要定义了一个lock()办法来获取锁,示意如果锁曾经被其余线程占有或持有,其以后获取锁的线程则进入期待状态。
  • 开释锁办法:次要定义了一个unlock()办法来开释锁,示意如果锁曾经被其余线程放弃或开释,其以后获取锁的线程则取得该锁。

2. 根本实现

在JDK1.8版本中,对于Semaphore的根本实现如下:

public class Semaphore implements java.io.Serializable {    private static final long serialVersionUID = -3222578661600680210 L;    /**     * Semaphore锁- 封装同步器     */    private final Sync sync;    /**     * Semaphore锁- 封装同步器     */    abstract static class Sync extends AbstractQueuedSynchronizer {        //....其余代码    }    /**     * Semaphore锁- 结构一个令牌许可(默认非公模式)     */    public Semaphore(int permits) {        sync = new NonfairSync(permits);    }    /**     * Semaphore锁- 结构一个令牌许可(可选偏心/非公模式)     */    public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }    /**     * Semaphore锁- 获取锁办法(默认一个且可中断机制)     */    public void acquire() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }            /**     * Semaphore锁- 获取锁办法(可选指定多个且可中断机制)     */    public void acquire(int permits) throws InterruptedException {        if (permits < 0) throw new IllegalArgumentException();        sync.acquireSharedInterruptibly(permits);    }    /**     * Semaphore锁- 获取锁办法(默认多个且不可中断机制)     */    public void acquireUninterruptibly() {        sync.acquireShared(1);    }        /**     * Semaphore锁- 获取锁办法(指定多个且不可中断机制)     */    public void acquireUninterruptibly(int permits) {        if (permits < 0) throw new IllegalArgumentException();        sync.acquireShared(permits);    }    /**     * Semaphore锁-开释锁办法(默认一个)     */    public void release() {        sync.releaseShared(1);    }    /**     * Semaphore锁-开释锁办法(可选指定多个)     */    public void release(int permits) {        if (permits < 0) throw new IllegalArgumentException();        sync.releaseShared(permits);    }}
  • 外部同步器:基于AQS根底同步器封装和定义了一个动态外部Sync抽象类,其中形象了一个内置锁lock()办法
  • 同步器工作模式:次要提供了 2个构造方法,其中无参数构造方法示意的是默认的工作模式,有参数构造方法次要根据参数来实现指定的工作模式
  • 获取锁办法: 次要提供了3个基于acquire办法,用于获取锁共享锁,其中:

    • 无参数acquire()办法:获取共享锁的个别模式,默认指定一个许可和反对可中断机制
    • 有参数acquire()办法:获取共享锁的指定模式,可选指定多个许可且反对可中断机制
    • 无参数acquireUninterruptibly()办法:获取共享锁的指定模式,默认指定一个许可且不反对可中断机制
  • 开释锁办法: 次要是提供了2个release()办法用于开释锁共享锁,其中:

    • 无参数release()办法:开释共享锁的个别模式,默认指定一个许可和反对可中断机制
    • 有参数release()办法:开释共享锁的指定模式,可选指定多个许可且反对可中断机制
2.1 基于AQS同步器封装动态外部Sync抽象类
    /**     * Semaphore锁- 基于AQS根底同步器封装同步器     */    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 1192457210091910933 L;        Sync(int permits) {            setState(permits);        }        final int getPermits() {            return getState();        }        /**         * Semaphore锁- 非偏心模式获取共享锁         */        final int nonfairTryAcquireShared(int acquires) {            for (;;) {                int available = getState();                int remaining = available - acquires;                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }        /**         * Semaphore锁- 开释共享锁         */        protected final boolean tryReleaseShared(int releases) {            for (;;) {                int current = getState();                int next = current + releases;                if (next < current) // overflow                    throw new Error("Maximum permit count exceeded");                if (compareAndSetState(current, next))                    return true;            }        }        /**         * Semaphore锁- 自旋+compareAndSetState通过CAS操作计算操作令牌许可数         */        final void reducePermits(int reductions) {            for (;;) {                int current = getState();                int next = current - reductions;                if (next > current) // underflow                    throw new Error("Permit count underflow");                if (compareAndSetState(current, next))                    return;            }        }        /**         * Semaphore锁- 自旋+compareAndSetState通过CAS操作重置令牌许可数         */        final int drainPermits() {            for (;;) {                int current = getState();                if (current == 0 || compareAndSetState(current, 0))                    return current;            }        }    }
  • 实现形式:次要是基于AQS根底同步器封装了一个动态的的Sync抽象类,通过构造方法指定一个最大的令牌许可数量
  • 次要办法:次要是看共享锁的获取nonfairTryAcquireShared()办法和开释锁tryReleaseShared()办法,其中:

    • 获取锁nonfairTryAcquireShared()办法:非偏心模式下获取共享锁,利用自旋+compareAndSetState()办法通过CAS操作,保障并发批改令牌许可数量
    • 开释锁tryReleaseShared(i)办法: 偏心/非偏心模式下开释共享锁,利用自旋+compareAndSetState()办法通过CAS操作开释,会把开释的令牌许可数量减少到以后残余的令牌许可数量中。
  • 令牌许可操作方法:次要提供了drainPermits() 办法 和reducePermits() 办法,其中:

    • drainPermits() 办法:次要是利用自旋+compareAndSetState()办法通过CAS操作重置令牌许可数
    • reducePermits() 办法:次要是自旋+compareAndSetState)办法通过CAS操作递加计算操作令牌许可数
  • 获取锁形式:令牌许可数量QS根底同步器状态变量对应,通过getPermits() 办法来获取令牌许可数量,实质是调用AQS根底同步器中的getState()来获取状态变量。

特地指出的是,这里的非偏心模式次要形容的是,在令牌许可数量容许的状况下,让所有线程进行自旋操作,其实就是不关怀线程到来的程序,将全副线程放到一起去参加竞争令牌许可。
其中,次要还利用compareAndSetState办法来进行CAS操作,保障批改令牌许可数量的原子性操作。
一般来说,假如咱们容许管制的最大线程数量为N,残余令牌许可数量为Remanent(N), 以后可用令牌许可数量为Current(N) , 耗费令牌许可数量为Reduction(N),那么咱们会失去一个计算公式:

即就意味着,残余令牌许可数量等于以后可用令牌许可数量与耗费令牌许可数量之差。
由此可见,在偏心/非偏心模式下,咱们对于对于获取锁和开释锁时,对于残余令牌许可数量Remanent(N)计算都满足以下公式:

  • 首先,在线程在访问共享资源前,咱们能够容许的最大值为Available(N),自旋获取锁的数量为Acquires(N),那么咱们在获取锁时:

  • 其次,在线程在访问共享资源后,自旋开释锁的数量为Releases(N),那么咱们在开释锁时:


当然,须要留神的的一个问题,就是当残余令牌许可数量Remanent(N) < 0时,示意以后线程会进入阻塞期待状态。

2.2 基于Sync抽象类封装FairSync偏心同步器
    /**     * Semaphore锁- 基于Sync抽象类封装FairSync偏心同步器     */    static final class FairSync extends Sync {        private static final long serialVersionUID = 2014338818796000944 L;        /**         * Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可         */        FairSync(int permits) {            super(permits);        }        /**         * Semaphore锁- Semaphore锁- 偏心模式开释共享锁         */        protected int tryAcquireShared(int acquires) {            for (;;) {                if (hasQueuedPredecessors())                    return -1;                int available = getState();                int remaining = available - acquires;                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }    }
  • 实现形式: 次要是在基于动态外部Sync抽象类来实现,结构了一个可指定大小的的令牌许可
  • 次要办法: 次要是提供了一个tryAcquireShared办法,其中利用hasQueuedPredecessors()来保障公平性
  • 工作机制: 通过基于AQS根底同步器中的期待队列来实现偏心机制

须要留神的是,在未达到最大的令牌许可数量时,所有线程都不会进入期待队列中。

2.3 基于Sync抽象类封装NonfairSync非偏心同步器
    /**     * Semaphore锁- 基于Sync抽象类封装NonfairSync非偏心同步器     */    static final class NonfairSync extends Sync {        private static final long serialVersionUID = -2694183684443567898 L;        /**         * Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可         */        NonfairSync(int permits) {            super(permits);        }        /**         * Semaphore锁- Semaphore锁- 非偏心模式开释共享锁         */        protected int tryAcquireShared(int acquires) {            return nonfairTryAcquireShared(acquires);        }    }
  • 实现形式: 次要是在基于动态外部Sync抽象类来实现,结构了一个可指定大小的的令牌许可
  • 次要办法: 次要是提供了一个tryAcquireShared办法,其中次要是调用了动态外部Sync抽象类nonfairTryAcquireShared办法。
  • 工作机制: 通过自旋操作让所有线程竞争获取令牌许可,实质还是采纳了AQS根底同步器中闯入策略到突破偏心的

3. 具体实现

在JDK1.8版本中,对于Semaphore的具体实现如下:

public class Semaphore implements java.io.Serializable {    private static final long serialVersionUID = -3222578661600680210 L;    /**     * Semaphore锁- 封装同步器     */    private final Sync sync;    /**     * Semaphore锁- 基于AQS根底同步器封装同步器     */    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 1192457210091910933 L;        Sync(int permits) {            setState(permits);        }        final int getPermits() {            return getState();        }        /**         * Semaphore锁- 非偏心模式获取共享锁         */        final int nonfairTryAcquireShared(int acquires) {            for (;;) {                int available = getState();                int remaining = available - acquires;                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }        /**         * Semaphore锁- 开释共享锁         */        protected final boolean tryReleaseShared(int releases) {            for (;;) {                int current = getState();                int next = current + releases;                if (next < current) // overflow                    throw new Error("Maximum permit count exceeded");                if (compareAndSetState(current, next))                    return true;            }        }        /**         * Semaphore锁- 自旋+compareAndSetState通过CAS操作计算操作令牌许可数         */        final void reducePermits(int reductions) {            for (;;) {                int current = getState();                int next = current - reductions;                if (next > current) // underflow                    throw new Error("Permit count underflow");                if (compareAndSetState(current, next))                    return;            }        }        /**         * Semaphore锁- 自旋+compareAndSetState通过CAS操作重置令牌许可数         */        final int drainPermits() {            for (;;) {                int current = getState();                if (current == 0 || compareAndSetState(current, 0))                    return current;            }        }    }    /**     * Semaphore锁- 基于Sync抽象类封装FairSync偏心同步器     */    static final class FairSync extends Sync {        private static final long serialVersionUID = 2014338818796000944 L;        /**         * Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可         */        FairSync(int permits) {            super(permits);        }        /**         * Semaphore锁- Semaphore锁- 偏心模式开释共享锁         */        protected int tryAcquireShared(int acquires) {            for (;;) {                if (hasQueuedPredecessors())                    return -1;                int available = getState();                int remaining = available - acquires;                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }    }    /**     * Semaphore锁- 基于Sync抽象类封装NonfairSync非偏心同步器     */    static final class NonfairSync extends Sync {        private static final long serialVersionUID = -2694183684443567898 L;        /**         * Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可         */        NonfairSync(int permits) {            super(permits);        }        /**         * Semaphore锁- Semaphore锁- 非偏心模式开释共享锁         */        protected int tryAcquireShared(int acquires) {            return nonfairTryAcquireShared(acquires);        }    }    /**     * Semaphore锁- 结构一个令牌许可(默认非公模式)     */    public Semaphore(int permits) {        sync = new NonfairSync(permits);    }    /**     * Semaphore锁- 结构一个令牌许可(可选偏心/非公模式)     */    public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }    /**     * Semaphore锁- 获取锁办法(默认一个且可中断机制)     */    public void acquire() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }    /**     * Semaphore锁- 获取锁办法(可选指定多个且可中断机制)     */    public void acquire(int permits) throws InterruptedException {        if (permits < 0) throw new IllegalArgumentException();        sync.acquireSharedInterruptibly(permits);    }    /**     * Semaphore锁- 获取锁办法(默认多个且不可中断机制)     */    public void acquireUninterruptibly() {        sync.acquireShared(1);    }        /**     * Semaphore锁- 获取锁办法(指定多个且不可中断机制)     */    public void acquireUninterruptibly(int permits) {        if (permits < 0) throw new IllegalArgumentException();        sync.acquireShared(permits);    }    /**     * Semaphore锁-开释锁办法(默认一个)     */    public void release() {        sync.releaseShared(1);    }    /**     * Semaphore锁-开释锁办法(可选指定多个)     */    public void release(int permits) {        if (permits < 0) throw new IllegalArgumentException();        sync.releaseShared(permits);    }    /**     * Semaphore锁-尝试获取锁办法(默认一个)     */    public boolean tryAcquire() {        return sync.nonfairTryAcquireShared(1) >= 0;    }    /**     * Semaphore锁-尝试获取锁办法(可选指定多个)     */    public boolean tryAcquire(int permits) {        if (permits < 0) throw new IllegalArgumentException();        return sync.nonfairTryAcquireShared(permits) >= 0;    }    /**     * Semaphore锁-尝试获取锁办法(可选指定多个并且反对超时机制)     */    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)    throws InterruptedException {        if (permits < 0) throw new IllegalArgumentException();        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));    }    /**     * Semaphore锁-尝试获取锁办法(默认一个并且反对超时机制)     */    public boolean tryAcquire(long timeout, TimeUnit unit)    throws InterruptedException {        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));    }    /**     * Semaphore锁-统计能够令牌许可数     */    public int availablePermits() {        return sync.getPermits();    }    /**     * Semaphore锁-重置令牌许可数     */    public int drainPermits() {        return sync.drainPermits();    }    /**     * Semaphore锁-递加计算令牌许可数     */    protected void reducePermits(int reduction) {        if (reduction < 0) throw new IllegalArgumentException();        sync.reducePermits(reduction);    }    /**     * Semaphore锁-判断是否偏心模式     */    public boolean isFair() {        return sync instanceof FairSync;    }    /**     * Semaphore锁-判断队列中是否存在线程对象     */    public final boolean hasQueuedThreads() {        return sync.hasQueuedThreads();    }    /**     * Semaphore锁-获取队列长度     */    public final int getQueueLength() {        return sync.getQueueLength();    }    /**     * Semaphore锁-获取队列的线程对象     */    protected Collection < Thread > getQueuedThreads() {        return sync.getQueuedThreads();    }}
  • 信号量同步器: 次要是提供了2个构造方法来实现令牌许可的治理,其中:

    • 默认非偏心模式:根据指定传入的令牌许可数量permits间接实例化NonfairSync非偏心同步器
    • 可选偏心/非偏心模式:根据指定传入的令牌许可数量permits和偏心标记fair来实例化NonfairSync非偏心同步器和FairSync偏心同步器,其中,当fair=true时,是偏心平模式,否则为非偏心模式
  • 反对可中断机制:次要是提供了2个acquire()办法来获取锁,其中:

    • 无参数acquire()办法:个别模式获取共享锁,次要是基于AQS根底同步器中的acquireSharedInterruptibly(int arg)来实现,其外围逻辑是doAcquireSharedInterruptibly(int arg)来操纵。
    • 有参数acquire()办法:根据指定传入的令牌许可数量permits来判断,当permits< 0时,间接throw new IllegalArgumentException();否则,间接调用acquireSharedInterruptibly(permits)办法。
  • 反对不可中断机制:次要是提供了2个acquireUninterruptibly() 办法,其中:

    • 无参数acquireUninterruptibly() 办法:个别模式获取共享锁,次要是基于AQS根底同步器中acquireShared(int arg)办法来实现,其外围逻辑是doAcquireShared(int arg) 来操纵。
    • 有参数acquireUninterruptibly() 办法:根据指定传入的令牌许可数量permits来判断,当permits< 0时,间接throw new IllegalArgumentException();否则,间接调用acquireShared(int arg)办法。
  • 非偏心模式获取锁形式: 次要提供了2个tryAcquire() 办法,其中:

    • 无参数tryAcquire() 办法:非偏心模式尝试获取共享锁,间接调用的是非偏心同步器中的nonfairTryAcquireShared(int acquires) 办法。
    • 有参数tryAcquire() 办法:根据指定传入的令牌许可数量permits来判断,当permits< 0时,间接throw new IllegalArgumentException();否则,间接调用nonfairTryAcquireShared(int acquires) 办法。
  • 偏心模式获取锁形式:次要提供了2个tryAcquire() 办法,反对超时机制。其中:

    • 无参数tryAcquire() 办法:偏心模式尝试获取共享锁,根据指定传入的令牌许可数量permits来判断,当permits< 0时,间接throw new IllegalArgumentException();否则,间接调用的是AQS根底同步器中的tryAcquire(int permits, long timeout, TimeUnit unit)办法,其外围逻辑是tryAcquireSharedNanos(int arg, long nanosTimeout)来操纵。
    • 有参数tryAcquire() 办法:偏心模式尝试获取共享锁,默认反对一个许可,间接调用的是AQS根底同步器中的tryAcquire(1,long timeout, TimeUnit unit)办法,其外围逻辑是tryAcquireSharedNanos(int arg, long nanosTimeout)来操纵。
  • 开释锁操作形式:次要提供了2个release()办法,其中:

    • 无参数release() 办法:偏心/非偏心模式示范锁操作,默认反对一个许可,次要是间接调用AQS根底同步器中的releaseShared(int arg) 办法
    • 有参数release() 办法:偏心/非偏心模式示范锁操作,根据指定传入的令牌许可数量permits来判断,当permits< 0时,间接throw new IllegalArgumentException();否则,次要是间接调用AQS根底同步器中的releaseShared(int arg) 办法
  • 令牌许可操作方法:次要提供了availablePermits() 办法,reducePermits(int reduction)办法 以及drainPermits() 办法,其中:

    • availablePermits() 办法:获取可用的令牌许可数量,次要是调用外部同步器中getPermits()办法。
    • reducePermits()办法:计算残余可用令牌许可数量,根据指定传入的令牌许可数量reduction来判断,当reduction< 0时,间接throw new IllegalArgumentException();否则,调用外部同步器中reducePermits()办法。
    • drainPermits() 办法:重置可用令牌许可数量,次要是调用外部同步器中drainPermits()办法。
  • 队列操作方法:次要提供了hasQueuedThreads()办法,getQueuedThreads() 办法以及getQueueLength() 办法,其中:

    • hasQueuedThreads()办法:次要是用于获取队列中是否存在期待获取令牌许可的线程对象,次要是间接应用AQS根底同步器的hasQueuedThreads()来实现。
    • getQueuedThreads() 办法:次要是用于获取队列中期待获取令牌许可的线程对象,次要是间接应用AQS根底同步器的getQueuedThreads()来实现。
    • getQueueLength() 办法:次要是用于获取队列中期待获取令牌许可的数量,次要是间接应用AQS根底同步器的getQueueLength()来实现。

综上所述,从肯定意义上讲,Semaphore是一种共享锁,属于AQS根底形象队列同步器中共享模式孵化的产物,反对偏心模式与非偏心模式,默认是应用非偏心模式。

写在最初

通过对Java畛域中,JDK外部提供的各种锁的实现来看,始终围绕的外围次要还是基于AQS根底同步器来实现的,然而AQS根底同步器不是一种非它不可的技术标准规范,更多的只是一套技术参考指南。

然而,实际上,Java对于锁的实现与使用远远不止这些,还有相位器(Phaser)和交换器(Exchanger),以及在Java JDK1.8版本之前并发容器ConcurrentHashMap中应用的分段锁(Segment)。

不论是何种实现和利用,在Java并发编程畛域来讲,都是围绕线程平安问题的角度去思考的,只是针对于各种各样的业务场景做的具体的实现。

肯定意义上来讲,对线程加锁只是并发编程的实现形式之一,绝对于理论利用来说,Java畛域中的锁都只是一种繁多利用的锁,只是给咱们把握Java并发编程提供一种思维没,喋喋不休也不可能详尽。

到此为止,这算是对于Java畛域中并发锁的最终章,文中表述均为集体认识和集体了解,如有不到之处,忘请谅解也请给予批评指正。

最初,技术钻研之路任重而道远,愿咱们熬的每一个通宵,都撑得起咱们想在这条路上走上来的勇气,将来依然可期,与各位程序编程君共勉!