关于linux:Java-并发编程解析-基于JDK源码解析Java领域中的并发锁我们可以从中学习到什么内容

28次阅读

共计 30774 个字符,预计需要花费 77 分钟才能阅读完成。

天穹之边,浩瀚之挚,眰恦之美;悟心悟性,虎头蛇尾,惟善惟道!—— 朝槿《朝槿兮年说》

写在结尾

在 Java 畛域中, 尤其是在并发编程畛域,对于多线程并发执行始终有两大外围问题:同步和互斥。其中:

  • 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个过程或线程应用,多个过程或线程不能同时应用公共资源。即就是同一时刻只容许一个线程访问共享资源的问题。
  • 同步(Synchronization):两个或两个以上的过程或线程在运行过程中协同步调,按预约的先后秩序运行。即就是线程之间如何通信、合作的问题。

针对对于这两大外围问题,利用管程是可能解决和实现的,因而能够说,管程是并发编程的万能钥匙。

尽管,Java 在基于语法层面 (synchronized 关键字) 实现了对管程技术, 然而从应用形式和性能上来说,内置锁 (synchronized 关键字) 的粒度绝对过大,不反对超时和中断等问题。

为了补救这些问题,从 JDK 层面对其“反复造轮子”,在 JDK 外部对其从新设计和定义,甚至实现了新的个性。

关健术语

本文用到的一些要害词语以及罕用术语,次要如下:

  • 信号量(Semaphore): 是在多线程环境下应用的一种设施,是能够用来保障两个或多个要害代码段不被并发调用,也是作零碎用来解决并发中的互斥和同步问题的一种办法。
  • 信号量机制(Semaphores):用来解决同步 / 互斥的问题的,它是 1965 年, 荷兰学者 Dijkstra 提出了一种行之有效的实现过程互斥与同步的办法。
  • 管程(Monitor) : 个别是指治理共享变量以及对共享变量的操作过程,让它们反对并发的一种机制。

根本概述

在 Java 畛域中,咱们能够将锁大抵分为基于 Java 语法层面 (关键词) 实现的锁和基于 JDK 层面实现的锁。

在 Java 畛域中,从 JDK 源码剖析来看,基于 JDK 层面实现的锁大抵次要能够分为以下 4 种形式:

  • 基于 Lock 接口实现的锁
  • 基于 ReadWriteLock 接口实现的锁
  • 基于 AQS 根底同步器实现的锁
  • 基于自定义 API 操作实现的锁

从浏览源码不难发现,在 Java SDK 并发包次要通过 AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。

一. 根本实践

在并发编程畛域,有两大外围问题:一个是互斥,即同一时刻只容许一个线程访问共享资源;另一个是同步,即线程之间如何通信、合作。

在操作系统中,个别有如果 I / O 操作时,对于阻塞和非阻塞是从函数调用角度来说的,其中:

  • 阻塞:如果读写操作没有就绪或者实现,则函数始终期待。
  • 非阻塞:函数立刻调用,而后让应用程序轮询循环。

而同步和异步则是从“读写是次要是由谁实现”的角度来说的,其中:

  • 同步:读写操作次要交给应用程序实现
  • 异步:读写操作次要由操作系统实现,个别实现之后,回调函数和事件告诉应用程序。

其中,信号量机制 (Semaphores) 是用来解决同步 / 互斥的问题的,然而信号量 (Semaphore) 的操作扩散在各个过程或线程中,不不便进行治理,因每次需调用 P /V(来自荷兰语 proberen 和 verhogen)操作,还可能导致死锁或毁坏互斥申请的问题。

因为 PV 操作对于解决过程互斥 / 同步编程简单,因此在此基础上提出了与信号量等价的——“管程技术”。

其中,管程 (Monitor) 当中定义了共享数据结构只能被管程外部定义的函数所批改,所以如果咱们想批改管程外部的共享数据结构的话,只能调用管程外部提供的函数来间接的批改这些数据结构。

一般来说,管程 (Monitor) 和信号量 (Semaphore) 是等价的,所谓等价指的是用管程可能实现信号量,也能用信号量实现管程。

在管程的倒退历程上,先后呈现过 Hasen 模型、Hoare 模型和 MESA 模型等三种不同的管程模型,当初正在宽泛应用的是 MESA 模型。

在 MESA 模型中,管程中引入了条件变量 (Conditional Variable) 的概念,而且每个条件变量都对应有一个期待队列(Wait Queue)。其中,条件变量和期待队列的作用是解决线程之间的同步问题。

而对于解决线程之间的互斥问题,将共享变量 (Shared Variable) 及其对共享变量的操作对立封装起来,个别次要是实现一个线程平安的阻塞队列 (Blocking Queue),将线程不平安的队列封装起来,对外提供线程平安的操作方法,例如入队操作(Enqueue) 和出队操作(Dequeue)。

在 Java 畛域中,对于 Java 语法层面实现的锁 (synchronized 关键字),其实就是参考了 MESA 模型,并且对 MESA 模型进行了精简,个别在 MESA 模型中,条件变量能够有多个,Java 语言内置的管程(synchronized) 里只有一个条件变量。

这就意味着,被 synchronized 关键字润饰的代码块或者间接标记静态方法以及实例办法,在编译期会主动生成相干加锁 (lock) 和解锁 (unlock) 的代码,即就是 monitorenter 和 monitorexit 指令。

对于 synchronized 关键字来说,次要是在 Java HotSpot(TM) VM 虚拟机通过 Monitor(监视器)来实现 monitorenter 和 monitorexit 指令的。

同时,在 Java HotSpot(TM) VM 虚拟机中,每个对象都会有一个监视器,监视器和对象一起创立、销毁。

监视器相当于一个用来监督这些线程进入的非凡房间,其任务是保障(同一时间)只有一个线程能够拜访被爱护的临界区代码块。

实质上,监视器是一种同步工具,也能够说是 JVM 对管程的同步机制的封装实现,次要特点是:

  • 同步:监视器所爱护的临界区代码是互斥地执行的。一个监视器是一个运行许可,任一线程进入临界区代码都须要取得这个许可,来到时把许可偿还。
  • 合作:监视器提供 Signal 机制,容许正持有许可的线程临时放弃许可进入阻塞期待状态,期待其余线程发送 Signal 去唤醒;其余领有许可的线程能够发送 Signal,唤醒正在阻塞期待的线程,让它能够从新取得许可并启动执行。

在 Hotspot 虚拟机中,监视器是由 C ++ 类 ObjectMonitor 实现的,ObjectMonitor 类定义在 ObjectMonitor.hpp 文件中,其中:

  • Owner:指向的线程即为取得锁的线程
  • Cxq:竞争队列(Contention Queue),所有申请锁的线程首先被放在这个竞争队列中
  • EntryList:对象实体列表,示意 Cxq 中那些有资格成为候选资源的线程被挪动到 EntryList 中。
  • WaitSet:相似于期待队列,某个领有 ObjectMonitor 的线程在调用 Object.wait()办法之后将被阻塞,而后该线程将被搁置在 WaitSet 链表中。

同时,管程与 Java 中面向对象准则 (Object Oriented Principle) 也是十分符合的,次要体现在 java.lang.Object 类中 wait()、notify()、notifyAll() 这三个办法,其中:

  • wait()办法:阻塞线程并且进入期待队列
  • notify()办法:随机地告诉期待队列中的一个线程
  • notifyAll()办法:告诉期待队列中的所有线程

不难发现,在 Java 中 synchronized 关键字及 java.lang.Object 类中 wait()、notify()、notifyAll() 这三个办法都是管程的组成部分。

由此可见,咱们能够失去一个比拟通用的并发同步工具根底模型,大抵蕴含如下几个内容,其中:

  • 条件变量(Conditional Variable):利用线程间共享的变量进行同步的一种工作机制
  • 共享变量((Shared Variable)):个别指对象实体对象的成员变量和属性
  • 阻塞队列 (Blocking Queue):共享变量(Shared Variable) 及其对共享变量的操作对立封装
  • 期待队列 (Wait Queue):每个条件变量都对应有一个期待队列(Wait Queue), 外部须要实现入队操作(Enqueue) 和出队操作 (Dequeue) 办法
  • 变量状态形容机(Synchronization Status):形容条件变量和共享变量之间状态变动,又能够称其为同步状态
  • 工作模式(Operation Mode):线程资源具备排他性,因而定义独占模式和共享模式两种工作模式

综上所述,条件变量和期待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。

二.AQS 根底同步器的设计与实现

在 Java 畛域中, 同步器是专门为多线程并发设计的同步机制,次要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。

对于多线程实现实现并发解决机制来说,始终以来,多线程都存在 2 个问题:

  • 线程之间内存共享,须要通过加锁进行管制,然而加锁会导致性能降落,同时简单的加锁机制也会减少编程编码难度
  • 过多线程造成线程之间的上下文切换,导致效率低下

因而,在并发编程畛域中,始终有一个很重要的设计准则:“不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”

简略来说,就是尽可能通过音讯通信,而不是内存共享来实现过程或者线程之间的同步。

其中,同步器是专门为多线程并发设计的同步机制,次要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。

因为在不同的利用场景中,对于同步器的需要也会有所不同,个别在咱们本人去实现和设计一种并发工具的时候,都需会思考以下几个问题:

  • 是否反对响应中断?如果阻塞状态的线程可能响应中断信号,也就是说当咱们给阻塞的线程发送中断信号的时候,可能唤醒它,那它就有机会开释已经持有的锁。
  • 是否反对超时?如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个谬误,那这个线程也有机会开释已经持有的锁。
  • 是否反对非阻塞地获取锁资源?如果尝试获取锁失败,并不进入阻塞状态,而是间接返回,那这个线程也有机会开释已经持有的锁。

从浏览 JDK 源码不难发现,次要是采纳设计模式中模板模式的准则,JDK 将各种同步器中雷同的局部形象封装成了一个对立的根底同步器,而后基于根底同步器为模板通过继承的形式来实现不同的同步器。

也就是说,在理论开发过程中,除了间接应用 JDK 实现的同步器,还能够基于这个根底同步器咱们也能够本人自定义实现合乎咱们业务需要的同步器。

在 JDK 源码中,同步器位于 java.util.concurrent.locks 包下,其根本定义是 AbstractQueuedSynchronizer 类,即就是咱们常说的 AQS 同步器。

1. 设计思维

一个规范的 AQS 同步器次要有同步状态机制,期待队列,条件队列,独占模式,共享模式等五大外围因素组成。

JDK 的 JUC(java.util.concurrent.)包中提供了各种并发工具,然而大部分同步工具的实现基于 AbstractQueuedSynchronizer 类实现,其内部结构次要如下:

  • 同步状态机制 (Synchronization Status):次要用于实现锁(Lock) 机制,是指同步状态,其要求对于状态的更新必须原子性的
  • 期待队列 (Wait Queue):次要用于寄存期待线程获取到的锁资源,并且把线程保护到一个 Node(节点) 外面和保护一个非阻塞的 CHL Node FIFO(先进先出)队列,次要是采纳自旋锁 +CAS 操作来保障节点插入和移除的原子性操作。
  • 条件队列(Condition Queue):用于实现锁的条件机制,个别次要是指替换“期待 - 告诉”工作机制,次要是通过 ConditionObject 对象实现 Condition 接口提供的办法实现。
  • 独占模式(Exclusive Mode):次要用于实现独占锁,次要是基于动态外部类 Node 的常量标记 EXCLUSIVE 来标识该节点是独占模式
  • 共享模式(Shared Mode):次要用于实现共享锁,次要是基于动态外部类 Node 的常量标记 SHARED 来标识该节点是共享模式

其中,对于 AbstractQueuedSynchronizer 类的实现原理,咱们能够从如下几个方面来看:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691 L;


    protected AbstractQueuedSynchronizer() {}

    /**
     * 期待队列:head- 头节点
     */
    private transient volatile Node head;

    /**
     * 期待队列:tail- 尾节点
     */
    private transient volatile Node tail;

    /**
     * 同步状态:32 位整数类型,更新同步状态 (state) 时必须保障其是原子性的
     */
    private volatile int state;

    /**
     * 自旋锁耗费超时工夫阀值(threshold):threshold < 1000ns 时,示意竞争时抉择自旋;threshold > 1000ns 时,示意竞争时抉择零碎阻塞
     */
    static final long spinForTimeoutThreshold = 1000 L;

    /**
     * CAS 原子性操作
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    /**
     * stateOffset
     */
    private static final long stateOffset;

    /**
     * headOffset
     */
    private static final long headOffset;

    /**
     * tailOffset
     */
    private static final long tailOffset;

    /**
     * waitStatusOffset
     */
    private static final long waitStatusOffset;

    /**
     * nextOffset
     */
    private static final long nextOffset;


    static {
        try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));

        } catch (Exception ex) {throw new Error(ex);
        }
    }
        
            private final boolean compareAndSetHead(Node update)  {return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
        
            private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
        
            private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
        
            private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
        
            protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}

[1]. AbstractQueuedSynchronizer 类的实现原理是继承了基于 AbstractOwnableSynchronizer 类的抽象类,其中次要对 AQS 同步器的通用个性和办法进行形象封装定义,次要包含如下办法:

public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {


    private static final long serialVersionUID = 3737899427754241961 L;


    protected AbstractOwnableSynchronizer() {}

    /**
     *  同步器拥有者
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 设置同步器拥有者:把线程当作参数传入,指定某个线程为独享
     */
    protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}

    /**
     * 获取同步器拥有者:获取指定的某个线程
     */
    protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
}
  • setExclusiveOwnerThread(Thread thread)办法:把某个线程作为参数传入,从而设置 AQS 同步器的所有者,即就是咱们设置的某个线程
  • getExclusiveOwnerThread()办法:获取以后 AQS 同步器的所有者,即就是咱们指定的某个线程

[2]. 对于同步状态 (state),其类型是 32 位整数类型,并且是被 volatile 润饰的,示意在更新同步状态(state) 时必须保障其是原子性的。

[3]. 对于期待队列的构造,次要是在 Node 定义了 head 和 tail 变量,其中 head 示意头部节点,tail 示意尾部节点

[4]. 对于期待队列的构造提到的 Node 类来说,次要内容如下:

  static final class Node {
      /** Marker to indicate a node is waiting in shared mode */
      static final Node SHARED = new Node();
            
      /** Marker to indicate a node is waiting in exclusive mode */
      static final Node EXCLUSIVE = null;

      /** waitStatus value to indicate thread has cancelled */
      static final int CANCELLED = 1;
            
      /** waitStatus value to indicate successor's thread needs unparking */
      static final int SIGNAL = -1;
            
      /** waitStatus value to indicate thread is waiting on condition */
      static final int CONDITION = -2;
            
      /**
       * waitStatus value to indicate the next acquireShared should
       * unconditionally propagate
       */
      static final int PROPAGATE = -3;

      /**
       * Status field, taking on only the values:
       *   SIGNAL:     The successor of this node is (or will soon be)
       *               blocked (via park), so the current node must
       *               unpark its successor when it releases or
       *               cancels. To avoid races, acquire methods must
       *               first indicate they need a signal,
       *               then retry the atomic acquire, and then,
       *               on failure, block.
       *   CANCELLED:  This node is cancelled due to timeout or interrupt.
       *               Nodes never leave this state. In particular,
       *               a thread with cancelled node never again blocks.
       *   CONDITION:  This node is currently on a condition queue.
       *               It will not be used as a sync queue node
       *               until transferred, at which time the status
       *               will be set to 0. (Use of this value here has
       *               nothing to do with the other uses of the
       *               field, but simplifies mechanics.)
       *   PROPAGATE:  A releaseShared should be propagated to other
       *               nodes. This is set (for head node only) in
       *               doReleaseShared to ensure propagation
       *               continues, even if other operations have
       *               since intervened.
       *   0:          None of the above
       *
       *
       * The field is initialized to 0 for normal sync nodes, and
       * CONDITION for condition nodes.  It is modified using CAS
       * (or when possible, unconditional volatile writes).
       */
      volatile int waitStatus;

      /**
       * Link to predecessor node that current node/thread relies on
       */
      volatile Node prev;

      /**
       * Link to the successor node that the current node/thread
       */
      volatile Node next;

      /**
       * The thread that enqueued this node.  Initialized on
       * construction and nulled out after use.
       */
      volatile Thread thread;

      /**
       * Link to next node waiting on condition, or the special
       */
      Node nextWaiter;

      /**
       * Returns true if node is waiting in shared mode.
       */
      final boolean isShared() {return nextWaiter == SHARED;}


      final Node predecessor() throws NullPointerException {
          Node p = prev;
          if (p == null)
              throw new NullPointerException();
          else
              return p;
      }

      Node() { // Used to establish initial head or SHARED marker}

      Node(Thread thread, Node mode) { // Used by addWaiter
          this.nextWaiter = mode;
          this.thread = thread;
      }

      Node(Thread thread, int waitStatus) { // Used by Condition
          this.waitStatus = waitStatus;
          this.thread = thread;
      }
  }
  • 标记 Node 的工作模式常量标记:次要保护了 SHARED 和 EXCLUSIVE 等 2 个动态字面常量,其中 SHARED 用于标记 Node 中是共享模式,EXCLUSIVE: 用于标记 Node 中是独享模式
  • 标记期待状态的动态字面常量标记:次要保护了 0(示意无状态),SIGNAL(-1, 示意后续节点中的线程通过 park 进入期待,以后节点在开释和勾销时,须要通过 unpark 解除后后续节点的期待),CANCELLED(1, 示意以后节点中的线程因为超时和中断被勾销),CONDITION(-2, 示意以后节点在条件队列中),PROPAGATE(-3,SHARED 共享模式的头节点形容状态,示意无条件往下流传)等 5 个动态字面常量
  • 保护了一个期待状态 (waitStatus): 次要用于形容期待队列中节点的状态,其取值范畴为 0(waitStatus=0, 示意无状态),SIGNAL(waitStatus=-1, 示意期待信号状态),CANCELLED(waitStatus=1, 示意勾销状态),CONDITION(waitStatus=-2, 示意条件状态),PROPAGATE(waitStatus=-3, 示意 SHARED 共享模式状态) 等 5 个动态字面常量,CAS 操作时写入,默认值为 0。
  • 保护了 Node 的 2 个构造节点变量:次要是 prev 和 next,其中,prev 示意前驱节点,next 示意后续节点,示意形成双向向链表,形成了期待队列的数据结构
  • 保护了一个状态工作模式标记:次要是保护了一个 nextWaiter,用于示意在期待队列中以后节点在是共享模式还是独享模式,而对于条件队列来说,用于组成单向链表构造
  • 保护了一个线程对象变量:次要用于记录以后节点中的线程 thread

[5]. 对于自旋锁耗费超时工夫阀值(spinForTimeoutThreshold), 次要示意零碎根据这个阀值来抉择自旋形式还是零碎阻塞。个别假如这个 threshold,当 threshold < 1000ns 时,示意竞争时抉择自旋;否则,当 threshold > 1000ns 时,示意竞争时抉择零碎阻塞

[6]. 对于带有 Offset 等变量对应各自的句柄,次要用于执行 CAS 操作。在 JDK1.8 版本之前,CAS 操作次要通过 Unsafe 类来说实现;在 JDK1.8 版本之后,曾经开始利用 VarHandle 来代替 Unsafe 类操作实现。

[7]. 对于 CAS 操作来说,次要提供了如下几个办法:

  • compareAndSetState(int expect, int update)办法:CAS 操作原子更新状态
  • compareAndSetHead(Node update)办法:CAS 操作原子更新头部节点
  • compareAndSetTail(Node expect, Node update)办法:CAS 操作原子更新尾部节点
  • compareAndSetWaitStatus(Node node, int expect,int update)办法:CAS 操作原子更新期待状态
  • compareAndSetNext(Node node,Node expect,Node update)办法:CAS 操作原子更新后续节点

[8]. 对于条件队列 (ConditionObject) 来说,次要内容如下:

  public class ConditionObject implements Condition, java.io.Serializable {

      private static final long serialVersionUID = 1173984872572414699 L;

      /** First node of condition queue. */
      private transient Node firstWaiter;

      /** Last node of condition queue. */
      private transient Node lastWaiter;

      /** Mode meaning to reinterrupt on exit from wait */
      private static final int REINTERRUPT = 1;

      /** Mode meaning to throw InterruptedException on exit from wait */
      private static final int THROW_IE = -1;

      /**
       * Creates a new {@code ConditionObject} instance.
       */
      public ConditionObject() {}
            
  }
  • 基于 Condition 的接口实现条件队列,其外围次要是实现阻塞和唤醒的工作机制
  • 基于 Node 定义了 firstWaiter 和 lastWaiter 变量,其中,firstWaiter 示意的是头节点,lastWaiter 是尾节点
  • 还定义了 2 个字面常量 REINTERRUPT 和 THROW_IE,其中 REINTERRUPT=1,形容的是当中断是退出条件队列,THROW_IE=- 1 示意的是产生异样时退出

[8]. 除此之外,在 AQS 根底同步器中,个别能够通过构造方法间接将参数值赋给对应变量,也能够通过变量句柄进行赋值操作:

  • isShared()办法:用于判断期待队列是否为共享模式
  • predecessor()办法:用于获取以后节点对应的前驱节点,如果为空,则 throw new NullPointerException();

2. 根本实现

一个规范的 AQS 同步器最外围底层设计实现是一个非阻塞的 CHL Node FIFO(先进先出)队列数据结构,通过采纳自旋锁 +CAS 操作的办法来保障原子性操作。

总的来说,一个 AQS 根底同步器,底层的数据结构采纳的是一个非阻塞的 CHL Node FIFO(先进先出)队列数据结构,而实现的外围算法则是采纳自旋锁 +CAS 操作的办法。

首先,对于非阻塞的 CHL Node FIFO(先进先出)队列数据结构,一般来说,FIFO(First In First Out, 先进先出)队列是一个有序列表,属于形象型数据类型 (Abstract Data Type,ADT), 所有的插入和删除操作都产生在队首(Front) 和队尾 (Rear) 两端,具备先进先出的个性。


    /**
     * 期待队列:head- 头节点
     */
    private transient volatile Node head;

    /**
     * 期待队列:tail- 尾节点
     */
    private transient volatile Node tail;
        
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

在 AQS 同步器的源码中,次要是通过动态外部类 Node 来实现的这个非阻塞的 CHL Node FIFO(先进先出)队列数据结构,保护了两个变量 head 和 tail,其中 head 对应队首 (Front),tail 对应队尾(Rear)。同时,还定义了 addWaiter(Node mode) 办法来示意入队操作,其中有个 enq(final Node node)办法,次要用于初始化队列中 head 和 tail 的设置。

其次,AQS 同步器以 CLH 锁为根底,其中 CLH 锁是一种自旋锁,对于自旋锁的实现形式来看,次要能够分为一般自旋锁和自适应自旋锁,CLH 锁和 MCS 锁等 4 种,其中:

  • 一般自旋锁:多个线程一直自旋,一直尝试获取锁,其不具备公平性和因为要保障 CPU 和缓存以及主存之间的数据一致性,其开销较大。
  • 自适应自旋锁:次要是为解决一般自旋锁的公平性问题,引入了一个排队机制,个别称为排他自旋锁,其具备公平性,然而没有解决保障 CPU 和缓存以及主存之间的数据一致性问题,其开销较大。
  • CLH 锁:通过肯定伎俩将线程对于某一个共享变量的轮询竞争转化为一个线程队列,且队列中的线程各自轮询本人本地变量。
  • MCS 锁:宗旨在于解决 CLH 锁的问题,也是基于 FIFO 队列,与 CLH 锁不同是,只对本地变量自旋,前驱节点负责告诉 MCS 锁中线程自适完结。

自旋锁是一种实现同步的计划,属于一种非阻塞锁,与惯例锁次要的区别就在于获取锁失败之后的解决形式不同,次要体现在:

  • 个别状况下,惯例锁在获取锁失败之后,会将线程阻塞并适当时从新唤醒
  • 而自旋锁则是应用自旋来替换阻塞操作,次要是线程会一直循环查看该锁是否被开释,一旦开释线程便会获取锁资源。

从实质上讲,自旋是一钟忙期待状态,会始终耗费 CPU 的执行工夫。个别状况下,惯例互斥锁实用于持有锁长时间的状况,自旋锁适宜持有工夫短的状况。

其中,对于 CLH 锁来说,其外围是为解决同步带来的花销问题,Craig,Landim,Hagersten 三人创造了 CLH 锁,其中次要是:

  • 构建一个 FIFO(先进先出)队列,构建时次要通过挪动尾部节点 tail 来实现队列的排队,每个想取得锁的线程都会创立一个新节点 (next) 并通过 CAS 操作原子操作将新节点赋予给 tail,以后线程轮询前一个节点的状态。
  • 执行完线程后,只需将以后线程对应节点状态设置为解锁即可,次要是判断以后节点是否为尾部节点,如果是间接设置尾部节点设置为空。因为下一个节点始终在轮询,所以能够取得锁。

CLH 锁将众多线程长时间对资源的竞争,通过有序化这些线程将其转化为只须要对本地变量检测。惟一存在竞争的中央就是入队之前对尾部节点 tail 的竞争,相对来说,以后线程对资源的竞争次数缩小,这节俭了 CPU 缓存同步的耗费,从而晋升了零碎性能。

然而同时也有一个问题,CLH 锁尽管解决了大量线程同时操作同一个变量时带来的开销问题,如果前驱节点和以后节点在本地主存中不存在,则拜访工夫过长,也会引起性能问题。

为了让 CLH 锁更容易实现勾销和超时的性能,AQS 同步器在设计时进行了革新,次要体现在:节点的构造和节点期待机制。其中:

  • 节点的构造:次要引入了头节点和尾节点,别离指向队列头部和尾部,对于锁的相干操作都与其非亲非故,并且每个节点都引入了前驱节点和后继节点。
  • 节点期待机制:次要在原来的自旋根底上减少了零碎阻塞唤醒,次要体现在 自旋锁耗费超时工夫阀值(threshold):threshold < 1000ns 时,示意竞争时抉择自旋;threshold > 1000ns 时,示意竞争时抉择零碎阻塞。

由此可见,次要是通过前驱节点和后继节点的援用连接起来造成一个链表队列,其中对于入队,检测节点,出队,判断超时,勾销节点等操作次要如下:

  • 入队(enqueue): 次要采纳一个有限循环进行 CAS 操作,即就是应用自旋形式竞争直到胜利。
  • 检测节点(checkedPrev): 个别在入队实现后,次要是检测判断以后节点的前驱节点是否为头节点,个别自旋形式是间接进入循环检测,而零碎阻塞形式是以后线程先检测,其中如果是头节点并胜利获取锁,则间接返回,以后线程不阻塞,否则对以后线程进行阻塞。
  • 出队(dequeue): 次要负责唤醒期待队列中的后继节点,并且依照条件往下流传有序执行
  • 判断超时(checkedTimeout): 队列中期待锁的线程可能因为中断或者超时的状况,当总耗时大于等于自定义耗时就间接返回,即就是
  • 勾销节点 (cancel): 次要是对于中断和超时而波及到勾销操作,而且这样的状况不再参加锁竞争,即就是个别通过调用 compareAndSetNext(Node node, Node expect,Node update) 来进行 CAS 操作。

特地值得注意的是,AQS 根底同步器中次要有期待队列和条件队列两种对列构造,比照便不难发现:期待队列采纳的底层数据结构是双向链表构造,而对于条件队列则是单向链表构造。

最初,AQS 同步器中应用了 CAS 操作,其中 CAS(Compare And Swap, 比拟并替换)操作时一种乐观锁策略,次要波及三个操作数据:内存值,预期值,新值,次要是指当且仅当预期值和内存值相等时才去批改内存值为新值。

一般来说,CAS 操作的具体逻辑,次要能够分为三个步骤:

  • 首先,查看某个内存值是否与该线程之前取到值一样。
  • 其次,如果不一样,示意此内存值曾经被别的线程批改,须要舍弃本次操作。
  • 最初,如果时一样,示意期间没有线程更改过,则须要用新值执行更新内存值。

除此之外,须要留神的是 CAS 操作具备原子性,次要是由 CPU 硬件指令来保障,并且通过 Java 本地接口 (Java Native Interface,JNI) 调用本地硬件指令实现。

当然,CAS 操作防止了乐观策略独占对象的 问题,同时进步了并发性能,然而也有以下三个问题:

  • 乐观策略只能保障一个共享变量的原子操作,如果是多个变量,CAS 便不如互斥锁,次要是 CAS 操作的局限所致。
  • 长时间循环操作可能导致开销过大。
  • 经典的 ABA 问题:次要是查看某个内存值是否与该线程之前取到值一样,这个判断逻辑不谨严。解决 ABA 问题的外围在于,引入版本号,每次更新变量值更新版本号。

而在 AQS 同步器中,为了保障并发实现保障原子性,而且是硬件级别的原子性,个别是通过 JNI(Java native interface,Java 本地接口)形式让 Java 代码调用 C /C++ 本地代码。

通过剖析源码可知,个别应用 Unsafe 类须要只用关注如下办法即可:


public final class Unsafe {

    private static final Unsafe theUnsafe;

    private static native void registerNatives();

    private Unsafe() {}

    @CallerSensitive
    public static Unsafe getUnsafe() {Class var0 = Reflection.getCallerClass();
        if (!VM.isSystemDomainLoader(var0.getClassLoader())) {throw new SecurityException("Unsafe");
        } else {return theUnsafe;}
    }

    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

    public native void unpark(Object var1);

    public native void park(boolean var1, long var2);

}
  • rgisterNatives()办法:是一个动态 办法,次要用于注册本地办法
  • 构造函数是 private 私有化的,个别无奈通过构造函数来实例化 Unsafe 对象
  • getUnsafe()办法:是用来获取 Unsafe 对象的,尽管是公有化的,然而如果 Java 语言开发层面的对进行安全检查

个别地,因为 Unsafe 类的操作波及到硬件底层的操作,JDK 对其实例化做了平安校验,只有受零碎信赖的代码才对其实例化,次要是通过类加载器来解析,其实例化形式次要有如下形式:

  • 第一种:间接调用该办法,次要旧式是 Unsafe.getUnsafe()。然而,对于咱们理论开发来说,这种形式无奈通过平安校验行不通,零碎会抛出 throw new SecurityException(“Unsafe”)信息
  • 第二种:通过反射机制绕过安全检查,次要是批改 Unsafe 类中 theUnsafe 字段的拜访权限,让其能被拜访从而达到获取 Unsafe 对象的目标

须要留神的是,在 Java 畛域中,对于 CAS 操作实现,次要有两点问题:

  • JDK1.8 版本之前,CAS 操作次要应用 Unsafe 类来执行底层操作,个别并发和线程操作时,次要用 compareAndSwapObject,compareAndSwapInt,compareAndSwapLong 等来实现 CAS,而对于线程调度次要是 park 和 unpark 办法,其次要在 sun.misc 包上面。
  • JDK1.8 版本之后,JDK1.9 的 CAS 操作次要应用 VarHandle 类,只是用 VarHandle 代替了一部分 Unsafe 类的操作,然而对于新版本中 Unsafe,实质上 Unsafe 类会间接调用 jdk.internal.misc 包上面 Unsafe 类来实现。

3. 具体实现

在 Java 畛域中,AQS 同步器利用独享模式和共享模式来实现同步机制,次要为解决多线并发执行中数据竞争和竞争条件问题。

为解决多线并发执行中数据竞争和竞争条件问题,引入了同步机制,次要是通过管制共享数据和临界区的拜访,个别比拟通用的形式是通过锁机制来实现。

在 Java 畛域中,JDK 对于 AQS 根底同步器形象封装了锁的获取和开释操作,次要提供了独享和共享两种工作模式:

  • 独享模式(Exclusive Mode):对应着独享锁(Exclusive Lock),示意着对于锁的获取和开释,一次只能至少一个或者只有一个线程把持,其余线程无奈取得并取得持有,必须期待持有线程开释锁。
  • 共享模式(Shared Mode):对应着共享锁(Shared Lock),示意着对于锁的获取和开释,一次能够至多一个或者容许多个线程把持,其余线程能够取得并取得持有,不必期待持有线程开释锁。

其中,AQS 根底同步器对于独享模式和共享模式的工作模式的根本流程,次要如下:

  • 获取锁流程:先尝试获取锁,如果获取胜利则往下持续进行,否则把线程保护到期待队列中,线程可能会挂起。
  • 开释锁流程:唤醒期待队列中的一个或者多个线程去尝试获取须要开释的锁。

个别地,AQS 根底同步器对于独享模式和共享模式的封装和实现,其中:

3.1. 独享模式的技术实现

[1]. 获取锁操作相干的外围逻辑,次要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 独占模式:[1]. 通过 acquire 获取锁操作
     */

    public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}


    /**
     * 独占模式:[2]. 通过 tryAcquire 尝试获取锁操作
     */
    protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
    }



    /**
     * 独占模式:[3]. 通过 addWaiter 入队操作
     */
    private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /**
     * 独占模式:[4]. 通过 acquireQueued 检测节点
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 独占模式:[5]. 通过 cancelAcquire 勾销锁获取
     */
    private void cancelAcquire(Node node) {if (node == null)
            return;

        node.thread = null;


        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;


        Node predNext = pred.next;


        node.waitStatus = Node.CANCELLED;


        if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
        } else {

            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {unparkSuccessor(node);
            }

            node.next = node; 
        }
    }


}

[2]. 开释锁操作相干的外围逻辑,次要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {


    /**
     * 独占模式:[1]. 通过 release 开释锁操作
     */
    public final boolean release(int arg) {if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * 独占模式:[2]. 通过 tryRelease 尝试开释锁操作
     */
    protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
    }

    /**
     * 独占模式:[3]. 通过 unparkSuccessor 唤醒后继节点
     */
    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

}

由此可见,对于独占模式的锁获取和开释,次要是根据 acquire 和 release 等办法来实现。

3.1. 共享模式的技术实现

[1]. 获取锁操作相干的外围逻辑,次要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 共享模式:[1]. 通过 acquireShared 获取锁操作
     */

    public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * 共享模式:[2]. 通过 tryAcquireShared 尝试获取锁操作
     */
    protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
    }

    /**
     * 共享模式:[3]. 通过 doAcquireShared 入队操作
     */
    private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {final Node p = node.predecessor();
                if (p == head) {int r = tryAcquireShared(arg);
                    if (r >= 0) {setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

}

[2]. 开释锁操作相干的外围逻辑,次要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 共享模式:[1]. 通过 releaseShared 开释锁操作
     */

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     * 共享模式:[2]. 通过 tryReleaseShared 尝试开释锁操作
     */


    protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
    }

    /**
     * 共享模式:[3]. 通过 doReleaseShared 开释锁就绪操作
     */
    private void doReleaseShared() {for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);
                } else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }
        
}

由此可见,对于共享模式的锁获取和开释,次要是根据 acquireShared 和 releaseShared 等办法来实现。

综上所述,不难看出,AQS 同步器设计思维是通过继承的形式提供一个模板,其外围原理是治理一个共享状态,通过对状态的管制来实现不同的管制。

二. LockSupport 的设计与实现

在 Java 畛域中,LockSupport 次要从线程资源角度为同步器和锁提供根本线程阻塞和唤醒原语,是“期待 - 告诉”工作机制的实现。

一般来说,当一个线程 (Thread) 只有参加锁竞争时,其经验的次要流程有:

  • 一旦以后线程进行锁竞争时,线程都会尝试获取锁,依据获取锁的状况进行后续解决。
  • 如果获取锁失败,则会创立节点插入到队列的尾部,会二次尝试从新获取锁,并不会阻塞以后线程。
  • 如果获取锁胜利,则间接返回,否则会将节点设置为待运行状态(SIGNAL)。
  • 最初对以后线程进行阻塞,当前驱节点运行实现后会唤醒后继节点。

在 Java 畛域中,对于线程的阻塞和唤醒,兴许咱们最早在学习面向对象准则的时候,个别都应用 java.lang.Object 类中 wait()、notify()、notifyAll() 这三个办法,能够用它们帮忙咱们实现期待 - 告诉”工作机制。

同时,在解说 AQS 根底同步器的实现时,提到说 CAS 操作的外围是应用 Unsafe 类来执行底层操作,对于线程调度次要是 park 和 unpark 办法,然而个别的 Java 语言层 main 开发对其调用又有安全检查的限度。

然而,在 AQS 根底同步的的阻塞和唤醒操作咋在获取锁饿的锁操作中须要应用,个别地:

  • 如果获取不到锁的以后线程在进入排到队列之后须要阻塞以后线程。
  • 并且,排到队列中前驱节点运行实现后,须要负责唤醒后继节点。

于是,在 AQS 根底同步器的设计与实现中,封装一个专门用于实现“期待 - 告诉”工作机制的 LockSupport 类。

对于 LockSupport 类,次要是为同步器和锁提供根本线程阻塞和唤醒原语,AQS 同步器和锁都是应用它来阻塞和唤醒线程。次要源码如下:


public class LockSupport {

    /**
     *  阻塞操作:利用 park 阻塞某个线程(指定参数)
     */
    public static void park(Object blocker) {Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0 L);
        setBlocker(t, null);
    }

    /**
     *  阻塞操作:利用 park 阻塞某个线程(无指定参数)
     */
    public static void park() {UNSAFE.park(false, 0 L);
    }


    /**
     *  阻塞操作:依据 nanos 许可,利用 parkNanos 阻塞某个线程
     */
    public static void parkNanos(long nanos) {if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    /**
     *  阻塞操作:依据 nanos 许可,利用 parkNanos 阻塞某个线程,然而指定阻塞对象
     */
    public static void parkNanos(Object blocker, long nanos) {if (nanos > 0) {Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }

    /**
     *  阻塞操作:依据 deadline 最大等待时间,利用 parkUntil 阻塞某个线程
     */
    public static void parkUntil(long deadline) {UNSAFE.park(true, deadline);
    }

    /**
     *  阻塞操作:依据 deadline 最大等待时间,利用 parkUntil 阻塞某个线程,须要指定阻塞对象
     */
    public static void parkUntil(Object blocker, long deadline) {Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }


    /**
     *  唤醒操作:利用 unpark 唤醒某个线程
     */
    public static void unpark(Thread thread) {if (thread != null)
            UNSAFE.unpark(thread);
    }

    /**
     *  设置阻塞器:指定线程和对象
     */
    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }

    /**
     *  获取阻塞器中线程对象
     */
    public static Object getBlocker(Thread t) {if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    /**
     *  获取阻塞器中线程对象
     */
    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13; // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;

    private static final long parkBlockerOffset;

    private static final long SEED;

    private static final long PROBE;

    private static final long SECONDARY;

    static {
        try {UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class <? > tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset(tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) {throw new Error(ex);
        }
    }

}
  • 设计思维:相比用 java.lang.Object 类中的 wait/notify 形式,其 LockSupport 类更关注线程自身,解耦了线程之间的同步。
  • 实现原理:次要还是应用 Unsafe 类来执行底层操作,次要是间接调用是 park 和 unpark 本地办法
  • 阻塞操作波及办法:个别以 park 结尾的办法来阻塞线程操作,大抵能够分为自定义阻塞对象参数和非自定义阻塞对象参数等阻塞办法
  • 唤醒线程操作:次要通过 unpark 办法来对以后线程设置可用,绝对于唤醒操作

三. Condition 接口的设计与实现

在 Java 畛域中,Condition 接口是用来实现管程技术,其中 Condition 用于解决同步问题。

绝对于 LockSupport 的设计与实现来说,Condition 接口只是在 JDK 层面对于阻塞和唤醒提供了一个模板的定义,是 AQS 根底同步器中条件队列的定义,而 ConditionObject 是在 AQS 根底同步器具体实现。

对于 Condition 接口而言,是提供了可代替 wait/notify 机制的条件队列模式,其中:


public interface Condition {

    /**
     * 条件队列模式:期待 await 操作
     */
    void await() throws InterruptedException;

    /**
     * 条件队列模式:期待 awaitUninterruptibly 操作,可中断模式
     */
    void awaitUninterruptibly();

    /**
     * 条件队列模式:期待 awaitNanos 操作,可超时模式
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 条件队列模式:期待 await 操作,可超时模式
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 条件队列模式:期待 awaitUntil 操作,可超时模式
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * 条件队列模式:告诉 signal 操作
     */
    void signal();

    /**
     * 条件队列模式:告诉 signalAll 操作
     */
    void signalAll();}
  • 定义了对于“期待(wait)”机制的相干实现:而以 await 结尾的所有办法都是对于期待机制的定义。
  • 定义了对于“告诉 (signal)”机制的相干实现:signal() 办法和 signalAll()办法,其中 signal()办法是随机地告诉期待队列中的一个线程,而 signalAll()办法是告诉期待队列中的所有线程。

四. Lock 接口的设计与实现

在 Java 畛域中,Lock 接口是用来实现管程技术,其中 Lock 用于解决互斥问题。

Lock 接口位于 java.util.concurrent.locks 包中,是 JUC 显式锁的一个形象,Lock 接口的次要形象办法:

public interface Lock {

    /**
     * Lock 接口 - 获取锁
     */
    void lock();

    /**
     * Lock 接口 - 获取锁(可中断)
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * Lock 接口 - 尝试获取锁
     *
     */
    boolean tryLock();

    /**
     *Lock 接口 - 尝试获取锁(反对超时)
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     *Lock 接口 - 开释锁
     *
     */
    void unlock();

    /**
     *  Lock 接口 - 设置条件变量
     */
    Condition newCondition();}
  • 获取锁:lock()
  • 开释锁:unlock()
  • 条件变量:Condition

在 JDK 中,对于 Lock 接口的具体实现次要是 ReentrantLock 类,其中:


public class ReentrantLock implements Lock, java.io.Serializable {

    private static final long serialVersionUID = 7373984872572414699 L;

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;


    /**
     * 结构锁的非偏心模式(默认模式)
     */
    public ReentrantLock() {sync = new NonfairSync();
        }
                
    /**
     * 结构锁的偏心和非偏心模式(可选偏心或者非偏心)
     */
    public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

    /**
     * Lock 接口 - 实现尝试获取锁
     */
    public void lock() {sync.lock();
    }

    /**
     * Lock 接口 - 实现尝试获取锁
     */
    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
    }

    /**
     * Lock 接口 - 实现尝试获取锁
     */
    public boolean tryLock() {return sync.nonfairTryAcquire(1);
    }

    /**
     * Lock 接口 - 实现尝试获取锁
     */
    public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * Lock 接口 - 开释锁
     */
    public void unlock() {sync.release(1);
    }

    /**
     * Lock 接口 - 创立条件变量
     */
    public Condition newCondition() {return sync.newCondition();
    }


}
  • 蕴含了一个同步器 Sync,次要是基于 AbstractQueuedSynchronizer 实现,同时基于 Sync 类还实现 FairSync 类和 NonfairSync 类,其中 FairSync 类对应着偏心模式,NonfairSync 类对应非偏心模式。
  • 实现 Lock 接口设计和定义的相干办法,能够设定其锁是偏心和非公的,默认是非偏心模式的。

绝对于 Java 内置锁,Java SDK 并发包里的 Lock 接口次要区别有可能响应中断、反对超时和非阻塞地获取锁等三个个性。

五. ReadWriteLock 接口的设计与实现

在 Java 畛域中,ReadWriteLock 接口次要是基于 Lock 接口来封装了 ReadLock 锁和 WriteLock 锁等 2 种锁的实现办法,其中 ReadLock 锁是读锁的接口定义,WriteLock 锁是写锁的接口定义。

ReadWriteLock 接口的外部实现,次要是基于 Lock 接口来获取 ReadLock 锁和 WriteLock 锁的,其具体代码如下:


public interface ReadWriteLock {
    /**
     * ReadWriteLock 接口 - 基于 Lock 接口实现 ReadLock
     */
    Lock readLock();

    /**
     * ReadWriteLock 接口 - 基于 Lock 接口实现 WriteLock
     */
    Lock writeLock();}
  • readLock()办法:获取读锁,次要是基于基于 Lock 接口来定义,示意着其具体实现类都会基于 AQS 根底同步器实现
  • writeLock()办法:获取写锁,次要是基于基于 Lock 接口来定义,示意着其具体实现类都会基于 AQS 根底同步器实现

在 JDK 中,对于 ReadWriteLock 接口的具体实现次要是 ReentrantReadWriteLock 类,其中:


public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {

    private static final long serialVersionUID = -6992448646407690164 L;

    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;

    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    /** Performs all synchronization mechanics */
    final Sync sync;


    /**
     * 同步器 - 基于 AbstractQueuedSynchronizer 实现
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //.....

        abstract boolean readerShouldBlock();

        abstract boolean writerShouldBlock();}

    /**
     * 同步器 - 非偏心模式
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037 L;

        final boolean writerShouldBlock() {return false;}

        final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();
        }
    }

    /**
     * 同步器 - 偏心模式
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451 L;
        final boolean writerShouldBlock() {return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {return hasQueuedPredecessors();
        }
    }


    /**
     * 结构锁的非偏心模式(默认预设)
     */
    public ReentrantReadWriteLock() {this(false);
    }

    /**
     * 结构锁的偏心和非偏心模式(可选偏心或者非偏心)
     */
    public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    /**
     * ReadWriteLock 接口 - 基于 Lock 具体实现的 writeLock()
     */
    public ReentrantReadWriteLock.WriteLock writeLock() {return writerLock;}

    /**
     * ReadWriteLock 接口 - 基于 Lock 具体实现的 readLock()
     */
    public ReentrantReadWriteLock.ReadLock readLock() {return readerLock;}

    /**
     * ReadWriteLock 接口 -ReadLock 内置类
     */
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164 L;

        private final Sync sync;

        //.....
    }

    /**
     * ReadWriteLock 接口 -WriteLock 内置类
     */
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164 L;

        private final Sync sync;

        //.....
    }

}
  • 蕴含了一个同步器 Sync,次要是基于 AbstractQueuedSynchronizer 实现,同时基于 Sync 类还实现 FairSync 类和 NonfairSync 类,其中 FairSync 类对应着偏心模式,NonfairSync 类对应非偏心模式。
  • 次要是内置定义了 ReadLock 类和 WriteLock 类等两个外部类,其中 ReadLock 类为读锁,而而 WriteLock 类为写锁,能够设定其锁是偏心和非公的,默认是非偏心模式的。
  • 实现了 ReadWriteLock 接口,通过外部类的定义来具体实现对应的锁,其中 ReadLock 类对应 readLock()办法,而 WriteLock 类对应 writeLock()办法。

综上所述,从 JDK 中对于 ReadWriteLock 接口的具体实现来看,咱们不难发现,ReadWriteLock 接口是实现读写锁一种模板定义,咱们能够基于这个接口来实现满足咱们理论业务需要的读写锁。

写在最初

在并发编程畛域,有两大外围问题:一个是互斥,即同一时刻只容许一个线程访问共享资源;另一个是同步,即线程之间如何通信、合作。

次要起因是,对于多线程实现实现并发,始终以来,多线程都存在 2 个问题:

  • 线程之间内存共享,须要通过加锁进行管制,然而加锁会导致性能降落,同时简单的加锁机制也会减少编程编码难度
  • 过多线程造成线程之间的上下文切换,导致效率低下

因而,在并发编程畛域中,始终有一个很重要的设计准则:“不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”

简略来说,就是尽可能通过音讯通信,而不是内存共享来实现过程或者线程之间的同步。

最初,技术钻研之路任重而道远,愿咱们熬的每一个通宵,都撑得起咱们想在这条路上走上来的勇气,将来依然可期,与各位程序编程君共勉!

版权申明:本文为博主原创文章,遵循相干版权协定,如若转载或者分享请附上原文出处链接和链接起源。

正文完
 0