关于java:AQS学习记录

38次阅读

共计 5015 个字符,预计需要花费 13 分钟才能阅读完成。

AQS 学习记录

一、AQS 的独占模式

1、获取资源

acquire办法用于线程获取资源,获取不到就进入期待队列,并设置为独占模式,胜利后,设置线程中断

    public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

AQS 只是一个框架,具体资源的获取 / 开释形式交由自定义同步器去实现,这里的接口没有定义成 abstract 是因为独占模式下只需实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。因而没必要定义成 abstract 类型。

   protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
    }

如果获取资源失败,就调用 addWaiter 办法,结构以后线程的 Node 节点,设置为期待队列的尾节点

    private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
        // 通过 CAS,将新的 Node 设置为 tail 节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node); // 如果设置尾节点失败,则通过自旋的形式来设置尾节点
        return node;
    }

设置尾节点失败时,则调用 enq(final Node node) 办法,通过自旋的形式,将以后 node 设置为尾节点直到胜利。

private Node enq(final Node node) {for (;;) {
            // 这里有个细节,将 volatile 变量 tail 赋给 t, 这样不必每次去主存读取,进步了效率
            Node t = tail;
            // 如果 tail 是 null,就创立一个虚构节点,同时指向 head 和 tail,称为 初始化。// 每个节点都必须设置前置节点的 ws 状态为 SIGNAL,所以必须要一个前置节点,// 对于第一个节点怎么办?它是没有前置节点的,这里须要创立一个假的。if (t == null) {if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                 // 通过 CAS 将新节点追加到 tail 节点前面,并更新队列的 tail 为新节点, 直到胜利
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

当初再回头看 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 办法,该办法则是通过自旋形式获取资源,for 循环中:判断以后 node 的前驱节点是不是 Head 节点(Head 节点仅仅代表头结点,外面没有寄存线程援用,示意以后线程曾经获取到资源),如果是,则以后 node 尝试获取资源,获取资源胜利,则将本人更新为 Head 节点,未获取到资源,则 waite。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; // 标记是否胜利拿到资源
        try {
            boolean interrupted = false;
            for (;;) {final Node p = node.predecessor(); // 获取 node 的前驱节点,留神 node 当初是尾节点了
   
                if (p == head && tryAcquire(arg)) {setHead(node); //node 设置为 Head 节点(Head 节点不寄存线程援用)p.next = null; // 旧的头结点出列,帮忙 GC 回收
                    failed = false;
                    return interrupted; // 返回 node 中线程在期待过程中是否被中断过
                }
                // 以后线程进入 waite 状态,且将前驱节点的转态设置为 signal,用于唤醒本人,返回中断状态状态
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; // 如果期待过程中被中断过就将 interrupted 标记置为 true
            }
        } finally {if (failed)
                cancelAcquire(node); // 仅在获取锁产生异样的时候,勾销以后 node 获取资源的资格
        }
    }

shouldParkAfterFailedAcquire办法次要是将前驱节点的转态设置为 signal(如果前驱转态 >0,即勾销转态,那么就跳过它,找到未被勾销的节点作为以后 node 的前驱节点),这样以后 node 就能够平安地进入阻塞状态,一旦 waiteStatus 为 signal 的前驱节点开释锁,就会唤醒以后 Node。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
         // 如果他的上一个节点的 ws 是 SIGNAL,他就须要阻塞。return true;
    if (ws > 0) {
         // 如果前驱节点的状态为勾销,则将前驱节点的前驱作为 node 的前驱
         // 按此流程,始终找到未勾销状态的前驱节点
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         // 如果没有勾销 || 0 || CONDITION || PROPAGATE,那么就将后任的 ws 设置成 SIGNAL.
         // 必须是 SIGNAL,是心愿本人的上一个节点在开释锁的时候,告诉本人(让本人获取锁)compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt该办法用于阻塞以后的线程,并返回以后线程的中断状态(留神:Thread.interrupted()办法会革除中断状态)

private final boolean parkAndCheckInterrupt() {LockSupport.park(this);  // 将以后线程阻塞
    return Thread.interrupted();// 如果被唤醒,查看本人是不是被中断的。}

下面提到 acquireQueued(final Node node, int arg) 中,获取锁的形式是一个死循环,胜利:会将的标记位 failed(标记以后 Node 是否胜利获取资源)置为 false,如果失败:抛出异样,则执行执行 finally 中 cancelAcquire(node) 办法,勾销以后线程申请资源的操作。该办法负责将须要勾销的 node 踢出期待队列

private void cancelAcquire(Node node) {if (node == null)
            return;

        node.thread = null;

        // 跳过被勾销的前驱节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // 如果 node 是 tail 节点,则比较简单,间接移出队列,将 pred 设置为 tailed 节点
        if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
          // 如果 pred 不是 head 节点,则肯定要设置它的 ws 为 signal,用于告诉前面节点去获取资源
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                // 走到这里,阐明 node 不是 tail,则 next!=null? 这里将满足条件的 next 设置为 pred.next, 否则维持不变
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

当下面的办法 cancelAcquire(node) 中,pred 为 head 节点,或者未胜利设置 signal 时,如果勾销了 node, 那么后继节点就没人能唤醒,unparkSuccessor(node) 办法负责从 tail 开始遍历,找到 node 前面第一个未被勾销的节点, 将其唤醒。
这里解析下:之所以从尾部开始遍历,是为了能保障遍历到所有的节点. 比方,以后 node 是前 tail 节点,新的 node2 正在变成 tail 节点,然而 addWaiterpred.next = node;并不是原子操作,很可能这步还未来得及执行,如果正向遍历,node.next,会为 null, 就会脱漏新退出的节点,然而从 tail 开始遍历必定没问题,因为在设置 tail 节点时,compareAndSetTail(pred, node)是原子操作。

 private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 从 tail 开始遍历,找到一个未被勾销的节点 node, 将其唤醒
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

开释资源

开释资源的的办法 tryRelease(arg)tryAcquire(arg)一样,都是交给子类去具体实现,胜利开释资源后,须要调用unparkSuccessor(h),从 tail 开始,唤醒一个期待状态为非勾销状态的 node。

public final boolean release(int arg) {if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

外部类 Node 中几个参数

  • static final int CANCELLED = 1 : 示意以后的线程被勾销
  • static final int SIGNAL = -1 : 前驱节点期待状态设置为 SIGNAL,在开释锁时,会告诉后继节点。
  • static final int CONDITION = -2:该标识的结点处于期待队列中,结点的线程期待在 Condition 上,当其余线程调用了 Condition 的 signal()办法后,CONDITION 状态的结点将从期待队列转移到同步队列中,期待获取同步锁。
  • static final int PROPAGATE = -3:与共享模式相干,在共享模式中,该状态标识结点的线程处于可运行状态。
  • 0 状态:值为 0,代表初始化状态,AQS 在判断状态时,通过用 waitStatus>0 示意勾销状态,而 waitStatus<0 示意无效状态。

正文完
 0