前言
上篇文章10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)说到JUC并发包中的同步组件大多应用AQS来实现
本篇文章通过AQS本人来实现一个同步组件,并从源码级别聊聊JUC并发包中的罕用同步组件
本篇文章须要的前置常识就是AQS,如果不理解AQS的同学能够看上一篇文章哈~
浏览本篇文章大略须要13分钟
自定义同步组件
为了更容易了解其余同步组件,咱们先来应用AQS本人来实现一个罕用的可重入锁
AQS模板办法流程是固定的,咱们次要只须要来实现它的尝试获取同步状态和尝试开释同步状态办法即可
首先咱们先规定要实现的可重入锁是独占式的
规定同步状态一开始为0,当有线程获取锁胜利同步状态就为1,当这个线程重入时就累加同步状态
规定开释同步状态时每次扣减1个同步状态,只有当同步状态扣减到0时,才是真正的开释独占锁
咱们应用一个外部类Sync 来继承AQS 并重写tryAcquire
尝试获取同步状态、tryRelease
尝试开释同步状态、isHeldExclusively
判断以后线程是否持有同步状态(期待、告诉时会用到该办法)
static class Sync extends AbstractQueuedSynchronizer { /** * 判断以后线程是否持有同步状态 * * @return */ @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } }
在获取同步状态中
- 先判断是否有同步状态(即同步状态是否为0),如果有同步状态就用CAS去获取(0->1),胜利就设置以后线程为获取同步状态的线程
- 如果没有同步状态(即同步状态不为0) ,就查看获取同步状态的线程是否为以后线程,如果是以后线程则阐明此次是重入,累减轻入次数
- 其余状况阐明未获取到同步状态,返回false 后续走AQS流程(构建节点退出AQS)
/** * 尝试获取同步状态 * * @param arg 获取同步状态的数量 * @return */ @Override protected boolean tryAcquire(int arg) { //1.获取同步状态 int state = getState(); //2.如果有同步状态则CAS替换 0->1 if (state == 0) { if (compareAndSetState(state, 1)) { //替换胜利 阐明获取到同步状态 设置以后获取同步状态线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } } else if (getExclusiveOwnerThread() == Thread.currentThread()) { //3.没有同步状态 查看获取同步资源的线程是否为以后线程 可重入 累减轻入次数 setState(state + arg); return true; } //其余状况就是没获取到同步状态 return false; }
在开释同步状态中
只有当同步状态要改成0时才是真正开释,否则状况状况下就是重入扣减次数
/** * 尝试开释同步状态 * * @param arg 开释同步状态的数量 * @return */ @Override protected boolean tryRelease(int arg) { //指标状态 int targetState = getState() - arg; //真正开释锁 if (targetState == 0) { setExclusiveOwnerThread(null); setState(targetState); return true; } //其余状况 扣减状态 setState(targetState); return false; }
应用外部类实现AQS的办法后,咱们在自定义同步组件类中去实现Lock接口,并用外部类实现AQS的办法去实现Lock接口的办法
将要获取、开释的同步状态都设置成1,对应响应中断、超时的办法就用AQS中对应的办法即可
public class MySynchronizedComponent implements Lock { public MySynchronizedComponent() { sync = new Sync(); } private Sync sync; @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.new ConditionObject(); }}
实际上咱们只须要去实现尝试获取、开释同步状态办法就可能实现本人的同步组件,这就是应用AQS带来的益处
代码案例能够去git仓库获取,放在本文最初
ReentrantLock
ReentrantLock是并发包中提供的可重入锁,它除了可能实现synchronized的性能外还能够响应中断、超时、实现偏心锁等,其底层也是通过AQS来实现的
ReentrantLock的性能与synchronized相似,可重入的独占锁,用于保障并发场景下同步操作
应用时须要显示加锁、解锁,罕用格局如下:
reentrantLock.lock();try{ //....}finally { reentrantLock.unlock();}
finally中最先去解锁,并且加锁要放在try块的最外层,并保障加锁和try块之间不会抛出异样
加锁不放在try中是因为加锁实现未知可能抛出不受查看unchecked的异样,当加锁抛出异样时,后续finally块解锁也会抛出非法监视器的异样从而导致笼罩
加锁和try块之间如果抛出异样,那么就无奈执行解锁了
ReentrantLock除了提供根本的同步性能,还提供响应中断、超时的API,同学们能够私下去查看
相熟ReentrantLock实现的同学,可能看下面自定义同步组件的代码很相熟,其实就是参考ReentrantLock非偏心锁写的
ReentrantLock中应用外部类Sync来继承AQS,同时外部类NonfairSync和FairSync来继承Sync去实现非偏心、偏心的获取同步状态
非偏心锁尝试获取同步状态 流程相似就不过多形容
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
那偏心锁如何来实现获取同步状态呢?
其实看过上篇AQS文章的同学就晓得了,在上篇文章中曾经说过
只须要在尝试获取同步状态前加上一个条件:队列中是否有前置工作(即在队列中FIFO排队获取)
偏心锁也是这么去实现的,前置条件hasQueuedPredecessors
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
ReentrantReadWriteLock
性能与实现
ReentrantReadWriteLock在ReentrantLock性能的根底上,提供读写锁的性能,让锁的粒度更细
在一些读多写少的场景下是容许同时读的,容许多个线程获取,其实想到了AQS的共享式,读锁也就是共享式
在读读的场景下,都是读锁/共享锁,不会进行阻塞
在读写、写读、写写的场景下,都会进行阻塞
比方要获取写锁时,须要期待读锁、写锁都解锁;要获取读锁时,须要期待写锁解锁
ReentrantReadWriteLock 在 ReentrantLock 的根底上减少ReadLock
和WriteLock
别离作为读锁和写锁
实际上读锁就是共享锁、写锁就是独占锁,在实现加锁、解锁的办法时别离调用共享式、独占式的获取、开释同步状态即可
在结构时,读写锁中理论应用的都是同一个AQS
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } //读锁结构 protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //写锁结构 protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; }
即同步状态会被读写锁共享,那么它们如何查看/批改本人的那局部同步状态呢?
在读写锁中,同步状态被一分为二,高16位的同步状态是读锁的,低16位的同步状态是写锁的
当线程获取写锁时,写状态+1,因为写状态在低位,相当于同步状态+1
当线程获取读锁时,读状态+1,因为读状态在高位,相当于同步状态+(1<<16)
写锁获取
写锁的获取实现在sync.tryAcquire
中 sync能够是偏心也能够是非偏心,实际上是独占式的获取
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //失去同步状态c int c = getState(); //失去写状态(同步状态低16位 与上 全1) int w = exclusiveCount(c); if (c != 0) { //同步状态不为0,写状态为0,阐明读状态不为0,读锁曾经被获取,此时获取写锁失败 //同步状态不为0,写状态也不为0,查看以后线程是否是获取写锁的线程,不是的话获取写锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; //只有以后线程获取过写锁能力进入这里 //如果原来的写状态+这次重入的写状态 超过了 同步状态的0~15位 则抛出异样 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //设置同步状态 因为写状态在低16位所以不必左移 (重入累加) setState(c + acquires); return true; } //同步状态为0 无锁时 //writerShouldBlock在非偏心锁下返回false 在偏心锁下查看是否有前驱工作 //如果CAS失败则返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //CAS胜利则 设置以后线程为取得独占锁(写锁)的线程 setExclusiveOwnerThread(current); return true;}
查看源码能够晓得:
- 当有锁时(同步状态不为0状况),如果只有读锁(没有写锁),那么间接失败;如果只有写锁则查看以后线程是否为获取写锁的线程(重入状况)
- 当无锁时进行CAS获取写锁,胜利则设置获取写锁的线程,失败则返回
依据源码剖析能够晓得,写锁容许重入,并且获取写锁时,如果有读锁会被阻塞
写锁开释
写锁的开释实现在sync.tryRelease
中
protected final boolean tryRelease(int releases) { //判断以后线程是不是获取写(独占)锁的线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //新状态 int nextc = getState() - releases; //如果新状态低16位为0(没有写锁)就设置获取写锁的线程为空,而后设置同步状态,再返回 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free;}
开释其实也相似,只有当写状态为0时才是真正开释,其余状况都是扣减重入次数
读锁获取
读锁的获取也就是共享式的获取
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); //同步状态 int c = getState(); //exclusiveCount 为获取写锁状态 低16位全与1 //如果有写锁 并且 获取写锁的线程不是以后线程 则失败(阐明容许同一线程获取写锁再获取读锁) if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取读状态 (同步状态右移16位) int r = sharedCount(c); //读没被阻塞 没超过最大值 且CAS胜利 记录信息 返回胜利 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
在读锁中容许同一线程获取写锁再获取读锁
在某些场景下要先写数据再读数据,比方:
- 获取写锁
- 写数据
- 开释写锁
- 应用(读)数据
这样会导致开释完写锁后,其余线程能够获取写锁,从而导致第四步会呈现脏读
正确的用法应该在开释写锁前获取读锁:
- 获取写锁
- 写数据
- 获取读锁
- 开释写锁
- 读数据
这样其余线程获取写锁时因为都读锁会被阻塞,而其余线程须要读时又不会被阻塞
在读多写少的场景,读写锁粒度更细,读读不阻塞,并发性能更好
信号量
性能
信号量用于管制同时拜访资源的线程数量
线程拜访资源时须要先拿到信号量能力拜访,拜访完开释信号量,信号量容许同时N个线程获取
上面是管制同时只能有2个线程获取到信号量
//初始化信号量 Semaphore semaphore = new Semaphore(2); //每次只有两个线程可能获取到信号量执行 ExecutorService executor = Executors.newFixedThreadPool(4); for (int i = 0; i < 10; i++) { executor.execute(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"取得资源"); //执行工作 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"开释资源======"); semaphore.release(); } }); } executor.shutdown();
实现
相熟AQS的同学应该能够猜到信号量其实就是通过共享式实现的
信号量结构时提供初始化信号量的数量,实际上就是初始化同步状态,比方设置2个信号量就是设置同步状态为2;还能够在结构中设置偏心、非偏心
在获取信号量时,应用响应中断的共享式,在非偏心状况下执行nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取同步状态 int available = getState(); //指标同步状态 int remaining = available - acquires; //没有信号量 或 CAS胜利 都会返回指标同步状态 为正数时获取失败 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
在获取时实际上就是扣减要获取的信号量,可能多个线程同时获取信号量,应用CAS+失败重试保障原子性,直到没有信号量或CAS胜利
在开释信号量时理论就是加上开释的信号量,可能多个线程同时开释信号量,因而开释时应用CAS+失败重试保障原子性
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
CountDownLatch
CountDownLatch 相当于一个计数器,在结构时设置计数数量
性能
调用countDown
办法会对数量进行自减
调用await
办法时,如果还有数量没被扣减完,则会阻塞,直到数量都被扣减完
当一个线程执行N个工作,或者多个线程执行一个工作时,要期待它们执行完再进行下一步操作时,就能够应用CountDownLatch
//初始化10CountDownLatch countDownLatch = new CountDownLatch(10);//固定线程池ExecutorService executor = Executors.newFixedThreadPool(10);for (int i = 1; i <= 10; i++) { final int index = i; executor.execute(() -> { System.out.println(Thread.currentThread() + "解决工作" + index); //执行工作... //数量-1 countDownLatch.countDown(); });}//计数量为0时才能够继续执行countDownLatch.await();System.out.println("解决完工作");executor.shutdown();
实现
其实它的实现与信号量相似,也是通过共享式
在结构中设置初始值时,实际上就是在设置同步状态
当执行countDown
扣减数量时,实际上就是在扣减同步状态 ,因为可能多线程同时执行,应用CAS+失败重试保障扣减同步状态胜利
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; }}
执行await
时,理论就是判断同步状态是否为0,不是则阐明有的线程还未执行完工作,阻塞期待
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;}
CyclicBarrier
cyclic Barrier 是一个可循环应用的屏障,它经常被用来和countdownlatch作比拟
它就像一个屏障,让线程执行完工作后遇到屏障阻塞,直到所有线程都执行完工作(都达到屏障),并且它是可重复使用的
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { System.out.println("所有线程达到屏障后,优先执行结构规定的runnable"); }); Thread t1 = new Thread(() -> { //执行工作 task(cyclicBarrier); }, "t1"); Thread t2 = new Thread(() -> { //执行工作 task(cyclicBarrier); }, "t2"); Thread t3 = new Thread(() -> { //执行工作 task(cyclicBarrier); }, "t3"); t1.start(); t2.start(); t3.start();
task办法中会执行await阻塞直到所有线程达到屏障
private static void task(CyclicBarrier cyclicBarrier) { System.out.println(Thread.currentThread() + "执行工作..."); try { TimeUnit.SECONDS.sleep(1); cyclicBarrier.await(); System.out.println("所有线程都执行完, " + Thread.currentThread() + "走出屏障"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }}
cyclic barrier会记录须要多少线程达到屏障,并且通过代来达到重复使用
应用reentrant lock 在await中加锁、解锁,每当一个线程达到屏障(执行await时),都会进行自减,如果不为0会阻塞,自减到0时阐明所有线程达到屏障,唤醒其余线程,并更新新的代
Exchange
Exchanger用于线程间的合作,能够用来替换变量
Exchanger<String> exchanger = new Exchanger();new Thread(() -> { String A = "A"; try { //B System.out.println(exchanger.exchange(A)); } catch (InterruptedException e) { e.printStackTrace(); }}).start();String B = "B";try { //A String A = exchanger.exchange(B); System.out.println("A=" + A + " B=" + B);} catch (InterruptedException e) { e.printStackTrace();}
当一个线程先执行exchange时会期待另一个线程执行,等到另一个线程exchange时则唤醒期待的线程
总结
本篇文章围绕前置常识AQS原理,来实现自定义的同步组件,并对并发包中罕用同步组件的性能和原理进行阐明
继承AQS后,只须要实现尝试获取、开释同步状态等办法就能够自定义同步组件
ReentrantLock 是由AQS实现的独占式可重入锁,初始值同步状态为0;获取锁时,如果是无锁则尝试CAS自增,胜利就获取了锁;如果有锁则判断获取锁的线程是不是以后线程,是则阐明是可重入锁自增次数;在开释锁时因为可重入的关系,只有自减为0才是真正开释锁
ReentrantLock 还提供响应中断、超时、偏心锁的其余性能,偏心锁实现只须要加上获取锁的前提:在AQS中FIFO排队,前驱节点为首节点
ReentrantReadWriteLock 提供共享的读锁和独占的写锁,将锁的情况更加细粒度,将同步状态高下16位拆分为读、写的状态,在读多写少的场景并发性能会更好;在获取写锁时,如果有读锁那么会阻塞,如果有写锁会查看是否为可重入;在获取读锁时,没有写锁就能够获取,如果写锁是以后线程也能够获取
信号量用于控制线程拜访资源,初始化自定义的信号量数量,线程拜访资源时先获取信号量,获取到信号量才可能拜访资源;应用共享式来实现,因为可能多个线程同时获取、开释信号量,在实现时都须要应用CAS+失败重试保障原子性
CountDownLatch 用于计数,能够用于一个线程执行N个工作,也能够用于多个线程执行1个工作,当执行完工作应用countdown 来对同步状态进行扣减,执行await办法时只有同步状态不为0就会阻塞线程,直到所有工作执行完(将同步状态扣减完)
CyclicBarrier 是可循环应用的屏障,用于多线程达到屏障后,须要期待其余线程都达到屏障才继续执行;应用reentrant lock 和 代 来实现,调用await时自减,当计数为0时阐明所有线程达到屏障,唤醒其余阻塞的线程
Exchange 用于线程间的合作,可能替换线程间的变量
最初(不要白嫖,一键三连求求拉~)
本篇文章被支出专栏 由点到线,由线到面,深入浅出构建Java并发编程常识体系,感兴趣的同学能够继续关注喔
本篇文章笔记以及案例被支出 gitee-StudyJava、 github-StudyJava 感兴趣的同学能够stat下继续关注喔~
案例地址:
Gitee-JavaConcurrentProgramming/src/main/java/C_AQSComponent
Github-JavaConcurrentProgramming/src/main/java/C_AQSComponent
有什么问题能够在评论区交换,如果感觉菜菜写的不错,能够点赞、关注、珍藏反对一下~
关注菜菜,分享更多干货,公众号:菜菜的后端私房菜
本文由博客一文多发平台 OpenWrite 公布!