文章会首发公众号,搜寻关注:Different Java
摘要
- 如何保障线程平安
- JDK并发包中的同步控制
- JDK并发包中锁的特点
- 为什么要讲AQS
- AQS的外围数据结构
- AQS排他锁如何申请
- AQS排他锁如何开释
1. 如何保障线程平安
Java多线程在对共享资源进行拜访时,如果不加以控制会存在线程平安问题,当咱们应用多线程对共享资源拜访时,通常会线程共享资源的进行拜访线程数的管制:
- 共享锁:咱们会对共享资源分派许可,只有拿到许可的线程才能够拜访到共享资源,否则就须要期待有许可可用(当然这里也能够放弃期待)。拿到许可的线程在来到共享区间(不再访问共享资源)时必须要偿还许可,这样处于期待的线程才有可能拿到许可执行。(信号量采纳的就是这种机制)
- 排他锁:只有一个线程能够拜访到共享资源,其余线程在未拿到排他锁时只能在共享区间外期待。当持有锁的线程开释锁当前,其余期待的线程才有可能获取到锁执行
2. JDK并发包中的同步控制
Java提供了大量的工具让咱们对共享资源进行多线程访问控制,其中很多人最相熟的是synchronized关键字,除了该关键字,JDK的并发包中也提供了大量的类来进行同步控制,次要有:
- ReentrantLock
- Semaphore
- ReadWriteLock
- CountDownLatch
- CyclicBarrier
- LockSupport
- Condition
3. JDK包中的锁的特点
3.1 ReentrantLock
ReentrantLock的作用和synchronized的作用基本一致,然而ReentrantLock应该须要手动lock(如果锁已被占用则须要期待)和unlock,并且反对线程中断,反对偏心锁(依照申请锁的程序取得锁),尝试取得锁的时候能够指定最大等待时间(如果不指定工夫则不进行期待,拿到锁返回true,否则返回false)。
ReentrantLock搭配Condition的await()以及signal()相当于synchronized搭配Object.wait()以及notify()。
3.2 Semaphore
Semaphore(信号量)容许多个线程同时拜访一个共享资源,在结构信号量时必须指定容许的最大线程数,在应用信号量时,咱们会尝试获取一个许可,如果获取失败,则须要期待,直到有线程开释许可或者以后线程被中断。
3.3 ReadWriteLock
ReadWriteLock(读写锁)是读写拆散的锁,读与读之间不须要阻塞互斥,但读写、写写之间须要阻塞互斥。ReadWriteLock中能够获取一个读锁和一个写锁,通过读写锁能够进步读多写少的程序的并发运行效率。
3.4 CountDownLatch
CountDownLatch是一个倒计数器,在结构CountDownLatch时咱们须要传一个数字,通过调用CountDownLatch的await办法,咱们能够阻塞以后线程,直至倒计数器减到0,通过CountDownLatch的countdown()办法,咱们能够将倒计数器的值减1。
CountDownLatch的作用就好比乘坐飞机,咱们必须等所有的旅客检票实现当前,而后飞机能力腾飞。
3.4 CyclicBarrier
CyclicBarrier(循环栅栏),循环栅栏的作用相似CountDownLatch,然而他会在将计数器减到0当前从新复原原始计数器的值,循环的概念也就是这么来的,而且循环栅栏反对在计数器减到0当前触发一个动作(Runnable接口的实现类)。
3.5 LockSupport
LockSupport是一个线程阻塞工具,机制相似于信号量,通过park()能够生产一个许可,如果以后没有许可,线程会被阻塞,unpark()开释一个许可。LockSupport中的许可最多只有一个。
LockSupport不会造成死锁,假如咱们的线程限先执行了unpark,而后再调用park也不会阻塞(因为unpark曾经开释了一个许可可被取得应用)。
Thread.resume()假如先于Thread.suspend()办法先执行,则会导致线程死锁。
4. 为什么要讲AQS
通过上图咱们能够看出,前文咱们所提到的锁在其外部都有一个Sync类(采纳了组合的形式),而这个Sync类都继承自AbstractQueuedSynchronizer类。
4.1 AQS外围数据结构
在AQS中与两个外部类:
- ConditionObject:保留着条件变量期待队列(由Condition.await()引起的期待),最终实现也是Node
- Node:同步期待队列的具体实现类(保留在期待在这个锁上的线程)
4.2 Node
Node的数据结构如下:
- volatile Node prev:期待链表中的上一个元素
- volatile Node next:期待链表中的下一个元素
- volatile Thread thread:以后线程对象
- Node nextWaiter:下一个期待在条件变量队列中的节点
除了上述4个属性外,还有一个重要的属性就是:
- volatile int waitStatus
该属性示意的是节点在队列中的状态,括号中的为状态的int值:
- 初始状态(0)
- CANCELLED(1):线程勾销了期待,如果在获得锁的过程中产生了一些异样,则可能呈现勾销的状况,比方期待过程中呈现了中断异样或者timeout
- SIGNAL(-1):示意节点须要被唤醒
- CONDITION(-2):示意线程期待在条件变量中
- PROPAGATE(-3)
4.3 ConditionObject
因为咱们这篇文章咱们只专一剖析一般的锁,不波及条件变量期待,所以读者敌人们可自行浏览源码。
5. 排他锁申请
上面这张图是ReentrantLock应用排他锁的流程图:
上面咱们剖析一下排他锁的应用过程,咱们能够以ReentrantLock为切入点,剖析一下AQS的实现:
public void lock() { sync.lock();}
咱们在调用ReentrantLock的lock办法时,将会调用Sync的lock办法(),这里的Sync是一个抽象类,咱们这里次要看非偏心锁的实现形式,也就是NonfairSync的lock办法,如下:
final void lock() { // 1 if (compareAndSetState(0, 1)) // 2 setExclusiveOwnerThread(Thread.currentThread()); else // 3 acquire(1);}
- 通过CAS尝试性取得锁(扭转AQS中的state属性值为1),如果尝试取得锁胜利,执行第2步骤,否则执行第三步骤
- CAS获取锁胜利当前,设置持有锁的线程为以后线程
- 如果没有获取到锁,则调用acquire尝试性取得许可,因为这里是排他锁,有且只有一个线程能够持有该锁
5.1 acquire
acquire办法是AQS中的一个办法,如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
首先调用tryAcquire再次尝试获取锁,该办法在AQS中的实现是抛出UnsupportedOperationException异样,也就是说子类必须要实现这个办法,上面咱们看一下tryAcquire在ReentrantLock$NonfairSync办法中的实现,该办法最终会调用ReentrantLock$Sync中的nonfairTryAcquire(int acquires)办法如下:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}
这里的次要逻辑就是先获取一下锁的状态,如果锁的状态是未占有(state值为0),则再次应用CAS尝试去占有锁,如果尝试胜利,则返回true。
通过CAS尝试获取锁失败当前,判断锁是否被以后线程持有,如果是,调用setState(nextc)将AQS中的state的值+1,而后返回true(这里解决了重入的问题)。
上述两种状况如果都不成立,则返回false,示意以后线程获取锁失败。
5.2 acquireQueued办法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
当咱们尝试获取锁失败当前,会调用acquireQueued该线程增加至期待队列,那么这个期待队列中的Node是怎么构建进去的呢?答案就是addWaiter办法。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node;}
addWaiter办法中首先构建了一个Node节点,Node节点中的线程为以后线程,而后尝试将该节点增加到AQS的期待队列中,如果原来期待队列的尾节点不为NULL的会采纳CAS疾速失败的办法进行增加,如果胜利增加,则返回这个新节点,否则调用enq增加新节点到期待队列中,办法如下:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 如果队列中还没有节点,CAS初始化一个空的头结点,并把尾节点设置为头结点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // CAS设置以后节点node为尾节点,并把原来的尾节点的next指向以后节点node if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
如果队列中临时还没有节点的话,就初始化一个空的头节点,并将尾节点设置为头结点,而后将以后节点node的前一个节点改成tail节点,并通过CAS直至胜利将node改为tail节点,并且原先的tail节点的next批改为以后节点note,而后返回以后节点node的前置节点(也就是原来的tail节点)。
当节点增加胜利当前,咱们会被动调用acquireQueued尝试获取锁,办法如下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 开启自旋 for (;;) { // 获取以后节点node的前继节点 final Node p = node.predecessor(); // 如果前继节点是头结点(虚节点),那么意味着以后节点是第一个数据节点,那么久尝试获取锁,如果获取锁胜利,将以后节点设置为头结点,setHead办法会将节点中的线程和prev节点都置为空 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 如果node的前继节点不是头结点或者获取锁失败,那么须要判断以后节点是否须要挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 获取如果产生异样,则勾销获取锁申请 if (failed) cancelAcquire(node); }}
如果判断以后节点线程是否须要挂起,次要依附shouldParkAfterFailedAcquire来判断,如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
通过上述源码,咱们能够看到,只有以后节点的前继节点的状态是SINGAL(待唤醒状态)时,以后节点线程能够被挂起(这里次要是为了避免有限循环资源节约)。上述代码还解决前置节点被勾销了非凡状况,将以后节点的前置节点批改为队列中最初一个waitSatus不是勾销状态的节点。
将以后节点的线程挂起办法如下,实现采纳的LockSupport。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();}
勾销获取所申请的办法是cancelAcquire,如下:
private void cancelAcquire(Node node) { // 过滤有效节点 if (node == null) return; node.thread = null; // 跳过所有处于勾销的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 找到最初一个无效节点的后继节点 Node predNext = pred.next; // 将以后节点的状态设置为勾销 node.waitStatus = Node.CANCELLED; // 如果以后节点是尾节点,设置尾节点为最初一个无效节点 if (node == tail && compareAndSetTail(node, pred)) { // 将最初一个无效节点的next设置为null compareAndSetNext(pred, predNext, null); } else { // 如果最初一个无效节点不是头结点并且(最初一个无效节点是SINGAL状态或者能够被设置为SGINAL状态),并且最初一个无效节点的线程不为null int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; // 如果以后节点后的后置节点是无效节点(存在且状态不为勾销状态),设置最初一个无效节点的next为以后节点的后继节点 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 上述条件如果均不成立,唤醒以后节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC }}
6. 排他锁开释
public void unlock() { sync.release(1);}
ReentrantLock通过unlock办法来解锁,该办法会调用AQS中的release办法,如下:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}
tryRelease是由具体的AQS的子类来实现,上面看一下ReentrantLock中的Sync中的实现,如下:
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 只有持有锁的线程才能够执行unlock操作 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 解锁胜利,设置独占线程为空 free = true; setExclusiveOwnerThread(null); } setState(c); return free;}
上面再回到AQS的release办法,如下:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}
如果解锁胜利,并且以后节点状态不是初始状态,而后就调用unparkSuccessor办法唤醒head后继节点的线程,办法如下:
private void unparkSuccessor(Node node) { // 获取以后节点状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取节点的后继节点 Node s = node.next; // 如果节点的后继节点为null或者曾经被勾销,则从尾节点找到一个未勾销的节点进行唤醒 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread);}
解锁的过程还是绝对比较简单,当一个线程在开释ReentrantLock的时候,须要从期待队列中唤醒其余须要唤醒的节点。
本期的Java AQS介绍到这,我是shysh95,咱们下期再见!