关于java:Lock介绍

38次阅读

共计 10540 个字符,预计需要花费 27 分钟才能阅读完成。

Lock 介绍

Lock 是 juc(java.util.concurrent) 包上面的一个接口类, 是作者 Doug Lea 定义的 api 标准, 次要接口有

api 阐明
void lock() 获取锁。
如果锁不可用,则出于线程调度目标,以后线程将被禁用,并处于休眠状态,直到获取锁为止。
void lockInterruptibly() throws InterruptedException 除非以后线程被中断,否则获取锁。
如果锁可用,则获取锁并立刻返回。
如果锁不可用,则出于线程调度目标,以后线程将被禁用,并处于休眠状态,直到产生以下两种状况之一:
锁被以后线程获取;或者
其余线程中断以后线程,反对中断获取锁。
如果以后线程:
在进入此办法时设置其中断状态;或者
获取锁时中断,反对中断获取锁,
而后抛出 InterruptedException 并革除以后线程的中断状态。
boolean tryLock() 仅当调用时锁闲暇时才获取锁。
如果锁可用,则获取锁并立刻返回 true 值。如果锁不可用,则此办法将立刻返回 false 值。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 如果在给定的等待时间内锁是闲暇的并且以后线程没有被中断,则获取锁。
如果锁可用,此办法立刻返回 true 值。如果锁不可用,则出于线程调度目标,以后线程将被禁用,并处于休眠状态,直到产生以下三种状况之一:
锁被以后线程获取;或者
其余线程中断以后线程,反对中断获取锁;或者
指定的等待时间已过
如果获取了锁,则返回 true 值。
如果以后线程:
在进入此办法时设置其中断状态;或者
获取锁时中断,反对中断获取锁,
而后抛出 InterruptedException 并革除以后线程的中断状态。
如果指定的等待时间已过,则返回值 false。如果工夫小于或等于零,则该办法基本不会期待。
void unlock() 开释锁
Condition newCondition() 返回绑定到此 Lock 实例的新 Condition 实例。
在期待条件之前,以后线程必须持有锁。对 Condition.await() 的调用将在期待之前主动开释锁,并在期待返回之前从新获取锁。

Condition 介绍

Condition 也是 juc 包下的一个接口类, 须要在线程持有 Lock 的状态下操作该接口下的办法, 次要接口有

api 阐明
void await() throws InterruptedException 导致以后线程期待,直到收到信号或中断。
与此条件关联的锁被主动开释,以后线程出于线程调度目标而被禁用,并处于休眠状态,直到产生以下四种状况之一:
其余线程调用该 Condition 的 signal 办法,并且以后线程恰好被选为要唤醒的线程;或者
其余一些线程为此条件调用 signalAll 办法;或者
其余线程中断以后线程,反对线程挂起中断;或者
产生“虚伪唤醒”。
在所有状况下,在此办法返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,保障持有该锁。
如果以后线程:
在进入此办法时设置其中断状态;或者
期待时被中断,反对线程挂起中断,
而后抛出 InterruptedException 并革除以后线程的中断状态。在第一种状况下,没有指定是否在开释锁之前进行中断测试。
void lockInterruptibly() throws InterruptedException 除非以后线程被中断,否则获取锁。
如果锁可用,则获取锁并立刻返回。
如果锁不可用,则出于线程调度目标,以后线程将被禁用,并处于休眠状态,直到产生以下两种状况之一:
锁被以后线程获取;或者
其余线程中断以后线程,反对中断获取锁。
如果以后线程:
在进入此办法时设置其中断状态;或者
获取锁时中断,反对中断获取锁,
而后抛出 InterruptedException 并革除以后线程的中断状态。
void awaitUninterruptibly() 导致以后线程期待,直到收到信号。
与此条件关联的锁被主动开释,以后线程出于线程调度目标而被禁用,并处于休眠状态,直到产生以下三种状况之一:
其余线程调用该 Condition 的 signal 办法,并且以后线程恰好被选为要唤醒的线程;或者
其余一些线程为此条件调用 signalAll 办法;或者
产生“虚伪唤醒”。
在所有状况下,在此办法返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,保障持有该锁。
如果以后线程在进入该办法时设置了中断状态,或者在期待时被中断,它将持续期待,直到发出信号。当它最终从此办法返回时,其中断状态仍将被设置。
long awaitNanos(long nanosTimeout) throws InterruptedException 使以后线程期待,直到收到信号或中断,或者指定的等待时间过来。
与此条件关联的锁被主动开释,以后线程出于线程调度目标而被禁用,并处于休眠状态,直到产生以下五种状况之一:
其余线程调用该 Condition 的 signal 办法,并且以后线程恰好被选为要唤醒的线程;或者
其余一些线程为此条件调用 signalAll 办法;或者
其余线程中断以后线程,反对线程挂起中断;或者
规定的等待时间已过;或者
产生“虚伪唤醒”。
在所有状况下,在此办法返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,保障持有该锁。
如果以后线程:
在进入此办法时设置其中断状态;或者
期待时被中断,反对线程挂起中断。
boolean await(long time, TimeUnit unit) throws InterruptedException 使以后线程期待,直到收到信号或中断,或者指定的等待时间过来。
boolean awaitUntil(Date deadline) throws InterruptedException 导致以后线程期待,直到收到信号或中断,或者指定的截止工夫过来。
与此条件关联的锁被主动开释,以后线程出于线程调度目标而被禁用,并处于休眠状态,直到产生以下五种状况之一:
其余线程调用该 Condition 的 signal 办法,并且以后线程恰好被选为要唤醒的线程;或者
其余一些线程为此条件调用 signalAll 办法;或者
其余线程中断以后线程,反对线程挂起中断;或者
规定的期限已过;或者
产生“虚伪唤醒”。
在所有状况下,在此办法返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,保障持有该锁。
如果以后线程:
在进入此办法时设置其中断状态;或者
期待时被中断,反对线程挂起中断。
void signal() 唤醒一个期待线程。
如果有任何线程在此条件下期待,则抉择一个线程来唤醒。该线程必须在从期待返回之前从新获取锁。
void signalAll() 唤醒所有期待线程。
如果任何线程正在期待这种状况,那么它们都会被唤醒。每个线程必须从新获取锁能力从期待返回。

作用

Lock 通过 lock,trylock 和 unlock 接口操作, 能够保障在多线程环境下的代码块的同步执行, 保障执行后果的正确性. 通过 Condition 的 await 和 signal,signallAll 操作, 保障同步代码块中的条件失去满足能力被执行实现. 提醒代码如下:


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockAndConditionDemo {private List<String> containers = new ArrayList<>();
    private int capacity = 10;

    private Lock lock = new ReentrantLock();
    private Condition emptyCondition = lock.newCondition();
    private Condition fullCondition = lock.newCondition();

    public void add(String product) {lock.lock();
        try {while (containers.size() == capacity) {fullCondition.await();
            }
            containers.add(product);
            emptyCondition.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);
        } finally {lock.unlock();
        }
    }

    public String take() {lock.lock();
        String product = null;
        try {while (containers.size() == 0) {emptyCondition.await();
            }
            product = containers.remove(0);
            fullCondition.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);
        } finally {lock.unlock();

        }
        return product;
    }
}

ReentrantLock 介绍

ReentrantLock 也是 juc 包下的一个具体类, 它实现了 Lock 的接口. 通常也用它来作为 Lock 接口的具体实现类, 外部通过动态外部类 Sync 继承 AQS(AbstractQueuedSynchronizer) 父类来实现 Lock 的 lock 办法.
AQS 外部是一个同步期待队列, 它通过 state(状态计数位) 和 CAS(compareAndSetState) 乐观锁的形式实现同步, 底层是通过 unsafe 的本地办法 unsafe.compareAndSwapInt 来保障操作的原子性的. 队列是通过属性 head,tail 和动态外部类 Node 的属性 prev,next 来实现的双向队列, 通过 enq 办法将新包装的 Thread 的 Node 节点增加到开端.
简要剖析下 ReentrantLock.lock() 的源代码:

// 办法一:
public void lock() {
    // 通过 sync 外部类 lock 办法, 默认为非偏心
    sync.lock();}
// 办法二:
final void lock() {
    // 通过 cas 乐观锁形式设置锁对象属性 state 的值, 默认值为 0
    if (compareAndSetState(0, 1))
        // 胜利设置为 1 进入此办法
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 失败进入此办法
        acquire(1);
}

// 办法三:
protected final boolean compareAndSetState(int expect, int update) {
    // 通过 unsafe 保障操作原子性
    //stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));state 的属性的地址值
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

办法四:
// 就是一个 set 设置属性的办法, 该类 AbstractOwnableSynchronizer 是 AQS 的父类
// 为真到办法四执行实现 lock 办法就实现了
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}

办法五:
public final void acquire(int arg) {
    //tryAcquire 办法再次尝试获取 state 的值, 获取胜利就退出, 不胜利执行 acquireQueued 办法
    //addWaiter 办法
    //acquireQueued 办法
    //selfInterrupt 办法
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

办法六:
// 调用 nonfairTryAcquire 办法
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}

办法七:
final boolean nonfairTryAcquire(int acquires) {
    // 获取以后线程
    final Thread current = Thread.currentThread();
    // 获取 AQS 以后属性 state 值
    int c = getState();
    // 如果 c 变为 0 了, 阐明第一次 CAS 操作的 state 值曾经被获取锁的线程批改为 0 了
    if (c == 0) {
        // 再次调用办法三
        if (compareAndSetState(0, acquires)) {
            // 胜利则再次调用办法四, 同样 lock 办法也执行实现了,true 之后就不会执行 && 前面的办法了
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果不为 0, 判断以后线程是否和锁对象保留的 thread 属性统一, 如果相等阐明是同一个线程, 之前的锁还没有开释的状况, 能够重入
    else if (current == getExclusiveOwnerThread()) {
        // 以后 state 值 +1, 如果超过了 int 的最大值, 则达到了重入的边界值抛出异常中断该线程后续执行
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 设置 state 属性值后 lock 办法也执行完了
        setState(nextc);
        return true;
    }
    return false;
}

办法八:
// 办法参数为 static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) {
    // 创立 AQS 的动态外部类的 node 实例       
    // Node(Thread thread, Node mode) {     
            // Used by addWaiter
    //        this.nextWaiter = mode;
    //        this.thread = thread;
    //    }
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 新退出的节点的上一个节点为该 lock 对象的 AQS 的对尾
        node.prev = pred;
        // 通过 CAS 的形式保障 AQS 的对尾节点设置为新节点, 胜利则将原对尾节点的下一个节点为新退出的节点返回该节点, 而后开始执行 acquireQueued 办法, 不胜利进入 enq 办法
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 将以后节点增加到 AQS 队列的开端并设置尾节点尾以后节点
    enq(node);
    return node;
}

办法九:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取新创建节点的前一个节点对象
            final Node p = node.predecessor();
            // 如果上一个节点为头节点, 则阐明后面曾经没有期待 node 了, 则再次执行办法七
            if (p == head && tryAcquire(arg)) {
                // 胜利则设置该 node 为头节点, 断开原先头节点指向该节点的援用链接
                // 不执行 cancelAcquire 办法
                // 不执行 selfInterrupt 办法, 同时 lock 办法也执行实现了
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 办法七没有设置胜利, 则执行阻塞该线程的办法
            // 胜利则执行阻塞和查看线程是否被中断的办法
            // 线程是被中断的则打断标识位设置为 true, 持续循环执行, 否则打断标记为 false 继续执行
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            // 非正常退出循环会执行获取失败会执行勾销获取节点办法
            cancelAcquire(node);
    }
}

办法十:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取上一个节点的期待状态
    int ws = pred.waitStatus;
    // 如果为 - 1 则为 true 退出, 执行暂停和查看打断办法 parkAndCheckInterrupt()
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    // 如果大于 0, 则找到上一个节点之前的节点的期待状态值大于 0 的前节点
    // 并将该节点的下一个节点援用设置为该节点, 退出持续循环执行上个办法
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 如果 ws 值为 0, 则通过 cas 设置期待状态的值为 -1, 退出持续循环执行上个办法
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

办法十一:
private final boolean parkAndCheckInterrupt() {
    // 线程阻塞办法
    LockSupport.park(this);
    // 获取线程是否被打断的状态返回, 返回之后线程状态会被重置
    return Thread.interrupted();}

办法十二:
public static void park(Object blocker) {
    // 获取以后线程
    Thread t = Thread.currentThread();
    // 通过 unsafe 类将线程 parkBlock 设置为该 lock 对象
    setBlocker(t, blocker);
    //UNSAFE.park 是阻塞线程的办法
    UNSAFE.park(false, 0L);
    // 从新将线程 parkBlock 设置为 null
    setBlocker(t, null);
}

办法十三:
private static void setBlocker(Thread t, Object arg) {
    // Even though volatile, hotspot doesn't need a write barrier here.
    // 对应 parkBlockerOffset 搁置 lock 对象
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

办法十四:
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    // 以后节点为空就退出
    if (node == null)
        return;

    // 异样退出将节点线程援用置空
    node.thread = null;

    // Skip cancelled predecessors
    // 取到该节点的上个节点的期待状态小于等于 0
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 以后节点的期待状态设置为 1(勾销状态)
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    将以后节点是否和尾节点相等, 相等则将尾节点设置为以后节点的上个节点, 上个节点的下一个节点设置为空
    if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        前节点不等于头节点并且前节点期待状态不为 - 1 或者将前节点的期待状态设置为 - 1 胜利并且前节点的线程属性不为空, 以后节点的下一个节点不为空且期待状态小于等于 0,CAS 设置前节点的下一个节点为以后节点的下一个节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 否则唤醒一个节点的线程
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

办法十五:
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    // 获取获取以后节点的期待状态
    int ws = node.waitStatus;
    // 小于 0 则进行 CAS 状态设置为 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    // 以后节点的下一个节点如果为空, 或者下一个期待状态大于 0,s 设置为 null,
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾节点开始便当到以后节点, 取最靠近以后节点的且期待状态小于等于 0 的赋值给 s
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果 s 不为空, 则唤醒该节点的线程办法
    if (s != null)
        LockSupport.unpark(s.thread);
}

办法十六:
private Node enq(final Node node) {for (;;) {
        Node t = tail;
        // 尾节点是否为空, 为空则通过 CAS 设置头节点, 并将头节点设置给尾节点, 再次循环
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不为空则将以后节点的上一个节点援用设置为尾节点, 通过 cas 形式将尾节点设置为以后节点, 原尾节点的下一个节点设置为以后节点, 并返回上一个节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

总结

ReentrantLock 通过 CAS 保障原子操作, 通过 AQS 将阻塞线程搁置在队列中期待, 从头取节点线程, 搁置进来的线程节点在对尾.

正文完
 0