JUC包下有很多的工具类都是基于 AQS(AbstractQueuedSynchronizer) 实现. 故深刻理解这部分内容十分重要. 尽管从代码角度AQS只是一个模板类,但波及的概念和细节特地多,防止忘记,做个总结. 会继续补充
  • AQS 实现原理

AQS 是由一个双端链表形成, 每个节点(Node)蕴含指向前后两个节点的指针(pre,next),一个代表以后竞争的资源状态(state),一个代表期待队列外面节点的期待状态(waitStatus)
对于期待状态的含意值,形容如下:

waitStatus:

  1. CANCEL(1): 在队列中期待的线程被中断或勾销. 这类节点会被移除队列
  2. SIGNAL(-1): 示意以后节点的后继节点处于阻塞状态,须要被我唤醒
  3. CONDITION(-2): 示意自身再 期待队列 中, 期待一个condition, 当其余线程调用condition.signal 办法时,才会将这样的节点搁置在同步队列中
  4. PROPAGATE(-3): 共享模式下应用,示意在可运行状态.

0: 初始化状态

重点剖析以下几个办法

    public final void acquire(int arg) {        //调用子类尝试获取资源    if (!tryAcquire(arg) &&        //没获取到的话,放入队列, 并且再队列里自旋获取锁资源(acquireQueued)        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        //如果期待途中被中断,则复原中断         selfInterrupt();    }     //入期待队列     private Node addWaiter(Node mode) {        Node node = new Node(Thread.currentThread(), mode);        // Try the fast path of enq; backup to full enq on failure        Node pred = tail;        if (pred != null) {            //调整新入队节点的前置指针            node.prev = pred;            //调整尾指针指向新入队的节点, 并发故用cas             if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        //自旋入队        enq(node);        return node;    }    **重要**    /**    * 在队列里自旋查看可能获取资源,然而也不是始终自旋, 如果线程很多,始终自旋会耗费cpu资源,      对于前置节点 的waitStatus是 Signal 的话,就意味着我须要parking(parking操作会将上下文由用户态转化为内核态,频繁park/unpark会增多上下文切换)    */    final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            //自旋 判断前驱节点是不是头结点, 即判断 是不是轮到我来竞争资源了            for (;;) {                final Node p = node.predecessor();                //如果是并且胜利获取了资源, 调整指针, 设置队头是以后节点                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                //依据前置节点的waitStatus是不是 signal 判断是否须要park                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }    //如果前置节点 waitStatus 是 signal 状态,则以后节点park 期待.    // 否则向前查问,将以后节点排到最近的signal状态节点的前面    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        int ws = pred.waitStatus;        if (ws == Node.SIGNAL)            /*             * This node has already set status asking a release             * to signal it, so it can safely park.             */            return true;        if (ws > 0) {            /*             * Predecessor was cancelled. Skip over predecessors and             * indicate retry.             */            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            /*             * waitStatus must be 0 or PROPAGATE.  Indicate that we             * need a signal, but don't park yet.  Caller will need to             * retry to make sure it cannot acquire before parking.             */             //设置之前失常的节点,状态为SIGNAL.             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }    public final boolean release(int arg) {        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }     //从头结点开始查找下一个须要唤醒的就"非勾销" 节点     private void unparkSuccessor(Node node) {        /*         * If status is negative (i.e., possibly needing signal) try         * to clear in anticipation of signalling.  It is OK if this         * fails or if status is changed by waiting thread.         */        int ws = node.waitStatus;        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        /*         * Thread to unpark is held in successor, which is normally         * just the next node.  But if cancelled or apparently null,         * traverse backwards from tail to find the actual         * non-cancelled successor.         */        Node s = node.next;        if (s == null || s.waitStatus > 0) {            s = null;            // 第一次没找到的话, 从队尾开始找            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus <= 0)                    s = t;        }        if (s != null)            LockSupport.unpark(s.thread);    }

AQS 实现原理再总结
同步队列调用tryAcquire可重写办法来判断是否曾经获取竞争资源,如果没有获取,就将以后线程包装成节点入队列,而后再自旋获取资源.是否自旋取决于前置节点的waitStatus,如果前置节点的waitStatus的状态是signal,则代表,以后节点须要parking期待,parkding期待的目标是为了缩小cpu空转,但会减少线程上下文切换,因为parking的原理是将用户态数据转为内核态. 前面unpark的操作则是将线程状态数据由内核态转为用户态. 等到前置节点release开释掉竞争状态后,前面的自旋判断就会竞争获取状态反复以上过程

画外音:AQS 其实是操作系统外面管程 模型的一种体现,将线程之间的同步
帮助用一个同一个的对象来进行治理. AQS 中的waitStatus其实就是这种思维的体现.

AQS 利用

AQS 根底章节 介绍了AQS 模板类的原理,以下介绍下juc上面基于AQS实现的并发工具类,从而对本人实现AQS有些启发
  1. ReentrantLock
// 默认的非偏心实现: cas 获取竞争状态.设置以后独占线程. 偏心版的则是入队列       final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                //如果还没有占用锁, cas 占用锁资源,并设置互斥线程 为本人                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            //如果互斥线程曾经是本人了,则减少重入次数            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }    //开释锁,则是减去须要开释的状态值并更新状态和独占线程   protected final boolean tryRelease(int releases) {            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        } 
  1. ReentrantReadWriteLock
读写锁实现中, 一个整型32位代表锁状态,前16位代表读锁数, 后16位代表写锁.读读可同时进行(有条件:前置节点不是写),写互斥,读写互斥

readLock.lock实现

protected final int tryAcquireShared(int unused) {            Thread current = Thread.currentThread();            int c = getState();            //如果曾经有写锁了并且不是本人,则间接返回            if (exclusiveCount(c) != 0 &&                getExclusiveOwnerThread() != current)                return -1;            int r = sharedCount(c);            //偏心和非偏心实现读是否阻塞稍后剖析            if (!readerShouldBlock() &&                r < MAX_COUNT &&                //如果不须要阻塞,则cas 加上要读锁个数 c, SHARED_UNIT为2进制的16左移一位,即第17位为1其它位都为0                //故  c + SHARED_UNIT 简略的 +c                 compareAndSetState(c, c + SHARED_UNIT)) {                if (r == 0) {                    firstReader = current;                    firstReaderHoldCount = 1;                } else if (firstReader == current) {                    firstReaderHoldCount++;                } else {                    HoldCounter rh = cachedHoldCounter;                    if (rh == null || rh.tid != getThreadId(current))                        cachedHoldCounter = rh = readHolds.get();                    else if (rh.count == 0)                        readHolds.set(rh);                    rh.count++;                }                return 1;            }            return fullTryAcquireShared(current);}

writeLock.lock 实现

protected final boolean tryAcquire(int acquires) {                    Thread current = Thread.currentThread();            int c = getState();            //取低位16位值.也即写锁的个数            int w = exclusiveCount(c);            //代表曾经有读锁或写锁存在            if (c != 0) {                // (Note: if c != 0 and w == 0 then shared count != 0)                //如果写锁为0, 或者独占线程不是本人,则间接失败.                 // c!=0 并且 w = 0 代表,此刻有读但没有写,有读的时候不容许写,故间接返回false                if (w == 0 || current != getExclusiveOwnerThread())                    return false;                //重入不能超过最大次数.                 if (w + exclusiveCount(acquires) > MAX_COUNT)                    throw new Error("Maximum lock count exceeded");                // Reentrant acquire                setState(c + acquires);                return true;            }            // fair 的写阻塞策略则是老老实实排队.如果后面没有期待节点的话,则不阻塞            // unfair 的写阻塞策略是不阻塞,间接竞争             if (writerShouldBlock() ||                !compareAndSetState(c, c + acquires))                return false;            setExclusiveOwnerThread(current);            return true;}

readLock 逻辑

 protected final int tryAcquireShared(int unused) {            Thread current = Thread.currentThread();            int c = getState();            //有写存在,并且不是本人间接返回.             if (exclusiveCount(c) != 0 &&                getExclusiveOwnerThread() != current)                return -1;            //读未阻塞并且没有超过高位16位最大值并且cas减少读的个数胜利的话,则获取到读锁            int r = sharedCount(c);            if (!readerShouldBlock() &&                r < MAX_COUNT &&                compareAndSetState(c, c + SHARED_UNIT)) {                if (r == 0) {                    //设置第一个读的线程                    firstReader = current;                    //初始化第一个读线程的重入数                    firstReaderHoldCount = 1;                } else if (firstReader == current) {                    //减少第一个读线程的重入数                    firstReaderHoldCount++;                } else {                    //以下则是其余的读线程逻辑:                    //cachedHoldCounter 是最初一个读线程的重入数                    HoldCounter rh = cachedHoldCounter;                    //cachedHoldCounter 始终存储最初一个读线程的重入数                    if (rh == null || rh.tid != getThreadId(current))                        cachedHoldCounter = rh = readHolds.get();                    else if (rh.count == 0)                        readHolds.set(rh);                   //读重入数++                                         rh.count++;                }                return 1;            }            return fullTryAcquireShared(current);}

读写锁的留神点

默认的unfair策略 读写读 场景, 第二个读不会因为本人是读锁就获取读锁. 因为读的时候不能写, 为了防止写饥渴, 如果读后面是写的话,须要等后面的写做完当前能力读. 对应代码如下

  final boolean apparentlyFirstQueuedIsExclusive() {        Node h, s;        return (h = head) != null &&            (s = h.next)  != null &&            //后面一个如果是写的话,读就会排队阻塞            !s.isShared()         &&            s.thread != null;    }
  1. countDownLatch 实现

await 办法

private static final class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 4982264981922014374L;        Sync(int count) {            setState(count);        }        int getCount() {            return getState();        }        // await办法期待,会回调这个办法. 当count降为0的时,不阻塞,故返回大于0(1)        protected int tryAcquireShared(int acquires) {            return (getState() == 0) ? 1 : -1;        }        //countdown 回调这个办法, 故是state -1        protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            for (;;) {                int c = getState();                //曾经减为0,曾经全副开释,间接放回false                if (c == 0)                    return false;                int nextc = c-1;                //cas 设置 减掉的 count.count = 0 则唤醒await                 if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }    }