关于并发编程:并发AQS原理及应用

43次阅读

共计 7162 个字符,预计需要花费 18 分钟才能阅读完成。

JUC 包下有很多的工具类都是基于 AQS(AbstractQueuedSynchronizer) 实现. 故深刻理解这部分内容十分重要. 尽管从代码角度 AQS 只是一个模板类, 但波及的概念和细节特地多, 防止忘记, 做个总结. 会继续补充

  • AQS 实现原理

AQS 是由一个双端链表形成, 每个节点 (Node) 蕴含指向前后两个节点的指针 (pre,next), 一个代表以后竞争的资源状态(state), 一个代表期待队列外面节点的期待状态(waitStatus)
对于期待状态的含意值, 形容如下:

waitStatus:

  1. CANCEL(1): 在队列中期待的线程被中断或勾销. 这类节点会被移除队列
  2. SIGNAL(-1): 示意以后节点的后继节点处于阻塞状态, 须要被我唤醒
  3. CONDITION(-2): 示意自身再 期待队列 中, 期待一个 condition, 当其余线程调用 condition.signal 办法时, 才会将这样的节点搁置在同步队列中
  4. PROPAGATE(-3): 共享模式下应用, 示意在可运行状态.

0: 初始化状态

重点剖析以下几个办法

    public final void acquire(int arg) {
        // 调用子类尝试获取资源
    if (!tryAcquire(arg) &&
        // 没获取到的话, 放入队列, 并且再队列里自旋获取锁资源(acquireQueued)
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果期待途中被中断, 则复原中断 
        selfInterrupt();}

     // 入期待队列
     private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            // 调整新入队节点的前置指针
            node.prev = pred;
            // 调整尾指针指向新入队的节点, 并发故用 cas 
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 自旋入队
        enq(node);
        return node;
    }

    ** 重要 **
    /**
    * 在队列里自旋查看可能获取资源, 然而也不是始终自旋, 如果线程很多, 始终自旋会耗费 cpu 资源,
      对于前置节点 的 waitStatus 是 Signal 的话, 就意味着我须要 parking(parking 操作会将上下文由用户态转化为内核态, 频繁 park/unpark 会增多上下文切换)
    */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋 判断前驱节点是不是头结点, 即判断 是不是轮到我来竞争资源了
            for (;;) {final Node p = node.predecessor();
                // 如果是并且胜利获取了资源, 调整指针, 设置队头是以后节点
                if (p == head && tryAcquire(arg)) {setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 依据前置节点的 waitStatus 是不是 signal 判断是否须要 park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

    // 如果前置节点 waitStatus 是 signal 状态, 则以后节点 park 期待.
    // 否则向前查问, 将以后节点排到最近的 signal 状态节点的前面
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             // 设置之前失常的节点, 状态为 SIGNAL. 
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    public final boolean release(int arg) {if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

     // 从头结点开始查找下一个须要唤醒的就 "非勾销" 节点
     private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 第一次没找到的话, 从队尾开始找
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

AQS 实现原理再总结
同步队列调用 tryAcquire 可重写办法来判断是否曾经获取竞争资源, 如果没有获取, 就将以后线程包装成节点入队列, 而后再自旋获取资源. 是否自旋取决于前置节点的 waitStatus, 如果前置节点的 waitStatus 的状态是 signal, 则代表, 以后节点须要 parking 期待,parkding 期待的目标是为了缩小 cpu 空转, 但会减少线程上下文切换, 因为 parking 的原理是将用户态数据转为内核态. 前面 unpark 的操作则是将线程状态数据由内核态转为用户态. 等到前置节点 release 开释掉竞争状态后, 前面的自旋判断就会竞争获取状态反复以上过程

画外音:AQS 其实是操作系统外面管程 模型的一种体现, 将线程之间的同步
帮助用一个同一个的对象来进行治理. AQS 中的 waitStatus 其实就是这种思维的体现.

AQS 利用

AQS 根底章节 介绍了 AQS 模板类的原理, 以下介绍下 juc 上面基于 AQS 实现的并发工具类, 从而对本人实现 AQS 有些启发

  1. ReentrantLock
// 默认的非偏心实现: cas 获取竞争状态. 设置以后独占线程. 偏心版的则是入队列
    
   final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 如果还没有占用锁, cas 占用锁资源, 并设置互斥线程 为本人
                if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果互斥线程曾经是本人了, 则减少重入次数
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    // 开释锁, 则是减去须要开释的状态值并更新状态和独占线程
   protected final boolean tryRelease(int releases) {int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        } 
  1. ReentrantReadWriteLock

读写锁实现中, 一个整型 32 位代表锁状态, 前 16 位代表读锁数, 后 16 位代表写锁. 读读可同时进行(有条件: 前置节点不是写), 写互斥, 读写互斥

readLock.lock 实现

protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
            int c = getState();
            // 如果曾经有写锁了并且不是本人, 则间接返回
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            // 偏心和非偏心实现读是否阻塞稍后剖析
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                // 如果不须要阻塞, 则 cas 加上要读锁个数 c, SHARED_UNIT 为 2 进制的 16 左移一位, 即第 17 位为 1 其它位都为 0
                // 故  c + SHARED_UNIT 简略的 +c 
                compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {firstReaderHoldCount++;} else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
}

writeLock.lock 实现


protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();
            int c = getState();
            // 取低位 16 位值. 也即写锁的个数
            int w = exclusiveCount(c);
            // 代表曾经有读锁或写锁存在
            if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)
                // 如果写锁为 0, 或者独占线程不是本人, 则间接失败. 
                // c!=0 并且 w = 0 代表, 此刻有读但没有写, 有读的时候不容许写, 故间接返回 false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 重入不能超过最大次数. 
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            // fair 的写阻塞策略则是老老实实排队. 如果后面没有期待节点的话, 则不阻塞
            // unfair 的写阻塞策略是不阻塞, 间接竞争 
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
}

readLock 逻辑

 protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
            int c = getState();
            // 有写存在, 并且不是本人间接返回. 
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 读未阻塞并且没有超过高位 16 位最大值并且 cas 减少读的个数胜利的话, 则获取到读锁
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {
                    // 设置第一个读的线程
                    firstReader = current;
                    // 初始化第一个读线程的重入数
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 减少第一个读线程的重入数
                    firstReaderHoldCount++;
                } else {
                    // 以下则是其余的读线程逻辑:
                    //cachedHoldCounter 是最初一个读线程的重入数
                    HoldCounter rh = cachedHoldCounter;
                    //cachedHoldCounter 始终存储最初一个读线程的重入数
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                   // 读重入数 ++                     
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
}

读写锁的留神点

默认的 unfair 策略 读写读 场景, 第二个读不会因为本人是读锁就获取读锁. 因为读的时候不能写, 为了防止写饥渴, 如果读后面是写的话, 须要等后面的写做完当前能力读. 对应代码如下

  final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            // 后面一个如果是写的话, 读就会排队阻塞
            !s.isShared()         &&
            s.thread != null;
    }
  1. countDownLatch 实现

await 办法

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {setState(count);
        }

        int getCount() {return getState();
        }

        // await 办法期待, 会回调这个办法. 当 count 降为 0 的时, 不阻塞, 故返回大于 0(1)
        protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
        }
        //countdown 回调这个办法, 故是 state -1
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {int c = getState();
                // 曾经减为 0, 曾经全副开释, 间接放回 false
                if (c == 0)
                    return false;
                int nextc = c-1;
                //cas 设置 减掉的 count.count = 0 则唤醒 await 
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

正文完
 0