关于java:AQS在Java中的应用

49次阅读

共计 7138 个字符,预计需要花费 18 分钟才能阅读完成。

上篇文章咱们详细分析了 AQS 的底层实现原理, 这节就来摸索 jdk 中应用 AQS 实现的工具类

从源码看 AQS


ReentrantLock

一, 是什么? 怎么用?

是什么?

是一个独占锁, 也就是在并发环境下同一时刻只能有一个线程取得资源, 也是一个可重入锁.

可重入锁: 一个线程曾经获取到了该资源, 下次再次获取资源时不会呈现期待状况 (上次获取资源没有开释)

怎么用?

在各类并发的场景下, 为了保障资源获取的正确性, 能够保障每个资源同时只能被一个线程获取到.

例如: 宿舍选宿零碎 (每张床位只能有一个学生抢到), 秒杀流动 (同一件商品不能被两个人买走)

二, 类架构

由下面架构图能够看出,ReentrantLock 能够分为偏心锁和非偏心锁, 而底层实现是 AQS, 在前面咱们还能够看到更多的类底层都是由 AQS 实现的, 所以说相熟 AQS 原理对了解这些类是非常有必要的

类的属性

/**
 * 实现锁的同步器
 */
private final Sync sync;
/**
 * 形象同步器
 * 子类可有偏心和非偏心两种形式, 应用 AQS 的 state 字段来示意是否获取到锁和重入次数
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * 依据子类实现, 能够实现偏心锁和非偏心锁
     */
    abstract void lock();

    /**
     * 非偏心形式获取锁
     */
    final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
        int c = getState();
        // 如果以后状态值为 0 也就是以后锁没有被其余线程持有, 则尝试获取锁
        if (c == 0) {
            // 获取锁, 如果过来胜利, 则设置以后线程为独占线程
            if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果以后锁是以后线程所持有, 则将重入次数 +1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            // 如果冲入次数超过阈值, 则将其置为负值
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    /**
     * 开释锁
     */
    @Override
    protected final boolean tryRelease(int releases) {int c = getState() - releases;
        // 判断以后线程和锁持有线程是否为同一个线程
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 判断以后可重入次数是否为 0, 如果为 0 则革除线程占有标记
        if (c == 0) {
            free = true;
            // 革除掉独占标记
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    /**
     * 判断以后线程是否持有锁
     */
    @Override
    protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}

    /**
     * 创立条件变量
     */
    final ConditionObject newCondition() {return new ConditionObject();
    }

    /**
     * 获取资源持有者
     */
    final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}

    /**
     * 获取重入次数
     */
    final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}

    /**
     * 是否曾经持有锁
     */
    final boolean isLocked() {return getState() != 0;
    }
}

三, 具体实现

偏心式

先获取资源的状态, 如果没有人占用, 判断以后线程是否为队列的首节点, 如果是则尝试获取资源, 获取胜利批改独占线程, 如果有人占用则判断独占线程和以后线程是否雷同, 如果雷同的判断可重入的次数, 超过抛出谬误, 否则重入胜利

/**
 * 偏心锁
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    @Override
    final void lock() {acquire(1);
    }

    /**
     * 获取锁
     */
    @Override
    protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            /**
             * 以后线程为队列的头节点并且获取资源胜利, 设置独占锁
             */
            if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) {throw new Error("Maximum lock count exceeded");
            }
            setState(nextc);
            return true;
        }
        return false;
    }
}

非偏心式

先获取以后资源的状态, 如果没有人占用, 间接获取, 获取胜利批改独占线程的状态, 有人占用查看以后占有线程是否为以后线程, 如果是则进行重入, 此外还须要判断重入次数, 如果超过了阈值, 抛出谬误

/**
 * 非偏心锁
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    @Override
    final void lock() {
        /**
         * 自旋获取资源, 胜利后批改独占状态,
         * 失败后持续尝试获取, 并将其退出到队列中以 CLH 自旋锁形式始终尝试获取资源
         */
        if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
        }
        else {//AQS 外部办法, 实则调用的是 tryAcquire(), 如果获取资源失败, 则退出到 AQS 队列尾部, 并且以自旋的方             // 式始终尝试获取资源, 不会响应中断, 然而设置了终端标记, 在获取到资源后会开释掉资源, 并且将以后线程               // 状态设置为 CANCELLED, 这一部分具体代码请看 AQS 源码解析
            acquire(1);
        }
    }

    /**
     * 尝试获取资源
     */
    @Override
    protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
    }
}

结构器

/**
* 默认为非偏心锁, 相比于偏心锁, 性能更高, 因为偏心锁每次还须要查看 AQS 中是否有期待的线程
*/
public ReentrantLock() {sync = new NonfairSync();
}
// 也能够指定创立偏心锁或非偏心锁
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

案例

之前某大厂的一个面试题, 应用三个线程程序打印出 ABC 三个字母, 第一个线程打印 A, 而后第二个线程打印 B, 第三个线程打印 C, 打印 10 轮

public class PrintWord {private static ReentrantLock lock = new ReentrantLock();
    static Condition conditionA = lock.newCondition();
    static Condition conditionB = lock.newCondition();
    static Condition conditionC = lock.newCondition();
    private static int i = 1;

    public static void main(String[] args) {new Thread(() -> {for (int j = 0; j < 10; j++) {printA();
            }
        },"A").start();
        new Thread(() -> {for (int j = 0; j < 10; j++) {printB();
            }
        },"B").start();
        new Thread(() -> {for (int j = 0; j < 10; j++) {printC();
            }
        },"C").start();}

    private static void printA() {lock.lock();
        try {if (i != 1) {conditionA.await();
            }
            System.out.println(Thread.currentThread().getName());
            i = 2;
            conditionB.signal();} catch (InterruptedException e) {e.printStackTrace();
        } finally {lock.unlock();
        }
    }
    private static void printB() {lock.lock();
        try {if (i != 2) {conditionB.await();
            }
            System.out.println(Thread.currentThread().getName());
            i = 3;
            conditionC.signal();} catch (InterruptedException e) {e.printStackTrace();
        } finally {lock.unlock();
        }
    }
    private static void printC() {lock.lock();
        try {if (i != 3) {conditionC.await();
            }
            System.out.println(Thread.currentThread().getName());
            i = 1;
            conditionA.signal();} catch (InterruptedException e) {e.printStackTrace();
        } finally {lock.unlock();
        }
    }
}

Semaphore

一, 是什么? 怎么用?

是什么?

信号量, 从概念上讲, 信号量保护一组许可证, 每个线程都能够来获取许可证, 直至许可证为空

怎么用?

能够应用其管制并发线程的数量

二, 类架构

从下面架构图咱们能够看出,Semaphore 底层也是应用的 AQS, 并且和 ReentrantLock 一样, 都提供了偏心式和非偏心式获取资源

类的属性

/**
 * 实现信号量的同步器
 */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    /**
     * 设置许可证的数量
     */
    Sync(int permits) {setState(permits);
    }

    /**
     * 获取以后残余的许可证数量
     */
    final int getPermits() {return getState();
    }

    /**
     * 以共享的形式非偏心获取许可证
     */
    final int nonfairTryAcquireShared(int acquires) {for (;;) {
            // 获取目前所残余的许可证
            int available = getState();
            // 计算获取之后残余的许可证
            int remaining = available - acquires;
            // 如果许可证数量为负就批改 state 值
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
    }

    /**
     * 以共享的形式开释许可证
     */
    @Override
    protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            //CAS 设置许可证数量
            if (compareAndSetState(current, next))
                return true;
        }
    }

    /**
     * 依照具体的数量缩小许可证
     */
    final void reducePermits(int reductions) {for (;;) {int current = getState();
            int next = current - reductions;
            if (next > current) 
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    /**
     * 获取以后能够应用的许可证, 如果等于 0 则间接批改 state
     */
    final int drainPermits() {for (;;) {int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

三, 具体实现

偏心式

获取许可证: 每次获取先都须要看 AQS 中是否有期待的线程, 如果有, 则间接退出, 否则获取许可证, 批改残余许可证的数量, 并且返回残余许可证数量

开释许可证: 由 AQS 的 releaseShared 调用, 开释许可证时, 在原先的根底上加上开释的许可证, 然而开释的数量不能为负, 开释胜利, 调用 AQS 中的 doReleaseShared 办法, 将队列头节点的状态设置为 0 而后从头节点的后继节点中找出一个状态值小于 0 的线程节点开释

/**
 * 偏心式同步器
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {super(permits);
    }

    /**
     * 获取许可证, 由 AQS 中的 acquireShared 办法调用, 如果许可证数量小于 0,
     * 则将以后线程退出到队列中始终轮询尝试过来许可证
     */
    @Override
    protected int tryAcquireShared(int acquires) {for (;;) {
            // 队列中有期待的线程, 间接返回
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            // 获取许可证,CAS 批改 state 值
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

非偏心式

获取许可证: 上来间接尝试获取信号量, 如果获取胜利返回残余许可证, 如果许可证数量小于 0 则进入 AQS 队列中

开释许可证: 和非偏心式雷同

/**
 * 非偏心形式下同步器
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {super(permits);
    }

    /**
     * 获取资源, 这一步由 AQS 中的 acquireShared 调用, 每次获取资源后会返回残余的许可证, 下面有写
     * 如果小于等于 0 则以后线程会始终处于 CLH 锁中, 如果大于 0 则会唤醒队列中所有状态为 SIGNAL 的线程
     * 详情见 AQS 源码 948 行开始
     */
    @Override
    protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
    }
}

获取许可证

public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {sync.acquireShared(1);
}

开释许可证

public void release() {sync.releaseShared(1);
}

正文完
 0