关于java:并发编程浅析ReentrantLock与ReadWriteLock

30次阅读

共计 17269 个字符,预计需要花费 44 分钟才能阅读完成。

1 Lock 接口

1.1 Lock 与 synchronized

在 Lock 接口呈现之前,Java 程序是靠 synchronized 关键字用来实现锁性能,应用时 隐式地获取和开释锁,然而它将锁的获取和开释固化了。

所以,如果占有锁的线程因为要期待 I / O 或者其余起因(比方调用 sleep 办法)被阻塞了,其余线程就会只能始终期待,直到占有锁的线程开释掉锁,开释锁有以下几种状况:

(1)获取锁的线程执行完了该代码块,而后会主动开释锁。

(2)执行线程产生了异样,JVM 会主动开释掉线程的锁。

(3)占有锁的线程进入 WAITING 状态从而开释锁,例如调用了 wait()办法等。

这会极大影响程序执行效率。因而,须要有一种机制保障期待的线程不是始终处于无期限地期待的状态(解决方案:tryLock(long time, TimeUnit unit))/lockInterruptibly())。

应用 synchronized 的局限性 还有:

  • 当多个线程读写文件时,读操作与读操作之间不会发生冲突。但采纳 synchronized 关键字实现同步时,还是只能一个线程进行读操作,其余读线程只能期待锁的开释而无奈进行读操作。因而,须要一种机制来保障多线程都只是进行读操作时,线程之间不会发生冲突(解决方案:ReentrantReadWriteLock)。
  • synchronized 无奈得悉线程是否胜利获取到锁 (解决方案:ReentrantLock)。
  • … …

为补救 synchronized 应用的局限性,Java SE 5 之后,并发包中新增了 Lock、ReadWriteLock 等接口(以及相干实现)。尽管它们短少了隐式获取开释锁的便捷性,然而却领有了 锁获取与开释的可操作性、可中断的获取锁以及超时获取锁 等多种 synchronized 关键字所不具备的同步个性。

1.2 Lock 接口办法

Lock 接口有 6 个办法:

// 获取锁  
void lock()   

// 如果以后线程未被中断,则获取锁,能够响应中断  
void lockInterruptibly()   

// 返回绑定到此 Lock 实例的新 Condition 实例  
Condition newCondition()   

// 仅在调用时锁为闲暇状态才获取该锁,能够响应中断  
boolean tryLock()   

// 如果锁在给定的等待时间内闲暇,并且以后线程未被中断,则获取锁  
boolean tryLock(long time, TimeUnit unit)   

// 开释锁  
void unlock()

lock()、tryLock()、tryLock(long time, TimeUnit unit)和 lockInterruptibly()办法是用来获取锁的。unLock()办法是用来开释锁的。

(1)lock() & unlock()

lock()用来获取锁。如果锁已被其余线程获取,则进行期待。应用 Lock,必须被动去开释锁 ,并且在产生异样时,不会主动开释锁。因而,一般来说,应用 Lock 必须在 try()/catch() 块中进行,并且将开释锁的操作放在 finally 块中进行,以保障锁肯定被被开释,避免死锁的产生。通常应用 Lock 来进行同步的话,是以上面这种模式去应用的:

Lock lock = ...;
lock.lock();// 获取锁
try{// 解决工作}catch(Exception ex){

}finally{lock.unlock();   // 开释锁
}

(2)tryLock() & tryLock(long time, TimeUnit unit)

tryLock()办法有返回值。它示意尝试获取锁,如果获取胜利,则返回 true;如果获取失败(即锁已被其余线程获取),则返回 false。值得注意的是,这个办法 无论如何都会立刻返回(在拿不到锁时不会始终在那期待)。

tryLock(long time, TimeUnit unit)办法和 tryLock() 办法相似,区别在于 tryLock(long time, TimeUnit unit) 在拿不到锁时会期待肯定的工夫,在工夫期限之内如果还拿不到锁,就返回 false,同时能够响应中断。如果一开始拿到锁或者在期待期间内拿到了锁,则返回 true。

个别状况下,通过 tryLock()是这样应用的:

Lock lock = ...;
if(lock.tryLock()) {
     try{// 解决工作}catch(Exception ex){ }finally{lock.unlock();   // 开释锁
     } 
}else {// 如果不能获取锁,则间接做其余事件}

(3)lockInterruptibly()

应用 lockInterruptibly() 办法可能响应中断,即中断线程的期待状态。例如,当两个线程 A、B 同时通过 lock.lockInterruptibly() 获取锁时,假若此时线程 A 获取到了锁,而线程 B 进入期待状态,那么线程 B 就可调用 interrupt() 办法中断线程 B 的期待过程。(interrupt()办法只能中断阻塞过程中的线程)

lockInterruptibly()个别的应用模式如下:

public void method() throws InterruptedException {lock.lockInterruptibly();
    try {//.....}
    finally {lock.unlock();
    }
}

Lock 接口的实现类有:

2 ReentrantLock

ReentrantLock 是 Lock 接口的次要实现类,ReentrantLock 是 可重入锁 ,顾名思义,就是反对重进入的锁,它示意该锁 可能反对一个线程对资源的反复加锁

它实现了 Lock 接口,外部类 Sync 是 AQS 的子类;Sync 的两个子类 NonfairSync 和 FairSync 别离对应 偏心锁和非偏心锁两种策略。(如果在相对工夫上,先对锁进行获取的申请肯定先被满足,那么这个锁是偏心的;反之,是不偏心的。)偏心的获取锁,也就是等待时间最长的线程最优先获取锁,也能够说锁获取是程序的。ReentrantLock 提供了一个构造函数,可能管制锁是否是偏心的。

    public ReentrantLock() {sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

也就是说,调用 ReentrantLock 时,不传参数或者传入参数true,即是偏心锁;传入参数false,就是非偏心锁。(ReentrantLock 默认采纳非偏心的策略

2.1 可重入

可重入锁 又名递归锁。可重入指的是 任意线程在获取到锁之后可能再次获取该锁而不会被锁所阻塞(前提锁对象得是同一个对象或者 class)。Java 中ReentrantLock 和 synchronized 都是可重入锁,可重入锁的一个长处是可肯定水平防止死锁。

例如:

    public synchronized void fun1() {System.out.println("办法 1 执行...");
        fun2();}

    public synchronized void fun2() {System.out.println("办法 2 执行...");
    }

类中的两个办法都是被内置锁 synchronized 润饰的,fun1()办法中调用 fun2()办法。因为内置锁是可重入的,所以同一个线程在调用 fun2()时能够间接取得以后对象的锁,进入 fun2()进行操作。

如果是一个不可重入锁,那么以后线程在调用 fun2()之前须要将执行 fun1()时获取以后对象的锁开释掉,实际上该对象锁已被以后线程所持有,且无奈开释,这种状况下会呈现死锁。

那么 ReentrantLock 是如何实现可重入的呢?上面以非偏心锁为例,剖析可重入实现原理。

首先查看 NonfairSync 办法:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            // 利用 CAS 尝试设置 AQS 的 state 为 1。设置胜利,示意获取锁胜利;如果设置失败,示意 state 曾经 >=1。if (compareAndSetState(0, 1))
                // 线程获取 AQS 锁胜利,须要设置 AQS 中的变量 exclusiveOwnerThread 为以后持有锁的线程,做保留记录
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 调用 acquire(),再次尝试或者线程进入期待队列。acquire(1);
        }

        // 子类重写的 tryAcquire 办法
        protected final boolean tryAcquire(int acquires) {
            // 调用 nonfairTryAcquire 办法
            return nonfairTryAcquire(acquires);
        }
    }

查看 acquire 办法:

public final void acquire(int arg) {
    // 调用子类重写的 tryAcquire 办法,如果 tryAcquire 办法返回 false,那么线程就会进入同步队列。if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

查看 nonfairTryAcquire 办法:

    final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();// 获取锁状态(0 未加锁;1 已加锁)if (c == 0) {if (compareAndSetState(0, acquires)) {// 间接 CAS 尝试获取锁,间接返回 true,以后线程不会进入同步队列。setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { // 如果以后线程已占用锁,再次获取锁
                int nextc = c + acquires;// status+1(可重入性)if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);// 从新设置锁的状态
                return true;
            }
            return false;
        }

ReentrantLock 继承父类 AQS,重写了父类 tryAcquire 办法。其父类 AQS 中保护了一个同步状态 status 来计数重入次数,status 初始值为 0。

当线程尝试获取锁时,可重入锁先尝试获取并更新 status 值,如果 status == 0 示意没有其余线程在执行同步代码,则把 status 置为 1,以后线程开始执行。如果 status != 0,则判断以后线程是否是获取到这个锁的线程,如果是的话执行 status+1,且以后线程能够再次获取锁。

开释锁时,调用 tryRelease()办法:

    protected final boolean tryRelease(int releases) {int c = getState() - releases;// 开释锁时 status-1
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();// 先判断以后线程是否已是占用锁的线程
        boolean free = false;
        if (c == 0) {// 只有 status= 0 时才开释锁。free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

开释锁时,可重入锁同样先获取以后 status 的值,在以后线程是持有锁的线程的前提下。如果 status-1 == 0,则示意以后线程所有反复获取锁的操作都曾经执行结束,而后该线程才会真正开释锁。如果该锁被获取了 n 次,那么前 (n-1) 次 tryRelease(int releases)办法必然返回 false,而只有同步状态齐全开释了,能力返回 true。

2.2 偏心 / 非偏心

偏心锁 是指多个线程 依照申请锁的程序来获取锁 ,线程间接进入队列中排队,队列中的第一个线程能力取得锁(不可插队,等待时间越长,申请锁时会被优先满足)。 偏心锁的长处是期待锁的线程不会饥饿。毛病是整体吞吐效率绝对非偏心锁要低,期待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非偏心锁大。

非偏心锁 是多个线程 加锁时间接尝试获取锁 ,获取不到才会到期待队列的队尾期待(可插队的)。但如果此时锁刚好可用,那么这个线程能够无需阻塞间接获取到锁,所以非偏心锁有可能呈现后申请锁的线程先获取锁的场景。非偏心锁的长处是 能够缩小唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞间接取得锁,CPU 不用唤醒所有线程。毛病是处于期待队列中的线程可能会饥饿,或者等很久才会取得锁。

Tips:如果一个过程被屡次回滚,迟迟不能占用必须的系统资源,可能会导致过程饥饿

导致线程饥饿常见起因:

  1. 高优先级线程吞噬所有的低优先级线程的 CPU 工夫。
  2. 线程被永恒梗塞在一个期待进入同步快的状态。
  3. 期待的线程永远不被唤醒。

查看偏心加锁办法的源码:

        protected final boolean tryAcquire(int acquires) {
            // 获取以后的线程
            final Thread current = Thread.currentThread();
             // 获取锁的状态
            int c = getState();
            if (c == 0) {
                // hasQueuedPredecessors 判断队列还有没有其它 node
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    // 设置获取锁的线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 设置获取锁的线程
            else if (current == getExclusiveOwnerThread()) {
                // 获取过了就累加,因为能够重入
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 从新设置锁的状态
                setState(nextc);
                return true;
            }
            return false;
        }

非偏心加锁办法的源码:

        final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

unlock()开释锁,其实就是把 state 从 n(可能产生了锁的重入,须要屡次开释)变成 0,此办法 不辨别偏心与否

偏心锁与非偏心锁解锁办法的源码:

    public void unlock() {sync.release(1);
    }

    public final boolean release(int arg) {
        // 子类重写的 tryRelease 办法,须要等锁的 state=0,即 tryRelease 返回 true 的时候,才会去唤醒其它线程进行尝试获取锁。if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        // 获取锁的状态
        int c = getState() - releases;
        // 判断锁的所有者是不是该线程
        if (Thread.currentThread() != getExclusiveOwnerThread())
            // 如果所的所有者不是该线程 则抛出异样 也就是锁开释的前提是线程领有这个锁
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 直到锁的状态是 0,阐明锁开释胜利。即锁没有重入,那么间接将将锁的所有者设置成 null
        // 咱们在一个线程外面调用几次 lock,就要调用几次 unlock,能力最终开释锁
        if (c == 0) {
            free = true;
            // 开释线程的拥有者
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

可见,平锁的开释和非偏心锁的开释一样的。偏心锁与非偏心锁的 lock()办法惟一的区别就在于 偏心锁在获取同步状态时多了一个限度条件:hasQueuedPredecessors()

    public final boolean hasQueuedPredecessors() {
        // 判断以后线程是否位于同步队列中的第一个。如果是则返回 true,否则返回 false。Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

由此可见,偏心锁就是通过同步队列来实现多个线程依照申请锁的程序来获取锁,从而实现偏心的个性。非偏心锁加锁时不思考排队期待问题,间接尝试获取锁,所以才会存在线程后申请却先取得锁的状况

2.3 小结

ReentrantLock 重入锁执行流程:

2.4 中断与超时期待

(1)lockInterruptibly 可中断形式获取锁

lockInterruptibly()反对中断的获取锁,其实是调用了 AQS 的 lockInterruptibly 办法。

    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
    }

最终调用 AQS 的 doAcquireInterruptibly(int arg)办法:

    public final void acquireInterruptibly(int arg)
        // 以后线程曾经中断了,抛出异样。if (Thread.interrupted())
            throw new InterruptedException();
        // 以后线程依然未胜利获取锁,则调用 doAcquireInterruptibly 办法,这个办法和
        // acquireQueued 办法没什么区别,就是线程在期待状态的过程中,如果线程被中断,线程会抛出异样。if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

(2)tryLock 超时期待形式获取锁

tryLock(long timeout, TimeUnit unit)也反对中断,并且在这个根底上减少了超时设置,其实也是调用了 AQS 的 tryAcquireNanos 办法。

     public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

最终调用 AQS 的 doAcquireNanos(int arg, long nanosTimeout)办法:

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 如果以后线程曾经中断,则抛出异样
        if (Thread.interrupted())
            throw new InterruptedException();
        // 再尝试获取一次,如果不胜利则调用 doAcquireNanos 办法进行超时期待获取锁。return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

查看 tryAcquireNanos 办法:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {if (nanosTimeout <= 0L)
        return false;
    // 计算超时的工夫 = 以后虚拟机的工夫 + 设置的超时工夫
    final long deadline = System.nanoTime() + nanosTimeout;
    // 调用 addWaiter 将以后线程封装成独占模式的节点,并且退出到同步队列尾部。final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                // 如果以后节点的前驱节点为头结点,则让以后节点去尝试获取锁。setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 如果以后节点的前驱节点不是头结点,或以后节点获取锁失败,// 则再次判断以后线程是否曾经超时。nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 调用 shouldParkAfterFailedAcquire 办法,通知以后节点的前驱节点,马上进入
            // 期待状态了,即做好进入期待状态前的筹备。if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 调用 LockSupport.parkNanos 办法,将以后线程设置成超时期待的状态。LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

3 ReadWriteLock

之前提到锁(如 Mutex 和 ReentrantLock)根本都是 排他锁 ,这些锁在同一时刻只容许一个线程进行拜访。而 读写锁(ReadWriteLock) 同一时刻能够容许多个读线程拜访,然而在写线程拜访时,所有的读线程和其余写线程均被阻塞

ReadWriteLock 保护了一组锁,一个是只读的锁,一个是写锁。读锁能够在没有写锁的时候被多个线程同时持有,写锁是独占的。

如何用一个共享变量来辨别锁是写锁还是读锁呢?答案就是 按位拆分

因为 state 是 int 类型的变量,在内存中 占用 4 个字节,也就是 32 位 。将其拆分为两局部:高 16 位和低 16 位,其中 高 16 位用来示意读锁状态,低 16 位用来示意写锁状态。当设置读锁胜利时,就将高 16 位加 1,开释读锁时,将高 16 位减 1;当设置写锁胜利时,就将低 16 位加 1,开释写锁时,将第 16 位减 1。

ReadWriteLock 接口只有两个办法:

// 返回读锁  
Lock readLock()   
// 返回写锁  
Lock writeLock() 

Java 并发库中 ReetrantReadWriteLock 实现了 ReadWriteLock 接口并增加了可重入的个性。

ReentrantReadWriteLock 有两个构造方法:

public ReentrantReadWriteLock() {this(false);// 默认为 false,采纳非偏心模式
}

public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

能够看到,默认的构造方法应用的是非偏心模式,创立的 Sync 是 NonfairSync 对象,而后初始化读锁和写锁。一旦初始化后,ReadWriteLock 接口中的两个办法就有返回值了,如下:

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock;}
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock;}

从下面能够看到,构造方法决定了 Sync 是 FairSync 还是 NonfairSync。Sync 继承了 AQS,而 Sync 是一个抽象类,NonfairSync 和 FairSync 继承了 Sync,并重写了其中的形象办法。

Sync 中提供了很多办法,然而有两个办法是形象的,子类必须实现。

abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();

FairSync/NonfairSync 实现办法如下:

    /**
     * 非偏心
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {return false; // 间接返回 false,阐明不须要排队}
        final boolean readerShouldBlock() {
            /* 以后线程是写锁占用的线程时,返回 true;否则返回 false。* 如果以后有一个写线程正在写,那么该读线程应该阻塞。*/
            return apparentlyFirstQueuedIsExclusive();}
    }

    /**
     * 偏心
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {return hasQueuedPredecessors();// 判断同步队列中是否有人在排队
        }
        final boolean readerShouldBlock() {return hasQueuedPredecessors();
        }
    }

writerShouldBlock()办法的作用是判断以后线程是否应该阻塞,对于偏心的写锁和非偏心写锁的具体实现不一样。

  • 对于非偏心写锁而言,间接返回 false,因为非偏心锁获取锁之前不须要去判断是否排队。
  • 对于偏心锁写锁而言,它会判断同步队列中是否有人在排队,有人排队,就返回 true,示意以后线程须要阻塞。无人排队就返回 false。

3.1 读锁

3.1.1 读锁加锁

获取读锁时,首先调用 ReadLock 类中的 lock 办法:

public void lock() {sync.acquireShared(1);
}

读锁应用的也是 AQS 的共享模式,AQS 的 acquireShared 办法如下:

    public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

当 tryAcquireShared()办法小于 0 时,那么会执行 doAcquireShared 办法将该线程退出到期待队列中。

Sync 实现了 tryAcquireShared 办法,如下:

        protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
            int c = getState();
            // exclusiveCount(c)返回的是写锁的数量,如果它不为 0,阐明写锁被占用
            // 如果此时占用写锁的线程不是以后线程,就返回 -1,示意获取锁失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 失去读锁的数量
            int r = sharedCount(c);
            
            /**
             * 在上面的代码中进行了三个判断:* 1、读锁是否应该排队。没有排队,就进行 if 前面的判断。排队,就间接调用 fullTryAcquireShared()办法。* 2、读锁数量是否超过最大值。(最大数量为 2 的 16 次方 -1=65535)* 3、是否获取到同步变量的最新状态值
             */          
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 如果读锁数量为 0 时,以后线程设置为 firstReader,即第一个读线程就是以后线程
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 如果以后读线程重入了,firstReaderHoldCount 累加
                    firstReaderHoldCount++; 
                } else {
                    // 读锁数量不为 0,且第一个获取到读锁的线程不是以后线程
                    // 上面这一段逻辑就是保留以后线程获取读锁的次数,如何保留的呢?// 通过 ThreadLocal 来实现的,readHolds 就是一个 ThreadLocal 的实例
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                // 返回 1 示意获取读锁胜利
                return 1;
            }
            // 否则,循环尝试
            return fullTryAcquireShared(current);
        }

查看 fullTryAcquiredShared 办法:

  final int fullTryAcquireShared(Thread current) {   
            HoldCounter rh = null;
             // for 死循环,直到满足相应的条件才会 return 退出,否则始终循环
            for (;;) {int c = getState();
                // 锁的状态为写锁时,持有锁的线程不等于以后线程,阐明以后线程获取锁失败,返回 -1
                if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)
                        return -1;
                } 
                // 如果读锁须要排队
                else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {// assert firstReaderHoldCount > 0;}
                    // 阐明有别的读线程占有了锁
                    else {if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();}
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 如果读锁达到了最大值,抛出异样
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 尝试设置同步变量的值,只有设置胜利了,就示意以后线程获取到了锁,而后就设置锁的获取次数等相干信息
                if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

从下面能够看到屡次调用了 readerShouldBlock 办法,对于偏心锁,只有队列中有线程在期待,那么将会返回 true,也就意味着读线程须要阻塞;对于非偏心锁,如果以后有线程获取了写锁,则返回 true。一旦不阻塞,那么读线程将会有机会取得读锁。

3.1.2 读锁开释

当调用 readLock.unlock()办法时,会先调用到 AQS 的 releaseShared()办法,在 releaseShared()办法中会先调用子类的 tryReleaseShared()办法。在这里会调用的是 ReentrantReadWriteLock 的外部类 Sync 的 tryReleaseShared() 办法。该办法的源码如下。

protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();}
        --rh.count;
    }
    for (;;) {int c = getState();
        // 将批改同步变量的值(读锁状态减去 1 <<16)int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

3.2 写锁

3.2.1 写锁加锁

首先调用 WriteLock 类中的 lock 办法:

public void lock() {sync.acquire(1);
}

AQS 的 acquire 办法如下:

public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 写锁应用的是 AQS 的独占模式。首先尝试获取锁,如果获取失败,那么将会把该线程退出到期待队列中。selfInterrupt();}

Sync 实现了 tryAcquire 办法用于尝试获取一把锁,如下:

protected final boolean tryAcquire(int acquires) {
            // 失去调用 lock 办法的以后线程
            Thread current = Thread.currentThread();
            int c = getState();
            // exclusiveCount()办法的作用是将同步变量与 0xFFFF 做 & 运算,计算结果就是写锁的数量。// w 即是写锁的数量。int w = exclusiveCount(c);
            // 如果线程占有了写锁或者读锁
            if (c != 0) {
                // 如果写锁数量为 0,线程占有的必是读锁,而且这个线程并不是以后线程(齐全不合乎重入),返回 false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 如果写锁的数量超过了最大值,抛出异样
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 写锁重入,返回 true
                setState(c + acquires);
                return true;
            }
    
            /**
             * 1. 当 writerShouldBlock()返回 true 时,示意以后线程还不能间接获取锁,因而 tryAcquire()办法间接返回 false。* 2. 当 writerShouldBlock()返回 false 时,示意以后线程能够尝试去获取锁,因而会执行 if 判断中前面的逻辑
             *       即通过 CAS 办法尝试去批改同步变量的值。* 3. 如果批改同步变量胜利,则示意以后线程获取到了锁,最终 tryAcquire()办法会返回 true。*      如果批改失败,那么 tryAcquire()会返回 false,示意获取锁失败。*/
            // 如果以后没有写锁或者读锁,如果写线程应该阻塞或者 CAS 失败,返回 false
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            // 否则将以后线程置为取得写锁的线程,返回 true
            setExclusiveOwnerThread(current);
            return true;
        }

从下面能够看到调用了 writerShouldBlock 办法,FairSync 的实现是如果期待队列中有期待线程,则返回 false,阐明偏心模式下,只有队列中有线程在期待,那么起初的这个线程也是须要记入队列期待的;NonfairSync 中的间接返回的间接是 false,阐明不须要阻塞。从下面的代码能够得出,当没有锁时,如果应用的非偏心模式下的写锁的话,那么返回 false,间接通过 CAS 就能够取得写锁。

3.2.2 写锁开释

写锁的开释与排他锁的开释逻辑也简直一样。当调用 writeLock.unlock()时,先调用到 AQS 的 release()办法,在 release()办法中会先调用子类的 tryRelease()办法。在这里调用的是 ReentrantReadWriteLock 的外部类 Sync 的 tryRelease()办法。写锁的开释逻辑比较简单,能够参考上面源码中的正文。办法的源码和正文如下。

protected final boolean tryRelease(int releases) {
    // 判断是否是以后线程持有锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 将 state 的值减去 releases
    int nextc = getState() - releases;
    // 调用 exclusiveCount()办法,计算写锁的数量。如果写锁的数量为 0,示意写锁被齐全开释,此时将 AQS 的 exclusiveOwnerThread 属性置为 null
    // 并返回 free 标识,示意写锁是否被齐全开释
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

3.3 小结

ReadWriteLock 读写锁执行流程:

读写锁不反对锁降级,反对锁降级。锁降级指的是线程获取到了写锁,在没有开释写锁的状况下,又获取读锁。以上面代码为例:

public class WriteReadLockTest {
    Object data;
    // 是否无效,如果生效,须要从新计算 data
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() {rwl.readLock().lock();
        if (!cacheValid) {
            // 获取写锁前必须开释读锁
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                // 判断是否有其它线程曾经获取了写锁、更新了缓存, 防止反复更新
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }
                // 降级为读锁, 开释写锁, 这样可能让其它线程读取缓存
                rwl.readLock().lock();
            } finally {rwl.writeLock().unlock();}
        }
        // 本人用完数据, 开释读锁
        try {use(data);
        } finally {rwl.readLock().unlock();}
    }
}

参考

JAVA 并发编程的艺术

不可不说的 Java“锁”事

深刻了解 ReentrantLock 的实现原理

读写锁 ReadWriteLock 的实现原理

深刻了解读写锁—ReadWriteLock 源码剖析

正文完
 0