共计 9083 个字符,预计需要花费 23 分钟才能阅读完成。
LockSupport
LockSupport 是线程期待唤醒机制(wait/notify)的改进版本。LockSupport 中的 park()
和 unpark()
的作用别离是阻塞线程和接触阻塞线程。
3 种让线程期待和唤醒的办法(线程通信)
形式 1:应用 Object 中的 wait()办法让线程期待,notify()办法唤醒线程
synchronized + wait + notify
形式 1:应用 Object 中的 wait()办法让线程期待,notify()办法唤醒线程
static Object objectLock = new Object(); // 创立锁
public static void main(String[] args) {
// 创立 A 线程,进入后打印,并阻塞。new Thread(() -> {synchronized (objectLock) {System.out.println(Thread.currentThread().getName() + "进来了!");
objectLock.wait();
System.out.println(Thread.currentThread().getName() + "被唤醒!");
}
}, "A").start();
// 创立 B 线程,用于唤醒
new Thread(() -> {synchronized (objectLock) {objectLock.notify();
System.out.println(Thread.currentThread().getName() + "告诉!");
}
}, "B").start();}
wait、notify 的限度:
- 咱们发现 wait 和 notify 如果不在一个代码块外面,必须与 synchronized 搭配应用,否则会报错。
- 如果咱们先应用 notify、再应用 wait,因为 wait 是后执行了,所以 不能被唤醒。
形式 2:应用 JUC 包中 Condition 的 await()办法让线程期待,signal()办法唤醒线程
Lock + await + signal
// 创立 Lock 对象,失去 condition
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) {
// 创立 A 线程,用 await 办法阻塞
new Thread(() -> {lock.lock();
try {System.out.println(Thread.currentThread().getName() + "进来了!");
condition.await();} finally {lock.unlock();
}
System.out.println(Thread.currentThread().getName() + "被唤醒!");
}, "A").start();
// 创立 B 线程,用于唤醒
new Thread(() -> {lock.lock();
try {System.out.println(Thread.currentThread().getName() + "告诉!");
condition.signal();} finally {lock.unlock();
}
}, "B").start();}
await、signal 的限度:
- 和 wait、notify 的问题截然不同,他们的底层机制是一样的。
形式 3:LockSupport 类能够阻塞以后线程以及唤醒指定被阻塞的线程
park + unpark,每个线程都有一个“许可证”,只有 0 和 1,默认为 0。unpark(Thread t)
办法发放许可证,没许可证就不容许放行。
public static void main(String[] args) {// 创立 A 线程,用 park()办法阻塞
Thread a = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "进来了!");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "被唤醒!");
}, "A");
a.start();
// 创立 B 线程,用于唤醒
Thread b = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "告诉!");
// 唤醒指定线程
LockSupport.unpark(a);
}, "B");
b.start();}
LockSupport 的劣势:
- 既不必 synchronized 或 Lock。
- 先唤醒,再阻塞,也可能被唤醒。因为线程曾经有了 “许可证” 了,所以 park()办法相当于没执行。
park 底层调用了 unsafe 类的 park 本地办法。
UNSAFE.park(false, 0L);
调用一次 unpark
就加 1,变为 1。调用一次 park
会生产许可证,变回 0。反复调用 unpark
不会积攒凭证。
AQS 实践
- AQS(AbstractQueuedSynchronizer),形象的队列同步器。ReentrantLock 类里,有一个外部类 Sync 就是继承的 AQS 类。
- AQS 是用来构建锁 或者 其余同步器组件的 重量级根底框架及整个 JUC 体系的基石 。通过内置的 FIFO 队列 来实现资源获取线程的排队工作,并通过一个 int 类型变量 示意持有锁的状态。
AQS 的作用
和 AQS 无关的:ReentrantLock、CountDownLatch、ReentrantReadWriteLock、Semaphore……
锁和同步器的关系:
- 锁,面向锁的使用者。
- 同步器,面向锁的实现者。
如果共享资源被占用,就须要肯定的 阻塞期待唤醒机制来保障锁的调配 。这个机制次要用的是 CLH 队列 的变体实现的,将临时获取不到锁的线程退出到队列中,这个队列就是 AQS 的形象体现。它将申请共享资源的线程封装成 队列的结点(Node),通过 CAS
自旋以及 LockSupport.park()
的形式,保护 state 变量的状态,使并发达到同步的管制成果。
AQS 源码体系
AQS 应用一个 volatile 的 int 类型的成员变量来示意同步状态,通过内置的 FIFO 队列来实现资源获取的排队,将每条要去抢占资源的线程封装成一个 Node 结点来实现锁的调配,通过 CAS 实现对 State 值的批改。
-
示意同步状态的 int 成员变量
private volatile int state;
state 为 0 就是没人占用,能够去获取资源。大于等于 1,有人占用资源,须要排队。
-
CLH 队列
CLH 队列,是一个双向队列。通过自旋期待,state 变量判断是否阻塞。
外部类 Node 作为载体,装的是须要排队的线程。
-
外部类 Node
队列中每一个排队的个体就是一个 Node。Node 有前结点prev
、后结点next
、头指针head
、尾指针tail
,用于实现双向队列。
Node 类里有两个模式:SHARED(示意线程以共享的模式期待锁)、EXCLUSIVE(示意线程以独占的形式期待锁)
类中也有一个 int 类型的状态变量 waitStatus
。意思是等待区其余线程的期待状态。
volatile int waitStatus;
从 ReentrantLock 开始解读 AQS
ReentrantLock 类中有个子类 Sync 继承了 AQS 类,NonfairSync 类和 FairSync 继承了 Sync 类。
new 一个 ReentrantLock 类时,不传参数默认是非偏心锁(NonfairSync),传入 true 是偏心锁(FairSync)
偏心锁和非偏心锁实现办法的惟一区别就在于:偏心锁在获取同步状态时多了一个限度条件:hasQueuePredecessors()
。
这个是偏心锁加锁时判断期待队列中是否存在无效节点的办法。因为偏心锁是先排先得。
- 偏心锁:考究先来先到,线程在获取锁时,如果期待队列中以及有线程在期待,那么以后线程就会进入期待队列中。
- 非偏心锁:不论是否有期待队列,都会去尝试取得锁。
AQS 非偏心锁就是,一来就先去插队,如果插队失败,才去乖乖的排队。
银行办理业务案例
public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();
// 带入一个银行办理业务的案例来模仿咱们的 AQS 如果进行线程的治理和告诉唤醒机制
// 3 个线程模仿 3 个来银行,受理窗口办理业务的顾客
// A 顾客就是第一个顾客,此时手里窗口没有任何人,A 能够间接去办理
new Thread(() -> {lock.lock();
try {System.out.println("A 线程 进入");
// 办理 20 分钟
TimeUnit.MINUTES.sleep(20);
} catch (InterruptedException e) {e.printStackTrace();
} finally {lock.unlock();
}
}, "A").start();
// B 顾客,因为窗口只有一个(只能一个线程持有锁),B 只能期待,进入候客区
new Thread(() -> {lock.lock();
try {System.out.println("B 线程 进入");
} finally {lock.unlock();
}
}, "B").start();
// C 顾客,进入候客区(当 A 办理实现后,会与 B 去抢)new Thread(() -> {lock.lock();
try {System.out.println("C 线程 进入");
} finally {lock.unlock();
}
}, "C").start();}
整个 ReentrantLock 的加锁过程,能够分为三个阶段:
- 尝试加锁
- 加锁失败,线程进入 AQS 队列
- 线程进入队列后,进入阻塞状态
lock()上锁
调用的是 sync
类的 lock()
办法。
如果是 FairSync:
final void lock() {acquire(1);
}
如果是 NonfairSync:
/*
底层是 unsafe 类,尝试用 CAS 取得锁,胜利后设置这个线程领有拜访权限。否则调用 acquire
*/
final void lock() {if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/*
unsafe 类的办法,传入(0,1),如果这个对象的内存偏移量的地位,expect 如果为 0,就为改为 1
*/
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
//CAS 批改胜利后,通过 AbstractOwnableSynchronizer 类的 setExclusiveOwnerThread 办法,把 exclusiveOwnerThread 线程设为以后线程。
当第一个顾客发现没人窗口没人后,开始办理业务。state 变为 1,占用顾客的线程是 currentThread。
第一个客户曾经占用了窗口,没那么快实现。第二个客户也调用 lock()办法,发现窗口被占用,只能去排队:
public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
// 外面的 addWaiter 办法、tryAcquire 办法、acquireQueue 办法都是重点
-
AQS 的 tryAcquire 办法
咱们进入 tryAcquire 办法发现没有逻辑代码,间接抛出异样。这就是典型的 模板办法设计模式。意思是所有子类必须实现这个办法,不实现父类就抛出异样。
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
而后发现 ReentrantLock 的 外部类 NofairSync 重写了这个办法:
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
这个办法其实调用的是外部类Sync
的 nonfairTryAcquire
办法。
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread(); // 须要排队的第二位顾客
int c = getState(); // 获取以后窗口的状态 state(0 闲暇,1 占用)// 如果运气十分的好,窗口凑巧闲暇了,就 CAS 扭转状态,把窗口的线程设为本人。if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
// 如果以后线程 等于 正在办理业务的线程(阐明取得了屡次锁,是可重入锁的实践)else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // nextc 为以后状态加 1
if (nextc < 0) // overflow(溢出)throw new Error("Maximum lock count exceeded");
setState(nextc); // 设置状态变量 state
return true;
}
return false;
}
咱们传入的第二位顾客再次发现,有人在办理业务,返回 false
。
在 acquire 办法中 !tryAcquire(arg)
取反为 true
,持续判断上面的办法。
-
AQS 的 addWaiter 办法
acquire 传入的是 Node.EXCLUSIVE
参数(结点的模式);结点进入队列。
private Node addWaiter(Node mode) {
// 结构 Node 结点(以后线程,模式)Node node = new Node(Thread.currentThread(), mode);
// 获取 Node 的尾结点,如果为 null,阐明队列没有结点。Node pred = tail;
// 当第三个顾客进入的时候,等待区曾经有结点了,执行这个代码块。和 enq 办法类似,尾插法。if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果队列没有结点,调用 enq 办法筹备进入队列
enq(node);
return node;
}
enq
办法(将节点插入队列):
private Node enq(final Node node) {
// 相当于自旋
for (;;) {
Node t = tail; // t 是尾指针
// 如果尾指针为 null,阐明队列无结点,进行初始化
if (t == null) {
/* 第一个结点并不是咱们传入的结点,而是零碎 new 了一个结点作为占位符。这个结点 Thread=null,waitStatus=0,是傀儡结点又称哨兵结点,用于占位。*/
if (compareAndSetHead(new Node()))
tail = head;
// 队列有结点后,持续循环,进入上面这个代码块(尾插法,结点的尾、前、后结点都设置好)} else {
// 传入结点的前一个指针指向尾结点
node.prev = t;
// 尾指针 指向 传入的节点
if (compareAndSetTail(t, node)) {
t.next = node; // 尾结点的下一个节点是 传入的节点
return t; // 返回新插入的尾结点
}
}
}
}
- AQS 的 acquireQueued 办法
传入的参数是 (addWaiter(Node.EXCLUSIVE), arg)
。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {final Node p = node.predecessor(); // 传入结点的上一个结点
// 如果前结点 == 哨兵结点 && 再看窗口是否抢占,失败就 false。if (p == head && tryAcquire(arg)) {
// 头结点指向以后节点,节点 Thread=null,prev=null,即以后节点变成了新的哨兵结点
setHead(node);
// 原哨兵结点的 next=null,没有连贯了,会被 GC 回收
p.next = null;
failed = false;
return interrupted;
}
// 抢占失败后是否 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
/*
这时自旋锁,抢占又失败后,持续进入 shouldParkAfterFailedAcquire 办法,因为第一次循环曾经将前结点的 waitStatus 的值改为 -1,所以返回 true。而后进入 parkAndCheckInterrupt 办法。*/
/*
锁被开释,其余线程被唤醒后!parkAndCheckInterrupt()返回 false,持续自旋!B 线程的前结点就是哨兵结点,执行 tryAcquire 办法,因为 A 线程走了,所以胜利抢占!返回 true
*/
}
} finally {if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
办法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 查看前结点的 waitStatus 状态
//SIGNAL 值固定为 -1
// 如果是 SIGNAL 状态,即期待中,间接返回 true。if (ws == Node.SIGNAL)
return true;
//waitStatus 大于 0 阐明是 CANCELLED 状态
if (ws > 0) {
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 把前结点的 waitStatus 值改为 -1,用于后续唤醒操作
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
办法:
private final boolean parkAndCheckInterrupt() {
// 阻塞这个线程!这时能够认为曾经坐在期待区了。LockSupport.park(this);
// 线程被唤醒后,不被阻塞,这里就返回 false
return Thread.interrupted();}
此时这个 acquireQueued 办法还没有完结,会被卡在 parkAndCheckInterrupt
办法外部,如果这个线程被 unpark 了。就会继续执行 acquireQueued 办法的代码。
unlock 开释锁
调用的是 sync
类的 lock()
办法。
public void unlock() {sync.release(1);
}
调用 AQS 的 release
办法,arg=1.
public final boolean release(int arg) {
// 开释一把锁后,返回 true
if (tryRelease(arg)) {
// 头结点就是哨兵结点
Node h = head;
// 哨兵的 waitStatus 为 -1,符合条件进入
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //
return true;
}
return false;
}
tryRelease 办法,也是一个模板办法,ReentrantLock 类的 Sync 重写了这个办法。
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}
protected final boolean tryRelease(int releases) {
// 如果以后 State 为 1,减去 1 后为 0
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true; //c=0,阐明能够解锁,free 变为 true
setExclusiveOwnerThread(null); // 设置以后窗口的占用线程为 null
}
setState(c); // 把状态改为相应的值
return free;
}
unparkSuccessor 办法,开释锁!
private void unparkSuccessor(Node node) {
// 传入的是哨兵结点,waitStatus 为 -1
int ws = node.waitStatus;
// 又把哨兵结点的 waitStatus 改为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// s 是哨兵结点的下一个结点。Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果哨兵结点的下一个结点存在,且 waitStatus 为 0,开释锁!if (s != null)
LockSupport.unpark(s.thread);
}
此时线程被唤醒,后面 acquireQueued 里阻塞的 其余线程 就持续往下执行。