关于java:Java并发编程解析-基于JDK源码解析Java领域中并发锁之三大同步器的设计思想与实现原理-四

11次阅读

共计 29725 个字符,预计需要花费 75 分钟才能阅读完成。

天穹之边,浩瀚之挚,眰恦之美;悟心悟性,虎头蛇尾,惟善惟道!—— 朝槿《朝槿兮年说》


写在结尾

在并发编程畛域,有两大外围问题:一个是互斥,即同一时刻只容许一个线程访问共享资源;另一个是同步,即线程之间如何通信、合作。
次要起因是,对于多线程实现实现并发,始终以来,多线程都存在 2 个问题:

  • 线程之间内存共享,须要通过加锁进行管制,然而加锁会导致性能降落,同时简单的加锁机制也会减少编程编码难度
  • 过多线程造成线程之间的上下文切换,导致效率低下

因而,在并发编程畛域中,始终有一个很重要的设计准则:“不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”
简略来说,就是尽可能通过音讯通信,而不是内存共享来实现过程或者线程之间的同步。

关健术语

本文用到的一些要害词语以及罕用术语,次要如下:

  • 并发(Concurrent): 在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行结束之间,且这几个程序都是在同一个处理机上运行。
  • 并行(Parallel): 当零碎有一个以上 CPU 时,当一个 CPU 执行一个过程时,另一个 CPU 能够执行另一个过程,两个过程互不抢占 CPU 资源,能够同时进行。
  • 信号量(Semaphore):  是在多线程环境下应用的一种设施,是能够用来保障两个或多个要害代码段不被并发调用,也是作零碎用来解决并发中的互斥和同步问题的一种办法。
  • 信号量机制(Semaphores):用来解决同步 / 互斥的问题的,它是 1965 年, 荷兰学者 Dijkstra 提出了一种行之有效的实现过程互斥与同步的办法。
  • 管程(Monitor) :  个别是指治理共享变量以及对共享变量的操作过程,让它们反对并发的一种机制。
  • 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个过程或线程应用,多个过程或线程不能同时应用公共资源。即就是同一时刻只容许一个线程访问共享资源的问题。
  • 同步(Synchronization):两个或两个以上的过程或线程在运行过程中协同步调,按预约的先后秩序运行。即就是线程之间如何通信、合作的问题。
  • 对象池(Object Pool): 指的是一次性创立出 N 个对象,之后所有的线程反复利用这 N 个对象,当然对象在被开释前,也是不容许其余线程应用的, 个别指保留实例对象的容器。

根本概述

在 Java 畛域中,咱们能够将锁大抵分为基于 Java 语法层面 (关键词) 实现的锁和基于 JDK 层面实现的锁。

在 Java 畛域中, 尤其是在并发编程畛域,对于多线程并发执行始终有两大外围问题:同步和互斥。其中:

  • 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个过程或线程应用,多个过程或线程不能同时应用公共资源。即就是同一时刻只容许一个线程访问共享资源的问题。
  • 同步(Synchronization):两个或两个以上的过程或线程在运行过程中协同步调,按预约的先后秩序运行。即就是线程之间如何通信、合作的问题。

针对对于这两大外围问题,利用管程是可能解决和实现的,因而能够说,管程是并发编程的万能钥匙。
尽管,Java 在基于语法层面 (synchronized 关键字) 实现了对管程技术, 然而从应用形式和性能上来说,内置锁 (synchronized 关键字) 的粒度绝对过大,不反对超时和中断等问题。
为了补救这些问题,从 JDK 层面对其“反复造轮子”,在 JDK 外部对其从新设计和定义,甚至实现了新的个性。
在 Java 畛域中,从 JDK 源码剖析来看,基于 JDK 层面实现的锁大抵次要能够分为以下 4 种形式:

  • 基于 Lock 接口实现的锁:JDK1.5 版本提供的 ReentrantLock 类
  • 基于 ReadWriteLock 接口实现的锁:JDK1.5 版本提供的 ReentrantReadWriteLock 类
  • 基于 AQS 根底同步器实现的锁:JDK1.5 版本提供的并发相干的同步器 Semaphore,CyclicBarrier 以及 CountDownLatch 等
  • 基于自定义 API 操作实现的锁:JDK1.8 版本中提供的 StampedLock 类

从浏览源码不难发现,在 Java SDK 并发包次要通过 AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。


一.AQS 根底同步器根本实践

在 Java 畛域中, 同步器是专门为多线程并发设计的同步机制,次要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。

一个规范的 AQS 同步器次要有同步状态机制,期待队列,条件队列,独占模式,共享模式等五大外围因素组成。
在 Java 畛域中,JDK 的 JUC(java.util.concurrent.)包中提供了各种并发工具,然而大部分同步工具的实现基于 AbstractQueuedSynchronizer 类实现,其内部结构次要如下:

  • 同步状态机制 (Synchronization Status):次要用于实现锁(Lock) 机制,是指同步状态,其要求对于状态的更新必须原子性的
  • 期待队列 (Wait Queue):次要用于寄存期待线程获取到的锁资源,并且把线程保护到一个 Node(节点) 外面和保护一个非阻塞的 CHL Node FIFO(先进先出)队列,次要是采纳自旋锁 +CAS 操作来保障节点插入和移除的原子性操作。
  • 条件队列(Condition Queue):用于实现锁的条件机制,个别次要是指替换“期待 - 告诉”工作机制,次要是通过 ConditionObject 对象实现 Condition 接口提供的办法实现。
  • 独占模式(Exclusive Mode):次要用于实现独占锁,次要是基于动态外部类 Node 的常量标记 EXCLUSIVE 来标识该节点是独占模式
  • 共享模式(Shared Mode):次要用于实现共享锁,次要是基于动态外部类 Node 的常量标记 SHARED 来标识该节点是共享模式

咱们能够失去一个比拟通用的并发同步工具根底模型,大抵蕴含如下几个内容,其中:

  • 条件变量(Conditional Variable):利用线程间共享的变量进行同步的一种工作机制
  • 共享变量((Shared Variable)):个别指对象实体对象的成员变量和属性
  • 阻塞队列 (Blocking Queue):共享变量(Shared Variable) 及其对共享变量的操作对立封装
  • 期待队列 (Wait Queue):每个条件变量都对应有一个期待队列(Wait Queue), 外部须要实现入队操作(Enqueue) 和出队操作 (Dequeue) 办法
  • 变量状态形容机(Synchronization Status):形容条件变量和共享变量之间状态变动,又能够称其为同步状态
  • 工作模式(Operation Mode):线程资源具备排他性,因而定义独占模式和共享模式两种工作模式

综上所述,条件变量和期待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。

二. JDK 显式锁对立概念模型

在并发编程畛域,有两大外围问题:一个是互斥,即同一时刻只容许一个线程访问共享资源;另一个是同步,即线程之间如何通信、合作。

综合 Java 畛域中的并发锁的各种实现与利用剖析来看,一把锁或者一种锁,基本上都会蕴含以下几个方面:

  • 锁的同步器工作机制:次要是思考共享模式还是独享模式,是否反对超时机制,以及是否反对超时机制?
  • 锁的同步器工作模式:次要是基于 AQS 根底同步器封装外部同步器,是否思考偏心 / 非偏心模式?
  • 锁的状态变量机制:次要锁的状态设置,是否共享状态变量?
  • 锁的队列封装定义:次要是指期待队列和条件队列,是否须要条件队列或者期待队列定义?
  • 锁的底层实现操作:次要是指底层 CL 锁和 CAS 操作,是否须要思考自旋锁或者 CAS 操作实例对象办法?
  • 锁的组合实现新锁:次要是基于独占锁和共享锁,是否思考对应 API 自定义操作实现?

综上所述,大抵能够根据上述这些方向,咱们便能够分明🉐️晓得 Java 畛域中各种锁实现的根本实践时和实现思维。

六.CountDownLatch(闭锁)的设计与实现

在 Java 畛域中,CountDownLatch(闭锁)是针对于 Java 多线程并发管制中倒计数器的具体数量,次要是采纳递加计数形式的倒计数器思维和基于 AQS 根底同步器来实现的一种同步器工具类。

CountDownLatch(闭锁)是 Java 多线程并发中最常见的一种同步器,从锁的性质上来看,属于共享锁,其性能相当于一个多线程环境下的倒数门闩。
CountDownLatch 通过定义一个倒计数器,在并发环境下由线程进行递加 1 操作,当计数值变为 0 之后,被 await 办法阻塞的线程将会唤醒。
通过 CountDownLatch 能够实现线程间的计数同步。

1. 设计思维

一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上期待 N 个条件都满足后,才让所有的线程持续往下执行,其中倒计数器的数量则为 N,每满足一个条件,倒计数器就顺次逐步递加 1,直到 N -1= 0 的时,所有期待的线程才往下继续执行。
CountDownLatch 类最早是在 JDK1.5 版本提供的,从设计思维上来看,次要包含倒计数器的同步器,管制阻塞期待的办法,倒计数器的递加操作方法等 3 个外围因素。其中:

  • 倒计数器的同步器:基于 AQS 根底形象队列同步器封装内置实现一个动态的内置同步类, 次要用于设置倒计数器的初始值以及定制 AQS 根底同步器的获取和开释共享锁。
  • 倒计数器的初始值:个别在构建 CountDownLatch 类时指定,示意的是须要期待条件的个数,即就是倒计数器的具体的资源数量 Source(N)。
  • 控制线程阻塞期待的办法:定义一个控制线程阻塞期待的办法,当倒计数器的具体的资源数量 Source(N)>0 时,调用办法使其线程进入阻塞期待状态。
  • 倒计数器的递加操作方法:定义一个倒计数器的递加操作方法,调用办法就会把倒计数器递加 1,当倒计数器的具体的资源数量 Source(N)-1= 0 时,所有期待的线程才往下继续执行。

简略来说,CountDownLatch 次要是让某个线程或者多个线程,期待其余线程实现某件事情或者某个工作完结之后能力继续执行。

2. 根本实现

在 CountDownLatch 类的 JDK1.8 版本中,对于 CountDownLatch 的根本实现如下:


public class CountDownLatch {

    private final Sync sync;

    /**
     * CountDownLatch 锁 - 结构一个倒计数器
     */
    public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * CountDownLatch 锁 - 基于 AQS 定义反对同步器实现
     */
    private  static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860 L;

        //...... 其余办法代码
    }

    /**
     * CountDownLatch 锁 - 线程期待办法
     */
    public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
    }

    /**
     * CountDownLatch 锁 - 倒计数器递加操作
     */
    public void countDown() {sync.releaseShared(1);
    }

    //... 其余代码
}
  • 倒计数器同步器:基于 AQS 根底定义反对同步器实现一个动态私有化的同步器 Sync 类,其中定义了获取和开释共享锁的两个办法
  • 线程期待办法:次要是提供了一个 await()办法,其本质是调用的是 AQS 根底同步器中的 acquireSharedInterruptibly(int arg)办法,否则 throws InterruptedException 异样
  • 倒计数器递加操作方法:次要是提供了一个 countDown()办法,其本质是调用的是 AQS 根底同步器中的 releaseShared(int arg) 办法
2.1 基于 AQS 同步器封装动态外部 Sync 抽象类

        /**
     * CountDownLatch 锁 - 基于 AQS 同步器封装一个外部的同步器
     */
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {setState(count);
        }

        int getCount() {return getState();
        }

        /**
     * CountDownLatch 锁 - 获取共享锁办法
     */
        protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
        }

        /**
     * CountDownLatch 锁 - 开释共享锁办法
     */
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
                
    }
  • 实现形式:次要基于 AQS 封装的外部动态形象 Sync 同步类实现,应用的 AQS 的共享模式
  • 次要办法:次要定制适配提供了 tryAcquireShared()和 tryReleaseShared()办法,即就是 tryAcquireShared()用于获取共享锁,tryReleaseShared()办法用于开释共享锁,其中:

    • 获取共享锁 tryAcquireShared()办法:首先获取状态变量 status,这里是指倒计数器中的数量,当 status= 0 时,返回值 =1,示意获取锁胜利;否则,status !=0 时,返回值 =-1,示意获取共享锁失败进行入队。
    • 开释共享锁 tryReleaseShared()办法:通过自旋来实现递加操作,其中会获取状态变量 status,将其递加 1 后应用 compareAndSetState(c, nextc)办法通过 CAS 批改状态值
  • 锁获取形式:次要是利用 getCount()来获取倒计数器中的数量,同时还能够利用构造方法领导一个倒计数器中的数量。

3. 具体实现


public class CountDownLatch {

  private final Sync sync;

  /**
* CountDownLatch 锁 - 基于 AQS 根底同步器实现一个外部同步器
*/
  private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374 L;

    Sync(int count) {setState(count);
    }

    int getCount() {return getState();
    }

    protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
      for (;;) {int c = getState();
        if (c == 0)
          return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
          return nextc == 0;
      }
    }
  }


  /**
* CountDownLatch 锁 - 结构一个倒计数器
*/
  public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
  }

  /**
* CountDownLatch 锁 - 基于 AQS 定义反对同步器实现
*/
  private static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860 L;

    //...... 其余办法代码
  }

  /**
* CountDownLatch 锁 - 线程期待办法
*/
  public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
  }

  /**
* CountDownLatch 锁 - 返回以后计数器
*/
  public long getCount() {return sync.getCount();
  }

  /**
* CountDownLatch 锁 - 线程期待办法(反对超时机制)
*/
  public boolean await(long timeout, TimeUnit unit)
  throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  }

  /**
* CountDownLatch 锁 - 倒计数器递加操作
*/
  public void countDown() {sync.releaseShared(1);
  }
}
  • 倒计数初始值:通过构造方法 CountDownLatch(int count)指定一个倒计数器的初始值,其必须大于 0,否则会 throw new IllegalArgumentException(“count < 0”)
  • 线程期待办法:次要提供了 await() 办法和 await(long timeout, TimeUnit unit)办法,其中:

    • 无参数 await() 办法:个别默认的办法,其本质是调用 AQS 同步器中的 acquireSharedInterruptibly()办法,次要示意反对中断机制
    • 有参数 await(long timeout, TimeUnit unit)办法:是用于实现超时机制,其本质是调用 AQS 同步器中的 tryAcquireSharedNanos(int arg, long nanosTimeout)办法
  • 倒计数递加操作方法:次要是 countDown() 办法,其本质是调用 AQS 同步器中的 releaseShared(int arg) 办法,外围实现是 AQS 根底同步器的 doReleaseShared 办法。
  • 其余办法:次要是 getCount() 办法,用来获取倒计数个数,其本质是调用 AQS 同步器中 getCount()办法,来获取状态变量

综上所述,从肯定意义上讲,CountDownLatch 是一种共享锁,属于 AQS 根底形象队列同步器中共享模式孵化的产物,没有反对偏心模式与非偏心模式的实现。


七.CyclicBarrier(循环屏障)的设计与实现

在 Java 畛域中,CyclicBarrier(循环屏障)是针对于 Java 多线程并发管制中倒计数器的线程数量,次要是采纳递加计数形式的倒计数器思维和基于 AQS 根底同步器实现的 ReentrantLock 锁来实现的一种同步器工具类。

CyclicBarrier(循环屏障)是 Java 中通过对线程预约义设置一个屏障,只有当达到屏障的线程数量达到指定的最大屏障时,屏障才会让这些线程通过执行。
从肯定意义上来讲,这里的屏障实质上还是一个倒计数器,倒计数器的最大限度反对的数量就是咱们为线程设置屏障大小,其工作原理与 CountDownLatch(闭锁)相似,都是通过让线程阻塞期待时,倒计数器执行递加 1 运算。
然而与 CountDownLatch 不同是,CyclicBarrier(循环屏障)是基于 ReentrantLock(可重入锁)来实现的,更精确的说,CyclicBarrier 是对 ReentrantLock 的利用实例。

1. 设计思维

一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上束缚 N 个线程,须要让指定数量的线程独特到达某一个节点之后,这些线程才会一起被执行。
CyclicBarrier(循环屏障)最早是在 JDK1.5 版本中提供的,从设计思维上来看,次要包含倒计数器的最大屏障,管制阻塞期待的办法,倒计数器的递加操作方法,和触发点线程工作等 4 个外围因素。其中:

  • 倒计数器的同步器:次要基于 ReentrantLock 来实现控制线程对象,其本质还是基于 AQS 根底同步器实现。
  • 倒计数器的最大屏障数量:个别是在构建 CyclicBarrier(循环屏障)对象是预约义设置,示意须要在某个运行节点上束缚的线程数量。
  • 控制线程阻塞期待的办法:定义一个办法,使得实现阻塞线程让其进入期待状态。
  • 倒计数器的递加操作方法:定义一个办法,使得让倒计数器进行递加 1 运算,直到达到屏障时,期待的线程才继续执行。
  • 触发点线程工作:个别指的是当指定数量的线程达到设置的屏障时,才会去触发执行的工作。

简略来说,CyclicBarrier(循环屏障)是让多个线程相互期待,直到达到一个同步的运行节点。再持续一起执行。

2. 根本实现

在 CyclicBarrier 类的 JDK1.8 版本中,对于 CountDownLatch 的根本实现如下:


public class CyclicBarrier {

    /** CyclicBarrier 锁—屏障 lock 实体 */
    private final ReentrantLock lock = new ReentrantLock();

    /** CyclicBarrier 锁—屏障条件队列 */
    private final Condition trip = lock.newCondition();

    /**  CyclicBarrier 锁—屏障最大值 */
    private final int parties;

    /**  CyclicBarrier 锁—屏障触发线程工作指标 */
    private final Runnable barrierCommand;

    /**  CyclicBarrier 锁—以后计数器的最大值屏障实例 */
    private Generation generation = new Generation();

    /**  CyclicBarrier 锁—以后计数器的最大值屏障实例 */
    private int count;

    /**  CyclicBarrier 锁—屏障实例 */
    private static class Generation {boolean broken = false;}

    /**  CyclicBarrier 锁—结构一个屏障实例(不带触发工作的) */
    public CyclicBarrier(int parties) {this(parties, null);
    }

    /**  CyclicBarrier 锁—结构一个屏障实例(带触发工作的) */
    public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**  CyclicBarrier 锁—无参数结构一个期待办法(默认模式) */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {return dowait(false, 0L);
        } catch (TimeoutException toe) {throw new Error(toe); // cannot happen
        }
    }

    /**  CyclicBarrier 锁—有参数结构一个期待办法(反对超时机制) */
    public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
    BrokenBarrierException,
    TimeoutException {return dowait(true, unit.toNanos(timeout));
    }

    /**  CyclicBarrier 锁—更新状态变量 */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();}

    /**  CyclicBarrier 锁—阻塞屏障 */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();}
    //... 其余代码
}
  • 预约义设置屏障最大值:次要是通过变量 parties 来实现预约义设置屏障最大值
  • 设置以后屏障数量:次要是通过变量 count 来实现
  • 控制线程的对象实例:次要是通过 ReentrantLock 和 Condition 来控制线程间通信
  • 触发指标工作对象:次要是通过 Runable 来定义 barrierCommand 变量
  • 提供了两个构造方法:都须要预约义指定屏障最大值 parties,其中一个须要传入 barrierAction 触发点工作
  • 线程阻塞期待办法:次要提供了 2 个 await()办法,其中:

    • 无参数 await()办法:默认解决形式,不反对超时机制,其外围解决逻辑在 dowait(boolean timed, long nanos)办法中实现
    • 有参数 await()办法:指定参数解决,反对超时机制,其外围解决逻辑在 dowait(boolean timed, long nanos)办法中实现
  • 屏障设置关健办法:次要是 breakBarrier() 来实现,其中:

    • 告诉达到屏障的所有线程:次要是通过 Condition 中的 signalAll()来告诉屏障中所有线程曾经满足条件
    • 屏障设置:默认预约义设置屏障最大值与设置以后屏障数雷同,次要设置 count = parties
    • 更新屏障状态:次要是通过 generation.broken = true 来实现
  • 更新屏障的状态:次要是提供了 nextGeneration() 办法,示意曾经达到预约义设置屏障最大值,其中:

    • 告诉达到屏障的所有线程:次要是通过 Condition 中的 signalAll()来告诉屏障中所有线程曾经满足条件
    • 筹备下一轮屏障设置:意味着预约义设置屏障最大值与设置以后屏障数雷同,次要设置 count = parties
    • 重置屏障状态:次要是通过 generation = new Generation()来实现

一般来说,假如咱们容许管制的最大线程数量为 N,预约义设置屏障最大值为 Parties(N), 以后屏障的线程数量为 Current(N) , 以后屏障中的期待线程数量为 Waiting(N), 那么咱们会失去一个计算公式:

2.1 结构 Generation 屏障实例标记
private static class Generation {boolean broken = false;}

次要是结构了一个动态私有化的 Generation 类,其中定义了一个 broken 变量来作为屏障标记,默认初始值为 false,示意还没达到屏障最大值。

2.1 线程阻塞期待外围 dowait 办法
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {// [1]. 实例化构建 ReentrantLock 的对象
    final ReentrantLock lock = this.lock;

    // [2]. 通过 lock()获取锁或者说加锁操作
    lock.lock();

    try {// [3]. 实例化构建 Generation 屏障实例对象
        final Generation g = generation;

        // [4]. 判断 Generation 屏障实例标记状态
        if (g.broken)
            throw new BrokenBarrierException();

        // [5]. 判断 Thread 是蕴含中断标记位
        if (Thread.interrupted()) {breakBarrier();
            throw new InterruptedException();}

        // [6]. 对倒计数器的屏障数量递加 1 运算
        int index = --count;

        // [7]. 根据后果 index == 0 示意以后指定的线程数量达到屏障最大值,须要触发 Runnable 工作
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;

                // 进行下一轮屏障设置
                nextGeneration();

                return 0;
            } finally {if (!ranAction)
                    breakBarrier();}
        }

        // [7]. 自旋操作

        for (;;) {
            try {
                // 判断是否超时
                if (!timed)

                    trip.await();
                else if (nanos > 0L)
                    // 进行下一轮屏障设置
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();
                    throw ie;
                } else {
                    // 是否产生线程中断
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            // 如果等待时间超过指定超时工夫,throw new TimeoutException
            if (timed && nanos <= 0L) {breakBarrier();
                throw new TimeoutException();}
        }
    } finally {

        // 最初开释锁操作
        lock.unlock();}
}
  • 加锁操作:实例化构建 ReentrantLock 的对象,通过 lock()办法进行加锁操作
  • 判断屏障实例标记状态:实例化构建 Generation 实例标记,判断屏障实例标记状态是否统一,如果不统一则 throw new BrokenBarrierException();
  • 判断以后线程是否被中断:判断 Thread 是蕴含中断标记位,如果中断 throw new InterruptedException()并调用 breakBarrier()从新设置屏障
  • 屏障倒计数器递加运算:对倒计数器的屏障数量递加 1 运算,即就是对以后倒计数器的以后值减去 1
  • 触发节点线程工作:以后倒计数器的以后值为 0 时,须要触发 Runnable 工作,并调用 nextGeneration 办法开启下一轮操作;否则,以后倒计数器的以后值不为 0 时,调用 awaitNanos(nanos)办法进入期待状态
  • 自旋操作判断超时:如果应用了超时参数,调用 awaitNanos(nanos)办法进入期待状态,其中如果产生中断则调用 Thread.currentThread().interrupt()设置中断标记。如果等待时间 > 指定超时工夫,抛出 throw new TimeoutException()异样
  • 开释锁:通过 unlock()办法进行解锁操作,并开释锁

3. 具体实现

在 CyclicBarrier 类的 JDK1.8 版本中,对于 CyclicBarrier 的具体实现如下:

public class CyclicBarrier {

    /** CyclicBarrier 锁—屏障 lock 实体 */
    private final ReentrantLock lock = new ReentrantLock();

    /** CyclicBarrier 锁—屏障条件队列 */
    private final Condition trip = lock.newCondition();

    /**  CyclicBarrier 锁—屏障最大值 */
    private final int parties;

    /**  CyclicBarrier 锁—屏障触发线程工作指标 */
    private final Runnable barrierCommand;

    /**  CyclicBarrier 锁—以后计数器的最大值屏障实例 */
    private Generation generation = new Generation();

    /**  CyclicBarrier 锁—以后计数器的最大值屏障实例 */
    private int count;

    /**  CyclicBarrier 锁—屏障实例 */
    private static class Generation {boolean broken = false;}

    /**  CyclicBarrier 锁—结构一个屏障实例(不带触发工作的) */
    public CyclicBarrier(int parties) {this(parties, null);
    }

    /**  CyclicBarrier 锁—结构一个屏障实例(带触发工作的) */
    public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**  CyclicBarrier 锁—无参数结构一个期待办法(默认模式) */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {return dowait(false, 0L);
        } catch (TimeoutException toe) {throw new Error(toe); // cannot happen
        }
    }

    /**  CyclicBarrier 锁—有参数结构一个期待办法(反对超时机制) */
    public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
    BrokenBarrierException,
    TimeoutException {return dowait(true, unit.toNanos(timeout));
    }

    /**  CyclicBarrier 锁—更新状态变量 */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();}

    /**  CyclicBarrier 锁—阻塞屏障 */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();}

    /**  CyclicBarrier 锁—阻塞屏障 */
    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {// [1]. 实例化构建 ReentrantLock 的对象
        final ReentrantLock lock = this.lock;

        // [2]. 通过 lock()获取锁或者说加锁操作
        lock.lock();

        try {// [3]. 实例化构建 Generation 屏障实例对象
            final Generation g = generation;

            // [4]. 判断 Generation 屏障实例标记状态是否为 true
            if (g.broken)
                throw new BrokenBarrierException();

            // [5]. 判断 Thread 是蕴含中断标记位
            if (Thread.interrupted()) {breakBarrier();
                throw new InterruptedException();}

            // [6]. 对倒计数器的屏障数量递加 1 运算
            int index = --count;

            // [7]. 根据后果 index == 0 示意以后指定的线程数量达到屏障最大值,须要触发 Runnable 工作
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;

                    // 进行下一轮屏障设置
                    nextGeneration();
                    return 0;
                } finally {if (!ranAction)
                        breakBarrier();}
            }

            // [7]. 自旋操作

            for (;;) {
                try {
                    // 判断是否超时
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();
                        throw ie;
                    } else {Thread.currentThread().interrupt();}
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                // 如果等待时间超过指定超时工夫,throw new TimeoutException
                if (timed && nanos <= 0L) {breakBarrier();
                    throw new TimeoutException();}
            }
        } finally {

            // 最初开释锁操作
            lock.unlock();}
    }


    /**  CyclicBarrier 锁—获取以后等屏障期待数量 */
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {return parties - count;} finally {lock.unlock();
        }
    }

    /**  CyclicBarrier 锁—获取以后等屏障数量 */
    public int getParties() {return parties;}

    /**  CyclicBarrier 锁—判断以后屏障 */
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {return generation.broken;} finally {lock.unlock();
        }
    }

    /**  CyclicBarrier 锁—重置屏障数量 */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation} finally {lock.unlock();
        }
    }

}

次要须要留神如下几个办法,都是基于 ReentrantLock 来实现加锁和解锁操作的,其中:

  • getNumberWaiting()办法:获取以后屏障中期待的线程数量
  • reset() 办法:当一轮屏障操作完结,须要重置屏障中最大线程数量
  • isBroken() 办法:判断是否达到屏障最大值

综上所述,从肯定意义上讲,CyclicBarrier 是一种可重入锁,属于 ReentrantLock 的利用实例,其中加锁和解锁操作都是独占模式的。


八.Semaphore(信号量)的设计与实现

在 Java 畛域中,Semaphore(信号量)是针对于 Java 多线程并发管制中实现对公共资源的线程数量进行并发同时访问控制,次要是采纳指定一个最大许可数的思维和基于 AQS 根底同步器来实现的一种同步器工具类。

Semaphore 能够用来管制在同一时刻访问共享资源的线程数量,通过协调各个线程以保障共享资源的正当应用。
Semaphore 保护了一组虚构许可,它的数量能够通过结构器的参数指定。
线程在访问共享资源前,必须调用 Semaphore 的 acquire()办法取得许可,如果许可数量为 0,该线程就始终阻塞。
线程在访问共享资源后,必须调用 Semaphore 的 release()办法开释许可。

1. 设计思维

一般来说,通过定义一个倒计数器,为了管制最多 N 个线程同时拜访公共资源,其计数器的最大值 Max(N)是被许可的最多 N 个线程数量,即就是许可的最大值 N。
Semaphore 类最早是在 JDK1.5 版本提供的,从设计思维上来看,次要包含倒计数器的最大许可数,同步器工作模式,获取锁办法,开释锁办法等 4 个外围因素。其中:

  • 同步器工作模式:基于 AQS 根底形象队列同步器封装内置实现一个动态的内置同步器抽象类,而后基于这个抽象类别离实现了偏心同步器和非偏心同步器,用来指定和形容同步器工作模式是偏心模式还是非偏心模式。
  • 偏心 / 非偏心模式:次要形容的是多个线程在同时获取锁时是否依照先到先得的程序获取锁,如果是则为偏心模式,否则为非偏心模式。
  • 获取锁办法:次要定义了一个 lock()办法来获取锁,示意如果锁曾经被其余线程占有或持有,其以后获取锁的线程则进入期待状态。
  • 开释锁办法:次要定义了一个 unlock()办法来开释锁,示意如果锁曾经被其余线程放弃或开释,其以后获取锁的线程则取得该锁。

2. 根本实现

在 JDK1.8 版本中,对于 Semaphore 的根本实现如下:


public class Semaphore implements java.io.Serializable {

    private static final long serialVersionUID = -3222578661600680210 L;

    /**
     * Semaphore 锁 - 封装同步器
     */
    private final Sync sync;

    /**
     * Semaphore 锁 - 封装同步器
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {//.... 其余代码}

    /**
     * Semaphore 锁 - 结构一个令牌许可(默认非公模式)
     */
    public Semaphore(int permits) {sync = new NonfairSync(permits);
    }

    /**
     * Semaphore 锁 - 结构一个令牌许可(可选偏心 / 非公模式)
     */
    public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * Semaphore 锁 - 获取锁办法(默认一个且可中断机制)
     */
    public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
    }
        
    /**
     * Semaphore 锁 - 获取锁办法(可选指定多个且可中断机制)
     */
    public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * Semaphore 锁 - 获取锁办法(默认多个且不可中断机制)
     */
    public void acquireUninterruptibly() {sync.acquireShared(1);
    }
    
    /**
     * Semaphore 锁 - 获取锁办法(指定多个且不可中断机制)
     */
    public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * Semaphore 锁 - 开释锁办法(默认一个)
     */
    public void release() {sync.releaseShared(1);
    }

    /**
     * Semaphore 锁 - 开释锁办法(可选指定多个)
     */
    public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

}
  • 外部同步器:基于 AQS 根底同步器封装和定义了一个动态外部 Sync 抽象类,其中形象了一个内置锁 lock()办法
  • 同步器工作模式:次要提供了 2 个构造方法,其中无参数构造方法示意的是默认的工作模式,有参数构造方法次要根据参数来实现指定的工作模式
  • 获取锁办法:次要提供了 3 个基于 acquire 办法,用于获取锁共享锁,其中:

    • 无参数 acquire()办法:获取共享锁的个别模式,默认指定一个许可和反对可中断机制
    • 有参数 acquire()办法:获取共享锁的指定模式,可选指定多个许可且反对可中断机制
    • 无参数 acquireUninterruptibly()办法:获取共享锁的指定模式,默认指定一个许可且不反对可中断机制
  • 开释锁办法:次要是提供了 2 个 release()办法用于开释锁共享锁,其中:

    • 无参数 release()办法:开释共享锁的个别模式,默认指定一个许可和反对可中断机制
    • 有参数 release()办法:开释共享锁的指定模式,可选指定多个许可且反对可中断机制
2.1 基于 AQS 同步器封装动态外部 Sync 抽象类

    /**
     * Semaphore 锁 - 基于 AQS 根底同步器封装同步器
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933 L;

        Sync(int permits) {setState(permits);
        }

        final int getPermits() {return getState();
        }

        /**
         * Semaphore 锁 - 非偏心模式获取共享锁
         */
        final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        /**
         * Semaphore 锁 - 开释共享锁
         */
        protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        /**
         * Semaphore 锁 - 自旋 +compareAndSetState 通过 CAS 操作计算操作令牌许可数
         */
        final void reducePermits(int reductions) {for (;;) {int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        /**
         * Semaphore 锁 - 自旋 +compareAndSetState 通过 CAS 操作重置令牌许可数
         */
        final int drainPermits() {for (;;) {int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
  • 实现形式:次要是基于 AQS 根底同步器封装了一个动态的的 Sync 抽象类,通过构造方法指定一个最大的令牌许可数量
  • 次要办法:次要是看共享锁的获取 nonfairTryAcquireShared()办法和开释锁 tryReleaseShared()办法,其中:

    • 获取锁 nonfairTryAcquireShared()办法:非偏心模式下获取共享锁,利用自旋 +compareAndSetState()办法通过 CAS 操作,保障并发批改令牌许可数量
    • 开释锁 tryReleaseShared(i)办法:偏心 / 非偏心模式下开释共享锁,利用自旋 +compareAndSetState()办法通过 CAS 操作开释,会把开释的令牌许可数量减少到以后残余的令牌许可数量中。
  • 令牌许可操作方法:次要提供了 drainPermits() 办法 和 reducePermits() 办法,其中:

    • drainPermits() 办法:次要是利用自旋 +compareAndSetState()办法通过 CAS 操作重置令牌许可数
    • reducePermits() 办法:次要是自旋 +compareAndSetState)办法通过 CAS 操作递加计算操作令牌许可数
  • 获取锁形式:令牌许可数量 QS 根底同步器状态变量对应,通过 getPermits() 办法来获取令牌许可数量,实质是调用 AQS 根底同步器中的 getState()来获取状态变量。

特地指出的是,这里的非偏心模式次要形容的是,在令牌许可数量容许的状况下,让所有线程进行自旋操作,其实就是不关怀线程到来的程序,将全副线程放到一起去参加竞争令牌许可。
其中,次要还利用 compareAndSetState 办法来进行 CAS 操作,保障批改令牌许可数量的原子性操作。
一般来说,假如咱们容许管制的最大线程数量为 N,残余令牌许可数量为 Remanent(N), 以后可用令牌许可数量为 Current(N) , 耗费令牌许可数量为 Reduction(N),那么咱们会失去一个计算公式:

即就意味着,残余令牌许可数量等于以后可用令牌许可数量与耗费令牌许可数量之差。
由此可见,在偏心 / 非偏心模式下,咱们对于对于获取锁和开释锁时,对于残余令牌许可数量 Remanent(N)计算都满足以下公式:

  • 首先,在线程在访问共享资源前,咱们能够容许的最大值为 Available(N), 自旋获取锁的数量为 Acquires(N),那么咱们在获取锁时:
  • 其次,在线程在访问共享资源后,自旋开释锁的数量为 Releases(N),那么咱们在开释锁时:

当然,须要留神的的一个问题,就是当残余令牌许可数量 Remanent(N) < 0 时,示意以后线程会进入阻塞期待状态。

2.2 基于 Sync 抽象类封装 FairSync 偏心同步器

    /**
     * Semaphore 锁 - 基于 Sync 抽象类封装 FairSync 偏心同步器
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944 L;

        /**
         * Semaphore 锁 - Semaphore 锁 - 通过构造方法指定令牌许可
         */
        FairSync(int permits) {super(permits);
        }

        /**
         * Semaphore 锁 - Semaphore 锁 - 偏心模式开释共享锁
         */
        protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
  • 实现形式:次要是在基于动态外部 Sync 抽象类来实现,结构了一个可指定大小的的令牌许可
  • 次要办法:次要是提供了一个 tryAcquireShared 办法,其中利用 hasQueuedPredecessors()来保障公平性
  • 工作机制:通过基于 AQS 根底同步器中的期待队列来实现偏心机制

须要留神的是,在未达到最大的令牌许可数量时,所有线程都不会进入期待队列中。

2.3 基于 Sync 抽象类封装 NonfairSync 非偏心同步器

    /**
     * Semaphore 锁 - 基于 Sync 抽象类封装 NonfairSync 非偏心同步器
     */
    static final class NonfairSync extends Sync {

        private static final long serialVersionUID = -2694183684443567898 L;

        /**
         * Semaphore 锁 - Semaphore 锁 - 通过构造方法指定令牌许可
         */
        NonfairSync(int permits) {super(permits);
        }


        /**
         * Semaphore 锁 - Semaphore 锁 - 非偏心模式开释共享锁
         */
        protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
        }
    }
  • 实现形式:次要是在基于动态外部 Sync 抽象类来实现,结构了一个可指定大小的的令牌许可
  • 次要办法:次要是提供了一个 tryAcquireShared 办法,其中次要是调用了动态外部 Sync 抽象类 nonfairTryAcquireShared 办法。
  • 工作机制:通过自旋操作让所有线程竞争获取令牌许可,实质还是采纳了 AQS 根底同步器中闯入策略到突破偏心的

3. 具体实现

在 JDK1.8 版本中,对于 Semaphore 的具体实现如下:


public class Semaphore implements java.io.Serializable {

    private static final long serialVersionUID = -3222578661600680210 L;

    /**
     * Semaphore 锁 - 封装同步器
     */
    private final Sync sync;


    /**
     * Semaphore 锁 - 基于 AQS 根底同步器封装同步器
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933 L;

        Sync(int permits) {setState(permits);
        }

        final int getPermits() {return getState();
        }

        /**
         * Semaphore 锁 - 非偏心模式获取共享锁
         */
        final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        /**
         * Semaphore 锁 - 开释共享锁
         */
        protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        /**
         * Semaphore 锁 - 自旋 +compareAndSetState 通过 CAS 操作计算操作令牌许可数
         */
        final void reducePermits(int reductions) {for (;;) {int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        /**
         * Semaphore 锁 - 自旋 +compareAndSetState 通过 CAS 操作重置令牌许可数
         */
        final int drainPermits() {for (;;) {int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }


    /**
     * Semaphore 锁 - 基于 Sync 抽象类封装 FairSync 偏心同步器
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944 L;

        /**
         * Semaphore 锁 - Semaphore 锁 - 通过构造方法指定令牌许可
         */
        FairSync(int permits) {super(permits);
        }

        /**
         * Semaphore 锁 - Semaphore 锁 - 偏心模式开释共享锁
         */
        protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    /**
     * Semaphore 锁 - 基于 Sync 抽象类封装 NonfairSync 非偏心同步器
     */
    static final class NonfairSync extends Sync {

        private static final long serialVersionUID = -2694183684443567898 L;

        /**
         * Semaphore 锁 - Semaphore 锁 - 通过构造方法指定令牌许可
         */
        NonfairSync(int permits) {super(permits);
        }


        /**
         * Semaphore 锁 - Semaphore 锁 - 非偏心模式开释共享锁
         */
        protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Semaphore 锁 - 结构一个令牌许可(默认非公模式)
     */
    public Semaphore(int permits) {sync = new NonfairSync(permits);
    }

    /**
     * Semaphore 锁 - 结构一个令牌许可(可选偏心 / 非公模式)
     */
    public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * Semaphore 锁 - 获取锁办法(默认一个且可中断机制)
     */
    public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
    }

    /**
     * Semaphore 锁 - 获取锁办法(可选指定多个且可中断机制)
     */
    public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * Semaphore 锁 - 获取锁办法(默认多个且不可中断机制)
     */
    public void acquireUninterruptibly() {sync.acquireShared(1);
    }
    
    /**
     * Semaphore 锁 - 获取锁办法(指定多个且不可中断机制)
     */
    public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * Semaphore 锁 - 开释锁办法(默认一个)
     */
    public void release() {sync.releaseShared(1);
    }

    /**
     * Semaphore 锁 - 开释锁办法(可选指定多个)
     */
    public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }


    /**
     * Semaphore 锁 - 尝试获取锁办法(默认一个)
     */
    public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * Semaphore 锁 - 尝试获取锁办法(可选指定多个)
     */
    public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * Semaphore 锁 - 尝试获取锁办法(可选指定多个并且反对超时机制)
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * Semaphore 锁 - 尝试获取锁办法(默认一个并且反对超时机制)
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Semaphore 锁 - 统计能够令牌许可数
     */
    public int availablePermits() {return sync.getPermits();
    }

    /**
     * Semaphore 锁 - 重置令牌许可数
     */
    public int drainPermits() {return sync.drainPermits();
    }

    /**
     * Semaphore 锁 - 递加计算令牌许可数
     */
    protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * Semaphore 锁 - 判断是否偏心模式
     */
    public boolean isFair() {return sync instanceof FairSync;}

    /**
     * Semaphore 锁 - 判断队列中是否存在线程对象
     */
    public final boolean hasQueuedThreads() {return sync.hasQueuedThreads();
    }

    /**
     * Semaphore 锁 - 获取队列长度
     */
    public final int getQueueLength() {return sync.getQueueLength();
    }

    /**
     * Semaphore 锁 - 获取队列的线程对象
     */
    protected Collection < Thread > getQueuedThreads() {return sync.getQueuedThreads();
    }

}
  • 信号量同步器:次要是提供了 2 个构造方法来实现令牌许可的治理,其中:

    • 默认非偏心模式:根据指定传入的令牌许可数量 permits 间接实例化 NonfairSync 非偏心同步器
    • 可选偏心 / 非偏心模式:根据指定传入的令牌许可数量 permits 和偏心标记 fair 来实例化 NonfairSync 非偏心同步器和 FairSync 偏心同步器,其中,当 fair=true 时,是偏心平模式,否则为非偏心模式
  • 反对可中断机制:次要是提供了 2 个 acquire()办法来获取锁,其中:

    • 无参数 acquire()办法:个别模式获取共享锁,次要是基于 AQS 根底同步器中的 acquireSharedInterruptibly(int arg)来实现,其外围逻辑是 doAcquireSharedInterruptibly(int arg)来操纵。
    • 有参数 acquire()办法:根据指定传入的令牌许可数量 permits 来判断,当 permits< 0 时,间接 throw new IllegalArgumentException();否则,间接调用 acquireSharedInterruptibly(permits)办法。
  • 反对不可中断机制:次要是提供了 2 个 acquireUninterruptibly() 办法,其中:

    • 无参数 acquireUninterruptibly() 办法:个别模式获取共享锁,次要是基于 AQS 根底同步器中 acquireShared(int arg)办法来实现,其外围逻辑是 doAcquireShared(int arg) 来操纵。
    • 有参数 acquireUninterruptibly() 办法:根据指定传入的令牌许可数量 permits 来判断,当 permits< 0 时,间接 throw new IllegalArgumentException();否则,间接调用 acquireShared(int arg)办法。
  • 非偏心模式获取锁形式:次要提供了 2 个 tryAcquire() 办法,其中:

    • 无参数 tryAcquire() 办法:非偏心模式尝试获取共享锁,间接调用的是非偏心同步器中的 nonfairTryAcquireShared(int acquires) 办法。
    • 有参数 tryAcquire() 办法:根据指定传入的令牌许可数量 permits 来判断,当 permits< 0 时,间接 throw new IllegalArgumentException();否则,间接调用 nonfairTryAcquireShared(int acquires) 办法。
  • 偏心模式获取锁形式:次要提供了 2 个 tryAcquire() 办法,反对超时机制。其中:

    • 无参数 tryAcquire() 办法:偏心模式尝试获取共享锁,根据指定传入的令牌许可数量 permits 来判断,当 permits< 0 时,间接 throw new IllegalArgumentException();否则,间接调用的是 AQS 根底同步器中的 tryAcquire(int permits, long timeout, TimeUnit unit)办法,其外围逻辑是 tryAcquireSharedNanos(int arg, long nanosTimeout)来操纵。
    • 有参数 tryAcquire() 办法:偏心模式尝试获取共享锁,默认反对一个许可,间接调用的是 AQS 根底同步器中的 tryAcquire(1,long timeout, TimeUnit unit)办法,其外围逻辑是 tryAcquireSharedNanos(int arg, long nanosTimeout)来操纵。
  • 开释锁操作形式:次要提供了 2 个 release()办法, 其中:

    • 无参数 release() 办法:偏心 / 非偏心模式示范锁操作,默认反对一个许可,次要是间接调用 AQS 根底同步器中的 releaseShared(int arg) 办法
    • 有参数 release() 办法:偏心 / 非偏心模式示范锁操作,根据指定传入的令牌许可数量 permits 来判断,当 permits< 0 时,间接 throw new IllegalArgumentException();否则,次要是间接调用 AQS 根底同步器中的 releaseShared(int arg) 办法
  • 令牌许可操作方法:次要提供了 availablePermits() 办法,reducePermits(int reduction)办法 以及 drainPermits() 办法,其中:

    • availablePermits() 办法:获取可用的令牌许可数量,次要是调用外部同步器中 getPermits()办法。
    • reducePermits()办法:计算残余可用令牌许可数量,根据指定传入的令牌许可数量 reduction 来判断,当 reduction< 0 时,间接 throw new IllegalArgumentException();否则,调用外部同步器中 reducePermits()办法。
    • drainPermits() 办法:重置可用令牌许可数量,次要是调用外部同步器中 drainPermits()办法。
  • 队列操作方法:次要提供了 hasQueuedThreads()办法,getQueuedThreads() 办法以及 getQueueLength() 办法,其中:

    • hasQueuedThreads()办法:次要是用于获取队列中是否存在期待获取令牌许可的线程对象,次要是间接应用 AQS 根底同步器的 hasQueuedThreads()来实现。
    • getQueuedThreads() 办法:次要是用于获取队列中期待获取令牌许可的线程对象,次要是间接应用 AQS 根底同步器的 getQueuedThreads()来实现。
    • getQueueLength() 办法:次要是用于获取队列中期待获取令牌许可的数量,次要是间接应用 AQS 根底同步器的 getQueueLength()来实现。

综上所述,从肯定意义上讲,Semaphore 是一种共享锁,属于 AQS 根底形象队列同步器中共享模式孵化的产物,反对偏心模式与非偏心模式,默认是应用非偏心模式。

写在最初

通过对 Java 畛域中,JDK 外部提供的各种锁的实现来看,始终围绕的外围次要还是基于 AQS 根底同步器来实现的,然而 AQS 根底同步器不是一种非它不可的技术标准规范,更多的只是一套技术参考指南。

然而,实际上,Java 对于锁的实现与使用远远不止这些,还有相位器 (Phaser) 和交换器(Exchanger), 以及在 Java JDK1.8 版本之前并发容器 ConcurrentHashMap 中应用的分段锁(Segment)。

不论是何种实现和利用,在 Java 并发编程畛域来讲,都是围绕线程平安问题的角度去思考的,只是针对于各种各样的业务场景做的具体的实现。

肯定意义上来讲,对线程加锁只是并发编程的实现形式之一,绝对于理论利用来说,Java 畛域中的锁都只是一种繁多利用的锁,只是给咱们把握 Java 并发编程提供一种思维没,喋喋不休也不可能详尽。

到此为止,这算是对于 Java 畛域中并发锁的最终章,文中表述均为集体认识和集体了解,如有不到之处,忘请谅解也请给予批评指正。

最初,技术钻研之路任重而道远,愿咱们熬的每一个通宵,都撑得起咱们想在这条路上走上来的勇气,将来依然可期,与各位程序编程君共勉!

正文完
 0