同步队列构造
AQS 应用的同步队列是基于一种 CLH 锁算法来实现。
CLH 锁也是一种基于链表的可扩大、高性能、偏心的自旋锁,申请线程只在本地变量上自旋,它一直轮询前驱的状态,如果发现前驱开释了锁就完结自旋.
同步器中蕴含了两个节点类型的援用,一个指向头节点 (head),一个指向尾节点 (tail), 没有获取到锁的线程,退出到队列的过程必须保障线程平安,因而同步器提供了一个基于 CAS 的设置尾节点的办法
CompareAndSetTail(Node expect,Node update)
, 它须要传递以后线程认为的尾节点和以后节点,只有设置胜利后,以后节点能力正式与之前的尾节点建设关联。
同步器队列遵循
FIFO
, 首节点是获取锁胜利的节点, 首节点的线程在开释锁时, 会唤醒后续节点, 而后继节点在胜利获取到锁后, 会把本人设置成首节点, 设置首节点是由获取锁胜利的线程来实现的, 因为只有一个线程能胜利获取到锁, 所以设置首节点不须要CAS
。
AQS 实现一个线程平安的计数器
自定义互斥锁
package com.rumenz.task.aqs;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyLock {private static final Sync STATE_HOLDER = new Sync();
/**
* 通过 Sync 外部类来持有同步状态,当状态为 1 示意锁被持有,0 示意锁处于闲暇状态
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 是否被独占,有两种示意形式
* 1. 能够依据状态,state= 1 示意锁被占用,0 示意闲暇
* 2. 能够依据以后独占锁的线程来判断,即 getExclusiveOwnerThread()!=null 示意被独占
*/
@Override
protected boolean isHeldExclusively() {return getExclusiveOwnerThread() != null;
}
/**
* 尝试获取锁,将状态从 0 批改为 1,操作胜利则将以后线程设置为以后独占锁的线程
*/
@Override
protected boolean tryAcquire(int arg) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 开释锁,将状态批改为 0
*/
@Override
protected boolean tryRelease(int arg) {if (getState() == 0) {throw new UnsupportedOperationException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
/**
* 上面的实现 Lock 接口须要重写的办法,根本是就是调用外部内 Sync 的办法
*/
public void lock() {STATE_HOLDER.acquire(1);
}
public void unlock() {STATE_HOLDER.release(1);
}
}
测试案例
package com.rumenz.task.aqs;
import org.omg.Messaging.SYNC_WITH_TRANSPORT;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class LockTest {
private final static Integer clientTotal=100000;
private final static Integer threadTotal=200;
private static Count count=new Count();
private static Count unSafe=new Count();
public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
final Semaphore semaphore=new Semaphore(threadTotal);
for (int i = 0; i < clientTotal; i++) {executorService.execute(()->{
try{semaphore.acquire();
count.getIncrement();
unSafe.getUnSafeIncrement();
semaphore.release();}catch (Exception e){e.printStackTrace();
}
countDownLatch.countDown();});
}
countDownLatch.await();
System.out.println("safe:"+count.getCount());
System.out.println("unSafe:"+unSafe.getCount());
executorService.shutdown();}
}
class Count{
private MyLock myLock;
private volatile int count;
Count() {this.myLock=new MyLock();
}
int getCount(){return count;}
int getIncrement(){myLock.lock();
count++;
myLock.unlock();
return count;
}
int getUnSafeIncrement(){
count++;
return count;
}
}
输入后果
safe:100000
unSafe:99995
关注微信公众号:【入门小站】,解锁更多知识点