关于后端:探索抽象同步队列-AQS

13次阅读

共计 5193 个字符,预计需要花费 13 分钟才能阅读完成。

by emanjusaka from https://www.emanjusaka.top/archives/8 彼岸花开可奈何
本文欢送分享与聚合,全文转载请留下原文地址。

前言

AbstractQueuedSynchronizer 形象同步队列简称 AQS,它是实现同步器的根底组件,并发包中锁的底层就是应用 AQS 实现的。大多数开发者可能永远不会间接应用 AQS,然而晓得其原理对于架构设计还是很有帮忙的。AQS 是 Java 中的一个抽象类,全称是 AbstractQueuedSynchronizer,即形象队列同步器。它定义了两种资源共享模式:独占式和共享式。独占式每次只能有一个线程持有锁,例如 ReentrantLock 实现的就是独占式的锁资源;共享式容许多个线程同时获取锁,并发访问共享资源,ReentrantReadWriteLock 和 CountDownLatch 等就是实现的这种模式。AQS 保护了一个 volatile 的 state 变量和一个 FIFO(先进先出)的队列。其中 state 变量代表的是竞争资源标识,而队列代表的是竞争资源失败的线程排队时寄存的容器。

一、原理

AQS 的核心思想是通过一个 FIFO 的队列来治理线程的期待和唤醒,同时保护了一个 state 变量来示意同步状态,能够通过 getState、setState、compareAndSetState 函数批改其值。当一个线程想要获取锁时,如果 state 为 0,则示意该线程获取锁胜利,否则示意该线程获取锁失败。它将被放入期待队列中,直到满足特定条件能力再次尝试获取。当一个线程开释锁时,如果 state 为 1,则示意该线程开释锁胜利,否则示意该线程开释锁失败。AQS 通过 CAS 操作来实现加锁和解锁。

1.1 CLH 队列

​​

AQS 中的 CLH 队列锁是 CLH 锁的一种变体,将自旋操作改成了阻塞线程操作。AQS 中的对 CLH 锁数据结构的改良次要包含三方面:扩大每个节点的状态、显式的保护前驱节点和后继节点以及诸如出队节点显式设为 null 等辅助 GC 的优化。

在 AQS(AbstractQueuedSynchronizer)中应用的 CLH 队列,head 指针和 tail 指针别离指向 CLH 队列中的两个要害节点。

  1. head 指针:head 指针指向 CLH 队列中的首个节点,该节点示意以后持有锁的线程。当一个线程胜利地获取到锁时,它就成为了持有锁的线程,并且会将该信息记录在 head 指针所指向的节点中。
  2. tail 指针:tail 指针指向 CLH 队列中的最初一个节点,该节点示意队列中最初一个期待获取锁的线程。当一个线程尝试获取锁时,它会生成一个新的节点,并将其插入到 CLH 队列的尾部,而后成为 tail 指针所指向的节点。这样,tail 指针的作用是标记以后 CLH 队列中最初一个期待获取锁的线程。

通过 head 指针和 tail 指针,CLH 队列可能保护一种有序的期待队列构造,保障线程获取锁的程序和互斥拜访的正确性。当一个线程开释锁时,它会批改以后节点的状态,并唤醒后继节点上的线程,让后续的线程可能及时感知锁的开释,并抢夺获取锁的机会。

1.2 线程同步

对于 AQS 来说,线程同步的要害是对状态值 state 进行操作。state 为 0 时示意没有线程持有锁,大于 0 时示意有线程持有锁。依据 state 是否属于一个线程,操作 state 的形式分为独占形式和共享形式。

在独占形式下获取和开释资源应用的办法为:

  • void acquire(int arg)
  • void acquireInterruptibly(int arg)
  • boolean release(int arg)

应用独占形式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其余线程再尝试操作 state 获取资源时会发现以后该资源不是本人持有的,就会在获取失败后被阻塞。

在共享形式下获取和开释资源的办法为:

  • void acquireShared(int arg)
  • voidacquireSharedInterruptibly(int arg)
  • boolean releaseShared(int arg)。

对应共享形式的资源与具体线程是不相干的,当多个线程去申请资源时通过 CAS 形式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果以后资源还能满足它的须要,则以后线程只须要应用 CAS 形式进行获取即可。

二、资源获取与开释

2.1 独占式

  1. 当一个线程调用 acquire(int arg)办法获取独占资源时,会首先应用 tryAcquire 办法尝试获取资源,具体是设置状态变量 state 的值,胜利则间接返回,失败则将以后线程封装为类型为 Node.EXCLUSIVE 的 Node 节点后插入到 AQS 阻塞队列的尾部,并调用 LockSupport.park(this)办法挂起本人。

        public final void acquire(int arg) {if (! tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();}

<!—->

  1. 当一个线程调用 release(int arg)办法时会尝试应用 tryRelease 操作开释资源,这里是设置状态变量 state 的值,而后调用 LockSupport.unpark(thread)办法激活 AQS 队列外面被阻塞的一个线程(thread)。被激活的线程则应用 tryAcquire 尝试,看以后状态变量 state 的值是否能满足本人的须要,满足则该线程被激活,而后持续向下运行,否则还是会被放入 AQS 队列并被挂起。

        public final boolean release(int arg) {if (tryRelease(arg)) {
                Node h = head;
                if (h ! = null && h.waitStatus ! = 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

2.2 共享式

  1. 当线程调用 acquireShared(int arg)获取共享资源时,会首先应用 tryAcquireShared 尝试获取资源,具体是设置状态变量 state 的值,胜利则间接返回,失败则将以后线程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列的尾部,并应用 LockSupport.park(this)办法挂起本人。

        public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }

<!—->

  1. 当一个线程调用 releaseShared(int arg)时会尝试应用 tryReleaseShared 操作开释资源,这里是设置状态变量 state 的值,而后应用 LockSupport.unpark(thread)激活 AQS 队列外面被阻塞的一个线程(thread)。被激活的线程则应用 tryReleaseShared 查看以后状态变量 state 的值是否能满足本人的须要,满足则该线程被激活,而后持续向下运行,否则还是会被放入 AQS 队列并被挂起。

        public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
                  return true;
              }
              return false;
          }

三、基于 AQS 实现自定义同步器

基于 AQS 实现一个不可重入的独占锁,自定义 AQS 须要重写一系列函数,还须要定义原子变量 state 的含意。这里定义,state 为 0 示意目前锁没有被线程持有,state 为 1 示意锁曾经被某一个线程持有,因为是不可重入锁,所以不须要记录持有锁的线程获取锁的次数。

package top.emanjusaka;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class UnReentrantLock implements Lock, Serializable {
    // 借助 AbstractQueuedSynchronizer 实现
    private static class Sync extends AbstractQueuedSynchronizer {
        // 查看是否有线程持有锁
        protected boolean isHeldExclusively() {return getState() == 1;
        }
        // 尝试获取锁
        public boolean tryAcquire(int acquires) {
            assert acquires == 1;
            // 应用 CAS 设置 state
            if (compareAndSetState(0, 1)) {
                // 如果 CAS 操作胜利,示意胜利取得了锁。这时,通过 setExclusiveOwnerThread 办法将以后线程设置为独占锁的拥有者
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            // 如果 CAS 操作失败,即无奈将 state 的值从 0 设置为 1,示意锁已被其余线程占用,无奈获取锁,于是返回 false。return false;
        }
        // 尝试开释锁
        protected boolean tryRelease(int releases) {
            assert releases == 1;
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            // 开释胜利,将独占锁的拥有者设为 null
            setExclusiveOwnerThread(null);
            // 将 state 的值设为 0
            setState(0);
            return true;
        }

        Condition newCondition() {return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {sync.acquire(1);
    }

    @Override
    public boolean tryLock() {return sync.tryAcquire(1);
    }

    @Override
    public void unlock() {sync.release(1);
    }

    @Override
    public Condition newCondition() {return sync.newCondition();
    }

    public boolean isLocked() {return sync.isHeldExclusively();
    }

    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

}

在开释锁时并没有应用 CAS(Compare and Swap)操作,而是间接应用了 setState​ 办法来将 state​ 的值设置为 0。

这是因为在开释锁的过程中,并不需要波及到多线程并发的问题。只有持有锁的线程才可能开释锁,其余线程无奈对锁进行操作。因而,不须要应用 CAS 来进行原子性的状态更新。

在这种状况下,能够间接应用一般的办法来设置 state​ 的值为 0,将独占锁的拥有者设为 null。因为只有一个线程能够操作这个锁,不存在并发竞争的状况,也就不须要应用 CAS 来保障原子性。

须要留神的是,当调用 tryRelease​ 办法时,应该保障以后线程是持有锁的线程,否则会抛出 IllegalMonitorStateException​ 异样。这是为了确保只有领有锁的线程能力开释锁,避免误开释其余线程的锁。

四、参考资料

  1. 《并发编程之美》
  2. ​AbstractQueuedSynchronizer​​抽象类的源码

本文原创,满腹经纶,如有纰漏,欢送斧正。尊贵的敌人,如果本文对您有所帮忙,欢送点赞,并期待您的反馈,以便于一直优化。
原文地址:https://www.emanjusaka.top/archives/8
微信公众号:emanjusaka 的编程栈

正文完
 0