关于aqs:AQS场景共享锁和独占锁4

1、java中,共享锁有哪些Java 中,常见的共享锁包含读写锁(ReentrantReadWriteLock)和计数器(CountDownLatch、CyclicBarrier、Semaphore 等)等。 1)读写锁读写锁是一种非凡的锁,它容许多个线程同时读取共享资源,但只容许一个线程写入共享资源。ReentrantReadWriteLock 是 Java 中实现读写锁的一种形式,它通过保护两个锁来实现读写锁的性能,即读锁和写锁。读锁能够同时被多个线程获取,而写锁只能被一个线程获取。 2)计数器计数器是一种用于管制并发拜访的同步工具,它容许多个线程并发地访问共享资源,但须要满足肯定的条件能力继续执行。常见的计数器包含 CountDownLatch、CyclicBarrier 和 Semaphore。CountDownLatch 是一种基于计数的同步工具,它能够让一个或多个线程期待一个或多个事件的产生。CyclicBarrier 是一种栅栏同步工具,它能够让多个线程在同一个工夫点上期待。Semaphore 是一种计数信号量,它能够管制同时访问共享资源的线程数量。 2、java 中,有哪些独占锁ava 中常见的独占锁包含 synchronized、ReentrantLock 和 ReentrantReadWriteLock 的写锁等。它们的特点是同一时间只能有一个线程持有锁,其余线程无法访问被锁定的代码块或资源。 1)synchronizedsynchronized 是 Java 中最根本的同步机制之一,它是一种独占锁,只能被一个线程持有。在 synchronized 关键字润饰的代码块或办法上,同一时刻只能有一个线程取得锁,其余线程须要期待取得锁的线程开释锁能力继续执行。 2)ReentrantLockReentrantLock 是 Java 中常见的独占锁实现之一,与 synchronized 相比,ReentrantLock 具备更高的灵活性和扩展性。ReentrantLock 能够设置偏心锁和非偏心锁,默认是非偏心锁。ReentrantLock 还反对可重入锁,即一个线程能够屡次获取同一个锁,而不会导致死锁。 3)ReentrantReadWriteLockReentrantReadWriteLock 是一种读写锁,读写锁容许多个线程同时读取共享资源,但只容许一个线程写入共享资源。在 ReentrantReadWriteLock 中,写锁是独占锁,即只能被一个线程持有,而读锁是共享锁,能够被多个线程同时持有。 AQS-根本篇(1)AQS-基本原理17问(2)AQS-Condition对象的应用(3)

March 7, 2023 · 1 min · jiezi

关于aqs:AQSCondition对象的使用3

AQS 中的 Condition 对象是用于实现线程间通信和同步的对象,它能够和锁对象一起应用。Condition 对象的实现依赖于期待队列,它的次要作用是将期待某个条件的线程从期待队列中挪动到条件队列中,同时在条件满足时将它们从条件队列中移回到期待队列中。 1、办法介绍1)Condition 对象的创立:Condition 对象是通过 Lock 接口中的 newCondition() 办法创立的。具体实现能够查看 ReentrantLock 和 ReentrantReadWriteLock 类中的 newCondition() 办法。 2)await() 办法的实现:await() 办法用于将以后线程退出到条件队列中,并且开释锁。具体实现中,它首先须要获取到以后线程对应的节点(Node 对象),而后将节点从期待队列中挪动到条件队列中,最初调用 release() 办法开释锁,并且挂起以后线程。 3)signal() 办法的实现:signal() 办法用于唤醒一个期待在条件队列中的线程,并且将它从条件队列中挪动到期待队列中。具体实现中,它首先须要获取到条件队列的头节点(firstWaiter),而后遍历条件队列,找到第一个不为 null 的节点,并且将它从条件队列中挪动到期待队列中,最初调用 unpark() 办法唤醒线程。 4)signalAll() 办法的实现:signalAll() 办法用于唤醒所有期待在条件队列中的线程,并且将它们从条件队列中挪动到期待队列中。具体实现中,它和 signal() 办法相似,只是在遍历条件队列时,将所有节点都挪动到期待队列中。 须要留神的是,Condition 对象的实现依赖于期待队列和 AQS 的实现,它的底层原理比较复杂。在应用 Condition 对象的时候,须要特地留神锁的开释和获取程序,以防止死锁等问题的呈现。 2、AQS的哪些锁的实现用到了condition对象AQS 中的 ReentrantLock 和 ReentrantReadWriteLock 都应用了 Condition 对象实现期待/告诉机制。1)在 ReentrantLock 中,Condition 对象是通过 newCondition() 办法创立的,它是基于 AQS 的期待/告诉机制实现的。ReentrantLock 中的 Condition 对象和 ReentrantLock 对象是绑定的,即同一个 ReentrantLock 对象中能够创立多个 Condition 对象。线程在应用 Condition 对象期待某个条件时,须要先获取对应的锁,而后通过 Condition 对象的 await() 办法进入期待状态,并且开释锁。当条件满足时,线程能够通过 signal() 或 signalAll() 办法唤醒期待在条件队列中的线程,让它们从新进入到期待队列中期待获取锁。2)在 ReentrantReadWriteLock 中,ReadLock 和 WriteLock 都实现了 Condition 接口。它们别离应用了两个 Condition 对象,别离用于实现读线程期待写线程开释锁的机制和写线程期待读线程开释锁的机制。 ...

March 7, 2023 · 2 min · jiezi

关于aqs:AQS基本原理17问

1、AQS是如何实现锁的?AQS通过定义了一个State变量来示意同步状态,同时保护了一个期待队列,当一个线程获取锁失败时,会将该线程退出到期待队列中,并将其阻塞,当锁的状态产生扭转时,AQS会从期待队列中抉择一个线程唤醒,使其有机会获取锁。 2、AQS的外围办法是哪些?AQS的外围办法包含tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int)、isHeldExclusively()等。 3、如何应用AQS实现独占锁?能够通过继承AbstractQueuedSynchronizer类并实现tryAcquire(int)和tryRelease(int)办法来实现独占锁。 4、如何应用AQS实现共享锁?能够通过继承AbstractQueuedSynchronizer类并实现tryAcquireShared(int)和tryReleaseShared(int)办法来实现共享锁。 5、AQS是如何实现同步的?AQS应用了一种基于FIFO期待队列的机制,当一个线程获取锁失败时,会将该线程退出到期待队列中,并将其阻塞,当锁的状态产生扭转时,AQS会从期待队列中抉择一个线程唤醒,使其有机会获取锁。 6、AQS中的State变量有什么作用?AQS中的State变量用于示意同步状态,能够用一个整型数值示意同步状态的不同状况,比方独占锁和共享锁的状态。 7、AQS中的Node节点有什么作用?AQS中的Node节点用于示意期待队列中的一个节点,其中蕴含了前驱节点、后继节点、期待状态等信息,AQS会依据这些信息进行线程的唤醒和期待。 8、AQS的期待队列是如何实现的?AQS的期待队列是通过一个双向链表来实现的,每个节点都蕴含了前驱节点和后继节点的信息,当一个线程退出期待队列时,会将一个节点退出到队列尾部,当一个线程被唤醒时,会从队列头部抉择一个节点进行唤醒。 9、AQS的tryAcquire办法和tryRelease办法的实现原理是什么?tryAcquire办法和tryRelease办法都是基于CAS(Compare and Swap,比拟并替换)操作实现的,通过原子操作批改State变量的值,以实现对锁的获取和开释。 10、AQS中的Condition对象是如何实现的?AQS中的Condition对象是基于期待队列的机制实现的,每个Condition对象都有一个期待队列,当一个线程调用Condition的await办法时,会将该线程退出到Condition的期待队列中,并开释锁,当另一个线程调用Condition的signal办法时,会从期待队列中抉择一个节点唤醒,并将其退出到主期待队列中,使其有机会获取锁。 11、AQS如何实现可重入锁?AQS实现可重入锁的形式是通过记录持有锁的线程和持有次数,在每次获取锁时判断以后线程是否曾经持有锁,如果是,则间接减少持有次数,如果不是,则通过失常的获取锁流程来获取锁。 12、AQS如何实现偏心锁?AQS实现偏心锁的形式是通过保护一个期待队列,所有期待锁的线程都会被退出到该队列的尾部,当锁开释时,AQS会从期待队列的头部抉择一个线程进行唤醒,保障先期待的线程先获取锁,实现公平性。 13、AQS如何实现读写锁?AQS实现读写锁的形式是通过保护两个State变量,一个示意读锁的数量,一个示意写锁的数量,在获取读锁和写锁时都须要判断以后状态是否容许获取锁,以及期待队列中是否有期待线程,从而实现读写锁的性能。 14、AQS如何实现信号量?AQS实现信号量的形式是通过保护一个State变量,示意以后可用的许可证数量,以及一个期待队列,当一个线程须要获取许可证时,如果以后可用的许可证数量大于0,则间接获取许可证,否则退出期待队列中期待,当有其余线程开释许可证时,AQS会从期待队列中抉择一个线程唤醒,并使其获取许可证。 15、AQS如何实现可中断锁?AQS实现可中断锁的形式是通过在期待队列中为每个线程都保护一个标识位,示意该线程是否被中断,当一个线程期待锁时,如果被中断,则AQS会将该线程从期待队列中移除,并抛出InterruptedException异样,使该线程可能退出期待状态。 16、AQS如何实现自旋锁?AQS实现自旋锁的形式是通过CAS操作来实现,当一个线程获取锁失败时,会尝试屡次通过CAS操作来获取锁,直到获取锁胜利或者达到肯定的次数后才进入期待队列。这种形式能够防止线程的上下文切换和期待队列的开销 17、AQS如何保障并发平安?AQS通过保护一个状态变量和一个期待队列来实现锁的管制,所有对共享资源的拜访都须要通过获取锁来进行,从而保障同一时刻只有一个线程可能访问共享资源,防止了多线程竞争的状况,保障了并发平安。 一、AQS介绍-根本篇

March 7, 2023 · 1 min · jiezi

关于aqs:AQS介绍基本篇

AQS(Abstract Queued Synchronizer,形象队列同步器)是Java中的一个并发编程工具类,用于实现自定义的同步器。AQS提供了一种基于FIFO期待队列的机制,能够通过实现其形象办法,定义独占锁(如ReentrantLock)或共享锁(如CountDownLatch、Semaphore)等各种同步器。 AQS通过定义了一个State变量来示意同步状态,同时保护了一个期待队列,当一个线程获取锁失败时,会将该线程退出到期待队列中,并将其阻塞,当锁的状态产生扭转时,AQS会从期待队列中抉择一个线程唤醒,使其有机会获取锁。 在应用AQS时,须要继承AbstractQueuedSynchronizer类,并实现以下几个办法: 1)tryAcquire(int): 尝试获取锁的操作,返回true示意胜利获取锁,false示意获取锁失败,须要进入期待队列。 2)tryRelease(int): 尝试开释锁的操作,返回true示意开释胜利,false示意开释失败。 3)tryAcquireShared(int): 尝试获取共享锁的操作,返回值的含意与tryAcquire(int)雷同。 4)tryReleaseShared(int): 尝试开释共享锁的操作,返回值的含意与tryRelease(int)雷同。 5)isHeldExclusively(): 判断以后线程是否持有独占锁。 AQS还提供了一些外部办法用于实现同步器的逻辑,如enq(Node)用于将节点退出期待队列,deq(Node)用于将节点从期待队列中移除等等。 总的来说,AQS是一个十分弱小和灵便的同步器工具类,能够用于实现各种各样的同步器,如独占锁、共享锁、读写锁、信号量、倒计时门闩等。 链接一、AQS-基本原理17问

March 7, 2023 · 1 min · jiezi

关于aqs:AQS及其组件的核心原理

AQS,全称AbstractQuenedSynchronizer,能够了解为形象的队列同步器。一、AQS的核心思想AQS核心思想是,如果被申请的共享资源闲暇,则将以后申请资源的线程设置为无效的工作线程,并且将共享资源设置为锁定状态。如果被申请的共享资源被占用,那么就须要一套线程阻塞期待以及被唤醒时锁调配的机制,这个机制AQS是用CLH队列锁实现的,行将临时获取不到锁的线程退出到队列中。二、实现原理 1、private volatile int state:AQS应用一个int成员变量来示意同步状态,通过内置的FIFO队列来实现获取资源线程的排队工作。AQS应用CAS对该同步状态进行原子操作实现对其值的批改。 //获取状态    protected final int getState() {        return state; } //设置状态值 protected final void setState(int newState) { state = newState; } //通过CAS批改状态 protected final boolean compareAndSetState(int expect, int update) {        // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }在ReentrantLock中,stste示意获取锁的线程数,如果state=0,示意还没有线程获取锁,1示意有线程获取了锁。大于1示意重入锁的数量。 2、CLH(Craig,Landin,and Hagersten)队列是一个虚构的双向队列(虚构的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条申请共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的调配。三、ReentrantLock中的偏心锁与非偏心锁 偏心锁:以后线程在抢锁之前先看看队列中是否有排队的线程,如果有则不容许抢,间接退出队列。 非偏心锁:以后线程在抢锁之前不须要查看队列中是否有排队的线程。利用待补充

February 28, 2023 · 1 min · jiezi

关于aqs:ReentrantLock-源码分析

ReentrantLock 源码剖析阐明本文基于 jdk 8 写作。@author JellyfishMIX - github / blog.jellyfishmix.comLICENSE GPL-2.0锁机制的外围: Sync(锁) /** * Synchronizer providing all implementation mechanics * 提供所有实现机制的同步器,这是 ReentrantLock 能实现锁机制的外围 */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizerReentrantLock 的 lock 与 unlock 办法,是通过 Sync 实现的,Sync 是一个 ReentrantLock 的外部类,继承了 AbstractQueuedSynchronizer(简称 AQS)。Sync 是 ReentrantLock 能实现锁机制的外围。 ...

September 23, 2022 · 9 min · jiezi

关于aqs:AQSAbstractQueuedSynchronizer-源码分析

AQS(AbstractQueuedSynchronizer) 源码剖析阐明本文基于 jdk 8 写作。@author JellyfishMIX - github / blog.jellyfishmix.comLICENSE GPL-2.0AQS 须要关注的点有哪些?AQS 全称 AbstractQueuedSynchronizer,是 juc 包(java.util.concurrent)中一个同步器开发框架,用于反对下层的锁。要害的属性:state 同步状态,CHL 同步队列。两种模式:独占模式,共享模式。AQS 的要害属性sate 同步状态,是锁的表述,示意有多少线程获取了锁。CHL 队列(同步队列),由链表实现,以 CAS + 自旋的形式获取资源,是可阻塞的先进先出的双向队列。通过 CAS + 自旋保障节点插入和移除的原子性。当有线程获取锁失败,就被增加到队列开端。当有线程开释了锁,会从 CHL 队头出队一个线程。state 同步状态/** * The synchronization state. */private volatile int state;AQS 锁反对的子类,能够借助操作同步状态 state 来实现尝试获取同步状态(tryAcquire)、尝试开释同步状态(tryRelease)的逻辑。 也就是说,AQS 的子类(各种锁)判断何时能获取同步状态,何时能开释同步状态,须要借助于判断 state 的值。而后能把获取同步状态进一步封装成获取锁(lock),把开释同步状态封装成开释锁(unlock)。至于 state 为何值时能够获取同步状态,state 为何值时能够开释同步状态,这些都是交由子类判断的。 AQS 只是当子类判断为未获取到同步状态时,通过 CHL 入队操作,把线程退出到同步队列中,而后让线程休眠。当子类判断为能够开释同步状态时,从 CHL 同步队列中唤醒一个线程。 CHL 队列在剖析前,可能会有如下纳闷: 节点的数据结构是什么样的?是单向还是双向?有无头节点和尾节点?NodeCHL 是链表,链表是由节点 node 组成的。 static final class Node { /** * 标记一个节点正在共享模式下期待 */ static final Node SHARED = new Node(); /** * 标记一个节点正在独占模式下期待 */ static final Node EXCLUSIVE = null; /** * waitStatus value,标记以后节点的线程被 cancel 勾销 */ static final int CANCELLED = 1; /** * waitStatus value,标记后继节点的线程须要被唤醒 */ static final int SIGNAL = -1; /** * waitStatus value,标记以后节点的线程进入期待队列中 */ static final int CONDITION = -2; /** * waitStatus value,示意下一次共享式的同步状态获取将会无条件流传上来 */ static final int PROPAGATE = -3; /** * 节点的状态 */ volatile int waitStatus; /** * 以后节点的前驱节点 */ volatile Node prev; /** * 以后节点的后继节点 */ volatile Node next; /** * 以后节点援用的线程 */ volatile Thread thread; /** * 期待队列中的下一个节点 */ Node nextWaiter; }留神到属性 prev, next, 阐明 CHL 是一个双向队列。 ...

September 23, 2022 · 9 min · jiezi

关于aqs:AQS源码解读番外篇四种自旋锁原理详解Java代码实现SpinLockTicketSpinLockCLHMCS

何为自旋锁自旋锁是为实现爱护共享资源而提出的一种锁机制。自旋锁与Java中的synchronized和Lock不同,不会引起调用线程阻塞睡眠。如果有线程持有自旋锁,调用线程就会始终循环检测锁的状态,直到其余线程开释锁,调用线程才进行自旋,获取锁。 自旋锁的劣势和缺点自旋锁的长处很显著:自旋锁不会使线程状态进行切换,始终处于用户态,即不会频繁产生上下文切换,执行速度快,性能高。 正是因为其不进行上下文切换的长处,也使得某些状况下,缺点也很显著:如果某个线程持有锁的工夫过长,或者线程间竞争强烈,就会导致某些期待获取锁的线程进入长时间循环期待,耗费CPU,从而造成CPU占用率极高。 自旋锁的实用场景自旋锁实用于被锁代码块执行工夫很短,即加锁工夫很短的场景。 常见自旋锁实现比拟有名的四种自旋锁:传统自旋锁SpinLock,排队自旋锁TicketSpinLock,CLH自旋锁,MCS自旋锁。这四种自旋锁的基本原理都是在CAS的根底上实现的,各有各的特点,且逐渐优化。 SpinLock传统自旋锁的劣势和有余实现原理SpinLock原理很简略,多个线程循环CAS批改一个共享变量,批改胜利则进行自旋获取锁。 代码实现实现接口参考了Java的Lock,外围办法在tryAcquire和tryRelease。获取锁的形式实现了自旋,可中断自旋,自旋超时中断,不自旋。共享变量state不仅作为锁的状态标记(state=0锁闲暇,state>0有线程持有锁),同时可作为自旋锁重入的次数。exclusiveOwnerThread记录以后持有锁的线程。 public class SpinLock implements Lock { protected volatile int state = 0; private volatile Thread exclusiveOwnerThread; @Override public void lock() { for(;;) { //直到获取锁胜利,才完结循环 if (tryAcquire(1)) { return; } } } @Override public void lockInterruptibly() throws InterruptedException { for(;;) { if (Thread.interrupted()) { //有被中断 抛异样 throw new InterruptedException(); } if (tryAcquire(1)) { return; } } } /** * 返回获取锁的后果,不会自旋 * @return */ @Override public boolean tryLock() { return tryAcquire(1); } /** * 返回获取自旋锁的后果,会自旋一段时间,超时后进行自旋 * @param time * @param unit * @return * @throws InterruptedException */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (nanosTimeout <= 0L) { return false; } final long deadline = System.nanoTime() + nanosTimeout; for(;;) { if (Thread.interrupted()) { //有被中断 抛异样 throw new InterruptedException(); } if (tryAcquire(1)) { return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { //超时自旋,间接返回false return false; } } } @Override public void unlock() { tryRelease(1); } @Override public Condition newCondition() { throw new UnsupportedOperationException(); } public int getState() { return state; } /** * 获取持有锁的以后线程 * @return */ public Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } /** * 获取以后线程重入次数 * @return */ public int getHoldCount() { return isHeldExclusively() ? getState() : 0; } /** * 开释锁 * @param releases * @return */ protected boolean tryRelease(int releases) { int c = getState() - releases; Thread current = Thread.currentThread(); if (current != getExclusiveOwnerThread()) //不是以后线程,不能unLock 抛异样 throw new IllegalMonitorStateException(); boolean free = false; if (c <= 0) { //每次减一,c = 0,证实没有线程持有锁了,能够开释了 free = true; c = 0; setExclusiveOwnerThread(null); System.out.println(String.format("spin un lock ok, thread=%s;", current.getName())); } //排它锁,只有以后线程才会走到这,是线程平安的 批改state setState(c); return free; } /** * 获取锁 * @param acquires * @return */ protected boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //若此时锁空着,则再次尝试抢锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); System.out.println(String.format("spin lock ok, thread=%s;", current.getName())); return true; } } //若以后持锁线程是以后线程(重入性) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //重入 setState(nextc); System.out.println(String.format("spin re lock ok, thread=%s;state=%d;", current.getName(), getState())); return true; } return false; } /** * 判断以后线程是否持有锁 * @return */ protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } protected void setState(int state) { this.state = state; } protected void setExclusiveOwnerThread(Thread exclusiveOwnerThread) { this.exclusiveOwnerThread = exclusiveOwnerThread; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } protected static final Unsafe getUnsafe() { try { //不能够间接应用Unsafe,须要通过反射,当然也能够间接应用atomic类 Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); Unsafe unsafe = (Unsafe) theUnsafe.get(null); if (unsafe != null) { return unsafe; } } catch (NoSuchFieldException | IllegalAccessException e) { e.printStackTrace(); } throw new RuntimeException("get unsafe is null"); } private static final Unsafe unsafe = getUnsafe(); private static final long stateOffset; static { try { stateOffset = unsafe.objectFieldOffset (SpinLock.class.getDeclaredField("state")); } catch (Exception ex) { throw new Error(ex); } }SpinLock的特点劣势:SpinLock实现原理简略,线程间没有频繁的上下文切换,执行速度快,性能高。缺点:SpinLock是不偏心的,无奈满足等待时间最长的线程优先获取锁,造成 “线程饥饿”。缺点:因为每个申请自旋锁的处理器均在一个全局变量上自旋检测,系统总线将因为处理器间的缓存同步而导致沉重的流量,从而升高了零碎整体的性能。因为传统自旋锁无序竞争的实质特点,内核执行线程无奈保障何时能够取到锁,某些执行线程可能须要期待很长时间,导致“不偏心”问题的产生。有两个方面的起因: ...

May 7, 2022 · 7 min · jiezi

关于aqs:Lock简介与初识AQS

Lock简介咱们下来看concurent包下的lock子包。锁是用来管制多个线程访问共享资源的形式,一般来说,一个锁可能避免多个线程同时访问共享资源。在Lock接口呈现之前,java程序次要是靠synchronized关键字实现锁性能的,而java SE5之后,并发包中减少了lock接口,它提供了与synchronized一样的锁性能。尽管它失去了像synchronize关键字隐式加锁解锁的便捷性,然而却领有了锁获取和开释的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步个性。通常应用显示应用lock的模式如下: Lock lock = new ReentrantLock(); lock.lock(); try { … } finally{lock.unlock(); } 须要留神的是synchronized同步块执行实现或者遇到异样是锁会主动开释,而lock必须调用unlock()办法开释锁,因而在finally块中开释锁。 Lock接口API 咱们当初就来看看lock接口定义了哪些办法: //获取锁void lock();//获取锁的过程可能响应中断void lockInterruptibly() throws InterruptedException;//非阻塞式响应中断能立刻返回,获取锁放回true反之返回fasleboolean tryLock();//超时获取锁,在超时内或者未中断的状况下可能获取锁boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//获取与lock绑定的期待告诉组件,以后线程必须取得了锁能力进行期待,进行期待时会先开释锁,当再次获取锁时能力从期待中返回Condition newCondition();// 开释锁。unlock(); 下面是lock接口下的六个办法,也只是从源码中英译中翻译了一遍,感兴趣的能够本人的去看看。那么在locks包下有哪些类实现了该接口了?先从最相熟的ReentrantLock说起。 public class ReentrantLock implements Lock, java.io.Serializable全选代码复制很显然ReentrantLock实现了lock接口,接下来咱们来认真钻研一下它是怎么实现的。java培训当你查看源码时你会诧异的发现ReentrantLock并没有多少代码,另外有一个很显著的特点是:基本上所有的办法的实现实际上都是调用了其动态内存类Sync中的办法,而Sync类继承了AbstractQueuedSynchronizer(AQS)。能够看出要想了解ReentrantLock要害外围在于对队列同步器AbstractQueuedSynchronizer(简称同步器)的了解。 初识AQS 对于AQS在源码中有非常具体的解释: Provides a framework for implementing blocking locks and relatedsynchronizers (semaphores, events, etc) that rely onfirst-in-first-out (FIFO) wait queues. This class is designed to bea useful basis for most kinds of synchronizers that rely on a singleatomic {@code int} value to represent state. Subclasses must definethe protected methods that change this state, and which define whatthat state means in terms of this object being acquired or released.Given these, the other methods in this class carry out all queuingand blocking mechanics. Subclasses can maintain other state fields,but only the atomically updated {@code int} value manipulated usingmethods {@link #getState}, {@link #setState} and {@link ...

February 22, 2022 · 1 min · jiezi

关于aqs:Java-JUC-抽象同步队列AQS解析

形象同步队列 AQS 解析AQS——锁的底层反对AbstractQueuedSynchronizer 形象同步队列简称 AQS,它是实现同步器的根底组件,并发包中的锁底层都是应用 AQS 来实现的,上面看下 AQS 的类图构造。 该图可知,AQS 是一个FIFO的双向队列,其外部通过节点 head 和 tail 记录队首和队尾的元素,队列元素类型为Node。 其中 Node 里的 thread 变量用来寄存进入 AQS 队列里的线程,而 SHARED 用来标记线程是获取共享资源时被阻塞挂起放入 AQS 队列的;EXCLUSIVE 用来标记该线程是获取独占资源时被挂起放入 AQS 队列中;waitStatus 记录以后线程期待状态,能够为CANCELLED(线程勾销)、SIGNAL(线程须要被唤醒)、CONDITION(线程在条件队列中期待)、PROPAGATE(开释共享资源时告诉其余节点);prev 记录以后节点的前驱节点,next 则是后驱节点。 在 AQS 中维持了一个繁多的状态信息state,能够通过 getState、setState、compareAndSetState 函数批改值。 对于 ReentrantLock 的实现,state 能够示意以后线程获取锁的次数;对于读写锁 ReentrantReadWriteLock,state 的高 16 位示意读状态,也就是获取该锁的次数,低 16 位示意获取到写锁线程可重入的次数;对于 Semaphore 来说,state 示意以后可用信号的个数;对于 CountDownlatch 来说,state 用来示意计数器以后的值;AQS 有个外部类 ConditionObject,它用来联合锁实现线程同步。ConditionObject 能够间接拜访 AQS 对象外部的变量,比方 state 状态值和队列。 ConditionObject 是条件变量,每个条件变量对应一个条件队列(单向链表队列),用来寄存调用条件变量的 await 办法后被阻塞的线程,而 firstWaiter 示意队首元素,lastWaiter 示意队尾元素。 ...

January 20, 2022 · 6 min · jiezi

关于aqs:谈谈JVM内部锁升级过程

简介: 对象在内存中的内存布局是什么样的?如何形容synchronized和ReentrantLock的底层实现和重入的底层原理?为什么AQS底层是CAS+volatile?锁的四种状态和锁降级过程应该如何形容?Object o = new Object() 在内存中占用多少字节?自旋锁是不是肯定比重量级锁效率高?关上偏差锁是否效率肯定会晋升?重量级锁到底重在哪里?重量级锁什么时候比轻量级锁效率高,同样反之呢?带着这些问题往下读。 作者 | 洋锅起源 | 阿里技术公众号 一 为什么讲这个?总结AQS之后,对这方面顺带的温习一下。本文从以下几个高频问题登程: 对象在内存中的内存布局是什么样的?形容synchronized和ReentrantLock的底层实现和重入的底层原理。谈谈AQS,为什么AQS底层是CAS+volatile?形容下锁的四种状态和锁降级过程?Object o = new Object() 在内存中占用多少字节?自旋锁是不是肯定比重量级锁效率高?关上偏差锁是否效率肯定会晋升?重量级锁到底重在哪里?重量级锁什么时候比轻量级锁效率高,同样反之呢?二 加锁产生了什么?有意识中用到锁的状况: //System.out.println都加了锁public void println(String x) { synchronized (this) { print(x); newLine(); }}简略加锁产生了什么? 要弄清楚加锁之后到底产生了什么须要看一下对象创立之后再内存中的布局是个什么样的? 一个对象在new进去之后在内存中次要分为4个局部: markword这部分其实就是加锁的外围,同时还蕴含的对象的一些生命信息,例如是否GC、通过了几次Young GC还存活。klass pointer记录了指向对象的class文件指针。instance data记录了对象外面的变量数据。padding作为对齐应用,对象在64位服务器版本中,规定对象内存必须要能被8字节整除,如果不能整除,那么就靠对齐来补。举个例子:new出了一个对象,内存只占用18字节,然而规定要能被8整除,所以padding=6。 晓得了这4个局部之后,咱们来验证一下底层。借助于第三方包 JOL = Java Object Layout java内存布局去看看。很简略的几行代码就能够看到内存布局的款式: public class JOLDemo { private static Object o; public static void main(String[] args) { o = new Object(); synchronized (o){ System.out.println(ClassLayout.parseInstance(o).toPrintable()); } }}将后果打印进去: ...

July 2, 2021 · 3 min · jiezi

关于aqs:Semaphore实战

简介Semaphore信号量计数器。和CountDownLatch,CyclicBarrier相似,是多线程合作的工具类,绝对于join,wait,notify办法应用起来简略高效。上面咱们次要看看它的用法吧! 实战限流。限度线程的并发数。比方在一个零碎中同时只能保障5个用户同时在线。 import java.util.concurrent.Semaphore;/** * @author :jiaolian * @date :Created in 2021-03-04 11:13 * @description:Semaphore限流 * @modified By: * 公众号:叫练 */public class LimitCurrnet { public static void main(String[] args) throws InterruptedException { //定义20个线程,每次最多只能执行5个线程; Semaphore semaphore = new Semaphore(5); for (int i=0; i<20; i++) { new Thread(()->{ try { //获取凭证 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"登录胜利"); Thread.sleep(2000); //开释凭证 semaphore.release(); System.out.println(Thread.currentThread().getName()+"用户退出"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }}如上代码所示:咱们定义了20个用户同时拜访零碎,Semaphore参数是5,示意同时只能有5个用户能够获取凭证,其余用户必须期待直到有在线用户退出。调用semaphore.acquire()示意获取凭证,此时凭证数会减一,调用semaphore.release()示意开释凭证,凭证数会加一,如果零碎中有期待的用户,操作此办法会告诉期待的一个用户获取凭证胜利,执行登录操作。最初打印局部后果如下:证实零碎最多能放弃5个用户同时在线。 留神:下面举出的这个案例,出个思考题:线程池是否能够实现呢? 模仿CyclicBarrier,CountDownLatch重用!Semaphore能够轻松实现CountDownLatch计数器,CyclicBarrier回环屏障,还记得CountDownLatch用法么?它是个计数器,能够帮咱们统计线程执行工夫,罕用来测试多线程高并发执行接口效率,咱们上面用Semaphore模仿多线程主线程期待子线程执行结束再返回。 import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;/** * @author :jiaolian * @date :Created in 2021-03-01 21:04 * @description:信号量测试 * @modified By: * 公众号:叫练 */public class SemaphoreTest { //定义线程数量; private static final int THREAD_COUNT = 2; //初始化信号量为0,默认是非偏心锁 private static Semaphore semaphore = new Semaphore(0,false); private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); public static void main(String[] args) throws InterruptedException { for (int i=0; i<THREAD_COUNT; i++) { executorService.submit(()->{ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"执行"); semaphore.release(); }); } //获取2个信号量 semaphore.acquire(2); System.out.println("主线程执行结束"); executorService.shutdown(); }}如上代码所示:咱们定义了Semaphore初始化信号量为0,默认是非偏心锁,在主线程中用线程池提交2个线程,主线程调用semaphore.acquire(2)示意须要获取两个信号量,但此时初始化信号量为0,此时AQS中的state会是0-2=-2,state值小于0,所以主线程执行这句话会阻塞将其退出AQS同步队列,线程池两个线程期待2秒后会调用semaphore.release()开释2个信号量,此时AQS中的state会自增到0,会告诉主线程退出期待持续往下执行。执行后果如下图所示。 ...

March 4, 2021 · 1 min · jiezi

关于aqs:核酸检测让我明白AQS原理

独占锁早上叫练带着一家三口来到了南京市第一医院做核酸检测,护士小姐姐站在医院门口拦着通知咱们人比拟多,无论小孩儿小孩,须要排队一个个期待医生采集唾液检测,OK,上面咱们用代码+图看看咱们一家三口是怎么排队的! import java.util.concurrent.locks.ReentrantReadWriteLock; /** * _@author_ :__jiaolian * _@date_ :__Created in 2021-01-22 10:33 * _@description__:独占锁测试_ * _@modified_ By__: * _公众号__:__叫练_ */ public class ExclusiveLockTest { **private** **static** `ReentrantReadWriteLock lock =` **new** `ReentrantReadWriteLock();` **private** **static** `ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();` _//__医院_ **private** **static** **class** **Hospital** `{` **private** `String name;` **public** **Hospital**(String name) `{` **this**`.name = name;` } _//__核酸检测排队测试_ **public** **void** **checkUp**() `{` **try** `{` writeLock.lock(); `System.out.println(Thread.currentThread().getName()+`"正在做核酸检测"`);` _//__核酸过程__...__好受__..._ `Thread.sleep(`3000`);` `}` **catch** `(InterruptedException e) {` e.printStackTrace(); `}` **finally** `{` writeLock.unlock(); } } } **public** **static** **void** **main**(String[] args) **throws** InterruptedException `{` `Hospital hospital =` **new** `Hospital(`"南京市第一医院"`);` `Thread JLWife =` **new** `Thread(()->hospital.checkUp(),`"叫练妻"`);` JLWife.start(); _//__睡眠__100__毫秒是让一家三口是有程序的排队去检测_ `Thread.sleep(`100`);` `Thread JLSon =` **new** `Thread(()->hospital.checkUp(),`"叫练子"`);` JLSon.start(); `Thread.sleep(`100`);` `Thread JL =` **new** `Thread(()->hospital.checkUp(),`"叫练"`);` JL.start(); }`}`如上代码:在主线程启动三个线程去医院门口排队,女士优先,叫练妻是排在最后面的,两头站的是叫练的孩子,最初就是叫练本人了。咱们假如模仿了下核酸检测一次须要3秒。代码中咱们用了独占锁,独占锁能够了解成医院只有一个医生,一个医生同时只能为一个人做核酸,所以须要一一排队检测,所以代码执行结束一共须要破费9秒,核酸检测就能够全副做完。代码逻辑还是比较简单,和咱们之前文章形容synchronized同理。核酸排队咱们用图形容下吧! ...

January 25, 2021 · 3 min · jiezi

关于aqs:或许是史上最好的AQS源码分析了AQS基础一

CC最新分享——《并发编程之透彻了解AQS源码剖析》,这相对是我见过的、讲并发编程、讲AQS讲的最好的视频了,没有之一,千万不要错过!是不是吹牛,听过就晓得,欢送围观! CC,20年Java开发和应用教训,多年的首席架构师和CTO,滞销原创书籍《研磨设计模式》的作者。参加和领导了上百个大中型我的项目的设计和开发,在互联网利用零碎架构、零碎设计、利用级框架和中间件开发等方面具备很多教训和领悟。更为难得的是,入行20年,依然奋战在技术一线,深知一线架构师须要把握哪些技术、把握到什么水平、一线架构设计会遇到哪些坑、如何能力做出最合适的架构设计,教训最难得!这次先来分享《并发编程和源码剖析》方面的内容,欢送品鉴!吹牛的话就不说了,做技术的人,要低调的弱小! 谈并发编程,必谈AQS,要想透彻了解AQS,必然要深刻了解AQS的原理、流程,而后细细研读它的源码,没有比源码更好的材料了。接下来,将通过一系列的内容,带着大家去一步一步了解AQS,一行一行去品读源码,一步一步去画图剖析流程!这些内容是CC在《高级互联网架构师技术实战培训》系列课程外面讲述的内容,当初分享给大家,心愿大家都能有所播种!学习倡议:在设计和安顿课程内容的时候,会逐渐浸透,让常识的曲线不会忽然变得那么平缓,因而,不倡议跳过一大片内容,间接去看所谓的“精髓”局部,跟着课程,能够自然而然,瓜熟蒂落的了解这些常识。预计你再也找不到比这更好的精品内容了,后续CC还会陆续推出更多的精品内容,连忙上车吧,抓稳了,持续飙车模式。 AQS的内容还是有肯定的难度的,为了让大家学习起来更轻松,这里从零讲起,一步一步深刻,本节内容包含:1:了解AQS是什么、能干什么2:了解独占锁和共享锁3:了解AQS根本的设计思路:图示 具体的内容,请参看视频吧!文字的货色总是不如视频来得直观和清晰立刻退出公众号架构设计一起学,观看《并发编程之AQS源码剖析》,同时还能够取得独家《架构师成长秘籍阶段一》一份,后续还会持续赠送《架构师成长秘籍阶段二》,心动不如口头,连忙退出吧!退出公众:架构设计一起学

September 10, 2020 · 1 min · jiezi

关于aqs:转载浅谈Java的AQS

所谓AQS,指的是AbstractQueuedSynchronizer,它提供了一种实现阻塞锁和一系列依赖FIFO期待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板办法,而后将子类作为同步组件的外部类。 理解一个框架最好的形式是读源码,说干就干。 AQS是JDK1.5之后才呈现的,由赫赫有名的Doug Lea李大爷来操刀设计并开发实现,全副源代码(加正文)2315行,整体难度中等。 * @since 1.5* @author Doug Lea根本框架在浏览源码前,首先论述AQS的根本思维及其相干概念。 AQS根本框架如下图所示: AQS保护了一个volatile语义(反对多线程下的可见性)的共享资源变量state和一个FIFO线程期待队列(多线程竞争state被阻塞时会进入此队列)。 State首先说一下共享资源变量state,它是int数据类型的,其拜访形式有3种: getState()setState(int newState)compareAndSetState(int expect, int update)上述3种形式均是原子操作,其中compareAndSetState()的实现依赖于Unsafe的compareAndSwapInt()办法。 private volatile int state;// 具备内存读可见性语义protected final int getState() { return state;}// 具备内存写可见性语义protected final void setState(int newState) { state = newState;}// 具备内存读/写可见性语义protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}资源的共享形式分为2种: 独占式(Exclusive)只有单个线程可能胜利获取资源并执行,如ReentrantLock。 共享式(Shared)多个线程可胜利获取资源并执行,如Semaphore/CountDownLatch等。 AQS将大部分的同步逻辑均曾经实现好,继承的自定义同步器只须要实现state的获取(acquire)和开释(release)的逻辑代码就能够,次要包含上面办法: tryAcquire(int):独占形式。尝试获取资源,胜利则返回true,失败则返回false。tryRelease(int):独占形式。尝试开释资源,胜利则返回true,失败则返回false。tryAcquireShared(int):共享形式。尝试获取资源。正数示意失败;0示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源。tryReleaseShared(int):共享形式。尝试开释资源,如果开释后容许唤醒后续期待结点返回true,否则返回false。isHeldExclusively():该线程是否正在独占资源。只有用到condition才须要去实现它。AQS须要子类复写的办法均没有申明为abstract,目标是防止子类须要强制性覆写多个办法,因为个别自定义同步器要么是独占办法,要么是共享方法,只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。 当然,AQS也反对子类同时实现独占和共享两种模式,如ReentrantReadWriteLock。 ...

August 27, 2020 · 5 min · jiezi

深入分析AQS实现原理

简单解释一下J.U.C,是JDK中提供的并发工具包,java.util.concurrent。里面提供了很多并发编程中很常用的实用工具类,比如atomic原子操作、比如lock同步锁、fork/join等。从Lock作为切入点我想以lock作为切入点来讲解AQS,毕竟同步锁是解决线程安全问题的通用手段,也是我们工作中用得比较多的方式。Lock APILock是一个接口,方法定义如下void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放void lockInterruptibly() // 和 lock()方法相似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException异常boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成功返回trueboolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法void unlock() // 释放锁Lock的实现实现Lock接口的类有很多,以下为几个常见的锁实现ReentrantLock:表示重入锁,它是唯一一个实现了Lock接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数ReentrantReadWriteLock:重入读写锁,它实现了ReadWriteLock接口,在这个类中维护了两个锁,一个是ReadLock,一个是WriteLock,他们都分别实现了Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是:读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。StampedLock: stampedLock是JDK8引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock是一种乐观的读策略,使得乐观锁完全不会阻塞写线程ReentrantLock的简单实用如何在实际应用中使用ReentrantLock呢?我们通过一个简单的demo来演示一下public class Demo { private static int count=0; static Lock lock=new ReentrantLock(); public static void inc(){ lock.lock(); try { Thread.sleep(1); count++; } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } }这段代码主要做一件事,就是通过一个静态的incr()方法对共享变量count做连续递增,在没有加同步锁的情况下多线程访问这个方法一定会存在线程安全问题。所以用到了ReentrantLock来实现同步锁,并且在finally语句块中释放锁。那么我来引出一个问题,大家思考一下多个线程通过lock竞争锁时,当竞争失败的锁是如何实现等待以及被唤醒的呢?什么是AQSaqs全称为AbstractQueuedSynchronizer,它提供了一个FIFO队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch等。AQS是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。可以这么说,只要搞懂了AQS,那么J.U.C中绝大部分的api都能轻松掌握。AQS的两种功能从使用层面来说,AQS的功能分为两种:独占和共享独占锁,每次只能有一个线程持有锁,比如前面给大家演示的ReentrantLock就是以独占方式实现的互斥锁共享锁,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLockReentrantLock的类图仍然以ReentrantLock为例,来分析AQS在重入锁中的使用。毕竟单纯分析AQS没有太多的含义。先理解这个类图,可以方便我们理解AQS的原理AQS的内部实现AQS的实现依赖内部的同步队列,也就是FIFO的双向队列,如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构造成一个Node加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个Node其实是由线程封装,当线程争抢锁失败后会封装成Node加入到ASQ队列中去Node类的组成如下static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; //前驱节点 volatile Node next; //后继节点 volatile Thread thread;//当前线程 Node nextWaiter; //存储在condition队列中的后继节点 //是否为共享锁 final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } //将线程构造成一个Node,添加到等待队列 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //这个方法会在Condition队列使用,后续单独写一篇文章分析condition Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }释放锁以及添加线程对于队列的变化添加节点当出现锁竞争以及释放锁的时候,AQS同步队列中的节点会发生变化,首先看一下添加节点的场景。这里会涉及到两个变化新的线程封装成Node节点追加到同步队列中,设置prev节点以及修改当前节点的前置节点的next节点指向自己通过CAS讲tail重新指向新的尾部节点释放锁移除节点head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下这个过程也是涉及到两个变化修改head节点指向下一个获得锁的节点新的获得锁的节点,将prev的指针指向null这里有一个小的变化,就是设置head节点不需要用CAS,原因是设置head节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要CAS保证,只需要把head节点设置为原首节点的后继节点,并且断开原head节点的next引用即可AQS的源码分析清楚了AQS的基本架构以后,我们来分析一下AQS的源码,仍然以ReentrantLock为模型。ReentrantLock的时序图调用ReentrantLock中的lock()方法,源码的调用过程我使用了时序图来展现从图上可以看出来,当锁获取失败时,会调用addWaiter()方法将当前线程封装成Node节点加入到AQS队列,基于这个思路,我们来分析AQS的源码实现分析源码ReentrantLock.lock()public void lock() { sync.lock();}这个是获取锁的入口,调用sync这个类里面的方法,sync是什么呢?abstract static class Sync extends AbstractQueuedSynchronizersync是一个静态内部类,它继承了AQS这个抽象类,前面说过AQS是一个同步工具,主要用来实现同步控制。我们在利用这个工具的时候,会继承它来实现同步控制功能。通过进一步分析,发现Sync这个类有两个具体的实现,分别是NofairSync(非公平锁),FailSync(公平锁).公平锁 表示所有线程严格按照FIFO来获取锁非公平锁 表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁公平锁和非公平锁的实现上的差异,我会在文章后面做一个解释,接下来的分析仍然以非公平锁作为主要分析逻辑。NonfairSync.lockfinal void lock() { if (compareAndSetState(0, 1)) //通过cas操作来修改state状态,表示争抢锁的操作 setExclusiveOwnerThread(Thread.currentThread());//设置当前获得锁状态的线程 else acquire(1); //尝试去获取锁}这段代码简单解释一下由于这里是非公平锁,所以调用lock方法时,先去通过cas去抢占锁如果抢占锁成功,保存获得锁成功的当前线程抢占锁失败,调用acquire来走锁竞争逻辑compareAndSetStatecompareAndSetState的代码实现逻辑如下// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}这段代码其实逻辑很简单,就是通过cas乐观锁的方式来做比较并替换。上面这段代码的意思是,如果当前内存中的state的值和预期值expect相等,则替换为update。更新成功返回true,否则返回false.这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作,一级涉及到state这个属性的意义。state当state=0时,表示无锁状态当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁private volatile int state;需要注意的是:不同的AQS实现,state所表达的含义是不一样的。UnsafeUnsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;Unsafe可认为是Java中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);这个是一个native方法, 第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的headOffset的值),第三个参数为期待的值,第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值var4相等,则更新为新的期望值 var5,如果更新成功,则返回true,否则返回false;acquireacquire是AQS中的方法,如果CAS操作未能成功,说明state已经不为0,此时继续acquire(1)操作,这里大家思考一下,acquire方法中的1的参数是用来做什么呢?如果没猜中,往前面回顾一下state这个概念 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }这个方法的主要逻辑是通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部acquireQueued,将Node作为参数,通过自旋去尝试获取锁。如果大家看过我写的Synchronized源码分析的文章,就应该能够明白自旋存在的意义NonfairSync.tryAcquire这个方法的作用是尝试获取锁,如果成功返回true,不成功返回false它是重写AQS类中的tryAcquire方法,并且大家仔细看一下AQS中tryAcquire方法的定义,并没有实现,而是抛出异常。按照一般的思维模式,既然是一个不实现的模版方法,那应该定义成abstract,让子类来实现呀?大家想想为什么protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);}nonfairTryAcquiretryAcquire(1)在NonfairSync中的实现代码如下ffinal boolean nonfairTryAcquire(int acquires) { //获得当前执行的线程 final Thread current = Thread.currentThread(); int c = getState(); //获得state的值 if (c == 0) { //state=0说明当前是无锁状态 //通过cas操作来替换state的值改为1,大家想想为什么要用cas呢? //理由是,在多线程环境中,直接修改state=1会存在线程安全问题,你猜到了吗? if (compareAndSetState(0, acquires)) { //保存当前获得锁的线程 setExclusiveOwnerThread(current); return true; } } //这段逻辑就很简单了。如果是同一个线程来获得锁,则直接增加重入次数 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //增加重入次数 if (nextc < 0) // overflow throw new Error(“Maximum lock count exceeded”); setState(nextc); return true; } return false;}获取当前线程,判断当前的锁的状态如果state=0表示当前是无锁状态,通过cas更新state状态的值如果当前线程是属于重入,则增加重入次数addWaiter当tryAcquire方法获取锁失败以后,则会先调用addWaiter将当前线程封装成Node,然后添加到AQS队列private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE //将当前线程封装成Node,并且mode为独占锁 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法 Node pred = tail; if (pred != null) { //tail不为空的情况,说明队列中存在节点数据 node.prev = pred; //讲当前线程的Node的prev节点指向tail if (compareAndSetTail(pred, node)) {//通过cas讲node添加到AQS队列 pred.next = node;//cas成功,把旧的tail的next指针指向新的tail return node; } } enq(node); //tail=null,将node添加到同步队列中 return node; }将当前线程封装成Node判断当前链表中的tail节点是否为空,如果不为空,则通过cas操作把当前线程的node添加到AQS队列如果为空或者cas失败,调用enq将节点添加到AQS队列enqenq就是通过自旋操作把当前节点加入到队列中private Node enq(final Node node) { //自旋,不做过多解释,不清楚的关注公众号[架构师修炼宝典] for (;;) { Node t = tail; //如果是第一次添加到队列,那么tail=null if (t == null) { // Must initialize //CAS的方式创建一个空的Node作为头结点 if (compareAndSetHead(new Node())) //此时队列中只一个头结点,所以tail也指向它 tail = head; } else {//进行第二次循环时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node node.prev = t; if (compareAndSetTail(t, node)) {//t此时指向tail,所以可以CAS成功,将tail重新指向Node。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点 t.next = node; return t; } } } }假如有两个线程t1,t2同时进入enq方法,t==null表示队列是首次使用,需要先初始化另外一个线程cas失败,则进入下次循环,通过cas操作将node添加到队尾到目前为止,通过addwaiter方法构造了一个AQS队列,并且将线程添加到了队列的节点中acquireQueued将添加到队列中的Node作为参数传入acquireQueued方法,这里面会做抢占锁的操作final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺 setHead(node); // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点//凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null p.next = null; // help GC failed = false; //获取锁成功 return interrupted; }//如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志 interrupted = true; } } finally { if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作 cancelAcquire(node); }}获取当前节点的prev节点如果prev节点为head节点,那么它就有资格去争抢锁,调用tryAcquire抢占锁抢占锁成功以后,把获得锁的节点设置为head,并且移除原来的初始化head节点如果获得锁失败,则根据waitStatus决定是否需要挂起线程最后,通过cancelAcquire取消获得锁的操作前面的逻辑都很好理解,主要看一下shouldParkAfterFailedAcquire这个方法和parkAndCheckInterrupt的作用shouldParkAfterFailedAcquire从上面的分析可以看出,只有队列的第二个节点可以有机会争用锁,如果成功获取锁,则此节点晋升为头节点。对于第三个及以后的节点,if (p == head)条件不成立,首先进行shouldParkAfterFailedAcquire(p, node)操作 shouldParkAfterFailedAcquire方法是判断一个争用锁的线程是否应该被阻塞。它首先判断一个节点的前置节点的状态是否为Node.SIGNAL,如果是,是说明此节点已经将状态设置-如果锁释放,则应当通知它,所以它可以安全的阻塞了,返回true。private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前继节点的状态 if (ws == Node.SIGNAL)//如果是SIGNAL状态,意味着当前线程需要被unpark唤醒 return true;如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。 if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don’t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}parkAndCheckInterrupt如果shouldParkAfterFailedAcquire返回了true,则会执行:parkAndCheckInterrupt()方法,它是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();}LockSupportLockSupport类是Java6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time); unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。permit相当于0/1的开关,默认是0,调用一次unpark就加1变成了1.调用一次park会消费permit,又会变成0。 如果再调用一次park会阻塞,因为permit已经是0了。直到permit变成1.这时调用unpark会把permit设置为1.每个线程都有一个相关的permit,permit最多只有一个,重复调用unpark不会累积锁的释放ReentrantLock.unlock加锁的过程分析完以后,再来分析一下释放锁的过程,调用release方法,这个方法里面做两件事,1,释放锁 ;2,唤醒park的线程public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}tryRelease这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。在排它锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,而且也只有这种情况下才会返回true。protected final boolean tryRelease(int releases) { int c = getState() - releases; // 这里是将锁的数量减1 if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 由于重入的关系,不是每次释放锁c都等于0, // 直到最后一次释放锁时,才会把当前线程释放 free = true; setExclusiveOwnerThread(null); } setState(c); return free;}unparkSuccessor在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是占用锁的节点),当前线程被释放之后,需要唤醒下一个节点的线程private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {//判断后继节点是否为空或者是否是取消状态, s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点, 至于为什么从尾部开始向前遍历,因为在doAcquireInterruptibly.cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的 s = t; }//内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁 if (s != null) LockSupport.unpark(s.thread); //释放许可}总结通过这篇文章基本将AQS队列的实现过程做了比较清晰的分析,主要是基于非公平锁的独占锁实现。在获得同步锁时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。 ...

December 14, 2018 · 4 min · jiezi

并发编程面试必备:AQS 原理以及 AQS 同步组件总结

常见问题:AQS 原理?;CountDownLatch和CyclicBarrier了解吗,两者的区别是什么?用过Semaphore吗?本节思维导图:【强烈推荐!非广告!】阿里云双11褥羊毛活动:https://m.aliyun.com/act/team1111/#/share?params=N.FF7yxCciiM.hf47liqn 差不多一折,不过仅限阿里云新人购买,不是新人的朋友自己找方法买哦!1 AQS 简单介绍AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。2 AQS 原理在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于AQS原理的理解”。下面给大家一个示例供大家参加,面试不是背题,大家一定要假如自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。下面大部分内容其实在AQS类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码。2.1 AQS 原理概览AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。看个AQS(AbstractQueuedSynchronizer)原理图:AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。private volatile int state;//共享变量,使用volatile修饰保证线程可见性状态信息通过procted类型的getState,setState,compareAndSetState进行操作//返回同步状态的当前值protected final int getState() { return state;} // 设置同步状态的值protected final void setState(int newState) { state = newState;}//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}2.2 AQS 对资源的共享方式AQS定义两种资源共享方式Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:公平锁:按照线程在队列中的排队顺序,先到者先拿到锁非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。2.3 AQS底层使用了模板方法模式同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用,下面简单的给大家介绍一下模板方法模式,模板方法模式是一个很容易理解的设计模式之一。模板方法模式是基于”继承“的,主要是为了在不改变模板结构的前提下在子类中重新定义模板中的内容以实现复用代码。举个很简单的例子假如我们要去一个地方的步骤是:购票buyTicket()->安检securityCheck()->乘坐某某工具回家ride()->到达目的地arrive()。我们可能乘坐不同的交通工具回家比如飞机或者火车,所以除了ride()方法,其他方法的实现几乎相同。我们可以定义一个包含了这些方法的抽象类,然后用户根据自己的需要继承该抽象类然后修改 ride()方法。AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。 以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS(Compare and Swap)减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。推荐两篇 AQS 原理和相关源码分析的文章:http://www.cnblogs.com/watery…https://www.cnblogs.com/cheng...3 Semaphore(信号量)-允许多个线程同时访问synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。示例代码如下:/** * * @author Snailclimb * @date 2018年9月30日 * @Description: 需要一次性拿一个许可的情况 /public class SemaphoreExample1 { // 请求的数量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); // 一次只能允许执行的线程数量。 final Semaphore semaphore = new Semaphore(20); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表达式的运用 try { semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20 test(threadnum); semaphore.release();// 释放一个许可 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); System.out.println(“finish”); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模拟请求的耗时操作 System.out.println(“threadnum:” + threadnum); Thread.sleep(1000);// 模拟请求的耗时操作 }}执行 acquire 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release 方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量。 Semaphore经常用于限制获取某种资源的线程数量。当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做: semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4 test(threadnum); semaphore.release(5);// 获取5个许可,所以可运行线程数量为20/5=4除了 acquire方法之外,另一个比较常用的与之对应的方法是tryAcquire方法,该方法如果获取不到许可就立即返回false。Semaphore 有两种模式,公平模式和非公平模式。公平模式: 调用acquire的顺序就是获取许可证的顺序,遵循FIFO;非公平模式: 抢占式的。Semaphore 对应的两个构造方法如下: public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。 由于篇幅问题,如果对 Semaphore 源码感兴趣的朋友可以看下面这篇文章:https://blog.csdn.net/qq_1943…4 CountDownLatch (倒计时器)CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。4.1 CountDownLatch 的两种典型用法①某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。②实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1) ,多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。4.2 CountDownLatch 的使用示例/* * * @author SnailClimb * @date 2018年10月1日 * @Description: CountDownLatch 使用方法示例 /public class CountDownLatchExample1 { // 请求的数量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表达式的运用 try { test(threadnum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { countDownLatch.countDown();// 表示一个请求已经被完成 } }); } countDownLatch.await(); threadPool.shutdown(); System.out.println(“finish”); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模拟请求的耗时操作 System.out.println(“threadnum:” + threadnum); Thread.sleep(1000);// 模拟请求的耗时操作 }}上面的代码中,我们定义了请求的数量为550,当这550个请求被处理完成之后,才会执行System.out.println(“finish”);。4.3 CountDownLatch 的不足CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。5 CyclicBarrier(循环栅栏)CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。5.1 CyclicBarrier 的应用场景CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。5.2 CyclicBarrier 的使用示例示例1:/* * * @author Snailclimb * @date 2018年10月1日 * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法 /public class CyclicBarrierExample2 { // 请求的数量 private static final int threadCount = 550; // 需要同步的线程数量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { // 创建线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println(“threadnum:” + threadnum + “is ready”); try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { System.out.println("—–CyclicBarrierException——"); } System.out.println(“threadnum:” + threadnum + “is finish”); }}运行结果,如下:threadnum:0is readythreadnum:1is readythreadnum:2is readythreadnum:3is readythreadnum:4is readythreadnum:4is finishthreadnum:0is finishthreadnum:1is finishthreadnum:2is finishthreadnum:3is finishthreadnum:5is readythreadnum:6is readythreadnum:7is readythreadnum:8is readythreadnum:9is readythreadnum:9is finishthreadnum:5is finishthreadnum:8is finishthreadnum:7is finishthreadnum:6is finish……可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await方法之后的方法才被执行。 另外,CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。示例代码如下:/* * * @author SnailClimb * @date 2018年10月1日 * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable */public class CyclicBarrierExample3 { // 请求的数量 private static final int threadCount = 550; // 需要同步的线程数量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { System.out.println("——当线程数达到之后,优先执行——"); }); public static void main(String[] args) throws InterruptedException { // 创建线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println(“threadnum:” + threadnum + “is ready”); cyclicBarrier.await(); System.out.println(“threadnum:” + threadnum + “is finish”); }}运行结果,如下:threadnum:0is readythreadnum:1is readythreadnum:2is readythreadnum:3is readythreadnum:4is ready——当线程数达到之后,优先执行——threadnum:4is finishthreadnum:0is finishthreadnum:2is finishthreadnum:1is finishthreadnum:3is finishthreadnum:5is readythreadnum:6is readythreadnum:7is readythreadnum:8is readythreadnum:9is ready——当线程数达到之后,优先执行——threadnum:9is finishthreadnum:5is finishthreadnum:6is finishthreadnum:8is finishthreadnum:7is finish……5.3 CyclicBarrier和CountDownLatch的区别CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从jdk作者设计的目的来看,javadoc是这么描述它们的:CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;)CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。CyclicBarrier和CountDownLatch的区别这部分内容参考了如下两篇文章:https://blog.csdn.net/u010185…https://blog.csdn.net/tolcf/a...6 ReentrantLock 和 ReentrantReadWriteLockReentrantLock 和 synchronized 的区别在上面已经讲过了这里就不多做讲解。另外,需要注意的是:读写锁 ReentrantReadWriteLock 可以保证多个线程可以同时读,所以在读操作远大于写操作的时候,读写锁就非常有用了。由于篇幅问题,关于 ReentrantLock 和 ReentrantReadWriteLock 详细内容可以查看我的这篇原创文章。ReentrantLock 和 ReentrantReadWriteLock ...

November 2, 2018 · 4 min · jiezi