什么是CAS
CAS(compare and swap)
,字面意思比拟并替换,是解决多线程并行状况下应用锁造成性能损耗的一种机制.
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}
CAS
有三个操作数,valueOffset
内存值,expect
期望值,update
要更新的值。如果内存值(valueOffset
)和期望值(expect
)是一样的。那么处理器会将该地位的值更新为(update
),否则不做任何操作。CAS 无效地阐明了“我认为地位
valueOffset
应该蕴含值expect
,如果蕴含该值,则将update
放到这个地位;否则,不要更改该地位,只通知我这个地位当初的值即可。”在 Java 中,sun.misc.Unsafe类提供了硬件级别的原子操作来实现这个 CAS,java.util.concurrent包下的大量类都应用了这个Unsafe类的 CAS 操作
CAS的利用
java.util.concurrent.atomic
包下的类大多数是应用CAS
实现的,如AtomicInteger
,AtomicBoolean
和AtomicLong
等。
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private volatile int value;// 初始int大小 // 省略了局部代码... // 带参数构造函数,可设置初始int大小 public AtomicInteger(int initialValue) { value = initialValue; } // 不带参数构造函数,初始int大小为0 public AtomicInteger() { } // 获取以后值 public final int get() { return value; } // 设置值为 newValue public final void set(int newValue) { value = newValue; } //返回旧值,并设置新值为 newValue public final int getAndSet(int newValue) { /** * 这里应用for循环不断通过CAS操作来设置新值 * CAS实现和加锁实现的关系有点相似乐观锁和乐观锁的关系 * */ for (;;) { int current = get(); if (compareAndSet(current, newValue)) return current; } } // 原子的设置新值为update, expect为冀望的以后的值 public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } // 获取以后值current,并设置新值为current+1 public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } // 此处省略局部代码,余下的代码大抵实现原理都是相似的}
个别,在竞争不是特地强烈的时候,应用该包下的原子操作性能比应用synchronized关键字的形式高效的多。通过查看getAndSet()办法,可知如果资源竞争十分激烈的话,这个for循环可能会继续很久都不能胜利跳出。在这种状况下,咱们可能须要思考如何升高对资源的竞争。在较多的场景下,咱们可能会应用到这些原子类操作。一个典型利用就是计数,在多线程的状况下须要思考线程平安问题。
//有线程平安问题public class Counter { private int count; public Counter(){} public int getCount(){ return count; } public void increase(){ count++; }}
//乐观锁,线程平安,毛病性能差public class Counter { private int count; public Counter(){} public synchronized int getCount(){ return count; } public synchronized void increase(){ count++; }}
这是乐观锁的实现,如果咱们须要获取这个资源,那么咱们就给它加锁,其余线程都无法访问该资源,直到咱们操作完后开释对该资源的锁。咱们晓得,乐观锁的效率是不如乐观锁的,下面说了atomic包下的原子类的实现是乐观锁形式,因而其效率会比应用synchronized关键字更高一些,举荐应用这种形式。
//乐观锁,线程平安,性能好public class Counter { private AtomicInteger count = new AtomicInteger(); public Counter(){} public int getCount(){ return count.get(); } public void increase(){ count.getAndIncrement(); }}
CAS的三大毛病
ABA问题
因为 CAS 须要在操作值的时候查看下值有没有发生变化,如果没有发生变化则更新,然而如果一个值原来是A,变成了B,又变成了A,那么应用 CAS 进行查看时会发现它的值没有发生变化,然而实际上却变动了。ABA 问题的解决思路就是应用版本号,在变量后面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A就会变成1A-2B-3A。从 Java 1.5 开始 JDK 的atomic包里提供了一个类AtomicStampedReference来解决 ABA 问题。这个类的compareAndSet办法作用是首先查看以后援用是否等于预期援用,并且以后标记是否等于预期标记,如果全副相等,则以原子形式将该援用和该标记的值设置为给定的更新值。
循环工夫长开销大
CAS 自旋如果长时间不胜利,会给 CPU 带来十分大的执行开销。如果 JVM 能反对处理器提供的pause指令那么效率会有肯定的晋升,pause指令有两个作用,一是它能够提早流水线执行指令,使 CPU 不会耗费过多的执行资源,提早的工夫取决于具体实现的版本,在一些处理器上延迟时间是零;二是它能够防止在退出循环的时候因内存程序抵触而引起 CPU 流水线被清空,从而进步 CPU 的执行效率。
只能保障一个共享变量的原子操作
当对一个共享变量执行操作时,咱们能够应用循环 CAS 的形式来保障原子操作,然而对多个共享变量操作时,循环 CAS 就无奈保障操作的原子性,这个时候就须要用锁,或者有一个取巧的方法,就是把多个共享变量合并成一个共享变量来操作。比方有两个共享变量i=2,j=a,合并一下ij=2a,而后用 CAS 来操作ij。从 Java 1.5 开始 JDK 提供了AtomicReference类来保障援用对象之间的原子性,咱们能够把多个变量放在一个对象里来进行 CAS 操作。
什么是AQS
AQS(AbstractQueuedSychronizer)
,形象队列同步器,是JDK
下提供的一套给予FIFO
期待队列的阻塞锁和相干同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int
值来示意状态的同步器的基类。如果咱们看过相似CountDownLatch类的源码实现,会发现其外部有一个继承了AbstractQueuedSynchronizer的外部类Sync。可见CountDownLatch是基于 AQS 框架来实现的一个同步器,相似的同步器在 JUC 下还有不少,如Semaphore等。
AQS的利用
QS 治理一个对于状态信息的繁多整数,该整数能够体现任何状态。比方,Semaphore用它来体现残余的许可数,ReentrantLock用它来体现领有它的线程曾经申请了多少次锁;FutureTask用它来体现工作的状态等。如 JDK 的文档中所说,应用 AQS 来实现一个同步器须要笼罩实现如下几个办法,并且应用
getState
、setState
和compareAndSetState
这三个办法来操作状态。
boolean tryAcquire(int arg)boolean tryRelease(int arg)int tryAcquireShared(int arg)boolean tryReleaseShared(int arg)boolean isHeldExclusively()
以上办法不须要全副实现,依据获取的锁的品种能够抉择实现不同的办法,反对独占(排他)获取锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively;而反对共享获取的同步器应该实现tryAcquireShared、tryReleaseShared、isHeldExclusively。上面以CountDownLatch举例说明基于 AQS 实现同步器,CountDownLatch用同步状态持有以后计数,countDown办法调用 release从而导致计数器递加;当计数器为 0 时,解除所有线程的期待;await调用acquire,如果计数器为 0,acquire会立刻返回,否则阻塞。通常用于某工作须要期待其余工作都实现后能力继续执行的情景
public class CountDownLatch { /** * 基于AQS的外部Sync * 应用AQS的state来示意计数count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 应用AQS的getState()办法设置状态 setState(count); } int getCount() { // 应用AQS的getState()办法获取状态 return getState(); } // 笼罩在共享模式下尝试获取锁 protected int tryAcquireShared(int acquires) { // 这里用状态state是否为0来示意是否胜利,为0的时候能够获取到返回1,否则不能够返回-1 return (getState() == 0) ? 1 : -1; } // 笼罩在共享模式下尝试开释锁 protected boolean tryReleaseShared(int releases) { // 在for循环中Decrement count直至胜利; // 当状态值即count为0的时候,返回false示意 signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; // 应用给定计数值结构CountDownLatch public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 让以后线程阻塞直到计数count变为0,或者线程被中断 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 阻塞以后线程,除非count变为0或者期待了timeout的工夫。当count变为0时,返回true public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // count递加 public void countDown() { sync.releaseShared(1); } // 获取以后count值 public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }}
AQS 原理
AQS 的实现次要在于保护一个volatile int state(代表共享资源)和一个 FIFO 线程期待队列(多线程争用资源被阻塞时会进入此队列,此队列称之为CLH队列)。CLH 队列中的每个节点是对线程的一个封装,蕴含线程根本信息,状态,期待的资源类型等。CLH构造如下
* +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+
简略源码剖析
public final void acquire(int arg) { // 首先尝试获取,不胜利的话则将其退出到期待队列,再for循环获取 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 从clh当选一个线程获取占用资源 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 当节点的先驱是head的时候,就能够尝试获取占用资源了tryAcquire final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 如果获取到资源,则将以后节点设置为头节点head setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 如果获取失败的话,判断是否能够劳动,能够的话就进入waiting状态,直到被unpark() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private Node addWaiter(Node mode) { // 封装以后线程和模式为新的节点,并将其退出到队列中 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // tail为null,阐明还没初始化,此时需进行初始化工作 if (compareAndSetHead(new Node())) tail = head; } else { // 否则的话,将以后线程节点作为tail节点退出到CLH中去 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
关注微信公众号:【入门小站】,解锁更多知识点。