在多线程编程中我们会遇到很多需要使用线程同步机制去解决的并发问题,而这些同步机制就是多线程编程中影响正确性和运行效率的重中之重。这不禁让我感到好奇,这些同步机制是如何实现的呢?好奇心是进步的源泉,就让我们一起来揭开同步机制源码的神秘面纱吧。
在本文中,我们会从 JDK 中大多数同步机制的共同基础 AbstractQueuedSynchronizer
类开始说起,然后通过源码了解我们最常用的两个同步类可重入锁 ReentrantLock
和闭锁 CountDownLatch
的具体实现。通过这篇文章我们将可以了解到 ReentrantLock
和CountDownLatch
两个常用同步类的源代码实现,并且掌握阅读其他基于 AQS 实现的同步工具类源码的能力,甚至可以利用 AQS 写出自己的同步工具类。
阅读这篇文章需要了解基本的线程同步机制,有兴趣的读者可以参考一下这篇文章《多线程中那些看不到的陷阱》。
同步机制的核心——AQS
同步机制源码初探
ReentrantLock
是我们常用的一种可重入互斥锁,是 synchronized 关键字的一个很好的替代品。互斥指的就是同一时间只能有一个线程获取到这个锁,而可重入是指如果一个线程再次获取一个它已经持有的互斥锁,那么仍然会成功。
这个类的源码在 JDK 的 java.util.concurrent
包下,我们可以在 IDE 中点击类名跳转到具体的类定义,比如下面就是在我的电脑上跳转之后看到的 ReentrantLock 类的源代码。在这里我们可以看到在 ReentrantLock 类中还包含了一个继承自 AbstractQueuedSynchronizer 类的内部类,而且有一个该内部类 Sync
类型的字段sync
。实际上 ReentrantLock 类就是通过这个内部类对象来实现线程同步的。
如果打开 CountDownLatch
的源代码,我们会发现这个类里也同样有一个继承自 AbstractQueuedSynchronizer 类的子类 Sync,并且也有一个 Sync
类型的字段 sync
。在java.util.concurrent
包下的大多数同步工具类的底层都是通过在内部定义一个 AbstractQueuedSynchronizer
类的子类来实现的,包括我们在本文中没提到的许多其他常用类也是如此,比如:读写锁 ReentrantReadWriteLock、信号量 Semaphore 等。
AQS 是什么?
那么这个 AbstractQueuedSynchronizer
类也就是我们所说的 AQS,到底是何方神圣呢?这个类首先像我们上面提到的,是大多数多线程同步工具类的基础。它内部包含了一个对同步器的等待队列,其中包含了所有在等待获取同步器的线程,在这个等待队列中的线程将会在同步器释放时被唤醒。比如一个线程在获取互斥锁失败时就会被放入到等待队列中等待被唤醒,这也就是 AQS 中的 Q——“Queued”的由来。
而类名中的第一个单词 Abstract
是因为 AQS 是一个抽象类,它的使用方法就是实现继承它的子类,然后使用这个子类类型的对象。在这个子类中我们会通过重写下列的五个方法中的一部分或者全部来指定这个同步器的行为策略:
-
boolean tryAcquire(int arg)
,独占式获取同步器,独占式指同一时间只能有一个线程获取到同步器; -
boolean tryRelease(int arg)
,独占式释放同步器; -
boolean isHeldExclusively()
,同步器是否被当前线程独占式地持有; -
int tryAcquireShared(int arg)
,共享式获取同步器,共享式指的是同一时间可能有多个线程同时获取到同步器,但是可能会有数量的限制; -
boolean tryReleaseShared(int arg)
,共享式释放同步器。
这五个方法之所以能指定同步器的行为,则是因为 AQS 中的其他方法就是通过对这五个方法的调用来实现的。比如在下面的 acquire 方法中就调用了 tryAcquire 来获取同步器,并且在被调用的 acquireQueued
方法内部也是通过 tryAcquire
方法来循环尝试获取同步器的。
public final void acquire(int arg) {
// 1. 调用 tryAcquire 方法尝试获取锁
// 2. 如果获取失败(tryAcquire 返回 false),则调用 addWaiter 方法将当前线程保存到等待队列中
// 3. 之后调用 acquireQueued 方法来循环执行“获取同步器 -> 获取失败休眠 -> 被唤醒重新获取”过程
// 直到成功获取到同步器返回 false;或者被中断返回 true
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果 acquireQueued 方法返回 true 说明线程被中断了
// 所以调用 selfInterrupt 方法中断当前线程
selfInterrupt();}
下面,我们就来看看在 ReentrantLock
和CountDownLatch
两个类中定义的 AQS 子类到底是如何重写这五个方法的。
CountDownLatch 的实现
CountDownLatch
是一种典型的闭锁,比如我需要使用四个线程完成四种不同的计算,然后把四个线程的计算结果相加后返回,这种情况下主线程就需要等待四个完成不同任务的工作线程完成之后才能继续执行。那么我们就可以创建一个初始的 count 值为 4 的 CountDownLatch
,然后在每个工作线程完成任务时都对这个CountDownLatch
执行一个 countDown
操作,这样 CountDownLatch 中的 count 值就会减 1。当 count 值减到 0 时,主线程就会从阻塞中恢复,然后将四个任务的结果相加后返回。
下面是 CountDownLath
的几个常用方法:
-
void await()
,等待操作,如果 count 值目前已经是 0 了,那么就直接返回;否则就进入阻塞状态,等待 count 值变为 0; -
void countDown()
,减少计数操作,会让 count 减 1。
调用多次 countDown()
方法让 count 值变为 0 之后,被 await()
方法阻塞的线程就可以继续执行了。了解了 CountDownLatch
的基本用法之后我们就来看看这个闭锁到底是怎么实现的,首先,我们来看一下 CountDownLatch
中 AQS 的子类,内部类 Sync
的定义。
CountDownLatch 的内部 Sync 类
下面的代码是 CountDownLatch
中 AQS 的子类 Sync
的定义,Sync
是 CountDownLatch
类中的一个内部类。在这个类中重写了 AQS 的 tryAcquireShared
和tryReleaseShared
两个方法,这两个都是共享模式需要重写的方法,因为 CountDownLatch
在 count 值为 0 时可以被任意多个线程同时获取成功,所以应该实现共享模式的方法。
在 CountDownLatch
的Sync
中使用了 AQS 的 state 值用来存放 count 值,在初始化时会把 state 值初始化为 n。然后在调用 tryReleaseShared
时会将 count 值减 1,但是因为这个方法可能会被多个线程同时调用,所以要用 CAS 操作保证更新操作的原子性,就像我们用 AtomicInteger
一样。在 CAS 失败时我们需要通过重试来保证把 state 减 1,如果 CAS 成功时,即使有许多线程同时执行这个操作最后的结果也一定是正确的。在这里,tryReleaseShared
方法的返回值表示这个释放操作是否可以让等待中的线程成功获取同步器,所以只有在 count 为 0 时才能返回 true。
tryAcquireShared
方法就比较简单了,直接返回 state 是否等于 0 即可,因为只有在 CountDownLatch
中的 count 值为 0 时所有希望获取同步器的线程才能获取成功并继续执行。如果 count 不为 0,那么线程就需要进入阻塞状态,等到 count 值变为 0 才能继续执行。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 构造器,初始化 count 值
// 在这个子类中把 count 值保存到了 AQS 的 state 中
Sync(int count) {setState(count);
}
// 获取当前的 count 值
int getCount() {return getState();
}
// 获取操作在 state 为 0 时会成功,否则失败
// tryAcquireShared 失败时,线程会进入阻塞状态等待获取成功
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
// 对闭锁执行释放操作减小计数值
protected boolean tryReleaseShared(int releases) {
// 减小 coun 值,在 count 值归零时唤醒等待的线程
for (;;) {int c = getState();
// 如果计数已经归零,则直接释放失败
if (c == 0)
return false;
// 将计数值减 1
int nextc = c-1;
// 为了线程安全,以 CAS 循环尝试更新
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CounDownLatch 对 Sync 类对象的使用
看了 CountDownLatch
中的 Sync
内部类定义之后,我们再来看看 CountDownLatch
是如何使用这个内部类的。
在 CountDownLatch
的构造器中,初始化 CountDownLatch
对象时会同时在其内部初始化保存一个 Sync
类型的对象到 sync
字段用于之后的同步操作。并且传入 Sync
类构造器的 count 一定会大于等于 0。
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
有了 Sync 类型的对象之后,我们在 await()
方法里就可以直接调用 sync
的acquireSharedInterruptibly
方法来获取同步器并陷入阻塞,等待 count 值变为 0 了。在 AQS 的 acquireSharedInterruptibly
方法中会在调用我们重写的 tryAcquireShared
方法获取失败时进入阻塞状态,直到 CountDownLatch
的 count 值变为 0 时才能成功获取到同步器。
public void await() throws InterruptedException {
// 调用 sync 对象的获取方法来进入锁等待
sync.acquireSharedInterruptibly(1);
}
而在 CountDownLatch
的另一个减少 count 值的重要方法 countDown()
中,我们同样是通过调用 sync
上的方法来实现具体的同步功能。在这里,AQS 的 releaseShared(1)
方法中同样会调用我们在 Sync
类中重写的 tryReleaseShared
方法来执行释放操作,并在 tryReleaseShared
方法返回 true 时去唤醒等待队列中的阻塞等待线程,让它们在 count 值为 0 时能够继续执行。
public void countDown() {sync.releaseShared(1);
}
从上文中可以看出,CoundDownLatch
中的各种功能都是通过内部类 Sync
来实现的,而这个 Sync
类就是一个继承自 AQS 的子类。通过在内部类 Sync
中重写了 AQS 的 tryAcquireShared
和tryReleaseShared
两个方法,我们就指定了 AQS 的行为策略,使其能够符合我们对 CountDownLatch
功能的期望。这就是 AQS 的使用方法,下面我们来看一个大家可能会更熟悉的例子,来进一步了解 AQS 在独占模式下的用法。
ReentrantLock 的实现
可重入锁 ReentrantLock
可以说是我们的老朋友了,从最早的 synchronized
关键字开始,我们就开始使用类似的功能了。可重入锁的特点主要有两点:
-
同一时间只能有一个线程持有
- 如果我想保护一段代码同一时间只能被一个线程所访问,比如对一个队列的插入操作。那么如果有一个线程已经获取了锁之后在修改队列了,那么其他也想要修改队列的线程就会陷入阻塞,等待之前的这个线程执行完成。
-
同一线程可以对一个锁重复获取成功多次
- 而如果一个线程对同一个队列执行了两个插入操作,那么第二次获取锁时仍然会成功,而不会被第一次成功获取到的锁所阻塞。
ReentrantLock
类的常用操作主要有三种:
-
获取锁,一个线程一旦获取锁成功后就会阻塞其他线程获取同一个锁的操作,所以一旦获取失败,那么当前线程就会被阻塞
- 最简单的获取锁方法就是调用
public void lock()
方法
- 最简单的获取锁方法就是调用
-
释放锁,获取锁之后就要在使用完之后释放它,否则别的线程都将会因无法获取锁而被阻塞,所以我们一般会在 finally 中进行锁的释放操作
- 可以通过调用
ReentrantLock
对象的unlock
方法来释放锁
- 可以通过调用
-
获取条件变量,条件变量是和互斥锁搭配使用的一种非常有用的数据结构,有兴趣的读者可以通过《从 0 到 1 实现自己的阻塞队列(上)》这篇文章来了解条件变量具体的使用方法
- 我们可以通过
Condition newCondition()
方法来获取条件变量对象,然后调用条件变量对象上的await()
、signal()
、signalAll()
方法来进行使用
- 我们可以通过
ReentrantLock 的内部 Sync 类
在 ReentrantLock
类中存在两种 AQS 的子类,一个实现了非公平锁,一个实现了公平锁。所谓的“公平”指的就是获取互斥锁成功返回的时间会和获取锁操作发起的时间顺序一致,例如有线程 A 已经持有了互斥锁,当线程 B、C、D 按字母顺序获取锁并进入等待,线程 A 释放锁后一定是线程 B 被唤醒,线程 B 释放锁后一定是 C 先被唤醒。也就是说锁被释放后对等待线程的唤醒顺序和获取锁操作的顺序一致。而且如果在这个过程中,有其他线程发起了获取锁操作,因为等待队列中已经有线程在等待了,那么这个线程一定要排到等待队列最后去,而不能直接抢占刚刚被释放还未被刚刚被唤醒的线程锁持有的锁。
下面我们同样先看一下 ReentrantLock
类中定义的 AQS 子类 Sync
的具体源代码。下面是上一段说到的非公平 Sync 类和公平 Sync 类两个类的共同父类 Sync
的带注释源代码,里面包含了大部分核心功能的实现。虽然下面包含了该类完整的源代码,但是我们现在只需要关心三个核心操作,也是我们在独占模式下需要重写的三个 AQS 方法:tryAcquire
、tryRelease
和isHeldExclusively
。建议在看完文章之后再回来回顾该类中其他的方法实现,直接跳过其他的方法当然也是完全没有问题的。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* 实现 Lock 接口的 lock 方法,子类化的主要原因是为了非公平版本的快速实现
*/
abstract void lock();
/**
* 执行非公平的 tryLock。tryAcquire 方法在子类中被实现,但是两者都需要非公平版本的 trylock 方法实现。*/
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
// 如果锁还未被持有
if (c == 0) {
// 通过 CAS 尝试获取锁
if (compareAndSetState(0, acquires)) {
// 如果锁获取成功则将锁持有者改为当前线程,并返回 true
setExclusiveOwnerThread(current);
return true;
}
}
// 锁已经被持有,则判断锁的持有者是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 可重入锁,如果锁的持有者是当前线程,那就在 state 上加上新的获取数
int nextc = c + acquires;
// 判断新的 state 值有没有溢出
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 将新的 state 更新为新的值,因为可以进入这段代码的只有一个线程
// 所以不需要线程安全措施
setState(nextc);
return true;
}
return false;
}
// 重写了 AQS 的独占式释放锁方法
protected final boolean tryRelease(int releases) {
// 计算剩余的锁持有量
// 因为只有当前线程持有该锁的情况下才能执行这个方法,所以不需要做多线程保护
int c = getState() - releases;
// 如果当前线程未持有锁,则直接抛出错误
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果锁持有数已经减少到 0,则释放该锁,并清空锁持有者
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新 state 值,只有 state 值被设置为 0 才是真正地释放了锁
// 所以 setState 和 setExclusiveOwnerThread 之间不需要额外的同步措施
setState(c);
return free;
}
// 当前线程是否持有该锁
protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}
// 创建对应的条件变量
final ConditionObject newCondition() {return new ConditionObject();
}
// 从外层传递进来的方法
// 获取当前的锁持有者
final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}
// 获取锁的持有计数
// 如果当前线程持有了该锁则返回 state 值,否则返回 0
final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}
// 判断锁是否已经被持有
final boolean isLocked() {return getState() != 0;
}
}
实际的 tryAcquire
方法将在公平 Sync 类与非公平 Sync 类两个子类中实现,但是这两个子类都需要调用父类 Sync
中的非公平版本的 tryAcquire——nonfairTryAcquire
方法。在这个方法中,我们主要做两件事:
-
当前锁还未被人持有。在
ReentrantLock
中使用 AQS 的 state 来保存锁的状态,state 等于 0 时代表锁没有被任何线程持有,如果 state 大于 0,那么就代表持有者对该锁的重复获取次数- 如果当前锁还未被线程持有,那么就会通过
compareAndSetState
来原子性地修改 state 值,修改成功则需要设置当前线程为锁的持有线程并返回 true 代表获取成功;否则就返回
- 如果当前锁还未被线程持有,那么就会通过
-
锁已被当前线程持有
- 在锁已被当前线程持有的情况下,就需要将 state 值加 1 代表持有者线程对锁的重复获取次数。
而对于独占式释放同步器的 tryRelease
方法,则在父类 Sync
中直接实现了,两个公平 / 非公平子类调用的都是同一段代码。首先,只有锁的持有者才能释放锁,所以如果当前线程不是所有者线程在释放操作中就会抛出异常。如果释放操作会将持有计数清零,那么当前线程就不再是该锁的持有者了,锁会被完全释放,而锁的所有者会被设置为 null。最后,Sync
会将减掉入参中的释放数之后的新持有计数更新到 AQS 的 state 中,并返回锁是否已经被完全释放了。
isHeldExclusively
方法比较简单,它只是检查锁的持有者是否是当前线程。
非公平 Sync 类的实现
Sync
的两个公平 / 非公平子类的实现比较简单,下面是非公平版本子类的源代码。在非公平版本的实现中,调用 lock
方法首先会尝试通过 CAS 修改 AQS 的 state 值来直接抢占锁,如果抢占成功就直接将持有者设置为当前线程;如果抢占失败就调用 acquire
方法走正常流程来获取锁。而在 acquire
方法中就会调用子类中的 tryAcquire
方法并进一步调用到上文提到的父类中的 nonfairTryAcquire
方法来完成锁获取操作。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 执行锁操作。尝试直接抢占,如果失败的话就回到正常的获取流程进行
*/
final void lock() {
// 尝试直接抢占
if (compareAndSetState(0, 1))
// 抢占成功设置锁所有者
setExclusiveOwnerThread(Thread.currentThread());
else
// 抢占失败走正常获取流程
acquire(1);
}
// 实现 AQS 方法,使用 nonfairTryAcquire 实现
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
}
公平 Sync 类的实现
而在公平版本的 Sync 子类 FairSync
中,为了保证成功获取到锁的顺序一定要和发起获取锁操作的顺序一致,所以自然不能在 lock
方法中进行 CAS 方式的抢占,只能老老实实调用 acquire
方法走正式流程。而 acquire
方法最终就会调用子类中定义的 tryAcquire
来真正获取锁。
在 tryAcquire
方法中,代码主要处理了两种情况:
-
当前锁还没有被线程锁持有
- 只有在确保等待队列为空的情况下才能尝试用 CAS 方式直接抢占锁,而在等待队列不为空的情况下,最后返回了 false,之后
acquire
方法中的代码会将当前线程放入到等待队列中阻塞等待锁的释放。这就保证了在获取锁时已经有线程等待的情况下,任何线程都要进入等待队列去等待获取锁,而不能直接对锁进行获取。
- 只有在确保等待队列为空的情况下才能尝试用 CAS 方式直接抢占锁,而在等待队列不为空的情况下,最后返回了 false,之后
-
当前线程已经持有了该锁
- 如果当前线程已经是该锁的持有者了,那么就会在 state 值上加上本次的获取数量来更新锁的重复获取次数,并返回 true 代表获取锁成功。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 直接使用 acquire 进行获取锁操作
final void lock() {acquire(1);
}
/**
* 公平版本的 tryAcquire 方法。不要授予访问权限,除非是递归调用或者没有等待线程或者这是第一个调用
*/
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
// 如果锁没有被持有
if (c == 0) {
// 为了实现公平特性,所以只有在等待队列为空的情况下才能直接抢占
// 否则只能进入队列等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
// 如果锁已被持有,且当前线程就是持有线程
else if (current == getExclusiveOwnerThread()) {
// 计算新的 state 值
int nextc = c + acquires;
// 如果锁计数溢出,则抛出异常
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置 state 状态值
setState(nextc);
return true;
}
return false;
}
}
ReentrantLock 对 Sync 类对象的使用
最后,我们来看看 ReentrantLock 类中的 lock()
、unlock()
、newCondition
方法对 Sync 类对象的使用方式。
首先是在构造器中,根据入参指定的公平 / 非公平模式创建不同的内部 Sync
类对象,如果是公平模式就是用 FairSync
类,如果是非公平模式就是用 NonfairSync
类。
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
然后在互斥锁的锁定方法 lock()
中,ReentrantLock
直接使用 Sync 类中的 lock
方法来实现了锁的获取功能。
public void lock() {
// 调用 sync 对象的 lock 方法实现
sync.lock();}
在 unlock()
方法中也是一样的情况,ReentrantLock
直接依赖 Sync 类对象来实现这个功能。
public void unlock() {
// 调用了 sync 对象的 release 方法实现
sync.release(1);
}
最后一个创建条件变量的方法则直接依赖于 AQS 中定义的方法,我们在 ReentranctLock
的Sync
类中并不需要做任务额外的工作,AQS 就能为我们做好所有的事情。
public Condition newCondition() {
// 调用了 sync 对象继承自 AQS 的 `newCondition` 方法实现
return sync.newCondition();}
通过 ReentrantLock
的例子我们能够更明显地感受到,这些基于 AQS 实现同步功能的类中并不需要做太多额外的工作,大多数操作都是通过直接调用 Sync
类对象上的方法来实现的。只要定义好了继承自 AQS 的子类 Sync
,并通过Sync
类重写几个 AQS 的关键方法来指定 AQS 的行为策略,就可以实现风格迥异的各种同步工具类了。
总结
在这篇文章中,我们从 AQS 的基本概念说起,简单介绍了 AQS 的具体用法,然后通过 CountDownLatch
和ReentrantLock
两个常用的多线程同步工具类的源码来具体了解了 AQS 的使用方式。我们不仅可以完全弄明白这两个线程同步类的实现原理与细节,而且最重要的是找到了 AQS 这个幕后大 BOSS。通过 AQS,我们不仅可以更容易地阅读并理解其他同步工具类的使用与实现,而且甚至可以动手开发出我们自己的自定义同步工具类。
到了这里,这一系列多线程编程相关的技术文章就接近尾声了。后续我还会发布一篇囊括这个系列所有内容的总结性文章,里面会对多线程编程相关的知识脉络做一次全面的梳理,然后将每个知识点链接到具体阐释这个主题的文章中去。让读者可以在宏观和微观两个层面理解多线程编程的原理与技巧,帮助大家建立完整的 Java 多线程理论与实践知识体系。有兴趣的读者可以关注一下后续的文章,感谢大家的支持。