乐趣区

Lock的独占锁和共享锁的比较分析

Lock 锁底层依赖于 AQS 实现,AQS 提供了多种锁的实现模式,其中独占锁和共享锁是主要的两种模式。AQS 本身是一种模板方法设计模式,即 AQS 对外部提供了一些模板方法,而这些模板方法又会调用由子类实现的抽象方法。今天我们主要是比较 AQS 中共享锁和独占锁的底层实现方面的不同。
public final void acquire(int arg){/* 对外提供的独占锁的模板方法 */ public final void acquireShared(int arg){// 对外提供的共享锁的模板方式
if(!tryAcquire(arg) if(tryAcquireShared(arg)<0)
&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg)) doAcquireShared(arg);
selfInterrupt()/* 中断当前调用线程 */}
}
先来分析 acuqire(arg) 方法,首先我们要理解 java 中的短路运算符 &&,也就是说当 tryAcquire(arg) 方法返回 false 时,即获取锁失败时,才会执行 acquireQueued(addWaiter(Node.EXCLUSIVE),arg), 剖开语句 acquireQueued(**), 先执行 addWaiter(Node.EXCLUSIVE), 然后执行 acquireQueued(), 所以一句 if 基本上就调用了所有的后续处理,这种编码方式,在 java 源码实现中非常常见。相比之下,acquireShared(arg) 方法更加符合我们平时的编码习惯。
addWaiter 方法的目的是将未成功获取到锁的线程中加入到同步队列中去,先看源码:
private Node addWaiter(Node mode){private Node enq(final Node node){
Node node=new Node(Thread.currentThread(),mode); for(;;){
Node pred=tail; Node t=tail;
if(pred!=null){if(t==null){
node.prev=pred; if(compareAndSetHead(new Node()))
if(compareAndSetTail(pred,node)){/* 注意该方式是原子方式 */ tail=head;
pred.next=node; }else{
return node; node.prev=t;
} if(compareAndSetTail(t,node)){
} t.next=node;
enq(node); return t;
return node; }
} }
}
}
上述的 addWaiter 方法首先构造一个新的节点,并先尝试插入同步队列,如果成功后,直接返回,如果不成功,则调用 enq 方法进行循环插入。节点既然已经被加入到同步队列中去了,那么接下来就需要将线程阻塞,阻塞之前需要再次尝试获取锁,如果仍然失败则阻塞,具体的处理方法在 acquireQueued(node,arg);
final boolean acquireQueued(final Node node,int arg){
boolean failed=true;
try{
boolean interrupted=false;
for(;;){
final Node p=node.predecessor();
if(p==head&&tryAcquire(arg)){
setHead(node);// 注意这一段代码并没有进行并发控制,因为这一句是由获取锁的线程设置,所以不需要进行同步控制
p.next=null;
failed=false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())
interrupted=true;
}
}finally{
if(failed)
cancelAcquire(node);
}
}
在上述代码中,关键的一点是 shouParkAfterFailedAcquire 方法和 parkAndCheckInterrupt 方法,接下来我们看下这两个函数的源码实现:
private static boolean shouldParkAfterFailedAcquire(Node pred,Node node){
int ws=pred.waitStatus;
if(ws==Node.SIGNAL) return true;// SIGNAL 表示该节点的后继节点正在阻塞中,当该节点释放时,将唤醒后继节点。此时 node 可以安全地进行阻塞,因为可以保证会被唤醒
if(ws>0){// 表示前置节点已经被取消
do{// 循环找到一个未被取消的节点
node.prev=pred=pred.prev;
}while(pred.waitStatus>0);
pred.next=node; // 执行到这一句时,acquireQueued 方法会循环一次,再次尝试获取锁
}else{
compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
}
return false;
}
规则 1:如果前继的节点状态为 SIGNAL,表明当前节点可以安全地进行阻塞,则返回成功,此时 acquireQueued 方法的第 12 行(parkAndCheckInterrupt)将导致线程阻塞
规则 2:如果前继节点状态为 CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回 false,acquireQueued 方法的无限循环将递归调用该方法,直至规则 1 返回 true,导致线程阻塞
规则 3:如果前继节点状态为非 SIGNAL、非 CANCELLED,则设置前继的状态为 SIGNAL,返回 false 后进入 acquireQueued 的无限循环,与规则 2 同

下面我们再来分析一下,共享锁 acquireShared() 方法中的 doAcquireShared(arg), 调用该方法说明,共享锁已经用完了,当前线程需要进行等待重新获取:
private void doAcquireShared(int arg){
final Node node=addWaiter(Node.SHARED);// 构造一个新的节点,并将新的节点加入到同步队列中
boolean failed=true;
try{
boolean interrupted=false;
for(;;){
final Node p=node.predecessor();
if(p==head){
int r=tryAcquireShared(arg);// 再次尝试获取共享锁
if(r>=0){
setHeadAndPropagate(node,r);// 这一句很关键
p.next=null;
if(interrupted) selfInterrupt();
failed=false;
return;
}
if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())// 同独占锁的规则一样
interrupted=true;
}
}
}finally{
if(failed)
cancelAcquire(node);
}
}
上面的代码中主要的一句关键代码是 setHeadAndPropagate 方法,主要能够调用 setHeadAndPropagate 方法,说明当前线程已经活到了锁,下面我们来看看这句代码的实现:
private void setHeadAndPropagate(Node node,int propagate){
Node h=head;
setHead(node);// 因为有多个线程可能同时获取了共享锁,setHead 方法可能会设置不成功,不过已经获取了锁,也不用关心是否设置成功
if(propagate>0||h==null||h.waitStatus<0){
Node s=node.next;
if(s==null||s.isShared())
doReleaseShared();
}
}
独占锁某个节点被唤醒之后,它只需要将这个节点设置成 head 就完事了,而共享锁不一样,某个节点被设置为 head 之后,如果它的后继节点是 SHARED 状态的,那么将继续通过 doReleaseShared 方法尝试往后唤醒节点,实现了共享状态的向后传播。

退出移动版