LockSupport

LockSupport是线程期待唤醒机制(wait/notify)的改进版本。LockSupport中的 park()unpark() 的作用别离是阻塞线程和接触阻塞线程。

3种让线程期待和唤醒的办法(线程通信)

形式1:应用Object中的wait()办法让线程期待,notify()办法唤醒线程

synchronized + wait + notify

形式1:应用Object中的wait()办法让线程期待,notify()办法唤醒线程
static Object objectLock = new Object(); // 创立锁public static void main(String[] args) {    // 创立A线程,进入后打印,并阻塞。    new Thread(() -> {        synchronized (objectLock) {            System.out.println(Thread.currentThread().getName() + " 进来了!");            objectLock.wait();            System.out.println(Thread.currentThread().getName() + " 被唤醒!");        }    }, "A").start();    // 创立B线程,用于唤醒    new Thread(() -> {         synchronized (objectLock) {            objectLock.notify();            System.out.println(Thread.currentThread().getName() + " 告诉!");        }    }, "B").start();}

wait、notify的限度:

  • 咱们发现 wait 和 notify 如果不在一个代码块外面,必须与 synchronized 搭配应用,否则会报错。
  • 如果咱们先应用notify、再应用wait,因为wait是后执行了,所以不能被唤醒
形式2:应用JUC包中Condition的await()办法让线程期待,signal()办法唤醒线程

Lock + await + signal

// 创立Lock对象,失去conditionstatic Lock lock = new ReentrantLock();static Condition condition = lock.newCondition();public static void main(String[] args) {    // 创立A线程,用await办法阻塞    new Thread(() -> {        lock.lock();        try {             System.out.println(Thread.currentThread().getName() + " 进来了!");            condition.await();        } finally {            lock.unlock();        }        System.out.println(Thread.currentThread().getName() + " 被唤醒!");    }, "A").start();    // 创立B线程,用于唤醒    new Thread(() -> {        lock.lock();        try {            System.out.println(Thread.currentThread().getName() + " 告诉!");            condition.signal();        } finally {            lock.unlock();        }    }, "B").start();}

await、signal的限度:

  • 和 wait 、notify 的问题截然不同,他们的底层机制是一样的。
形式3:LockSupport类能够阻塞以后线程以及唤醒指定被阻塞的线程

park + unpark,每个线程都有一个 “许可证” ,只有 0 和 1,默认为 0。unpark(Thread t) 办法发放许可证,没许可证就不容许放行。

public static void main(String[] args) {    // 创立A线程,用park()办法阻塞    Thread a = new Thread(() -> {        System.out.println(Thread.currentThread().getName() + " 进来了!");        LockSupport.park();        System.out.println(Thread.currentThread().getName() + " 被唤醒!");    }, "A");    a.start();    // 创立B线程,用于唤醒    Thread b = new Thread(() -> {        System.out.println(Thread.currentThread().getName() + " 告诉!");        // 唤醒指定线程        LockSupport.unpark(a);    }, "B");    b.start();}

LockSupport的劣势:

  • 既不必synchronized或Lock。
  • 先唤醒,再阻塞,也可能被唤醒。因为线程曾经有了“许可证”了,所以park()办法相当于没执行。

park底层调用了unsafe类的park本地办法。

UNSAFE.park(false, 0L);

调用一次 unpark 就加 1,变为 1。调用一次 park 会生产许可证,变回 0。反复调用 unpark 不会积攒凭证。

AQS实践

  • AQS(AbstractQueuedSynchronizer),形象的队列同步器。ReentrantLock类里,有一个外部类Sync就是继承的AQS类。
  • AQS是用来构建锁 或者 其余同步器组件的重量级根底框架及整个JUC体系的基石。通过内置的FIFO队列来实现资源获取线程的排队工作,并通过一个int类型变量示意持有锁的状态。

AQS的作用

和AQS无关的 :ReentrantLock、CountDownLatch、ReentrantReadWriteLock、Semaphore......

锁和同步器的关系:

  • 锁,面向锁的使用者。
  • 同步器,面向锁的实现者。

如果共享资源被占用,就须要肯定的阻塞期待唤醒机制来保障锁的调配。这个机制次要用的是 CLH队列 的变体实现的,将临时获取不到锁的线程退出到队列中,这个队列就是AQS的形象体现。它将申请共享资源的线程封装成队列的结点(Node),通过 CAS 自旋以及 LockSupport.park() 的形式,保护state变量的状态,使并发达到同步的管制成果。

AQS源码体系

AQS应用一个volatile的int类型的成员变量来示意同步状态,通过内置的FIFO队列来实现资源获取的排队,将每条要去抢占资源的线程封装成一个Node结点来实现锁的调配,通过CAS实现对State值的批改。

  • 示意同步状态的int成员变量
private volatile int state;

state为 0 就是没人占用,能够去获取资源。大于等于 1,有人占用资源,须要排队。

  • CLH队列

CLH队列,是一个双向队列。通过自旋期待,state变量判断是否阻塞。

外部类Node作为载体,装的是须要排队的线程。

  • 外部类Node

队列中每一个排队的个体就是一个Node。Node有前结点prev 、 后结点next 、头指针head 、尾指针tail,用于实现双向队列。

Node类里有两个模式:SHARED(示意线程以共享的模式期待锁)、EXCLUSIVE(示意线程以独占的形式期待锁)

类中也有一个int类型的状态变量 waitStatus。意思是等待区其余线程的期待状态。

volatile int waitStatus;

从ReentrantLock开始解读AQS

ReentrantLock类中有个子类Sync继承了AQS类,NonfairSync类和FairSync继承了Sync类。

new一个ReentrantLock类时,不传参数默认是非偏心锁(NonfairSync),传入true是偏心锁(FairSync)

偏心锁和非偏心锁实现办法的惟一区别就在于:偏心锁在获取同步状态时多了一个限度条件:hasQueuePredecessors()

这个是偏心锁加锁时判断期待队列中是否存在无效节点的办法。因为偏心锁是先排先得。

  • 偏心锁:考究先来先到,线程在获取锁时,如果期待队列中以及有线程在期待,那么以后线程就会进入期待队列中。
  • 非偏心锁:不论是否有期待队列,都会去尝试取得锁。

AQS非偏心锁就是,一来就先去插队,如果插队失败,才去乖乖的排队。

银行办理业务案例

public static void main(String[] args) {    ReentrantLock lock = new ReentrantLock();    // 带入一个银行办理业务的案例来模仿咱们的AQS如果进行线程的治理和告诉唤醒机制    // 3个线程模仿3个来银行,受理窗口办理业务的顾客    // A 顾客就是第一个顾客,此时手里窗口没有任何人,A能够间接去办理    new Thread(() -> {        lock.lock();        try {            System.out.println("A线程 进入");            // 办理20分钟            TimeUnit.MINUTES.sleep(20);        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            lock.unlock();        }    }, "A").start();    // B顾客,因为窗口只有一个(只能一个线程持有锁),B只能期待,进入候客区    new Thread(() -> {        lock.lock();        try {            System.out.println("B线程 进入");        } finally {            lock.unlock();        }    }, "B").start();    // C顾客,进入候客区(当A办理实现后,会与B去抢)    new Thread(() -> {        lock.lock();        try {            System.out.println("C线程 进入");        } finally {            lock.unlock();        }    }, "C").start();}

整个ReentrantLock的加锁过程,能够分为三个阶段:

  1. 尝试加锁
  2. 加锁失败,线程进入AQS队列
  3. 线程进入队列后,进入阻塞状态

lock()上锁

调用的是sync类的lock()办法。

如果是FairSync:

final void lock() {    acquire(1);}

如果是NonfairSync:

/*    底层是unsafe类,尝试用CAS取得锁,胜利后设置这个线程领有拜访权限。否则调用acquire*/final void lock() {    if (compareAndSetState(0, 1))        setExclusiveOwnerThread(Thread.currentThread());    else        acquire(1);}
/*    unsafe类的办法,传入(0,1),如果这个对象的内存偏移量的地位,expect如果为 0,就为改为 1*/protected final boolean compareAndSetState(int expect, int update) {    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);} //CAS批改胜利后,通过AbstractOwnableSynchronizer类的setExclusiveOwnerThread办法,把exclusiveOwnerThread线程设为以后线程。

当第一个顾客发现没人窗口没人后,开始办理业务。state变为1,占用顾客的线程是 currentThread。

第一个客户曾经占用了窗口,没那么快实现。第二个客户也调用lock()办法,发现窗口被占用,只能去排队:

public final void acquire(int arg) {    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}//外面的addWaiter办法、tryAcquire办法、acquireQueue办法都是重点
  • AQS的tryAcquire办法

咱们进入tryAcquire办法发现没有逻辑代码,间接抛出异样。这就是典型的模板办法设计模式。意思是所有子类必须实现这个办法,不实现父类就抛出异样。

protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}

而后发现 ReentrantLock 的 外部类NofairSync 重写了这个办法:

protected final boolean tryAcquire(int acquires) {    return nonfairTryAcquire(acquires);}

这个办法其实调用的是外部类SyncnonfairTryAcquire 办法。

final boolean nonfairTryAcquire(int acquires) {    final Thread current = Thread.currentThread();    //须要排队的第二位顾客    int c = getState();        // 获取以后窗口的状态state(0闲暇,1占用)        //如果运气十分的好,窗口凑巧闲暇了,就CAS扭转状态,把窗口的线程设为本人。    if (c == 0) {        if (compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }        //如果以后线程 等于 正在办理业务的线程 (阐明取得了屡次锁,是可重入锁的实践)    else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;        // nextc为以后状态加 1        if (nextc < 0) // overflow(溢出)            throw new Error("Maximum lock count exceeded");        setState(nextc);                // 设置状态变量 state        return true;    }    return false;}

咱们传入的第二位顾客再次发现,有人在办理业务,返回 false

在acquire办法中 !tryAcquire(arg) 取反为 true ,持续判断上面的办法。

  • AQS的addWaiter办法

acquire传入的是 Node.EXCLUSIVE 参数(结点的模式);结点进入队列。

 private Node addWaiter(Node mode) {    //结构Node结点(以后线程,模式)    Node node = new Node(Thread.currentThread(), mode);             // 获取Node的尾结点,如果为null,阐明队列没有结点。    Node pred = tail;        // 当第三个顾客进入的时候,等待区曾经有结点了,执行这个代码块。和enq办法类似,尾插法。    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }        //如果队列没有结点,调用enq办法筹备进入队列    enq(node);    return node;}

enq 办法(将节点插入队列):

private Node enq(final Node node) {    //相当于自旋    for (;;) {        Node t = tail;        // t 是尾指针                //如果尾指针为null,阐明队列无结点,进行初始化        if (t == null) {             /*    第一个结点并不是咱们传入的结点,而是零碎new了一个结点作为占位符。                  这个结点Thread=null,waitStatus=0,是傀儡结点又称哨兵结点,用于占位。    */            if (compareAndSetHead(new Node()))                    tail = head;                //队列有结点后,持续循环,进入上面这个代码块(尾插法,结点的尾、前、后结点都设置好)            } else {            //传入结点的前一个指针指向尾结点            node.prev = t;            //尾指针 指向 传入的节点            if (compareAndSetTail(t, node)) {                t.next = node;    // 尾结点的下一个节点是 传入的节点                return t;        // 返回新插入的尾结点            }        }    }}
  • AQS的acquireQueued办法

传入的参数是 (addWaiter(Node.EXCLUSIVE), arg)

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        //自旋        for (;;) {            final Node p = node.predecessor();    //传入结点的上一个结点                         // 如果前结点 == 哨兵结点 && 再看窗口是否抢占,失败就false。            if (p == head && tryAcquire(arg)) {                // 头结点指向以后节点,节点Thread=null,prev=null,即以后节点变成了新的哨兵结点                setHead(node);                    // 原哨兵结点的next=null,没有连贯了,会被GC回收                p.next = null;                 failed = false;                return interrupted;            }            //抢占失败后是否park阻塞            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                interrupted = true;                        /*                这时自旋锁,抢占又失败后,持续进入shouldParkAfterFailedAcquire办法,                因为第一次循环曾经将前结点的waitStatus的值改为-1,所以返回true。                    而后进入parkAndCheckInterrupt办法。                */                        /*            锁被开释,其余线程被唤醒后!parkAndCheckInterrupt()返回false,持续自旋!           B线程的前结点就是哨兵结点,执行tryAcquire办法,因为A线程走了,所以胜利抢占!返回true               */        }    } finally {        if (failed)            cancelAcquire(node);    }}

shouldParkAfterFailedAcquire 办法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    //查看前结点的waitStatus状态        //SIGNAL值固定为-1    //如果是SIGNAL状态,即期待中,间接返回true。    if (ws == Node.SIGNAL)                return true;         //waitStatus 大于0阐明是 CANCELLED 状态    if (ws > 0) {        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;            } else {        //把前结点的waitStatus值改为 -1,用于后续唤醒操作        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }    return false;}

parkAndCheckInterrupt 办法:

private final boolean parkAndCheckInterrupt() {    //阻塞这个线程!这时能够认为曾经坐在期待区了。    LockSupport.park(this);                    //线程被唤醒后,不被阻塞,这里就返回false    return Thread.interrupted();    }

此时这个acquireQueued办法还没有完结,会被卡在parkAndCheckInterrupt办法外部,如果这个线程被unpark了。就会继续执行acquireQueued办法的代码。

unlock开释锁

调用的是sync类的lock()办法。

public void unlock() {    sync.release(1);}

调用AQS的release办法,arg=1.

public final boolean release(int arg) {    //开释一把锁后,返回true    if (tryRelease(arg)) {        // 头结点就是哨兵结点        Node h = head;                // 哨兵的waitStatus为-1,符合条件进入        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        //        return true;    }    return false;}

tryRelease 办法,也是一个模板办法,ReentrantLock类的Sync重写了这个办法。

protected boolean tryRelease(int arg) {    throw new UnsupportedOperationException();}
protected final boolean tryRelease(int releases) {    //如果以后State为1,减去1后为0    int c = getState() - releases;    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();        boolean free = false;        if (c == 0) {        free = true;    //c=0,阐明能够解锁,free变为true        setExclusiveOwnerThread(null);    //设置以后窗口的占用线程为 null    }    setState(c);    //把状态改为相应的值    return free;}

unparkSuccessor办法,开释锁!

private void unparkSuccessor(Node node) {    // 传入的是哨兵结点,waitStatus为-1    int ws = node.waitStatus;        // 又把哨兵结点的waitStatus改为0    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);        // s是哨兵结点的下一个结点。    Node s = node.next;    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }        // 如果哨兵结点的下一个结点存在,且waitStatus为0,开释锁!    if (s != null)        LockSupport.unpark(s.thread);}

此时线程被唤醒,后面acquireQueued里阻塞的其余线程就持续往下执行。