同步队列构造

AQS应用的同步队列是基于一种CLH锁算法来实现。

CLH锁也是一种基于链表的可扩大、高性能、偏心的自旋锁,申请线程只在本地变量上自旋,它一直轮询前驱的状态,如果发现前驱开释了锁就完结自旋.

同步器中蕴含了两个节点类型的援用,一个指向头节点(head),一个指向尾节点(tail),没有获取到锁的线程,退出到队列的过程必须保障线程平安,因而同步器提供了一个基于CAS的设置尾节点的办法CompareAndSetTail(Node expect,Node update),它须要传递以后线程认为的尾节点和以后节点,只有设置胜利后,以后节点能力正式与之前的尾节点建设关联。

同步器队列遵循FIFO,首节点是获取锁胜利的节点,首节点的线程在开释锁时,会唤醒后续节点,而后继节点在胜利获取到锁后,会把本人设置成首节点,设置首节点是由获取锁胜利的线程来实现的,因为只有一个线程能胜利获取到锁,所以设置首节点不须要CAS

AQS实现一个线程平安的计数器

自定义互斥锁

package com.rumenz.task.aqs;import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class MyLock {    private static final Sync STATE_HOLDER = new Sync();    /**     * 通过Sync外部类来持有同步状态, 当状态为1示意锁被持有,0示意锁处于闲暇状态     */    private static class Sync extends AbstractQueuedSynchronizer {        /**         * 是否被独占, 有两种示意形式         *  1. 能够依据状态,state=1示意锁被占用,0示意闲暇         *  2. 能够依据以后独占锁的线程来判断,即getExclusiveOwnerThread()!=null 示意被独占         */        @Override        protected boolean isHeldExclusively() {            return getExclusiveOwnerThread() != null;        }        /**         * 尝试获取锁,将状态从0批改为1,操作胜利则将以后线程设置为以后独占锁的线程         */        @Override        protected boolean tryAcquire(int arg) {            if (compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        /**         * 开释锁,将状态批改为0         */        @Override        protected boolean tryRelease(int arg) {            if (getState() == 0) {                throw new UnsupportedOperationException();            }            setExclusiveOwnerThread(null);            setState(0);            return true;        }    }    /**     * 上面的实现Lock接口须要重写的办法,根本是就是调用外部内Sync的办法     */    public void lock() {        STATE_HOLDER.acquire(1);    }    public void unlock() {        STATE_HOLDER.release(1);    }}

测试案例

package com.rumenz.task.aqs;import org.omg.Messaging.SYNC_WITH_TRANSPORT;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class LockTest {    private final static Integer clientTotal=100000;    private final static Integer threadTotal=200;    private static Count count=new Count();    private static Count unSafe=new Count();    public static void main(String[] args) throws Exception {        ExecutorService executorService = Executors.newCachedThreadPool();        final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);        final Semaphore semaphore=new Semaphore(threadTotal);        for (int i = 0; i < clientTotal; i++) {            executorService.execute(()->{                try{                    semaphore.acquire();                    count.getIncrement();                    unSafe.getUnSafeIncrement();                    semaphore.release();                }catch (Exception e){                    e.printStackTrace();                }                countDownLatch.countDown();            });        }        countDownLatch.await();        System.out.println("safe:"+count.getCount());        System.out.println("unSafe:"+unSafe.getCount());        executorService.shutdown();    }}class Count{    private MyLock myLock;    private volatile int count;     Count() {        this.myLock=new MyLock();    }     int getCount(){        return count;    }     int getIncrement(){        myLock.lock();        count++;        myLock.unlock();        return count;    }     int getUnSafeIncrement(){        count++;        return count;    }}

输入后果

safe:100000unSafe:99995

关注微信公众号:【入门小站】,解锁更多知识点