ReentrantLock 特色
特点:
1.可重入
2.偏心/非偏心
3.可中断
4.反对条件期待
5.可设置锁超时
罕用 API
应用例子:
public class ReentrantLockTest { static ReentrantLock lock = new ReentrantLock(true); static class ClientThread extends Thread { @Override public void run() { System.out.println(Thread.currentThread() + "开始尝试获取锁"); lock.lock(); try { System.out.println(Thread.currentThread() + "胜利获取锁"); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println(Thread.currentThread() + "实现开释锁"); } } } public static void main(String[] args) throws InterruptedException { ClientThread t1 = new ClientThread(); ClientThread t2 = new ClientThread(); ClientThread t3 = new ClientThread(); t1.start(); t2.start(); t3.start(); TimeUnit.SECONDS.sleep(10); }}
源码剖析
获取锁
如果我应用上面的代码进行获取就行:
ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();
ReentrantLock 默认调用的就是非偏心锁 调用栈: java.util.concurrent.locks.ReentrantLock#lock
java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
final void lock() { // 间接尝试加锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 如果获取锁失败进入 AQS acquire 逻辑 acquire(1);}
如果 compareAndSetState(0, 1)可能间接执行胜利,那么将间接完结办法的执行。 如果失败,那么就会调用acquire 办法如下:
public final void acquire(int arg) { // tryAcquire(arg) 尝试获取锁 // acquireQueued 获取锁失败进行期待队列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
咱们先看 tryAcquire办法: java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
- java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
他会间接调用到 nonfairTryAcquire非偏心锁的加锁逻辑 外面有两个逻辑:
- 如果以后状态无锁,间接尝试加锁,加锁胜利返回 true
- 如果以后时锁重入,那么间接批改 AQS 状态共享变量值 state 等于 c + acquires, 加锁胜利返回 ture
如果都不满足,那么返回加锁失败返回 false
// 非偏心锁的逻辑// 如何了解插队, 这里的插队是以后队列中被唤醒的线程, 和以后退出的线程都能够被执行// 如果以后退出线程比队列中唤醒的线程先获取到锁, 就是插队景象final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 无锁状态, 尝试竞争 if (c == 0) { if (compareAndSetState(0, acquires)) { //是否获取到锁 setExclusiveOwnerThread(current); return true; } } // 以后线程持有锁, state 计数 +1 else if (current == getExclusiveOwnerThread()) { //判断是否是重入 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}
如果 tryAcquire 调用实现后是获取锁胜利 acquire办法执行完结,最初代表 lock 办法执行完结。
获取锁失败进入同步队列
如果获取锁失败,那么就会执行 acquire代码前面段 if 逻辑的执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这里其实能够分为两个办法来看
- addWaiter(Node.EXCLUSIVE)
- acquireQueued(xxx, arg)
依照执行程序,咱们先看 addWaiter(Node.EXCLUSIVE) 这里次要是入队的逻辑。 addWaiter: java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) { // 将以后线程转换为 AQS Node 节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 尝试间接退出到尾节点 Node pred = tail; if (pred != null) { // 以后节点的前驱节点指向尾节点 node.prev = pred; // cas 批改 tail 节点,如果胜利返回 node if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果失败,调用 enq enq(node); return node;}
enq是将以后节点插入队列,必要的时候会进行初始化
//将节点插入队列,必要时进行初始化。private Node enq(final Node node) { for (;;) { Node t = tail; // 如果没有尾节点,那么须要进行初始化 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } // 如果有尾节点/其实就是有头节点/曾经被初始化,通过 CAS 入队 else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
后面咱们看看完了,以后获取锁的线程当获取锁失败的时候,胜利进入 AQS 队列,接下来咱们持续看 acquireQueued又做了什么呢?
- 如果是队列头节点,会再次尝试获取锁
如果批改 java.util.concurrent.locks.AbstractQueuedSynchronizer.Node状态位
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 是否中断 boolean interrupted = false; for (;;) { // 获取 node 的前驱节点 final Node p = node.predecessor(); // 如果是头节点,再次尝试获取锁 if (p == head && tryAcquire(arg)) { // 将 node 设置为 头节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判断是否须要进行阻塞以后线程 if (shouldParkAfterFailedAcquire(p, node) && // 阻塞线程 parkAndCheckInterrupt()) interrupted = true; } } finally { // 是否失败 if (failed) // 如果失败,勾销获取锁 cancelAcquire(node); }}
下面咱们能够看到,for (;;)中有两个判断
- 如果是头节点,就调用tryAcquire尝试获取锁 (之前咱们曾经剖析过 tryAcquire 了,咱们次要看前面个 if )
- 如果不是就进入 shouldParkAfterFailedAcquire 办法
在调用 acquireQueued这个过程中可能调用屡次 shouldParkAfterFailedAcquire 办法。shouldParkAfterFailedAcquire 会执行一下几个操作。
- 能够用来批改以后节点的状态,
和对链表上有效的节点出队
/** * 当获取锁失败后, 查看更新新节点状态如果是须要阻塞返回, true * <p> * 一个前继节点 waitStatus = 0, 第一次将持续设置为 SIGNAL, 通知以后线程筹备进入阻塞, 此时仍旧获取不到, 以后线程进入阻塞 * * @param pred 前继节点 * @param node 以后节点 * @return {@code true} if thread should block */private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前继节点的状态, 第一次进入的话, 肯定是 0 if (ws == Node.SIGNAL) return true; if (ws > 0) { do { // 出队, 剔除有效的节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 第一次进来, pred.waitStatus = 0 执行这个分支 // 将前继节点的状态批改为 SIGNAL, 示意 pred.next 节点须要被唤醒(此时筹备进入阻塞, 然而还未被阻塞, 再次获取锁失败之后才会被阻塞) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
当 Node 被批改 Node.SIGNAL状态后,第一个 if 返回 true , 咱们再次回到 acquireQueued 办法,就会执行 parkAndCheckInterrupt 办法,就是将以后的线程 park 而后返回以后线程的中断状态。
private final boolean parkAndCheckInterrupt() { // 阻塞线程 LockSupport.park(this); // 返回线程中断状态 return Thread.interrupted();}
留神:这里线程 park 过后,其实获取锁就完结了前半段的操作,实现同步队列的入队,并且进入期待。咱们就须要期待解锁唤醒。
开释锁
开释锁的代码如下:
lock.unlock();
开释锁做了什么呢?
- 开释以后锁的状态
- 在 AQS 队列中去唤醒排队的头节点
调用栈如下: java.util.concurrent.locks.ReentrantLock#unlock
- java.util.concurrent.locks.AbstractQueuedSynchronizer#release
咱们能够从 release办法开始
// 解锁public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; // 判断是否有须要唤醒的线程 if (h != null && h.waitStatus != 0) //waitStatus 的值为 0, 只有当后继存在节点才会被设置为该值不为 0, 此时须要唤醒后继线程 unparkSuccessor(h); return true; } return false;}
开释锁,次要是调用 tryRelease, 首先就是思考之前的重入问题,间接对 state 进行 -1 ,而后如果 c == 0示意以后线程不再持有锁,咱们就能够批改 ownerThread == null . 这个时候,最初批改 state 为新值。
// tryRelease protected final boolean tryRelease(int releases) { int c = getState() - releases; // 判断是否是以后线程持有锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 如果 state == 0 示意以后线程不在占有该锁 free = true; setExclusiveOwnerThread(null); } setState(c); return free;}
开释锁胜利后,再次回到 release办法,会再次判断,如果 AQS 队列不为空,那么就进行排队线程唤醒。 次要是调用 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
// 唤醒队列中的线程private void unparkSuccessor(Node node) { // 将以后节点状态批改为 0 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 反向查找 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒队列中的节点 LockSupport.unpark(s.thread);}
其实这里最要害的就是 LockSupport.unpark(s.thread); 这里就会回到 acquireQueued,执行唤醒后强锁的逻辑,仍然在 acquireQueued外面。
开释锁后唤醒期待节点
以后节点被唤醒逻辑,首先会在 shouldParkAfterFailedAcquire 办法中出队,而后尝试加锁如果加锁胜利就返回 true.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 出队的逻辑 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
再次竞争锁,次要是在acquireQueued办法中调用 tryAcquire办法进行获取锁。如果获取锁失败,就又再次获取锁,如果获取锁胜利返回。
测试和实际
反对锁中断
如果通过 lock_.lockInterruptibly(); 形式加锁,如果以后线程呈现中断过后,会抛出 _java.lang.InterruptedException线程中断异样,所以 ReentrantLock反对可中断。 相干源码:
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */private final boolean parkAndCheckInterrupt() { // LockSupport.park 会革除中断信号 LockSupport.park(this); return Thread.interrupted();}// private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 抛出中断异样 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
试验代码:
public class ReentrantLockTest { static ReentrantLock lock = new ReentrantLock(true); static class ClientThread implements Runnable { @SneakyThrows @Override public void run() { System.out.println(Thread.currentThread() + "开始尝试获取锁"); lock.lockInterruptibly(); try { System.out.println(Thread.currentThread() + "胜利获取锁"); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println(Thread.currentThread() + "实现开释锁"); } } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new ClientThread(), "t1"); Thread t2 = new Thread(new ClientThread(), "t2"); Thread t3 = new Thread(new ClientThread(), "t3"); t1.start(); t2.start(); // 锁中断 //lock.lockInterruptibly(); TimeUnit.SECONDS.sleep(1); t3.start(); TimeUnit.SECONDS.sleep(1); t3.interrupt(); TimeUnit.SECONDS.sleep(10); }}
输入后果:
Thread[t1,5,main]开始尝试获取锁Thread[t2,5,main]开始尝试获取锁Thread[t1,5,main]胜利获取锁Thread[t3,5,main]开始尝试获取锁Exception in thread "t3" java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at io.zhengsh.juc._1lock.reentrantlock.ReentrantLockTest$ClientThread.run(ReentrantLockTest.java:18) at java.lang.Thread.run(Thread.java:748)Thread[t1,5,main]实现开释锁Thread[t2,5,main]胜利获取锁Thread[t2,5,main]实现开释锁
获取锁设置超时
lock.tryLock(2, TimeUnit.SECONDS)能够反对设置获取锁的超时工夫,能够无效的防止线程饥饿问题 测试代码:
public class ReentrantLockTryTest { static ReentrantLock lock = new ReentrantLock(true); static class ClientThread implements Runnable { @Override public void run() { System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t开始尝试获取锁"); try { if (lock.tryLock(2, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t获取锁胜利"); TimeUnit.SECONDS.sleep(5); } else { System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t获取锁失败"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock.isHeldByCurrentThread() && lock.isLocked()) { lock.unlock(); System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t实现开释锁"); } } } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new ClientThread(), "t1"); Thread t2 = new Thread(new ClientThread(), "t2"); Thread t3 = new Thread(new ClientThread(), "t3"); t1.start(); t2.start(); t3.start(); //t1.interrupt(); TimeUnit.SECONDS.sleep(20); }}
输入后果
Thread[t1,5,main] 1653540581 开始尝试获取锁Thread[t3,5,main] 1653540581 开始尝试获取锁Thread[t2,5,main] 1653540581 开始尝试获取锁Thread[t1,5,main] 1653540581 获取锁胜利Thread[t3,5,main] 1653540583 获取锁失败Thread[t2,5,main] 1653540583 获取锁失败Thread[t1,5,main] 1653540586 实现开释锁
条件期待队列应用
Condition 是在 java 1.5 中才呈现的,它用来代替传统的Object的wait()、notify()实现线程间的合作,相比应用Object的wait()、notify(),应用Condition的await()、signal()这种形式实现线程间合作更加平安和高效。因而通常来说比拟举荐应用 Condition,阻塞队列实际上是应用了 Condition 来模仿线程间合作。 Condition 是个接口,根本的办法就是 await() 和 signal() 办法; Condition 依赖于 Lock 接口,生成一个 Condition 的根本代码是 lock.newCondition() 调用 Condition 的 await() 和 signal() 办法,都必须在 lock 爱护之内,就是说必须在 lock.lock() 和 lock.unlock 之间才能够应用:
- Conditon中的await()对应Object的wait();
- Condition中的signal()对应Object的notify();
- Condition中的signalAll()对应Object的notifyAll()。
测试场景: 上面一个场景,须要ABC3个线程,A线程打印1次,而后是B线程打印2次,再是C线程打印3次,线程交替打印。 ABC线程须要交替执行,咱们须要管制,线程的执行先后顺序 咱们能够应用多条件Condition来管制,每一个线程领有一个condition对象,调用各种的await办法,能够使线程期待,而后让别的线程调用这个condition对象的signal办法,唤醒线程。 代码如下:
public class ReentrantLockConditionTest { private int data = 1; private Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); public void printA() { lock.lock(); try { while (data != 1) { condition1.await(); } // 打印5次 for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + " ->" + data); } data = 2; // 告诉B线程 condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { while (data != 2) { condition2.await(); } // 打印10次 for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " ->" + data); } data = 3; // 告诉C condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { while (data != 3) { condition3.await(); } // 打印15次 for (int i = 0; i < 15; i++) { System.out.println(Thread.currentThread().getName() + " ->" + data); } data = 1; // 告诉A condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { ReentrantLockConditionTest conditionTest = new ReentrantLockConditionTest(); // A,B,C 交替执行 new Thread(conditionTest::printA, "A").start(); new Thread(conditionTest::printB, "B").start(); new Thread(conditionTest::printC, "C").start(); }}
输入后果如下:
A ->1B ->2B ->2C ->3C ->3C ->3