关于并发:AQS-自定义同步锁挺难的

49次阅读

共计 9191 个字符,预计需要花费 23 分钟才能阅读完成。

AQSAbstractQueuedSynchronizer 的简称。

AbstractQueuedSynchronizer 同步状态

AbstractQueuedSynchronizer 外部有一个 state 属性,用于批示同步的状态:

private volatile int state;

state的字段是个 int 型的,它的值在 AbstractQueuedSynchronizer 中是没有具体的定义的,只有子类继承 AbstractQueuedSynchronizer 那么 state 才有意义,如在 ReentrantLock 中,state=0示意资源未被锁住,而 state>=1 的时候,示意此资源曾经被另外一个线程锁住。

AbstractQueuedSynchronizer中尽管没有具体获取、批改 state 的值,然而它为子类提供一些操作 state 的模板办法:

获取状态

    protected final int getState() {return state;}

更新状态

    protected final void setState(int newState) {state = newState;}

CAS 更新状态

    protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

AQS 期待队列

AQS 期待列队是一个双向队列,队列中的成员都有一个 prevnext成员,别离指向它后面的节点和前面的节点。

队列节点

AbstractQueuedSynchronizer 外部,期待队列节点由外部动态类 Node 示意:

static final class Node {...}
节点模式

队列中的节点有两种模式:

  • 独占节点:同一时刻只能有一个线程拜访资源,如ReentrantLock
  • 共享节点:同一时刻容许多个线程拜访资源,如Semaphore
节点的状态

期待队列中的节点有五种状态:

  • CANCELLED:此节点对应的线程,曾经被勾销
  • SIGNAL:此节点的下一个节点须要一个唤醒信号
  • CONDITION:以后节点正在条件期待
  • PROPAGATE:共享模式下会流传唤醒信号,就是说当一个线程应用共享模式拜访资源时,如果胜利拜访到资源,就会持续唤醒期待队列中的线程。

自定义同步锁

为了便于了解,应用 AQS 本人实现一个简略的同步锁,感受一下应用 AQS 实现同步锁是如许的轻松。

上面的代码自定了一个 CustomLock 类,继承了 AbstractQueuedSynchronizer,并且还实现了Lock 接口。
CustomLock类是一个简略的可重入锁,类中只须要重写 AbstractQueuedSynchronizer 中的 tryAcquiretryRelease办法,而后在批改大量的调用就能够实现一个最根本的同步锁。

public class CustomLock extends AbstractQueuedSynchronizer implements Lock {

    @Override
    protected boolean tryAcquire(int arg) {int state = getState();
        if(state == 0){if( compareAndSetState(state, arg)){setExclusiveOwnerThread(Thread.currentThread());
                System.out.println("Thread:" + Thread.currentThread().getName() + "拿到了锁");
                return true;
            }
        }else if(getExclusiveOwnerThread() == Thread.currentThread()){
            int nextState = state + arg;
            setState(nextState);
            System.out.println("Thread:" + Thread.currentThread().getName() + "重入");
            return true;
        }

        return false;
    }

    @Override
    protected boolean tryRelease(int arg) {int state = getState() - arg;

        if(getExclusiveOwnerThread() != Thread.currentThread()){throw new IllegalMonitorStateException();
        }

        boolean free = false;
        if(state == 0){
            free = true;
            setExclusiveOwnerThread(null);
            System.out.println("Thread:" + Thread.currentThread().getName() + "开释了锁");
        }

        setState(state);

        return free;
    }


    @Override
    public void lock() {acquire(1);
    }
 

  
    @Override
    public void unlock() {release(1);
    }
    ...
}

CustomLock是实现了 Lock 接口,所以要重写 lockunlock办法,不过办法的代码很少只须要调用 AQS 中的 acquirerelease

而后为了演示 AQS 的性能写了一个小演示程序,启动两根线程,别离命名为 线程 A 线程 B ,而后同时启动,调用 runInLock 办法,模仿两条线程同时拜访资源的场景:

public class CustomLockSample {public static void main(String[] args) throws InterruptedException {Lock lock = new CustomLock();
        new Thread(()->runInLock(lock), "线程 A").start();
        new Thread(()->runInLock(lock), "线程 B").start();}

    private static void runInLock(Lock lock){

        try {lock.lock();
            System.out.println("Hello:" + Thread.currentThread().getName());
            Thread.sleep(2000);
        } catch (InterruptedException e) {e.printStackTrace();
        }finally {lock.unlock();
        }

    }
}

拜访资源(acquire)

在 CustomLock 的 lock 办法中,调用了 acquire(1)acquire 的代码如下:

  public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}
  • CustomLock.tryAcquire(…)CustomLock.tryAcquire 判断以后线程是否可能拜访同步资源
  • addWaiter(…):将以后线程增加到期待队列的队尾,以后节点为独占模型(Node.EXCLUSIVE)
  • acquireQueued(…):如果以后线程可能拜访资源,那么就会放行,如果不能那以后线程就须要阻塞。
  • selfInterrupt:设置线程的中断标记

留神: 在 acquire 办法中,如果 tryAcquire(arg)返回 true, 就间接执行完了,线程被放行了。所以的前面的办法调用 acquireQueued、addWaiter 都是 tryAcquire(arg)返回 false 时才会被调用。

tryAcquire 的作用

tryAcquire在 AQS 类中是一个间接抛出异样的实现:

protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}

而在咱们自定义的 CustomLock 中,重写了此办法:

  @Override
    protected boolean tryAcquire(int arg) {int state = getState();
        if(state == 0){if( compareAndSetState(state, arg)){setExclusiveOwnerThread(Thread.currentThread());
                System.out.println("Thread:" + Thread.currentThread().getName() + "拿到了锁");
                return true;
            }
        }else if(getExclusiveOwnerThread() == Thread.currentThread()){
            int nextState = state + arg;
            setState(nextState);
            System.out.println("Thread:" + Thread.currentThread().getName() + "重入");
            return true;
        }

        return false;
    }

tryAcquire办法返回一个布而值,true示意以后线程可能拜访资源,false以后线程不能拜访资源,所以 tryAcquire 的作用:决定线程是否可能拜访受爱护的资源 tryAcquire 外面的逻辑在子类能够自由发挥,AQS 不关怀这些,只须要晓得能不能拜访受爱护的资源,而后来决定线程是放行还是进行期待队列(阻塞)。

因为是在多线程环境下执行,所以不同的线程执行 tryAcquire 时会返回不同的值,假如线程 A 比线程 B 要快一步,先达到 compareAndSetState 设置 state 的值成员并胜利,那线程 A 就会返回 true,而 B 因为 state 的值不为 0 或者 compareAndSetState 执行失败,而返回 false。

线程 B 抢占锁流程

下面拜访到线程 A 胜利取得了锁,那线程 B 就会抢占失败,接着执行前面的办法。

线程的入队

线程的入队是逻辑是在 addWaiter 办法中,addWaiter 办法的具体逻辑也不须要说太多,如果你晓得 链表 的话,就非常容易了解了,最终的后果就是将新线程增加到队尾。AQS 的中有两个属性 headtail 别离指定期待队列的队首和队尾。

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  private Node enq(final Node node) {for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

须要留神的是在 enq 办法中,初始化队列的时候,会新建一个 Node 做为 headtail,而后在之后的循环中将参数 node 增加到队尾,队列初始化完后,外面会有两个节点 ,一个是空的结点new Node() 另外一个就是对该当火线程的结点。

因为线程 A 在 tryAcquire 时返回了 true,所以它会被间接放行,那么只有 B 线程会进入addWaiter 办法,此时的期待队列如下:

留神: 期待队列内的节点都是正在期待资源的线程,如果一个线程间接可能拜访资源,那它压根就不须要进入期待队列,会被放行。

线程 B 的阻塞

线程 B 被增加到期待队列的尾部后,会继续执行 acquireQueued 办法,这个办法就是 AQS 阻塞线程的中央,acquireQueued办法代码的一些解释:

  • 里面是一个 for (;;) 有限循环,这个很重要
  • 会从新调用一次 tryAcquire(arg) 判断线程是否可能拜访资源了
  • node.predecessor()获取参数 node 的前一个节点
  • shouldParkAfterFailedAcquire判断以后线程获取锁失败后,需不需要阻塞
  • parkAndCheckInterrupt()应用 LockSupport 阻塞以后线程,
 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }
shouldParkAfterFailedAcquire 判断是否要阻塞

shouldParkAfterFailedAcquire接管两个参数:前一个节点、以后节点,它会判断 前一个节点的 waitStatus 属性 ,如果前一个节点的waitStatus=Node.SIGNAL 就会返回 true:

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
           do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
            pred.next = node;
        } else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

acquireQueued办法在循环中会屡次调用 shouldParkAfterFailedAcquire,在期待队列中节点的waitStatus 的属性默认为 0,所以第一次执行 shouldParkAfterFailedAcquire 会执行:

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

更新完 pred.waitStatus 后,节点的状态如下:

而后 shouldParkAfterFailedAcquire 返回 false,回到 acquireQueued 的循环体中,又去抢锁还是失败了,又会执行 shouldParkAfterFailedAcquire,第二次循环时此时的pred.waitStatus 等于 Node.SIGNAL 那么就会返回 true。

parkAndCheckInterrupt 阻塞线程

这个办法就比拟直观了,就是将线程的阻塞住:

  private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
        return Thread.interrupted();}
为什么是一个 for (;;) 有限循环呢

先看一个 for (;;) 的退出条件,只有 node 的前一个节点是 head 并且 tryAcquire 返回 true 时才会退出循环 ,否则的话线程就会被parkAndCheckInterrupt 阻塞。

线程被 parkAndCheckInterrupt 阻塞后就不会向上面执行了,然而等到它被唤醒后,它还在 for (;;) 体中,而后又会持续先去抢占锁,而后如果还是失败,那又会处于期待状态,所以始终循环上来,就只有两个后果:

  1. 抢到锁退出循环
  2. 抢占锁失败,期待下一次唤醒再次抢占锁

线程 A 开释锁

线程 A 的业务代码执行实现后,会调用 CustomLock.unlock 办法,开释锁。unlock 办法外部调用的release(1)

     public void unlock() {release(1);
    }

release是 AQS 类的办法,它跟 acquire 相同是开释的意思:

    public final boolean release(int arg) {if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

办法体中的 tryRelease 是不是有点眼生,没错,它也是在实现 CustomLock 类时重写的办法,首先在 tryRelease 中会判断以后线程是不是曾经取得了锁,如果没有就间接抛出异样,否则的话计算 state 的值,如果 state 为 0 的话就能够开释锁了。

 protected boolean tryRelease(int arg) {int state = getState() - arg;

        if(getExclusiveOwnerThread() != Thread.currentThread()){throw new IllegalMonitorStateException();
        }

        boolean free = false;
        if(state == 0){
            free = true;
            setExclusiveOwnerThread(null);
            System.out.println("Thread:" + Thread.currentThread().getName() + "开释了锁");
        }

        setState(state);

        return free;
    }

release办法只做了两件事:

  1. 调用 tryRelease 判断以后线程开释锁是否胜利
  2. 如果以后线程锁开释锁胜利,唤醒其余线程(也就是正在期待中的 B 线程)

tryRelease返回 true 后,会执行 if 外面的代码块:

if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }

先回顾一下当初的期待队列的样子:

依据下面的图,来走下流程:

  • 首先拿到 head 属性的对象,也就是队列的第一个对象
  • 判断 head 不等于空,并且 waitStatus!=0,很显著当初的 waitStatus 是等于 Node. SIGNAL 的,它的值是 -1

所以 if (h != null && h.waitStatus != 0) 这个 if 必定是满足条件的,接着执行unparkSuccessor(h)

   private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
       
        Node s = node.next;
        
        ...
        
        if (s != null)
            LockSupport.unpark(s.thread);
    }

unparkSuccessor首先将 node.waitStatus 设置为 0,而后 获取 node 的下一个节点 ,最初调用LockSupport.unpark(s.thread) 唤醒线程,至此咱们的 B 线程就被唤醒了。

此时的队列又回到了,线程 B 刚刚入队的样子:

线程 B 唤醒之后

线程 A 开释锁后,会唤醒线程 B,回到线程 B 的阻塞点,acquireQueued的 for 循环中:

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

线程唤醒后的第一件事就是,拿到它的上一个节点(以后是 head 结点),而后应用 if 判断

if (p == head && tryAcquire(arg))

依据当初期待队列中的节点状态,p == head是返回 true 的,而后就是 tryAcquire(arg) 了,因为线程 A 曾经开释了锁,那当初的线程 B 天然就能获取到锁了,所以 tryAcquire(arg)也会返回 true。

设置队列头

线路 B 拿到锁后,会调用 setHead(node) 本人设置为队列的头:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

调用 setHead(node) 后队列会产生些变动:

移除上一个节点

setHead(node)执行完后,接着按上一个节点齐全移除:

p.next = null; 

此时的队列:

线程 B 开释锁

线程 B 开释锁的流程与线程 A 基本一致,只是以后队列中曾经没有须要唤醒的线程,所以不须要执行代码去唤醒其余线程:

if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }

h != null && h.waitStatus != 0这里的 h.waitStatus 曾经是 0 了,不满足条件,不会去唤醒其余线程。

总结

文中通过自定义一个 CustomLock 类,而后通过查看 AQS 源码来学习 AQS 的局部原理。通过残缺的走完锁的获取、开释两个流程,加深对 AQS 的了解,心愿对大家有所帮忙。

欢送关注我的公众号:架构文摘,取得独家整顿 120G 的收费学习资源助力你的架构师学习之路!

公众号后盾回复 arch028 获取材料:

正文完
 0