JUC包下有很多的工具类都是基于 AQS(AbstractQueuedSynchronizer) 实现. 故深刻理解这部分内容十分重要. 尽管从代码角度AQS只是一个模板类,但波及的概念和细节特地多,防止忘记,做个总结. 会继续补充
- AQS 实现原理
AQS 是由一个双端链表形成, 每个节点(Node)蕴含指向前后两个节点的指针(pre,next),一个代表以后竞争的资源状态(state),一个代表期待队列外面节点的期待状态(waitStatus)
对于期待状态的含意值,形容如下:
waitStatus:
- CANCEL(1): 在队列中期待的线程被中断或勾销. 这类节点会被移除队列
- SIGNAL(-1): 示意以后节点的后继节点处于阻塞状态,须要被我唤醒
- CONDITION(-2): 示意自身再 期待队列 中, 期待一个condition, 当其余线程调用condition.signal 办法时,才会将这样的节点搁置在同步队列中
- PROPAGATE(-3): 共享模式下应用,示意在可运行状态.
0: 初始化状态
重点剖析以下几个办法
public final void acquire(int arg) { //调用子类尝试获取资源 if (!tryAcquire(arg) && //没获取到的话,放入队列, 并且再队列里自旋获取锁资源(acquireQueued) acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果期待途中被中断,则复原中断 selfInterrupt(); } //入期待队列 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { //调整新入队节点的前置指针 node.prev = pred; //调整尾指针指向新入队的节点, 并发故用cas if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //自旋入队 enq(node); return node; } **重要** /** * 在队列里自旋查看可能获取资源,然而也不是始终自旋, 如果线程很多,始终自旋会耗费cpu资源, 对于前置节点 的waitStatus是 Signal 的话,就意味着我须要parking(parking操作会将上下文由用户态转化为内核态,频繁park/unpark会增多上下文切换) */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋 判断前驱节点是不是头结点, 即判断 是不是轮到我来竞争资源了 for (;;) { final Node p = node.predecessor(); //如果是并且胜利获取了资源, 调整指针, 设置队头是以后节点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //依据前置节点的waitStatus是不是 signal 判断是否须要park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //如果前置节点 waitStatus 是 signal 状态,则以后节点park 期待. // 否则向前查问,将以后节点排到最近的signal状态节点的前面 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //设置之前失常的节点,状态为SIGNAL. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //从头结点开始查找下一个须要唤醒的就"非勾销" 节点 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 第一次没找到的话, 从队尾开始找 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
AQS 实现原理再总结
同步队列调用tryAcquire可重写办法来判断是否曾经获取竞争资源,如果没有获取,就将以后线程包装成节点入队列,而后再自旋获取资源.是否自旋取决于前置节点的waitStatus,如果前置节点的waitStatus的状态是signal,则代表,以后节点须要parking期待,parkding期待的目标是为了缩小cpu空转,但会减少线程上下文切换,因为parking的原理是将用户态数据转为内核态. 前面unpark的操作则是将线程状态数据由内核态转为用户态. 等到前置节点release开释掉竞争状态后,前面的自旋判断就会竞争获取状态反复以上过程
画外音:AQS 其实是操作系统外面管程 模型的一种体现,将线程之间的同步
帮助用一个同一个的对象来进行治理. AQS 中的waitStatus其实就是这种思维的体现.
AQS 利用
AQS 根底章节 介绍了AQS 模板类的原理,以下介绍下juc上面基于AQS实现的并发工具类,从而对本人实现AQS有些启发
- ReentrantLock
// 默认的非偏心实现: cas 获取竞争状态.设置以后独占线程. 偏心版的则是入队列 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果还没有占用锁, cas 占用锁资源,并设置互斥线程 为本人 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果互斥线程曾经是本人了,则减少重入次数 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //开释锁,则是减去须要开释的状态值并更新状态和独占线程 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
- ReentrantReadWriteLock
读写锁实现中, 一个整型32位代表锁状态,前16位代表读锁数, 后16位代表写锁.读读可同时进行(有条件:前置节点不是写),写互斥,读写互斥
readLock.lock实现
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //如果曾经有写锁了并且不是本人,则间接返回 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); //偏心和非偏心实现读是否阻塞稍后剖析 if (!readerShouldBlock() && r < MAX_COUNT && //如果不须要阻塞,则cas 加上要读锁个数 c, SHARED_UNIT为2进制的16左移一位,即第17位为1其它位都为0 //故 c + SHARED_UNIT 简略的 +c compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current);}
writeLock.lock 实现
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //取低位16位值.也即写锁的个数 int w = exclusiveCount(c); //代表曾经有读锁或写锁存在 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) //如果写锁为0, 或者独占线程不是本人,则间接失败. // c!=0 并且 w = 0 代表,此刻有读但没有写,有读的时候不容许写,故间接返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //重入不能超过最大次数. if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } // fair 的写阻塞策略则是老老实实排队.如果后面没有期待节点的话,则不阻塞 // unfair 的写阻塞策略是不阻塞,间接竞争 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true;}
readLock 逻辑
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //有写存在,并且不是本人间接返回. if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读未阻塞并且没有超过高位16位最大值并且cas减少读的个数胜利的话,则获取到读锁 int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { //设置第一个读的线程 firstReader = current; //初始化第一个读线程的重入数 firstReaderHoldCount = 1; } else if (firstReader == current) { //减少第一个读线程的重入数 firstReaderHoldCount++; } else { //以下则是其余的读线程逻辑: //cachedHoldCounter 是最初一个读线程的重入数 HoldCounter rh = cachedHoldCounter; //cachedHoldCounter 始终存储最初一个读线程的重入数 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //读重入数++ rh.count++; } return 1; } return fullTryAcquireShared(current);}
读写锁的留神点
默认的unfair策略 读写读 场景, 第二个读不会因为本人是读锁就获取读锁. 因为读的时候不能写, 为了防止写饥渴, 如果读后面是写的话,须要等后面的写做完当前能力读. 对应代码如下
final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && //后面一个如果是写的话,读就会排队阻塞 !s.isShared() && s.thread != null; }
- countDownLatch 实现
await 办法
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // await办法期待,会回调这个办法. 当count降为0的时,不阻塞,故返回大于0(1) protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //countdown 回调这个办法, 故是state -1 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); //曾经减为0,曾经全副开释,间接放回false if (c == 0) return false; int nextc = c-1; //cas 设置 减掉的 count.count = 0 则唤醒await if (compareAndSetState(c, nextc)) return nextc == 0; } } }