Lock 介绍
Lock 是 juc(java.util.concurrent) 包上面的一个接口类, 是作者 Doug Lea 定义的 api 标准, 次要接口有
api | 阐明 | |
---|---|---|
void lock() | 获取锁。 如果锁不可用,则出于线程调度目标,以后线程将被禁用,并处于休眠状态,直到获取锁为止。 |
|
void lockInterruptibly() throws InterruptedException | 除非以后线程被中断,否则获取锁。 如果锁可用,则获取锁并立刻返回。 如果锁不可用,则出于线程调度目标,以后线程将被禁用,并处于休眠状态,直到产生以下两种状况之一: 锁被以后线程获取;或者 其余线程中断以后线程,反对中断获取锁。 如果以后线程: 在进入此办法时设置其中断状态;或者 获取锁时中断,反对中断获取锁, 而后抛出 InterruptedException 并革除以后线程的中断状态。 |
|
boolean tryLock() | 仅当调用时锁闲暇时才获取锁。 如果锁可用,则获取锁并立刻返回 true 值。如果锁不可用,则此办法将立刻返回 false 值。 |
|
boolean tryLock(long time, TimeUnit unit) throws InterruptedException | 如果在给定的等待时间内锁是闲暇的并且以后线程没有被中断,则获取锁。 如果锁可用,此办法立刻返回 true 值。如果锁不可用,则出于线程调度目标,以后线程将被禁用,并处于休眠状态,直到产生以下三种状况之一: 锁被以后线程获取;或者 其余线程中断以后线程,反对中断获取锁;或者 指定的等待时间已过 如果获取了锁,则返回 true 值。 如果以后线程: 在进入此办法时设置其中断状态;或者 获取锁时中断,反对中断获取锁, 而后抛出 InterruptedException 并革除以后线程的中断状态。 如果指定的等待时间已过,则返回值 false。如果工夫小于或等于零,则该办法基本不会期待。 |
|
void unlock() | 开释锁 | |
Condition newCondition() | 返回绑定到此 Lock 实例的新 Condition 实例。 在期待条件之前,以后线程必须持有锁。对 Condition.await() 的调用将在期待之前主动开释锁,并在期待返回之前从新获取锁。 |
Condition 介绍
Condition 也是 juc 包下的一个接口类, 须要在线程持有 Lock 的状态下操作该接口下的办法, 次要接口有
api | 阐明 | |
---|---|---|
void await() throws InterruptedException | 导致以后线程期待,直到收到信号或中断。 与此条件关联的锁被主动开释,以后线程出于线程调度目标而被禁用,并处于休眠状态,直到产生以下四种状况之一: 其余线程调用该 Condition 的 signal 办法,并且以后线程恰好被选为要唤醒的线程;或者 其余一些线程为此条件调用 signalAll 办法;或者 其余线程中断以后线程,反对线程挂起中断;或者 产生“虚伪唤醒”。 在所有状况下,在此办法返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,保障持有该锁。 如果以后线程: 在进入此办法时设置其中断状态;或者 期待时被中断,反对线程挂起中断, 而后抛出 InterruptedException 并革除以后线程的中断状态。在第一种状况下,没有指定是否在开释锁之前进行中断测试。 |
|
void lockInterruptibly() throws InterruptedException | 除非以后线程被中断,否则获取锁。 如果锁可用,则获取锁并立刻返回。 如果锁不可用,则出于线程调度目标,以后线程将被禁用,并处于休眠状态,直到产生以下两种状况之一: 锁被以后线程获取;或者 其余线程中断以后线程,反对中断获取锁。 如果以后线程: 在进入此办法时设置其中断状态;或者 获取锁时中断,反对中断获取锁, 而后抛出 InterruptedException 并革除以后线程的中断状态。 |
|
void awaitUninterruptibly() | 导致以后线程期待,直到收到信号。 与此条件关联的锁被主动开释,以后线程出于线程调度目标而被禁用,并处于休眠状态,直到产生以下三种状况之一: 其余线程调用该 Condition 的 signal 办法,并且以后线程恰好被选为要唤醒的线程;或者 其余一些线程为此条件调用 signalAll 办法;或者 产生“虚伪唤醒”。 在所有状况下,在此办法返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,保障持有该锁。 如果以后线程在进入该办法时设置了中断状态,或者在期待时被中断,它将持续期待,直到发出信号。当它最终从此办法返回时,其中断状态仍将被设置。 |
|
long awaitNanos(long nanosTimeout) throws InterruptedException | 使以后线程期待,直到收到信号或中断,或者指定的等待时间过来。 与此条件关联的锁被主动开释,以后线程出于线程调度目标而被禁用,并处于休眠状态,直到产生以下五种状况之一: 其余线程调用该 Condition 的 signal 办法,并且以后线程恰好被选为要唤醒的线程;或者 其余一些线程为此条件调用 signalAll 办法;或者 其余线程中断以后线程,反对线程挂起中断;或者 规定的等待时间已过;或者 产生“虚伪唤醒”。 在所有状况下,在此办法返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,保障持有该锁。 如果以后线程: 在进入此办法时设置其中断状态;或者 期待时被中断,反对线程挂起中断。 |
|
boolean await(long time, TimeUnit unit) throws InterruptedException | 使以后线程期待,直到收到信号或中断,或者指定的等待时间过来。 | |
boolean awaitUntil(Date deadline) throws InterruptedException | 导致以后线程期待,直到收到信号或中断,或者指定的截止工夫过来。 与此条件关联的锁被主动开释,以后线程出于线程调度目标而被禁用,并处于休眠状态,直到产生以下五种状况之一: 其余线程调用该 Condition 的 signal 办法,并且以后线程恰好被选为要唤醒的线程;或者 其余一些线程为此条件调用 signalAll 办法;或者 其余线程中断以后线程,反对线程挂起中断;或者 规定的期限已过;或者 产生“虚伪唤醒”。 在所有状况下,在此办法返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,保障持有该锁。 如果以后线程: 在进入此办法时设置其中断状态;或者 期待时被中断,反对线程挂起中断。 |
|
void signal() | 唤醒一个期待线程。 如果有任何线程在此条件下期待,则抉择一个线程来唤醒。该线程必须在从期待返回之前从新获取锁。 |
|
void signalAll() | 唤醒所有期待线程。 如果任何线程正在期待这种状况,那么它们都会被唤醒。每个线程必须从新获取锁能力从期待返回。 |
作用
Lock 通过 lock,trylock 和 unlock 接口操作, 能够保障在多线程环境下的代码块的同步执行, 保障执行后果的正确性. 通过 Condition 的 await 和 signal,signallAll 操作, 保障同步代码块中的条件失去满足能力被执行实现. 提醒代码如下:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockAndConditionDemo {private List<String> containers = new ArrayList<>();
private int capacity = 10;
private Lock lock = new ReentrantLock();
private Condition emptyCondition = lock.newCondition();
private Condition fullCondition = lock.newCondition();
public void add(String product) {lock.lock();
try {while (containers.size() == capacity) {fullCondition.await();
}
containers.add(product);
emptyCondition.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);
} finally {lock.unlock();
}
}
public String take() {lock.lock();
String product = null;
try {while (containers.size() == 0) {emptyCondition.await();
}
product = containers.remove(0);
fullCondition.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);
} finally {lock.unlock();
}
return product;
}
}
ReentrantLock 介绍
ReentrantLock 也是 juc 包下的一个具体类, 它实现了 Lock 的接口. 通常也用它来作为 Lock 接口的具体实现类, 外部通过动态外部类 Sync 继承 AQS(AbstractQueuedSynchronizer) 父类来实现 Lock 的 lock 办法.
AQS 外部是一个同步期待队列, 它通过 state(状态计数位) 和 CAS(compareAndSetState) 乐观锁的形式实现同步, 底层是通过 unsafe 的本地办法 unsafe.compareAndSwapInt 来保障操作的原子性的. 队列是通过属性 head,tail 和动态外部类 Node 的属性 prev,next 来实现的双向队列, 通过 enq 办法将新包装的 Thread 的 Node 节点增加到开端.
简要剖析下 ReentrantLock.lock() 的源代码:
// 办法一:
public void lock() {
// 通过 sync 外部类 lock 办法, 默认为非偏心
sync.lock();}
// 办法二:
final void lock() {
// 通过 cas 乐观锁形式设置锁对象属性 state 的值, 默认值为 0
if (compareAndSetState(0, 1))
// 胜利设置为 1 进入此办法
setExclusiveOwnerThread(Thread.currentThread());
else
// 失败进入此办法
acquire(1);
}
// 办法三:
protected final boolean compareAndSetState(int expect, int update) {
// 通过 unsafe 保障操作原子性
//stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));state 的属性的地址值
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
办法四:
// 就是一个 set 设置属性的办法, 该类 AbstractOwnableSynchronizer 是 AQS 的父类
// 为真到办法四执行实现 lock 办法就实现了
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}
办法五:
public final void acquire(int arg) {
//tryAcquire 办法再次尝试获取 state 的值, 获取胜利就退出, 不胜利执行 acquireQueued 办法
//addWaiter 办法
//acquireQueued 办法
//selfInterrupt 办法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
办法六:
// 调用 nonfairTryAcquire 办法
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
办法七:
final boolean nonfairTryAcquire(int acquires) {
// 获取以后线程
final Thread current = Thread.currentThread();
// 获取 AQS 以后属性 state 值
int c = getState();
// 如果 c 变为 0 了, 阐明第一次 CAS 操作的 state 值曾经被获取锁的线程批改为 0 了
if (c == 0) {
// 再次调用办法三
if (compareAndSetState(0, acquires)) {
// 胜利则再次调用办法四, 同样 lock 办法也执行实现了,true 之后就不会执行 && 前面的办法了
setExclusiveOwnerThread(current);
return true;
}
}
// 如果不为 0, 判断以后线程是否和锁对象保留的 thread 属性统一, 如果相等阐明是同一个线程, 之前的锁还没有开释的状况, 能够重入
else if (current == getExclusiveOwnerThread()) {
// 以后 state 值 +1, 如果超过了 int 的最大值, 则达到了重入的边界值抛出异常中断该线程后续执行
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置 state 属性值后 lock 办法也执行完了
setState(nextc);
return true;
}
return false;
}
办法八:
// 办法参数为 static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) {
// 创立 AQS 的动态外部类的 node 实例
// Node(Thread thread, Node mode) {
// Used by addWaiter
// this.nextWaiter = mode;
// this.thread = thread;
// }
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 新退出的节点的上一个节点为该 lock 对象的 AQS 的对尾
node.prev = pred;
// 通过 CAS 的形式保障 AQS 的对尾节点设置为新节点, 胜利则将原对尾节点的下一个节点为新退出的节点返回该节点, 而后开始执行 acquireQueued 办法, 不胜利进入 enq 办法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 将以后节点增加到 AQS 队列的开端并设置尾节点尾以后节点
enq(node);
return node;
}
办法九:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取新创建节点的前一个节点对象
final Node p = node.predecessor();
// 如果上一个节点为头节点, 则阐明后面曾经没有期待 node 了, 则再次执行办法七
if (p == head && tryAcquire(arg)) {
// 胜利则设置该 node 为头节点, 断开原先头节点指向该节点的援用链接
// 不执行 cancelAcquire 办法
// 不执行 selfInterrupt 办法, 同时 lock 办法也执行实现了
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 办法七没有设置胜利, 则执行阻塞该线程的办法
// 胜利则执行阻塞和查看线程是否被中断的办法
// 线程是被中断的则打断标识位设置为 true, 持续循环执行, 否则打断标记为 false 继续执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
// 非正常退出循环会执行获取失败会执行勾销获取节点办法
cancelAcquire(node);
}
}
办法十:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的期待状态
int ws = pred.waitStatus;
// 如果为 - 1 则为 true 退出, 执行暂停和查看打断办法 parkAndCheckInterrupt()
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 如果大于 0, 则找到上一个节点之前的节点的期待状态值大于 0 的前节点
// 并将该节点的下一个节点援用设置为该节点, 退出持续循环执行上个办法
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果 ws 值为 0, 则通过 cas 设置期待状态的值为 -1, 退出持续循环执行上个办法
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
办法十一:
private final boolean parkAndCheckInterrupt() {
// 线程阻塞办法
LockSupport.park(this);
// 获取线程是否被打断的状态返回, 返回之后线程状态会被重置
return Thread.interrupted();}
办法十二:
public static void park(Object blocker) {
// 获取以后线程
Thread t = Thread.currentThread();
// 通过 unsafe 类将线程 parkBlock 设置为该 lock 对象
setBlocker(t, blocker);
//UNSAFE.park 是阻塞线程的办法
UNSAFE.park(false, 0L);
// 从新将线程 parkBlock 设置为 null
setBlocker(t, null);
}
办法十三:
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
// 对应 parkBlockerOffset 搁置 lock 对象
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
办法十四:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// 以后节点为空就退出
if (node == null)
return;
// 异样退出将节点线程援用置空
node.thread = null;
// Skip cancelled predecessors
// 取到该节点的上个节点的期待状态小于等于 0
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 以后节点的期待状态设置为 1(勾销状态)
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
将以后节点是否和尾节点相等, 相等则将尾节点设置为以后节点的上个节点, 上个节点的下一个节点设置为空
if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
前节点不等于头节点并且前节点期待状态不为 - 1 或者将前节点的期待状态设置为 - 1 胜利并且前节点的线程属性不为空, 以后节点的下一个节点不为空且期待状态小于等于 0,CAS 设置前节点的下一个节点为以后节点的下一个节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 否则唤醒一个节点的线程
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
办法十五:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 获取获取以后节点的期待状态
int ws = node.waitStatus;
// 小于 0 则进行 CAS 状态设置为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 以后节点的下一个节点如果为空, 或者下一个期待状态大于 0,s 设置为 null,
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点开始便当到以后节点, 取最靠近以后节点的且期待状态小于等于 0 的赋值给 s
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果 s 不为空, 则唤醒该节点的线程办法
if (s != null)
LockSupport.unpark(s.thread);
}
办法十六:
private Node enq(final Node node) {for (;;) {
Node t = tail;
// 尾节点是否为空, 为空则通过 CAS 设置头节点, 并将头节点设置给尾节点, 再次循环
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 不为空则将以后节点的上一个节点援用设置为尾节点, 通过 cas 形式将尾节点设置为以后节点, 原尾节点的下一个节点设置为以后节点, 并返回上一个节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
总结
ReentrantLock 通过 CAS 保障原子操作, 通过 AQS 将阻塞线程搁置在队列中期待, 从头取节点线程, 搁置进来的线程节点在对尾.