舒适提醒:本文内容较长废话较多,如有心脏病、精神病史等请酌情查看。
一、概述
本文源码基于openJDK8u。在浏览本文前,你须要对并发有所理解。
在并发中,为了解决程序中多个过程和线程对资源的抢占问题,在 Java 中引入了锁的概念。
各种各样的锁,对于初碰 Java 并发的同学来说,面对多达 20 种的锁,霎时懵逼,退游戏这把鸡劳资不吃了......
其实不要缓和,尽管锁的品种很多,然而都是依据其个性衍生进去的概念而已,如果你对 Java 锁不是很清晰,心愿这篇文章可能对你有所帮忙。敌人们,如果你不会做饭或者不晓得吃什么请关注我麻辣德子感谢您的双...呸,学好 Java , 回绝沉迷某音,哈哈哈哈哈~
上面,是一张对于锁的思维导图,带大家有一个总体的意识,配合此图食用成果更佳哈。
ps:如果看不清楚能够点击点击原图查看哦。
二、synchronized
带你跑毒
为了不便大家由浅入深(狐疑作者开车然而没有证据...),咱们从大家比拟相熟的 synchronized
说起。对于 Java 使用者来说,synchronized
关键字是实现锁的一种重要形式。
package com.aysaml.demo.test;import com.google.common.util.concurrent.ThreadFactoryBuilder;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * SynchronizedDemo 示例 * * @author wangning * @date 2019-11-26 */public class SynchronizedDemo { private static int count = 0; private static void addCount() { count++; } public static void main(String[] args) { int loopCount = 1000; ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); ExecutorService executorService = new ThreadPoolExecutor(10, 1000, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < loopCount; i++) { Runnable r = SynchronizedDemo::addCount; executorService.execute(r); } executorService.shutdown(); System.out.println(SynchronizedDemo.count); }}
措不迭防的代码粘贴,哈哈哈。
下面是比拟经典的线程并发问题示例。运行这段代码,失去的后果多种多样,996、997、998.....
后果并不像咱们预期的那样是1000,遇到多线程资源竞争时咱们可能第一反馈的就是加 synchronized
,简略粗犷,于是变成下边这样:
private static synchronized void addCount() { count++; }
在累加办法加上 synchronized
,就给这个办法加了锁,如此,每次执行的后果就合乎了咱们的预期。
Java 内置了一个反编译工具 javap
能够反编译字节码文件,通过执行命令 javap -verbose -p SynchronizedDemo.class
能够看到上述这个类的字节码文件。
查找被加锁的办法 addCount()
发现,synchronized
润饰办法 时是通过 ACC_SYNCHRONIZED 标记符号指定该办法是一个同步办法,从而执行相应的同步调用。
同样查看字节码,能够晓得 synchronized
润饰代码块 时 通过monitorenter 和 monitorexit 指令来解决同步问题,其中 monitorenter 指令指向同步代码块的开始地位,monitorexit 指令指明同步代码块的完结地位。
三、各种锁的解释,猥琐发育捡枪捡子弹
这部分只是简略说一下各种锁以及相干概念,让大家有一个简略理解,其中相应的在 Java 中的实现会用较长的篇幅做介绍。
偏差锁、轻量级锁、重量级锁
在程序第一次执行到 synchronized
代码块的时候,锁对象变成 偏差锁 ,即偏差于第一个取得它的线程的锁。在程序第二次执行到改代码块时,线程会判断此时持有锁的线程是否就是它本人,如果是就持续往下面执行。值得注意的是,在第一次执行完同步代码块时,并不会开释这个偏差锁。从效率角度来看,如果第二次执行同步代码块的线程始终是一个,并不需要从新做加锁操作,没有额定开销,效率极高。
后面说的只有一个线程同步执行代码块只是现实的状态下(如果只有一个线程也不必思考并发的问题了,尽管这么说有点不太谨严哈...),一旦有第二个线程退出 锁竞争 ,偏差锁就主动降级为 轻量级锁 。而这里不同状况需值得注意:当第二个线程想要获取锁时,且这个锁是偏差锁时,会判断以后持有锁的线程是否依然存活,如果该持有锁的线程没有存活,那么偏差锁并不会降级为轻量级锁 。什么是锁竞争:即一个线程想要获取另一个线程持有的锁 。
在此状态下各个线程持续做锁竞争,没有抢到锁的线程循环判断是否可能胜利获取锁,这种状态称为 自旋 ,故轻量级锁是一种 自旋锁 。虚拟机中有个计数器用来记录自旋次数,默认容许循环10次,这个值能够通过虚拟机参数-XX:PreBlockSpin
来进行批改。如果锁竞争特地重大,达到这个自旋次数最大的限度,轻量级锁就会降级为重量级锁。当其余线程尝试获取锁的时候,发现当初的锁是重量级锁,则间接将本人挂起,期待未来被唤醒。
对于这块的更多信息,能够参考《Synchronized与三种锁态》
偏心锁、非偏心锁
当一个线程持有的锁开释时,其余线程依照先后顺序,先申请的先失去锁,那么这个锁就是偏心锁。反之,如果后申请的线程有可能先获取到锁,就是非偏心锁 。
Java 中的 ReentrantLock
能够通过其构造函数来指定是否是偏心锁,默认是非偏心锁。一般来说,应用非偏心锁能够取得较大的吞吐量,所以举荐优先应用非偏心锁。
synchronized
就是一种非偏心锁。
乐观锁、乐观锁
先说乐观锁,即在读数据的时候总认为其余线程会对数据进行批改,所以采取加锁的模式,一旦本线程要读取数据时,就加锁,其余线程被阻塞,期待锁的开释。所以乐观锁总结为乐观加锁阻塞线程。
在读数据时总认为其余线程不会对数据做批改,在更新数据时会判断其余线程有没有更新数据,如果有更新,则从新读取,再次尝试更新,循环上述步骤直到更新胜利,即为乐观锁。
这样来看乐观锁实际上是没有锁的,只是通过一种比拟替换的办法来保证数据同步,总结为乐观无锁回滚重试。
CAS(比拟和替换)
CAS, 英文直译为 compare and swap,即比拟和替换。下面乐观锁也说了,其实就是一种比拟与替换的过程。
简略形容一下就是:读取到一个值为 A ,在要将这个值更新为B 之前,查看是否等于 A (比拟),如果是则将 A 更新为 B(替换) ,否则什么都不做。
通过这种形式,能够实现不用应用加锁的形式,就能保障资源在多线程之间的同步,显然,不阻塞线程,能够大大提高吞吐量。形式虽好,然而也存在问题。
ABA 问题,即如果一个值从 A 变为 B 再变回 A 时,这样 CAS 就会认为值没有发生变化。
- 对于这个问题,曾经有了应用版本号的解决形式,即每次变量更新的时候变量的版本号都 +1,即由
A->B->A
就变成了1A->2B->3A
。
- 对于这个问题,曾经有了应用版本号的解决形式,即每次变量更新的时候变量的版本号都 +1,即由
- 循环工夫长开销大,如果锁的竞争比拟强烈,就会导致 CAS 一直的反复执行,始终循环,消耗 CPU 资源。
- 只能保障一个变量的同步,显然,因为其个性,CAS 只能保障一个共享变量的原子操作。
可重入锁
可重入锁即容许多个线程屡次获取同一把锁,那从锁自身的角度来看,就是能够从新进入该锁。比方有一个递归函数外面有加锁操作,如果这个锁不阻塞本人,就是可重入锁,故也称递归锁 。
再看下面的偏差锁、轻量级锁、重量级锁能够晓得synchronized关键字加锁是可重入的 ,不仅如此,JDK 中实现 Lock 接口的锁都是可重入的 。感兴趣的读者能够自行理解怎么实现不可重入锁,这里只讲一下锁的定义。
可中断锁
如果线程A持有锁,线程B期待获取该锁。因为线程A持有锁的工夫过长,线程B不想持续期待了,咱们能够让线程B中断本人或者在别的线程里中断它,这种就是可中断锁。
在 Java 中,synchronized就是不可中断锁,而Lock的实现类都是可中断锁。
独享锁、共享锁
独享锁亦称互斥锁,排它锁,容易了解这种锁每次只容许一个线程持有。反之,就是共享锁啦。
读锁、写锁
下面说独享锁和共享锁,其实读写锁就是其最典型的锁。写锁是独享锁,读锁是共享锁。在前面咱们会着重说一下Java 中的读写锁实现。
三、CAS 在 Java 中的实现,带上这把 M4-CAS
通过上面对 CAS 的简略介绍,置信大家对 CAS 也有了一个比较简单的概念:通过比拟和替换实现单个变量的线程平安 。
JDK 中对 CAS 的实现在 java.util.concurrent.atomic
包中:
类名 | 形容 |
---|---|
AtomicBoolean | 能够用原子形式更新的 boolean 值。 |
AtomicInteger | 能够用原子形式更新的 int 值。 |
AtomicIntegerArray | 能够用原子形式更新其元素的 int 数组。 |
AtomicIntegerFieldUpdater | 基于反射的实用工具,能够对指定类的指定 volatile int 字段进行原子更新。 |
AtomicLong | 能够用原子形式更新的 long 值。 |
AtomicLongArray | 能够用原子形式更新其元素的 long 数组。 |
AtomicLongFieldUpdater | 基于反射的实用工具,能够对指定类的指定 volatile long 字段进行原子更新。 |
AtomicMarkableReference | AtomicMarkableReference 保护带有标记位的对象援用,能够原子形式对其进行更新。用来解决 ABA 问题,只关怀有没有被批改过。 |
AtomicReference | 能够用原子形式更新的对象援用。 |
AtomicReferenceArray | 能够用原子形式更新其元素的对象援用数组。 |
AtomicReferenceFieldUpdater | 基于反射的实用工具,能够对指定类的指定 volatile 字段进行原子更新。 |
AtomicStampedReference | AtomicStampedReference 保护带有整数“标记”的对象援用,能够用原子形式对其进行更新。用来解决 ABA 问题,与下面的 AtomicMarkableReference 相比,除了关系有没有被批改过之外,还关怀批改了几次。 |
以 AtomicInteger
为例,看看它是如何保障对 int 的操作线程平安的。
package java.util.concurrent.atomic;import java.util.function.IntUnaryOperator;import java.util.function.IntBinaryOperator;import sun.misc.Unsafe;public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // 应用 Unsafe.compareAndSwapInt 进行数据更新 private static final Unsafe unsafe = Unsafe.getUnsafe(); // 内存偏移量,即内存地址 private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; /** * 带有初值的构造函数 * * @param initialValue the initial value */ public AtomicInteger(int initialValue) { value = initialValue; } /** * 无参构造函数默认值为0 */ public AtomicInteger() { } /** * 取得以后值 * * @return the current value */ public final int get() { return value; } /** * 设置所给值,因为value是应用volatile关键字润饰,所以一经批改,其余线程会立刻看到value的批改 * * @param newValue the new value */ public final void set(int newValue) { value = newValue; } /** * 设置给定的值,通过调用Unsafe的提早设置办法不保障后果被其余线程立刻看到 * * @param newValue the new value * @since 1.6 */ public final void lazySet(int newValue) { unsafe.putOrderedInt(this, valueOffset, newValue); } /** * 原子形式设置新值,返回旧值 * * @param newValue the new value * @return the previous value */ public final int getAndSet(int newValue) { return unsafe.getAndSetInt(this, valueOffset, newValue); } /** * 应用 CAS 形式设置新值,胜利返回true * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } /** * 应用 CAS 形式更新值 * * <p><a href="package-summary.html#weakCompareAndSet">May fail * spuriously and does not provide ordering guarantees</a>, so is * only rarely an appropriate alternative to {@code compareAndSet}. * * @param expect the expected value * @param update the new value * @return {@code true} if successful */ public final boolean weakCompareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } /** * 原子减少 * * @return the previous value */ public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } /** * 原子减1 * * @return the previous value */ public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); } /** * 原子减少给定值 * * @param delta the value to add * @return the previous value */ public final int getAndAdd(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta); } /** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } /** * Atomically decrements by one the current value. * * @return the updated value */ public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } /** * Atomically adds the given value to the current value. * * @param delta the value to add * @return the updated value */ public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) + delta; } /** * Atomically updates the current value with the results of * applying the given function, returning the previous value. The * function should be side-effect-free, since it may be re-applied * when attempted updates fail due to contention among threads. * * @param updateFunction a side-effect-free function * @return the previous value * @since 1.8 */ public final int getAndUpdate(IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return prev; } /** * Atomically updates the current value with the results of * applying the given function, returning the updated value. The * function should be side-effect-free, since it may be re-applied * when attempted updates fail due to contention among threads. * * @param updateFunction a side-effect-free function * @return the updated value * @since 1.8 */ public final int updateAndGet(IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return next; } /** * Atomically updates the current value with the results of * applying the given function to the current and given values, * returning the previous value. The function should be * side-effect-free, since it may be re-applied when attempted * updates fail due to contention among threads. The function * is applied with the current value as its first argument, * and the given update as the second argument. * * @param x the update value * @param accumulatorFunction a side-effect-free function of two arguments * @return the previous value * @since 1.8 */ public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) { int prev, next; do { prev = get(); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSet(prev, next)); return prev; } /** * Atomically updates the current value with the results of * applying the given function to the current and given values, * returning the updated value. The function should be * side-effect-free, since it may be re-applied when attempted * updates fail due to contention among threads. The function * is applied with the current value as its first argument, * and the given update as the second argument. * * @param x the update value * @param accumulatorFunction a side-effect-free function of two arguments * @return the updated value * @since 1.8 */ public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) { int prev, next; do { prev = get(); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSet(prev, next)); return next; } /** * Returns the String representation of the current value. * @return the String representation of the current value */ public String toString() { return Integer.toString(get()); } /** * Returns the value of this {@code AtomicInteger} as an {@code int}. */ public int intValue() { return get(); } /** * Returns the value of this {@code AtomicInteger} as a {@code long} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public long longValue() { return (long)get(); } /** * Returns the value of this {@code AtomicInteger} as a {@code float} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public float floatValue() { return (float)get(); } /** * Returns the value of this {@code AtomicInteger} as a {@code double} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public double doubleValue() { return (double)get(); }}
源码中的正文写的很具体,写正文写到一半决定不做谷歌翻译了...次要是通过 Unsafe 提供的 compareAndSwapInt(Object var1, long var2, int var4, int var5)
等办法来实现 CAS 原子操作,Unsafe 提供了执行低级别、不平安操作的办法,如间接拜访零碎内存资源、自主治理内存资源等。
CAS操作蕴含三个操作数---内存地位、预期原值及新值。执行 CAS 操作的时候,将内存地位的值与预期原值比拟,如果相匹配,那么处理器会主动将该地位值更新为新值,否则,处理器不做任何操作。咱们都晓得,CAS 是一条 CPU 的原子指令( cmpxchg 指令),不会造成所谓的数据不统一问题,Unsafe 提供的 CAS 办法(如 compareAndSwapXXX )底层实现即为 CPU 指令 cmpxchg 。
如果小伙伴对其感兴趣能够参考 《Java魔法类:Unsafe利用解析》
四、AQS 给你穿上三级甲
AQS,全名 AbstractQueuedSynchronizer
,直译为形象队列同步器,是构建锁或者其余同步组件的根底框架,能够解决大部分同步问题。实现原理能够简略了解为:同步状态( state ) + FIFO 线程期待队列 。
资源 state
AQS应用了一个 int 类型的成员变量 state 来示意同步状态,应用了volatile
关键字来保障线程间的可见性,当 state > 0 时示意曾经获取了锁,当 state = 0 时示意开释了锁。它提供了三个办法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,确保对state的操作是平安的。
❀ 而对于不同的锁,state 也有不同的值:- 独享锁中 state =0 代表开释了锁,state = 1 代表获取了锁。
- 共享锁中 state 即持有锁的数量。
- 可重入锁 state 即代表重入的次数。
- 读写锁比拟非凡,因 state 是 int 类型的变量,为 32 位,所以采取了两头切割的形式,高 16 位标识读锁的数量 ,低 16 位标识写锁的数量 。
- FIFO 线程期待队列
实现队列的形式无外乎两种,一是应用数组,二是应用 Node 。AQS 应用了 Node 的形式实现队列。
static final class Node { /** 标记一个节点是在共享模式,默认为共享模式 */ static final Node SHARED = new Node(); /** 标记一个节点为独占模式 */ static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** 以此变量来示意以后线程的状态 */ volatile int waitStatus; /** 前驱 */ volatile Node prev; /** 后继 */ volatile Node next; /** 用于保留线程 */ volatile Thread thread; /** 保留下一个处于期待状态的Node */ Node nextWaiter; /** * 用来判断是否是共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // 无参构造方法,默认为共享模式 } Node(Thread thread, Node mode) { // 用于结构下一个期待线程Node节点 this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // 用于结构带有本线程状态的Node this.waitStatus = waitStatus; this.thread = thread; } }
在 AQS 中定义了两个节点,别离为头尾节:
/** 期待队列的头结点,作为队列的初始化节点,只能通过setHead()办法设置值, * 而这个办法将Node的变量值都置空,便于及时GC。当其有值时,必须保障waiteStatus为CANCELLED状态。 */ private transient volatile Node head; /** * 用于保留线程期待队列的尾结点。 * 通过enq()办法设置值。 */ private transient volatile Node tail;
这个队列的构造见下图:
AQS 的一堆办法,依照获取锁和解锁的维度能够分为上面这样:
获取锁相干办法
办法 | 形容 |
---|---|
acquire(int arg) | 独占模式获取锁,疏忽中断。 |
acquireInterruptibly(int arg) | 独占模式获取锁,如果被中断则停止。 |
acquireShared(int arg) | 共享模式获取锁,疏忽中断。 |
acquireSharedInterruptibly(int arg) | 共享模式获取锁,如果被中断则停止。 |
tryAcquire(int arg) | 尝试在独占模式获取锁。由子类自行实现。 |
tryAcquireNanos(int arg, long nanosTimeout) | 尝试在独占模式获取锁,如果被中断则停止,如果到了给定超时工夫 nanosTimeout ,则会失败。 |
tryAcquireShared(int arg) | 尝试在共享模式获取锁。 |
tryAcquireSharedNanos(int arg, long nanosTimeout) | 尝试在共享模式获取锁,如果被中断则停止,如果到了给定超时工夫 nanosTimeout ,则会失败。 |
addWaiter(Node mode) | 将以后线程退出到CLH队列队尾。 |
acquireQueued(final Node node, int arg) | 以后线程会依据公平性准则来进行阻塞期待,直到获取锁为止;并且返回以后线程在期待过程中有没有中断过。 |
selfInterrupt() | 产生一个中断。 |
解锁相干办法
办法 | 形容 |
---|---|
release(int arg) | 以独占模式开释对象。 |
releaseShared(int arg) | 以共享模式开释对象。 |
tryRelease(int arg) | 试图设置状态来反映独占模式下的一个开释。由子类自行实现。 |
tryReleaseShared(int arg) | 试图设置状态来反映共享模式下的一个开释。 |
unparkSuccessor(Node node) | 用来唤醒节点。 |
五、Lock
带你吃鸡
除了 synchronized
外,在 Java 中还有 Lock
接口的一系列实现来加锁。
如上绿色虚线示意实现,WriteLock
、ReadLock
、ReentrantLock
都实现了 Lock
接口,三者别离对应读锁、写锁和可重入锁,其中 ReadWriteLock
定义了读锁和写锁,ReentrantReadWriteLock
以动态外部类的模式实现了读写锁。
首先创立一个单例读写锁:
package com.aysaml.demo.test;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** * 单例读写锁 * * @author wangning * @date 2019-11-26 */public enum Locker { instance; private Locker() { } private static final ReadWriteLock lock = new ReentrantReadWriteLock(); public Lock writeLock() { return lock.writeLock(); }}
如此能够在上述的累加办法下面加锁做同步:
private static void addCount() { Lock locker = Locker.instance.writeLock(); locker.lock(); count++; locker.unlock(); }
看下运行后果,如愿以偿(总感觉这个词用在这不太对,真是切实想不出什么词了,哈哈,意思你们懂就好)。
两种加锁形式比拟:
synchronized
属于互斥锁,任何时候只容许一个线程的读写操作,其余线程必须期待;ReadWriteLock
容许多个线程取得读锁,但只容许一个线程取得写锁,效率绝对较高。
看了下面的 Lock 接口的实现图,咱们晓得在 Java 中锁有三个重要实现,上面一一来看。
读锁写锁形象队列同步器三级甲
下面说了读写锁属于共享锁,即容许同一时刻有多个线程获取锁。在一些业务中,读的操作比写的操作多,相比拟 synchronized 而言,读写锁采纳 CAS 形式保障资源同步,所以应用读写锁能够大大增加吞吐量。
在Java 中 ReentrantReadWriteLock
中以内部类的模式实现了读写锁,如下:
再接着别离看他们的实现:
- ReadLock
- WriteLock
能够看到两个锁中的加锁操作都有一个要害的货色 Sync :
Sync 是 ReentrantReadWriteLock
的一个形象外部类,它继承了 AbstractQueuedSynchronizer
并实现了共享与独享形式的同步操作。读写锁正好是一对共享&独占锁,而同步的队列也有共享和独占之分,那咱们就从它们的加锁和解锁别离来看 AQS 的工作流程:
写锁(独占式)
加锁
public void lock() { sync.acquire(1); }
这是 WriteLock 的加锁办法,能够看到加锁实际上是调用了 Sync 的 acquire(int arg) 办法,而这个办法是在 AQS 中实现的,它应用 final 关键字来做润饰,在子类中不可重写。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
那 acquire(int arg)
做了什么呢,能够看到执行了四个办法:
- tryAcquire:尝试获取锁,获取胜利则设置锁状态并返回 true ,否则返回 false 。需子类自行实现。
- addWaiter:将以后线程退出到 CLH 队列队尾。已有实现。
- acquireQueued:以后线程会依据公平性准则来进行阻塞期待,直到获取锁为止;并且返回以后线程在期待过程中有没有中断过。已有实现。
- selfInterrupt:产生中断。已有实现。
后面咱们说 AQS 由线程状态 state 和 线程期待队列组成,AQS 加锁解锁的过程实际上就是对线程状态的批改和期待队列的出入队列操作,而 AQS 的子类能够通过重写 tryAcquire(int acquires)
办法来对 state 进行批改操作。于是就有 ReentrantReadWriteLock 中的 Sync 重写了 tryAcquire 办法:
protected final boolean tryAcquire(int acquires) { // 获取以后线程 Thread current = Thread.currentThread(); // 拿到state变量,即锁的个数 int c = getState(); // 取得写锁的数量,前边说了低16位示意写锁个数 int w = exclusiveCount(c); // 若该线程曾经持有锁 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 如果写锁数量为0或者持有锁的线程不是以后线程,返回 false if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果写入锁数量大于最大数量(65535),跑出异样 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } // 如果写线程数为0,并且以后线程须要阻塞那么就返回失败;或者如果通过CAS减少写线程数失败也返回失败。 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 设置以后线程为锁的拥有者 setExclusiveOwnerThread(current); return true; }
再来看下 addWaiter 办法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 通过 CAS 设置尾结点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果不胜利则屡次尝试 enq(node); return node; }
enq(Node node) 办法如下:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
能够看到 enq 办法应用了死循环的形式统一尝试设置尾结点,直到胜利。
如此入队操作就能够简略了解为:tail 指向新节点、新节点的 prev 指向以后最初的节点,以后最初一个节点的 next 指向以后节点。
解锁
public void unlock() { sync.release(1); }
解锁调用了 Sync 的 release 办法,上面看看这个办法都做了什么:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒节点 unparkSuccessor(h); return true; } return false; }
与加锁相似,tryRelease(arg) 由 AQS 的子类自行重写,
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
开释锁比较简单,开释锁的实现是通过 CAS 批改 waitStatus 为 0 来实现的,而后通过 LockSupport.unpark(s.thread)
唤醒线程 。
为了不便了解,在这里总结一下 WriteLock 的工作流程图:
❀ 加锁操作
读锁 (共享式)
读锁的加锁操作与写锁相似:
public void lock() { sync.acquireShared(1); }
调用自定义的 tryAcquireShared(arg) 办法获取同步状态
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
如果获取失败,调用 doAcquireShared(int arg) 办法自旋形式获取同步状态:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
再看一下 tryAcquireShared(int unused) 办法:
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 如果其余线程曾经获取了写锁,则以后线程获取读锁失败,进入期待状态 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
在这个办法中能够晓得,如果其余线程曾经获取了写锁,则以后线程获取读锁失败,进入期待状态。如果以后线程获取了写锁或者写锁未被获取,则以后线程减少读状态,胜利获取读锁。
六、结语
可能读完之后小伙伴并没有整明确,反而更懵逼了;或者基本没有读完。概念比拟多,贴的代码也比拟多,能够配合其中几个比拟重要的图去了解,几个比拟要害的点:锁的分类思维导图、AQS、CAS、写锁的加锁实现过程图。因为笔者程度所限,如上都是在学习的过程中总结、整顿所得,仅供参考。
欢送拜访集体博客 获取更多常识分享。