关于java:Java并发编程之CAS和AQS

35次阅读

共计 6744 个字符,预计需要花费 17 分钟才能阅读完成。

什么是 CAS

CAS(compare and swap), 字面意思比拟并替换, 是解决多线程并行状况下应用锁造成性能损耗的一种机制.

public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

CAS有三个操作数,valueOffset内存值,expect期望值,update要更新的值。如果内存值 (valueOffset) 和期望值 (expect) 是一样的。那么处理器会将该地位的值更新为(update), 否则不做任何操作。

CAS 无效地阐明了“我认为地位 valueOffset 应该蕴含值 expect,如果蕴含该值,则将update 放到这个地位;否则,不要更改该地位,只通知我这个地位当初的值即可。”在 Java 中,sun.misc.Unsafe 类提供了硬件级别的原子操作来实现这个 CAS,java.util.concurrent 包下的大量类都应用了这个 Unsafe 类的 CAS 操作

CAS 的利用

java.util.concurrent.atomic包下的类大多数是应用 CAS 实现的, 如 AtomicInteger,AtomicBooleanAtomicLong等。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    private volatile int value;// 初始 int 大小
    // 省略了局部代码...

    // 带参数构造函数,可设置初始 int 大小
    public AtomicInteger(int initialValue) {value = initialValue;}
    // 不带参数构造函数, 初始 int 大小为 0
    public AtomicInteger() {}

    // 获取以后值
    public final int get() {return value;}

    // 设置值为 newValue
    public final void set(int newValue) {value = newValue;}

    // 返回旧值,并设置新值为 newValue
    public final int getAndSet(int newValue) {
        /**
        * 这里应用 for 循环不断通过 CAS 操作来设置新值
        * CAS 实现和加锁实现的关系有点相似乐观锁和乐观锁的关系
        * */
        for (;;) {int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }

    // 原子的设置新值为 update, expect 为冀望的以后的值
    public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    // 获取以后值 current,并设置新值为 current+1
    public final int getAndIncrement() {for (;;) {int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    // 此处省略局部代码,余下的代码大抵实现原理都是相似的
}

个别,在竞争不是特地强烈的时候,应用该包下的原子操作性能比应用 synchronized 关键字的形式高效的多。通过查看 getAndSet()办法,可知如果资源竞争十分激烈的话,这个 for 循环可能会继续很久都不能胜利跳出。在这种状况下,咱们可能须要思考如何升高对资源的竞争。在较多的场景下,咱们可能会应用到这些原子类操作。一个典型利用就是计数,在多线程的状况下须要思考线程平安问题。

// 有线程平安问题
public class Counter {
    private int count;
    public Counter(){}
    public int getCount(){return count;}
    public void increase(){count++;}
}
// 乐观锁, 线程平安, 毛病性能差
public class Counter {
    private int count;
    public Counter(){}
    public synchronized int getCount(){return count;}
    public synchronized void increase(){count++;}
}

这是乐观锁的实现,如果咱们须要获取这个资源,那么咱们就给它加锁,其余线程都无法访问该资源,直到咱们操作完后开释对该资源的锁。咱们晓得,乐观锁的效率是不如乐观锁的,下面说了 atomic 包下的原子类的实现是乐观锁形式,因而其效率会比应用 synchronized 关键字更高一些,举荐应用这种形式。

// 乐观锁, 线程平安, 性能好
public class Counter {private AtomicInteger count = new AtomicInteger();
    public Counter(){}
    public int getCount(){return count.get();
    }
    public void increase(){count.getAndIncrement();
    }
}

CAS 的三大毛病

ABA 问题

因为 CAS 须要在操作值的时候查看下值有没有发生变化,如果没有发生变化则更新,然而如果一个值原来是 A,变成了 B,又变成了 A,那么应用 CAS 进行查看时会发现它的值没有发生变化,然而实际上却变动了。ABA 问题的解决思路就是应用版本号,在变量后面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A-2B-3A。

从 Java 1.5 开始 JDK 的 atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。这个类的 compareAndSet 办法作用是首先查看以后援用是否等于预期援用,并且以后标记是否等于预期标记,如果全副相等,则以原子形式将该援用和该标记的值设置为给定的更新值。

循环工夫长开销大

CAS 自旋如果长时间不胜利,会给 CPU 带来十分大的执行开销。如果 JVM 能反对处理器提供的 pause 指令那么效率会有肯定的晋升,pause 指令有两个作用,一是它能够提早流水线执行指令,使 CPU 不会耗费过多的执行资源,提早的工夫取决于具体实现的版本,在一些处理器上延迟时间是零;二是它能够防止在退出循环的时候因内存程序抵触而引起 CPU 流水线被清空,从而进步 CPU 的执行效率。

只能保障一个共享变量的原子操作

当对一个共享变量执行操作时,咱们能够应用循环 CAS 的形式来保障原子操作,然而对多个共享变量操作时,循环 CAS 就无奈保障操作的原子性,这个时候就须要用锁,或者有一个取巧的方法,就是把多个共享变量合并成一个共享变量来操作。比方有两个共享变量 i=2,j=a,合并一下 ij=2a,而后用 CAS 来操作 ij。从 Java 1.5 开始 JDK 提供了 AtomicReference 类来保障援用对象之间的原子性,咱们能够把多个变量放在一个对象里来进行 CAS 操作。

什么是 AQS

AQS(AbstractQueuedSychronizer), 形象队列同步器, 是 JDK 下提供的一套给予 FIFO 期待队列的阻塞锁和相干同步器的一个同步框架。这个抽象类被设计为作为一些可用原子 int 值来示意状态的同步器的基类。如果咱们看过相似 CountDownLatch 类的源码实现,会发现其外部有一个继承了 AbstractQueuedSynchronizer 的外部类 Sync。可见 CountDownLatch 是基于 AQS 框架来实现的一个同步器,相似的同步器在 JUC 下还有不少,如 Semaphore 等。

AQS 的利用

QS 治理一个对于状态信息的繁多整数,该整数能够体现任何状态。比方,Semaphore 用它来体现残余的许可数,ReentrantLock 用它来体现领有它的线程曾经申请了多少次锁;FutureTask 用它来体现工作的状态等。

如 JDK 的文档中所说,应用 AQS 来实现一个同步器须要笼罩实现如下几个办法,并且应用 getStatesetStatecompareAndSetState这三个办法来操作状态。

boolean tryAcquire(int arg)
boolean tryRelease(int arg)
int tryAcquireShared(int arg)
boolean tryReleaseShared(int arg)
boolean isHeldExclusively()

以上办法不须要全副实现,依据获取的锁的品种能够抉择实现不同的办法,反对独占(排他)获取锁的同步器应该实现 tryAcquire、tryRelease、isHeldExclusively;而反对共享获取的同步器应该实现 tryAcquireShared、tryReleaseShared、isHeldExclusively。上面以 CountDownLatch 举例说明基于 AQS 实现同步器,CountDownLatch 用同步状态持有以后计数,countDown 办法调用 release 从而导致计数器递加;当计数器为 0 时,解除所有线程的期待;await 调用 acquire,如果计数器为 0,acquire 会立刻返回,否则阻塞。通常用于某工作须要期待其余工作都实现后能力继续执行的情景

public class CountDownLatch {
    /**
     * 基于 AQS 的外部 Sync
     * 应用 AQS 的 state 来示意计数 count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {// 应用 AQS 的 getState()办法设置状态
            setState(count);
        }

        int getCount() {// 应用 AQS 的 getState()办法获取状态
            return getState();}

        // 笼罩在共享模式下尝试获取锁
        protected int tryAcquireShared(int acquires) {
            // 这里用状态 state 是否为 0 来示意是否胜利,为 0 的时候能够获取到返回 1,否则不能够返回 -1
            return (getState() == 0) ? 1 : -1;
        }

        // 笼罩在共享模式下尝试开释锁
        protected boolean tryReleaseShared(int releases) {
            // 在 for 循环中 Decrement count 直至胜利;
            // 当状态值即 count 为 0 的时候,返回 false 示意 signal when transition to zero
            for (;;) {int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    // 应用给定计数值结构 CountDownLatch
    public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 让以后线程阻塞直到计数 count 变为 0,或者线程被中断
    public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
    }

    // 阻塞以后线程,除非 count 变为 0 或者期待了 timeout 的工夫。当 count 变为 0 时,返回 true
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // count 递加
    public void countDown() {sync.releaseShared(1);
    }

    // 获取以后 count 值
    public long getCount() {return sync.getCount();
    }

    public String toString() {return super.toString() + "[Count =" + sync.getCount() + "]";
    }
}

AQS 原理

AQS 的实现次要在于保护一个 volatile int state(代表共享资源)和一个 FIFO 线程期待队列(多线程争用资源被阻塞时会进入此队列,此队列称之为 CLH 队列)。CLH 队列中的每个节点是对线程的一个封装,蕴含线程根本信息,状态,期待的资源类型等。

CLH 构造如下

 *      +------+  prev +-----+       +-----+
 * head |      | <---- |     | <---- |     |  tail
 *      +------+       +-----+       +-----+

简略源码剖析

    public final void acquire(int arg) {
        // 首先尝试获取, 不胜利的话则将其退出到期待队列,再 for 循环获取
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}
    
    // 从 clh 当选一个线程获取占用资源
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 当节点的先驱是 head 的时候,就能够尝试获取占用资源了 tryAcquire
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // 如果获取到资源,则将以后节点设置为头节点 head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果获取失败的话,判断是否能够劳动,能够的话就进入 waiting 状态,直到被 unpark()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }
  
   private Node addWaiter(Node mode) {
        // 封装以后线程和模式为新的节点, 并将其退出到队列中
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }  
    
    private Node enq(final Node node) {for (;;) {
            Node t = tail;
            if (t == null) { 
                // tail 为 null,阐明还没初始化,此时需进行初始化工作
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 否则的话,将以后线程节点作为 tail 节点退出到 CLH 中去
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

关注微信公众号:【入门小站】, 解锁更多知识点。

正文完
 0