共计 47980 个字符,预计需要花费 120 分钟才能阅读完成。
对于 Java 局部的面试来说,忽然想到并发这一块的内容是不太残缺的,这篇文章会通篇把多线程和并发都大抵论述一遍,至多可能达到理解原理和应用的目标,内容会比拟多,从最根本的线程到咱们罕用的类会对立说一遍,缓缓看。
过程 & 线程
对于根本的概念,大家应该都很相熟了,过程是资源分配的单位,线程是 CPU 调度的单位,线程是过程中的一个实体。
对于咱们的 Java 程序来说,天生就是多线程的,咱们通过 main 办法启动,就是启动了一个 JVM 的过程,同时创立一个名为 main
的线程,main 就是 JVM 过程中的一个实体线程。
线程生命周期
线程几种根本状态:
- New,初始状态,就是 New 了一个线程,然而还没有调用 start 办法
- Runnable,可运行 Ready 或者运行 Running 状态,线程的就绪和运行中状态咱们统称为 Runnable 运行状态
- Blocked/Wating/Timed_Wating,这些状态对立就叫做休眠状态
- Terminated,终止状态
几个状态之间的转换咱们别离来说。
New:咱们创立一个线程,然而线程没有调用 start 办法,就是初始化状态。
Runnable:调用 start()启动线程进入 Ready 可运行状态,期待 CPU 调度之后进入到 Running 状态。
Blocked:阻塞状态,当线程在期待进入 synchronized 锁的时候,进入阻塞状态。
Waiting:期待状态须要被显示的唤醒,进入该状态分为三种状况,在 synchonized 中调用 Object.wait(),调用 Thread.join(),调用 LockSupport.park()。
Timed_Waiting:和 Waiting 的区别就是多了超时工夫,不须要显示唤醒,达到超时工夫之后主动唤醒,调用图中的一些带有超时参数的办法则会进入该状态。
Terminated:终止状态,线程执行结束。
守护线程 & 用户线程
Java 中的线程分为守护线程和用户线程,下面咱们提到的 main 线程其实就是一个用户线程。
他们最次要的区别就在于,只有有非守护线程没有完结,JVM 就不会失常退出,而守护线程则不会影响 JVM 的退出。
能够通过简略的办法设置一个线程为守护线程。
Thread t = new Thread();
t.setDaemon(true);
锁
锁是管制多线程并发访问共享资源的形式,为了更简略疾速的理解 Java 中的锁,咱们能够依照显示锁和隐式锁来做一个大抵的辨别。
隐式锁
在没有 Lock
接口之前,加锁通过 synchronzied 实现,在之前的 Java 根底系列中我曾经说过了,就不在这里过多的论述,此处援用之前写过的,更多具体能够看《我想进大厂》之 Java 根底夺命连环 16 问。
synchronized 是 java 提供的原子性内置锁,这种内置的并且使用者看不到的锁也被称为 监视器锁,应用 synchronized 之后,会在编译之后在同步的代码块前后加上 monitorenter 和 monitorexit 字节码指令,他依赖操作系统底层互斥锁实现,次要作用就是实现原子性操作和解决共享变量的内存可见性问题。
执行 monitorenter 指令时会尝试获取对象锁,如果对象没有被锁定或者曾经取得了锁,锁的计数器 +1。此时其余竞争锁的线程则会进入期待队列中。
执行 monitorexit 指令时则会把计数器 -1,当计数器值为 0 时,则锁开释,处于期待队列中的线程再持续竞争锁。
如果再深刻到源码来说,synchronized 实际上有两个队列 waitSet 和 entryList。
- 当多个线程进入同步代码块时,首先进入 entryList
- 有一个线程获取到 monitor 锁后,就赋值给以后线程,并且计数器 +1
- 如果线程调用 wait 办法,将开释锁,以后线程置为 null,计数器 -1,同时进入 waitSet 期待被唤醒,调用 notify 或者 notifyAll 之后又会进入 entryList 竞争锁
- 如果线程执行结束,同样开释锁,计数器 -1,以后线程置为 null
显示锁
尽管 synchronized 应用简略,然而也使得加锁的流程固化了,显示锁在 Java1.5 版本之后退出了 Lock 接口,能够通过申明式显示的加锁和解锁。
Lock lock = new ReentrantLock();
lock.lock(); // 加锁
lock.unlock(); // 解锁
独占锁
在上述的伪代码中,咱们应用到了ReentrantLock
,它其实就是独占锁,独占锁保障任何时候都只有一个线程能取得锁,当然了,synchronized 也是独占锁。
这里咱们看 ReentrantLock 的几个加锁接口。
void lock(); // 阻塞加锁
void lockInterruptibly() throws InterruptedException; // 可中断
boolean tryLock(); // 非阻塞
boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 超时加锁
这几个加锁接口,向咱们明确地展现了他和 synchronized 的区别。
- 可中断加锁
lockInterruptibly
,synchronized 可能会有死锁的问题,那么解决方案就是能响应中断。以后线程加锁时,如果其余线程调用以后线程的中断办法,则会抛出异样。 - 非阻塞加锁
tryLock
,调用后立即返回,获取锁则返回 true,否则返回 false - 反对超时加锁
tryLock(long time, TimeUnit unit)
,超时工夫内获取锁返回 true,否则返回 false - 反对偏心和非偏心锁,偏心指的是获取锁依照申请锁的工夫程序决定,先到先得,非偏心则是间接竞争锁,先到不肯定先得
- 反对 Condition
如果你看过阻塞队列的源码,那么你对 Condition 应该挺理解了,咱们举个栗子来看看,咱们须要实现:
- 如果队列满了,那么写入阻塞
- 如果队列空了,那么删除(取元素)阻塞
咱们给阻塞队列提供一个 put 写入元素和 take 删除元素的办法。
put 时候加锁且响应中断,如果队列满了,notFull.await 开释锁,进入阻塞状态,反之,则把元素增加到队列中,notEmpty.signal 唤醒阻塞在删除元素的线程。
take 的时候一样加锁且响应中断,如果队列空了,notEmpty.await 进入开释锁,进入阻塞状态,反之,则删除元素,notFull.signal 唤醒阻塞在增加元素的线程。
public class ConditionTest {public static void main(String[] args) throws Exception {ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
}
static class ArrayBlockingQueue<E> {private Object[] items;
int takeIndex;
int putIndex;
int count;
private ReentrantLock lock;
private Condition notEmpty;
private Condition notFull;
public ArrayBlockingQueue(int capacity) {this.items = new Object[capacity];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();}
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {while (count == items.length) {notFull.await();
}
enqueue(e);
} finally {lock.unlock();
}
}
private void enqueue(E x) {final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length){putIndex = 0;}
count++;
notEmpty.signal();}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {while (count == 0) {notEmpty.await();
}
return dequeue();} finally {lock.unlock();
}
}
private E dequeue() {final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length){takeIndex = 0;}
count--;
notFull.signal();
return x;
}
}
}
读写锁
读写锁,也能够称作共享锁,区别于独占锁,共享锁则能够容许多个线程同时持有,如 ReentrantReadWriteLock
容许多线程并发读,要简略概括就是:读读不互斥,读写互斥,写写互斥。
ReentrantReadWriteLock
通过浏览源码发现它外部保护了两个锁:读锁和写锁。
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
实质上,不论是 ReentrantLock 还是 ReentrantReadWriteLock 都是基于 AQS,AQS 只有一个状态位 state,对于 ReentrantReadWriteLock 实现读锁和写锁则是对 state 做出了辨别,高 16 位示意的是读锁的状态,低 16 示意的是写锁的状态。
咱们能够看一个源码中给出的应用例子。
class CacheData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {rwl.readLock().lock();
if (!cacheValid) {
// 必须先开释读锁,再加写锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 从新校验状态,避免并发问题
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 写锁降级为读锁
rwl.readLock().lock();
} finally {rwl.writeLock().unlock(); // 写锁开释,依然持有读锁}
} try {use(data);
} finally {rwl.readLock().unlock();}
}
}
这个例子嵌套写的其实不太好了解,因为他蕴含了一个写锁降级的概念,实际上咱们本人写最简略的例子就是这样,例子中给到的示例其实是一个意思,只是在写锁开释前先降级为读锁,明确意思就好。
rwl.readLock().lock();
doSomething();
rwl.readLock().unlock();
rwl.writeLock().lock();
doSomething();
rwl.writeLock().unlock();
额定须要留神的是,写锁能够降级为读锁,然而读锁不能降级为写锁,比方上面这种写法是不反对的。
rwl.readLock().lock();
doSomething();
rwl.writeLock().lock();
doSomething();
rwl.writeLock().unlock();
rwl.readLock().unlock();
StampedLock
这是 JDK1.8 之后新增的一个锁,相比 ReentrantReadWriteLock
他的性能更好,在读锁和写锁的根底上减少了一个乐观读锁。
写锁:他的写锁基本上和 ReentrantReadWriteLock 一样,然而不可重入。
读锁:也和 ReentrantReadWriteLock 一样,然而不可重入。
乐观读锁:一般的读锁通过 CAS 去批改以后 state 状态,乐观锁实现原理则是加锁的时候返回一个 stamp(锁状态),而后还须要调用一次 validate(stamp)
判断以后是否有其余线程持有了写锁,通过的话则能够间接操作数据,反之降级到一般的读锁,之前咱们说到读写锁也是互斥的,那么乐观读和写就不是这样的了,他能反对一个线程去写。所以,他性能更高的起因就来自于没有 CAS 的操作,只是简略的位运算拿到以后的锁状态 stamp,并且能反对另外的一个线程去写。
总结下来能够了解为:读读不互斥,读写不互斥,写写互斥 ,另外通过tryConvertToReadLock()
和tryConvertToWriteLock()
等办法反对锁的升降级。
还是依照官网的文档举个栗子,不便了解,两个办法别离示意乐观锁的应用和锁降级的应用。
public class StampedLockTest {
private double x, y;
private final StampedLock sl = new StampedLock();
double distanceFromOrigin() {
// 乐观锁
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
// 状态曾经扭转,降级到读锁,从新读取一次最新的数据
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
void moveIfAtOrigin(double newX, double newY) {
// 能够应用乐观锁代替
long stamp = sl.readLock();
try {while (x == 0.0 && y == 0.0) {
// 尝试降级到写锁
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
// 降级胜利,替换以后 stamp 标记
stamp = ws;
x = newX;
y = newY;
break;
} else {
// 降级失败,再次获取写锁
sl.unlockRead(stamp);
stamp = sl.writeLock();}
}
} finally {sl.unlock(stamp);
}
}
}
LockSupport
LockSupport 是一个比拟根底的工具类,基于 Unsafe
实现,次要就是提供线程阻塞和唤醒的能力,下面咱们提到对线程生命周期状态的时候也说过了,LockSupport 的几个 park 性能将会把线程阻塞,直到被唤醒。
看看他的几个外围办法:
public static void park(); // 阻塞以后线程
public static void parkNanos(long nanos); // 阻塞以后线程加上了超时工夫,达到超时工夫之后返回
public static void parkUntil(long deadline); // 和下面相似,参数 deadline 代表的是从 1970 到当初工夫的毫秒数
public static void unpark(Thread thread);// 唤醒线程
举个栗子:
public class Test {public static void main(String[] args) throws Exception {
int sleepTime = 3000;
Thread t = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "挂起");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "持续工作");
});
t.start();
System.out.println("主线程 sleep" + sleepTime);
Thread.sleep(sleepTime);
System.out.println("主线程唤醒阻塞线程");
LockSupport.unpark(t);
}
}
// 输入如下
主线程 sleep3000
Thread- 0 挂起
主线程唤醒阻塞线程
Thread- 0 持续工作
原子类
多线程环境下操作变量,除了能够用咱们下面始终说的加锁的形式,还有其余更简略快捷的方法吗?
JDK1.5 之后引入的原子操作包上面的一些类提供给了咱们一种无锁操作变量的形式,这种通过 CAS 操作的形式更高效并且线程平安。
根本数据类型
咱们先说针对根本数据类型提供的 AtomicInteger
、AtomicLong
、AtomicBoolean
,看名字都晓得是干嘛的,因为基本上没什么区别,以AtomicInteger
的办法举例来说明。
public final int getAndIncrement(); // 旧值 +1,返回旧值
public final int getAndDecrement(); // 旧值 -1,返回旧值
public final int getAndAdd(int delta); // 旧值 +delta,返回旧值
public final int getAndSet(int newValue); // 旧值设置为 newValue,返回旧值
public final int getAndAccumulate(int x,IntBinaryOperator accumulatorFunction); // 旧值依据传入办法进行计算,返回旧值
public final int getAndUpdate(IntUnaryOperator updateFunction)// 旧值依据传入进行计算,返回旧值
与之绝对应的还有一套办法比方 incrementAndGet()
等等,规定齐全一样,只是返回的是新值。
咱们看看上面的例子,针对自定义规定传参,比方咱们能够把计算规定改成乘法。
public class AtomicIntegerTest {public static void main(String[] args) {AtomicInteger atomic = new AtomicInteger(10);
System.out.println(atomic.getAndIncrement()); //10
System.out.println(atomic.getAndDecrement()); //11
System.out.println(atomic.getAndAdd(2));//10
System.out.println(atomic.getAndSet(10)); //12
System.out.println(atomic.get()); //10
System.out.println("=====================");
System.out.println(atomic.getAndAccumulate(3, (left, right) -> left * right)); // 10
System.out.println(atomic.get()); //30
System.out.println(atomic.getAndSet(10)); //30
System.out.println("=====================");
System.out.println(atomic.getAndUpdate(operand -> operand * 20)); // 10
System.out.println(atomic.get()); //200
}
}
另外提到一嘴,根本数据类型只给了 Integer、Long、Boolean,那其余的根本数据类型呢?其实看下 AtomicBoolean 的源码咱们发现其实他实质上是转成了 Integer 解决的,那么针对其余的类型也能够参考这个思路来实现。
数组
针对数组类型的原子操作提供了 3 个,能够不便的更新数组中的某个元素。
AtomicIntegerArray
:针对 Integer 数组的原子操作。
AtomicLongArray
:针对 Long 数组的原子操作。
AtomicReferenceArray
:针对援用类型数组的原子操作。
和下面说的 Atomic 其实也没有太大的区别,还是以 AtomicIntegerArray 举例说明,次要办法也根本一样。
public final int getAndIncrement(int i);
public final int getAndDecrement(int i);
public final int getAndAdd(int i, int delta);
public final int getAndSet(int i, int newValue);
public final int getAndAccumulate(int i, int x,IntBinaryOperator accumulatorFunction);
public final int getAndUpdate(int i, IntUnaryOperator updateFunction);
操作截然不同,只是多了一个参数示意以后索引的地位,同样有 incrementAndGet
等一套办法,返回最新值,没有区别,对于援用类型 AtomicReferenceArray
来说只是没有了 increment 和 decrement 这些办法,其余的也都大同小异,不再赘述。
说实话,这个都没有举栗子的必要。
public class AtomicIntegerArrayTest {public static void main(String[] args) {int[] array = {10};
AtomicIntegerArray atomic = new AtomicIntegerArray(array);
System.out.println(atomic.getAndIncrement(0)); //10
System.out.println(atomic.get(0));//11
System.out.println(atomic.getAndDecrement(0)); //11
System.out.println(atomic.getAndAdd(0, 2));//10
System.out.println(atomic.getAndSet(0, 10)); //12
System.out.println(atomic.get(0)); //10
System.out.println("=====================");
System.out.println(atomic.getAndAccumulate(0, 3, (left, right) -> left * right)); // 10
System.out.println(atomic.get(0)); //30
System.out.println(atomic.getAndSet(0, 10)); //30
System.out.println("=====================");
System.out.println(atomic.getAndUpdate(0, operand -> operand * 20)); // 10
System.out.println(atomic.get(0)); //200
}
}
援用类型
像 AtomicInteger 那种,只能原子更新一个变量,如果须要同时更新多个变量,就须要应用咱们的援用类型的原子类,针对援用类型的原子操作提供了 3 个。
AtomicReference
:针对援用类型的原子操作。
AtomicMarkableReference
:针对带有标记位的援用类型的原子操作。
AtomicStampedReference
:针对带有标记位的援用类型的原子操作。
AtomicMarkableReference 和 AtomicStampedReference 十分相似,他们是为了解决 CAS 中的 ABA 的问题(别说你不晓得啥是 ABA 问题),只不过这个标记的类型不同,咱们看下源码。
AtomicMarkableReference 标记类型是布尔类型,所以其实他版本就俩,true 和 false。
AtomicMarkableReference 标记类型是整型,那可不就是失常的版本号嘛。
public class AtomicMarkableReference<V> {
private static class Pair<T> {
final T reference;
final boolean mark; // 标记
}
}
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp; // 标记
}
}
办法还是那几个,老样子。
public final V getAndSet(V newValue);
public final V getAndUpdate(UnaryOperator<V> updateFunction);
public final V getAndAccumulate(V x, BinaryOperator<V> accumulatorFunction);
public final boolean compareAndSet(V expect, V update);
简略举个栗子:
public class AtomicReferenceTest {public static void main(String[] args) {User user = new User(1L, "test", "test");
AtomicReference<User> atomic = new AtomicReference<>(user);
User pwdUpdateUser = new User(1L,"test","newPwd");
System.out.println(atomic.getAndSet(pwdUpdateUser));
System.out.println(atomic.get());
}
@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
static class User {
private Long id;
private String username;
private String password;
}
}
// 输入
AtomicReferenceTest.User(id=1, username=test, password=test)
AtomicReferenceTest.User(id=1, username=test, password=newPwd)
对象属性
针对对象属性的原子操作也还是提供了 3 个。
AtomicIntegerFieldUpdater
:针对援用类型里的整型属性的原子操作。
AtomicLongFieldUpdater
:针对援用类型里的长整型属性的原子操作。
AtomicReferenceFieldUpdater
:针对援用类型里的属性的原子操作。
须要留神的是,须要更新的属性字段不能是 private,并且必须用 volatile
润饰,否则会报错。
举个栗子:
public class AtomicReferenceFieldTest {public static void main(String[] args) {AtomicReferenceFieldUpdater<User, String> atomic = AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "password");
User user = new User(1L, "test", "test");
System.out.println(atomic.getAndSet(user, "newPwd"));
System.out.println(atomic.get(user));
}
@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
static class User {
private Long id;
private String username;
volatile String password;
}
}
// 输入
test
newPwd
累加器
累加器有 4 个,都来自 JDK1.8 新增的,为啥新增呢?因为 Doug 大佬感觉 AtomicLong 还不够快,尽管说通过 CAS 操作曾经很快了,然而众所知周,高并发同时操作一个共享变量只有一个胜利,那其余的线程都在有限自旋,大量的节约了 CPU 的资源,所以累加器 Accumulator 的思路就是把一个变量拆成多个变量,这样多线程去操作竞争多个变量资源,性能不就晋升了嘛。
也就是说,在高并发的场景下,能够尽量的应用上面这些类来替换根底类型操作的那些 AtomicLong 之类的,能够进步性能。
LongAdder
:Long 类型的累加,LongAccumulator 的特例。
LongAccumulator
:Long 类型的累加。
DoubleAdder
:Double 类型的累加,DoubleAccumulator 的特例。
DoubleAccumulator
:Double 类型的累加。
因为 LongAdder 和 DoubleAdder 都是一样的,咱们以 LongAdder 和 LongAccumulator 举例来说明它的一些简略的原理。
LongAdder
它继承自 Striped64
,外部保护了一个Cell
数组,核心思想就是把单个变量的竞争拆分,多线程下如果一个 Cell
竞争失败,转而去其余 Cell
再次 CAS 重试。
transient volatile Cell[] cells;
transient volatile long base;
在计算以后值的时候,则是累加所有 cell 的 value 再加上 base。
public long sum() {Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
这里还波及到一个伪共享的概念,至于啥是伪共享,看看之前我写的实在字节二面:什么是伪共享?。
解决伪共享的真正的外围就在 Cell
数组,能够看到,Cell
数组应用了 Contented
注解。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) {value = x;}
}
在下面咱们提到数组的内存地址都是间断的,所以数组内的元素常常会被放入一个缓存行,这样的话就会带来伪共享的问题,影响性能,这里应用 Contented
进行填充,就防止了伪共享的问题,使得数组中的元素不再共享一个缓存行。
LongAccumulator
下面说到,LongAdder 其实就是 LongAccumulator 的一个特例,相比 LongAdder 他的性能会更加弱小,能够自定义累加的规定,在下面演示 AtomicInteger 性能的时候其实咱们也应用过了。
,实际上就是实现了一个 LongAdder 的性能,初始值咱们传入 0,而 LongAdder 的初始值就是 0 并且只能是 0。
public class LongAdderTest {public static void main(String[] args) {LongAdder longAdder = new LongAdder();
LongAccumulator accumulator = new LongAccumulator((left, right) -> 0, 0);
}
}
工具类 & 容器类
这里要说到一些咱们在平时开发中常常应用到的一些类以及他们的实现原理。
CountDownLatch
CountDownLatch 实用于在多线程的场景须要期待所有子线程全副执行结束之后再做操作的场景。
假如当初咱们有一个业务场景,咱们须要调用多个 RPC 接口去查问数据并且写入 excel,最初把所有 excel 打包压缩发送邮件进来。
public class CountDownLatchTest {public static void main(String[] args) throws Exception{ExecutorService executorService = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(2);
executorService.submit(()->{
try {Thread.sleep(1000);
System.out.println("写 excelA 实现");
countDownLatch.countDown();} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
executorService.submit(()->{
try {Thread.sleep(3000);
System.out.println("写 excelB 实现");
countDownLatch.countDown();} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
System.out.println("期待 excel 写入实现");
countDownLatch.await();
System.out.println("开始打包发送数据..");
executorService.shutdown();}
}
// 输入
期待 excel 写入实现
写 excelA 实现
写 excelB 实现
开始打包发送数据..
整个过程如下:
初始化一个 CountDownLatch 实例传参 2,因为咱们有 2 个子线程,每次子线程执行结束之后调用 countDown()办法给计数器 -1,主线程调用 await()办法后会被阻塞,直到最初计数器变为 0,await()办法返回,执行结束。
他和 join 有个区别,像咱们这里用的是 ExecutorService 创立线程池,是没法应用 join 的,相比起来,CountDownLatch 的应用会显得更加灵便。
CountDownLatch 基于 AQS 实现,用 volatile 润饰 state 变量维持倒数状态,多线程共享变量可见。
- CountDownLatch 通过构造函数初始化传入参数理论为 AQS 的 state 变量赋值,维持计数器倒数状态
- 当主线程调用 await()办法时,以后线程会被阻塞,当 state 不为 0 时进入 AQS 阻塞队列期待。
- 其余线程调用 countDown()时,通过 CAS 批改 state 值 -1,当 state 值为 0 的时候,唤醒所有调用 await()办法阻塞的线程
CyclicBarrier
CyclicBarrier 叫做回环屏障,它的作用是 让一组线程全副达到一个状态之后再全副同时执行,他和 CountDownLatch 次要区别在于,CountDownLatch 的计数器只能用一次,而 CyclicBarrier 的计数器状态则是能够始终重用的。
咱们能够应用 CyclicBarrier 一样实现下面的需要。
public class CyclicBarrierTest {public static void main(String[] args) throws Exception{ExecutorService executorService = Executors.newFixedThreadPool(10);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {System.out.println("开始打包发送数据..");
});
executorService.submit(()->{
try {Thread.sleep(1000);
System.out.println("写 excelA 实现");
cyclicBarrier.await();} catch (Exception e) {throw new RuntimeException(e);
}
});
executorService.submit(()->{
try {Thread.sleep(3000);
System.out.println("写 excelB 实现");
cyclicBarrier.await();} catch (Exception e) {throw new RuntimeException(e);
}
});
System.out.println("期待 excel 写入实现");
executorService.shutdown();}
}
// 输入
期待 excel 写入实现
写 excelA 实现
写 excelB 实现
开始打包发送数据..
初始化的时候咱们传入 2 个线程和一个回调办法,线程调用 await()之后进入阻塞状态并且计数器 -1,这个阻塞点被称作为屏障点或者同步点,只有最初一个线程达到屏障点的时候,所有被屏障拦挡的线程能力持续运行,这也是叫做回环屏障的名称起因。
而当计数器为 0 时,就去执行 CyclicBarrier 构造函数中的回调办法,回调办法执行实现之后,就会退出屏障点,唤醒其余阻塞中的线程。
CyclicBarrier 基于 ReentrantLock 实现,实质上还是基于 AQS 实现的,外部保护 parties 记录总线程数,count 用于计数,最开始 count=parties,调用 await()之后 count 原子递加,当 count 为 0 之后,再次将 parties 赋值给 count,这就是复用的原理。
- 当子线程调用 await()办法时,获取独占锁 ReentrantLock,同时对 count 递加,进入阻塞队列,而后开释锁
- 当第一个线程被阻塞同时开释锁之后,其余子线程竞争获取锁,操作同 1
- 直到最初 count 为 0,执行 CyclicBarrier 构造函数中的工作,执行结束之后子线程持续向下执行,计数重置,开始下一轮循环
Semaphore
Semaphore 叫做信号量,和后面两个不同的是,他的计数器是递增的,信号量这玩意儿在限流中就常常应用到。
public class SemaphoreTest {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(0);
executorService.submit(() -> {
try {Thread.sleep(1000);
System.out.println("写 excelA 实现");
semaphore.release();} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
executorService.submit(() -> {
try {Thread.sleep(3000);
System.out.println("写 excelB 实现");
semaphore.release();} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
System.out.println("期待 excel 写入实现");
semaphore.acquire(2);
System.out.println("开始打包发送数据..");
executorService.shutdown();}
}
// 输入
期待 excel 写入实现
写 excelA 实现
写 excelB 实现
开始打包发送数据..
略微和前两个有点区别,构造函数承受参数示意可用的许可证的数量,acquire 办法示意获取一个许可证,应用完之后 release 偿还许可证。
当子线程调用 release()办法时,计数器递增,主线程 acquire()传参为 2 则阐明主线程始终阻塞,直到计数器为 2 才会返回。
Semaphore 还还还是基于 AQS 实现的,同时获取信号量有偏心和非偏心两种策略,通过构造函数的传参能够批改,默认则是非偏心的策略。
- 先说非偏心的策略,主线程调用 acquire()办法时,用以后信号量值 - 须要获取的值,如果小于 0,阐明还没有达到信号量的要求值,则会进入 AQS 的阻塞队列,大于 0 则通过 CAS 设置以后信号量为残余值,同时返回残余值。而对于偏心策略来说,如果以后有其余线程在期待获取资源,那么本人就会进入 AQS 阻塞队列排队。
- 子线程调用 release()给以后信号量值计数器 +1(减少的值数量由传参决定),同时不停的尝试唤醒因为调用 acquire()进入阻塞的线程
Exchanger
Exchanger 用于两个线程之间替换数据,如果两个线程都达到同步点,这两个线程能够互相交换他们的数据。
举个栗子,A 和 B 两个线程须要替换他们本人写的数据以便核查数据是否统一。
public class ExchangerTest {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newFixedThreadPool(10);
Exchanger<String> exchanger = new Exchanger<>();
executorService.submit(() -> {
try {Thread.sleep(1000);
System.out.println("写 excelA 实现");
System.out.println("A 获取到数据 =" + exchanger.exchange("excelA"));
} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
executorService.submit(() -> {
try {Thread.sleep(3000);
System.out.println("写 excelB 实现");
System.out.println("B 获取到数据 =" + exchanger.exchange("excelB"));
} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
executorService.shutdown();}
}
// 输入
写 excelA 实现
写 excelB 实现
B 获取到数据 =excelA
A 获取到数据 =excelB
A 写完之后 exchange 会始终阻塞期待,直到另外一个线程也 exchange 之后,才会继续执行。
ThreadLocalRandom
通常咱们都会用 Random 去生成随机数,然而 Random 有点小问题,在多线程并发的状况下为了保障生成的随机性,通过 CAS 的形式保障生成新种子的原子性,然而这样带来了性能的问题,多线程并发去生成随机数,然而只有一个线程能胜利,其余的线程会始终自旋,性能不高,所以 ThreadLocalRandom 就是为了解决这个问题而诞生。
// 多线程下通过 CAS 保障新种子生成的原子性
protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {oldseed = seed.get();
nextseed = (oldseed * multiplier + addend) & mask;
} while (!seed.compareAndSet(oldseed, nextseed));
return (int)(nextseed >>> (48 - bits));
}
ThreadLocalRandom 咱们从名字就能看进去,必定应用了 ThreadLocal,作用就是用 ThreadLocal 保留每个种子的变量,避免在高并发下对同一个种子的抢夺。
应用也非常简单:
ThreadLocalRandom.current().nextInt(100);
看下源码实现,current 办法获取以后的 ThreadLocalRandom 实例。
public static ThreadLocalRandom current() {if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}
nextInt 办法和 Random 看起来差不多,下面是生成新的种子,上面是固定的基于新种子计算随机数,次要看 nextSeed。
public int nextInt(int bound) {if (bound <= 0)
throw new IllegalArgumentException(BadBound);
int r = mix32(nextSeed()); // 生成新种子
int m = bound - 1;
if ((bound & m) == 0) // power of two
r &= m;
else { // reject over-represented candidates
for (int u = r >>> 1;
u + m - (r = u % bound) < 0;
u = mix32(nextSeed()) >>> 1)
;
}
return r;
}
r = UNSAFE.getLong(t, SEED) + GAMMA 计算出新的种子,而后应用 UNSAFE 的办法放入以后线程中。
final long nextSeed() {
Thread t; long r; // read and update per-thread seed
UNSAFE.putLong(t = Thread.currentThread(), SEED,
r = UNSAFE.getLong(t, SEED) + GAMMA);
return r;
}
ConcurrentHashMap
这个咱们就不说了,说的太多了,之前的文章也写过了,能够参考之前写过的。
CopyOnWriteArrayList&CopyOnWriteArraySet
这是线程平安的 ArrayList,从名字咱们就能看进去,写的时候复制,这叫做 写时复制,也就是写的操作是对拷贝的数组的操作。
先看构造函数,有 3 个,别离是无参,传参为汇合和传参数组,其实都差不多,无参构造函数创立一个新的数组,汇合则是把汇合类的元素拷贝到新的数组,数组也是一样。
public CopyOnWriteArrayList() {setArray(new Object[0]);
}
public CopyOnWriteArrayList(Collection<? extends E> c) {Object[] elements;
if (c.getClass() == CopyOnWriteArrayList.class)
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {elements = c.toArray();
if (c.getClass() != ArrayList.class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
setArray(elements);
}
public CopyOnWriteArrayList(E[] toCopyIn) {setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}
咱们看 add 办法,你一眼就能看进去非常简单的实现,通过 ReentrantLock 加锁,而后拷贝出一个新的数组,数组长度 +1,再把新数组赋值,所以这就是名字的由来,写入的时候操作的是数组的拷贝,其余的删除批改就不看了,基本上是一样的。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {lock.unlock();
}
}
再看看 get 办法,也非常简单,间接获取数组以后索引的值,这里须要留神的是,读数据是没有加锁的,所以会有一致性的问题,它并不能保障读到的肯定是最新的数据。
public E get(int index) {return get(getArray(), index);
}
private E get(Object[] a, int index) {return (E) a[index];
}
final Object[] getArray() {return array;}
至于 CopyOnWriteArraySet,他就是基于 CopyOnWriteArrayList 实现的,这里咱们不再赘述。
public CopyOnWriteArraySet() {al = new CopyOnWriteArrayList<E>();
}
public boolean add(E e) {return al.addIfAbsent(e);
}
public boolean addIfAbsent(E e) {Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
Fork/Join
Fork/Join 是一个并行执行工作的框架,利用的 分而治之 的思维。
Fork 是把一个大的工作拆分成若干个小工作并行执行,Join 则是合并拆分的子工作的后果集,最终计算出大工作的后果。
所以整个 Fork/Join 的流程能够认为就是两步:
- Fork 拆分工作,直到拆分到最小粒度不可拆分为止
- Join 计算结果,把每个子工作的后果进行合并
这里咱们须要介绍一下次要的几个类:
ForkJoinTask:就是咱们的分治工作的抽象类
RecursiveTask:继承于 ForkJoinTask,用于计算有返回后果的工作
RecursiveAction:继承于 ForkJoinTask,用于计算没有返回后果的工作
ForkJoinPool:用于执行 ForkJoinTask 工作的线程池,通常咱们能够用 ForkJoinPool.commonPool() 去创立一个 Fork/Join 的线程池,而后用 submit 或者 invoke 去提交执行工作。
这里咱们写一个测试程序,用于计算 [0,999] 的求和后果,所以咱们写一个类继承 RecursiveTask,并且实现他的 compute 办法。
invokeAll() 相当于每个工作都执行 fork,fork 之后会再次执行 compute 判断是否要持续拆分,如果无需拆分那么则应用 join 办法计算汇总后果。
public class ForkJoinTest {public static void main(String[] args) throws Exception {List<Integer> list = new LinkedList<>();
Integer sum = 0;
for (int i = 0; i < 1000; i++) {list.add(i);
sum += i;
}
CalculateTask task = new CalculateTask(0, list.size(), list);
Future<Integer> future = ForkJoinPool.commonPool().submit(task);
System.out.println("sum=" + sum + ",Fork/Join result=" + future.get());
}
@Data
static class CalculateTask extends RecursiveTask<Integer> {
private Integer start;
private Integer end;
private List<Integer> list;
public CalculateTask(Integer start, Integer end, List<Integer> list) {
this.start = start;
this.end = end;
this.list = list;
}
@Override
protected Integer compute() {
Integer sum = 0;
if (end - start < 200) {for (int i = start; i < end; i++) {sum += list.get(i);
}
} else {int middle = (start + end) / 2;
System.out.println(String.format("从 [%d,%d] 拆分为:[%d,%d],[%d,%d]", start, end, start, middle, middle, end));
CalculateTask task1 = new CalculateTask(start, middle, list);
CalculateTask task2 = new CalculateTask(middle, end, list);
invokeAll(task1, task2);
sum = task1.join() + task2.join();
}
return sum;
}
}
}
// 输入
从 [0,1000] 拆分为:[0,500],[500,1000]
从 [0,500] 拆分为:[0,250],[250,500]
从 [500,1000] 拆分为:[500,750],[750,1000]
从 [0,250] 拆分为:[0,125],[125,250]
从 [250,500] 拆分为:[250,375],[375,500]
从 [500,750] 拆分为:[500,625],[625,750]
从 [750,1000] 拆分为:[750,875],[875,1000]
sum=499500,Fork/Join result=499500
应用实现之后,咱们再来谈一下 Fork/Join 的原理。
先看 fork 的代码,调用 fork 之后,应用 workQueue.push() 把工作增加到队列中,留神 push 之后调用 signalWork 唤醒一个线程去执行工作。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
final ForkJoinPool.WorkQueue workQueue; // 工作窃取
final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();}
}
下面咱们看到了 workQueue,这个其实就是咱们说的工作队列,它是一个双端队列,并且有一个工作线程和他对应。
@sun.misc.Contended
static final class WorkQueue {
volatile int base; // 下一个出队列索引
int top; // 下一个入队列索引
ForkJoinTask<?>[] array; // 队列中的 task
final ForkJoinPool pool;
final ForkJoinWorkerThread owner; // 工作队列中的工作线程
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // 以后 join 的工作
volatile ForkJoinTask<?> currentSteal; // 以后偷到的工作
}
那如果工作线程本人队列的做完了怎么办?只能傻傻地期待吗?并不是,这时候有一个叫做 工作窃取 的机制,所以他就会去其余线程的队列里偷一个工作来执行。
为了防止偷工作线程和本人的线程产生竞争,所以本人的工作线程是从队列头部获取工作执行,而偷工作线程则从队列尾部偷工作。
Executor
Executor 是并发编程中重要的一环,工作创立后提交到 Executor 执行并最终返回后果。
工作
线程两种创立形式:Runnable 和 Callable。
Runnable 是最后创立线程的形式,在 JDK1.1 的版本就曾经存在,Callable 则在 JDK1.5 版本之后退出,他们的次要区别在于 Callable 能够返回工作的执行后果。
工作执行
工作的执行次要靠 Executor,ExecutorService 继承自 Executor,ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 别离实现了 ExecutorService。
那说到线程池之前,咱们必定要提及到线程池的几个外围参数和原理,这个之前的文章也写到过,属于根底中的根底局部。
首先线程池有几个外围的参数概念:
- 最大线程数 maximumPoolSize
- 外围线程数 corePoolSize
- 沉闷工夫 keepAliveTime
- 阻塞队列 workQueue
- 回绝策略 RejectedExecutionHandler
当提交一个新工作到线程池时,具体的执行流程如下:
- 当咱们提交工作,线程池会依据 corePoolSize 大小创立若干工作数量线程执行工作
- 当工作的数量超过 corePoolSize 数量,后续的工作将会进入阻塞队列阻塞排队
- 当阻塞队列也满了之后,那么将会持续创立 (maximumPoolSize-corePoolSize) 个数量的线程来执行工作,如果工作解决实现,maximumPoolSize-corePoolSize 额定创立的线程期待 keepAliveTime 之后被主动销毁
- 如果达到 maximumPoolSize,阻塞队列还是满的状态,那么将依据不同的回绝策略对应解决
回绝策略次要有四种:
- AbortPolicy:间接抛弃工作,抛出异样,这是默认策略
- CallerRunsPolicy:应用调用者所在的线程来解决工作
- DiscardOldestPolicy:抛弃期待队列中最老的工作,并执行当前任务
- DiscardPolicy:间接抛弃工作,也不抛出异样
ThreadPoolExecutor
通常为了快捷咱们会用 Executors 工具类提供的创立线程池的办法疾速地创立一个线程池进去,次要有几个办法,然而个别咱们不举荐这样应用,非常容易导致呈现问题,生产环境中咱们个别举荐本人实现,参数本人定义,而不要应用这些办法。
创立
// 创立固定线程数大小的线程池,外围线程数 = 最大线程数,阻塞队列长度 =Integer.MAX_VALUE
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 创立只有一个线程的线程池,阻塞队列长度 =Integer.MAX_VALUE
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 创立外围线程数为 0,最大线程数 =Integer.MAX_VALUE 的线程池,阻塞队列为同步队列
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
最好的方法就是本人创立,并且指定线程名称:
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors()*2,
1000L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("thread-name").build());
提交工作
重点说一下几个办法:
submit(Runnable task, T result):能够用于主线程和子线程之间的通信,数据共享。
submit(Runnable task):返回 null,相当于调用 submit(Runnable task, null)。
invokeAll(Collection<? extends Callable<T>> tasks):批量提交工作,阻塞期待所有工作执行实现之后返回,带超时工夫的则是在超时之后返回,并且勾销没有执行实现的工作。
invokeAny(Collection<? extends Callable<T>> tasks):批量提交工作,只有一个工作有返回,那么其余的工作都会被终止。
public void execute(Runnable command); // 提交 runnable 工作,无返回
public <T> Future<T> submit(Callable<T> task); // 提交 callable 工作,有返回
public Future<?> submit(Runnable task); // 提交 runnable,有返回
public <T> Future<T> submit(Runnable task, T result); // 提交 runnable,有返回
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); // 批量提交工作
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit);
public <T> T invokeAny(Collection<? extends Callable<T>> tasks);
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit);
敞开
shutdown:线程池状态设置为SHUTDOWN
,不再承受新工作,间接返回,线程池中工作会执行实现,遍历线程池中的线程,一一调用 interrupt 办法去中断线程。
shutdownNow:线程池状态设置为STOP
,不再承受新工作,间接返回,线程池中工作会被中断,返回值为被抛弃的工作列表。
isShutdown:只有调用了 shutdown 或者 shutdownNow,都会返回 true
isTerminating:所有工作都敞开后,才返回 true
public void shutdown();
public List<Runnable> shutdownNow();
public boolean isShutdown();
public boolean isTerminating();
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor,从名字咱们也晓得,他是用于定时执行工作的线程池。
外部实现了一个 DelayedWorkQueue 作为工作的阻塞队列,ScheduledFutureTask 作为调度的工作,保留到队列中。
咱们先看下他的构造函数,4 个构造函数都不反对传队列进来,所以默认的就是应用他的外部类 DelayedWorkQueue,因为 DelayedWorkQueue 是一个无界队列,所以这里最大线程数都是设置的为 Integer.MAX,因为没有意义。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
执行定时工作的办法次要有 4 个,后面两个 schedule 传参辨别 Runnable 和 Callable 其实并没有区别,最终 Runnable 会通过 Executors.callable(runnable, result) 转换为 Callable,实质上咱们能够当做只有 3 个执行办法来看。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit);
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
schedule:提交一个延时工作,从工夫单位为 unit 的 delay 工夫开始执行,并且工作只会执行一次。
scheduleWithFixedDelay:以固定的 延迟时间 反复执行工作,initialDelay 示意提交工作后多长时间开始执行,delay 示意工作执行工夫距离。
scheduleAtFixedRate:以固定的 工夫频率 反复执行工作,指的是以起始工夫开始,而后以固定的工夫距离反复执行工作,initialDelay 示意提交工作后多长时间开始执行,而后从 initialDelay + N * period
执行。
这两个特地容易搞混,很难了解到底是个啥意思,记住了。
scheduleAtFixedRate 是 上次执行实现之后立即执行 ,scheduleWithFixedDelay 则是 上次执行实现 +delay 后执行。
看个例子,两个工作都会提早 1 秒,而后以 2 秒的距离开始反复执行,工作睡眠 1 秒的工夫。
scheduleAtFixedRate 因为工作执行的耗时比工夫距离小,所以始终是以 2 秒的距离在执行。
scheduleWithFixedDelay 因为工作耗时用了 1 秒,导致前面的工夫距离都成了 3 秒。
public class ScheduledThreadPoolTest {public static void main(String[] args) throws Exception {ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
executorService.scheduleAtFixedRate(() -> {
try {System.out.println("scheduleAtFixedRate=" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
executorService.scheduleWithFixedDelay(() -> {
try {System.err.println("scheduleWithFixedDelay=" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
// executorService.shutdown();}
}
// 输入
scheduleAtFixedRate=01:17:05
scheduleWithFixedDelay=01:17:05
scheduleAtFixedRate=01:17:07
scheduleWithFixedDelay=01:17:08
scheduleAtFixedRate=01:17:09
scheduleAtFixedRate=01:17:11
scheduleWithFixedDelay=01:17:11
scheduleAtFixedRate=01:17:13
scheduleWithFixedDelay=01:17:14
scheduleAtFixedRate=01:17:15
scheduleAtFixedRate=01:17:17
scheduleWithFixedDelay=01:17:17
scheduleAtFixedRate=01:17:19
scheduleWithFixedDelay=01:17:20
scheduleAtFixedRate=01:17:21
咱们把工作耗时调整到超过工夫距离,比方改成睡眠 3 秒,察看输入后果。
scheduleAtFixedRate 因为工作执行的耗时比工夫距离长,依照规定上次工作执行完结之后立即执行,所以变成以 3 秒的工夫距离执行。
scheduleWithFixedDelay 因为工作耗时用了 3 秒,导致前面的工夫距离都成了 5 秒。
scheduleWithFixedDelay=01:46:21
scheduleAtFixedRate=01:46:21
scheduleAtFixedRate=01:46:24
scheduleWithFixedDelay=01:46:26
scheduleAtFixedRate=01:46:27
scheduleAtFixedRate=01:46:30
scheduleWithFixedDelay=01:46:31
scheduleAtFixedRate=01:46:33
scheduleWithFixedDelay=01:46:36
scheduleAtFixedRate=01:46:36
OK,最初来说说实现原理:
- 首先咱们通过调用 schedule 的几个办法,把工作增加到 ScheduledThreadPoolExecutor 去执行
- 接管到工作之后,会通过申请参数的延迟时间计算出真正须要执行工作的工夫,而后把工作封装成 RunnableScheduledFuture
- 而后把封装之后的工作增加到提早队列中,工作 ScheduledFutureTask 实现了 comparable 接口,把工夫越小的工作放在队列头,如果工夫一样,则会通过 sequenceNumber 去比拟,也就是执行工夫雷同,先提交的先执行
- 最初线程池会从提早队列中去获取工作执行,如果是一次性的工作,执行之后删除队列中的工作,如果是反复执行的,则再次计算工夫,而后把工作增加到提早队列中
CompletionService
记得下面我将 ThreadPoolExecutor 的办法吗,其中有一个 invokeAny 的办法,批量提交工作,只有有一个实现了,就间接返回,而不必始终傻傻地等,他的实现就是应用了 CompletionService,我给你看一段源码。
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
}
看到了吧,OK,在咱们想试试应用这个类之前,咱们先试试 invokeAny 好使不。
public class CompletionServiceTest {
private static final int TOTAL = 10;
private static ExecutorService executorService = Executors.newFixedThreadPool(TOTAL);
public static void main(String[] args) throws Exception {testInvokeAny();
}
private static void testInvokeAny() throws Exception {List<TestTask> taskList = new LinkedList<>();
for (int i = 0; i < TOTAL; i++) {taskList.add(new TestTask(i));
}
String value = executorService.invokeAny(taskList, 60, TimeUnit.SECONDS);
System.out.println("get value =" + value);
executorService.shutdown();}
static class TestTask implements Callable<String> {
private Integer index;
public TestTask(Integer index) {this.index = index;}
@Override
public String call() throws Exception {long sleepTime = ThreadLocalRandom.current().nextInt(1000, 10000);
System.out.println("task-" + index + "sleep" + sleepTime + "Ms");
Thread.sleep(sleepTime);
return "task-" + index;
}
}
}
// 输入
task-7 sleep 3072 Ms
task-4 sleep 1186 Ms
task-3 sleep 6182 Ms
task-9 sleep 7411 Ms
task-0 sleep 1882 Ms
task-1 sleep 8274 Ms
task-2 sleep 4789 Ms
task-5 sleep 8894 Ms
task-8 sleep 7211 Ms
task-6 sleep 5959 Ms
get value = task-4
看到成果了吧,耗时最短的工作返回,整个流程就完结了,那咱们试试本人用 CompletionService 来实现这个成果看看。
public static void main(String[] args) throws Exception {// testInvokeAny();
testCompletionService();}
private static void testCompletionService() {CompletionService<String> completionService = new ExecutorCompletionService(executorService);
List<Future> taskList = new LinkedList<>();
for (int i = 0; i < TOTAL; i++) {taskList.add(completionService.submit(new TestTask(i)));
}
String value = null;
try {for (int i = 0; i < TOTAL; i++) {value = completionService.take().get();
if (value != null) {System.out.println("get value =" + value);
break;
}
}
} catch (Exception e) {e.printStackTrace();
} finally {
taskList.forEach(task -> {task.cancel(true);
});
}
executorService.shutdown();}
// 输入
task-4 sleep 5006 Ms
task-1 sleep 4114 Ms
task-2 sleep 4865 Ms
task-5 sleep 1592 Ms
task-3 sleep 6190 Ms
task-7 sleep 2482 Ms
task-8 sleep 9405 Ms
task-9 sleep 8798 Ms
task-6 sleep 2040 Ms
task-0 sleep 2111 Ms
get value = task-5
成果是一样的,咱们只是实现了一个简化版的 invokeAny 性能,应用起来也挺简略的。
实现原理也挺简略的,哪个工作先实现,就把他丢到阻塞队列里,这样取工作后果的时候间接从队列里拿,必定是拿到最新的那一个。
异步后果
通常,咱们都会用 FutureTask 来获取线程异步执行的后果,基于 AQS 实现。
这个没有说太多的必要,看看几个办法就行了。
public V get();
public V get(long timeout, TimeUnit unit);
public boolean cancel(boolean mayInterruptIfRunning);
get 会阻塞的获取线程异步执行的后果,个别不倡议间接应用,最好是应用带超时工夫的 get 办法。
咱们能够通过 cancel 办法去尝试勾销工作的执行,参数代表是否反对中断,如果工作未执行,那么能够间接勾销,如果工作执行中,应用 cancel(true) 会尝试中断工作。
CompletableFuture
之前咱们都在应用 Future,要么只能用 get 办法阻塞,要么就用 isDone 来判断,JDK1.8 之后新增了 CompletableFuture 用于 异步编程,它针对 Future 的性能减少了回调能力,能够帮忙咱们简化异步编程。
CompletableFuture 次要蕴含四个静态方法去创建对象,次要区别在于 supplyAsync 返回计算结果,runAsync 不返回,另外两个办法则是能够指定线程池,如果不指定线程池则默认应用 ForkJoinPool,默认线程数为 CPU 核数。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
上面看看他的那些恶心人的几十个办法,我预计能疯。
串行
串行就不必解释了,A->B->C 依照程序执行,下一个工作必须等上一个工作执行实现才能够。
次要蕴含 thenApply、thenAccept、thenRun 和 thenCompose,以及他们对应的带 async 的异步办法。
为了不便记忆咱们要记住,有 apply 的有传参有返回值,带 accept 的有传参然而没有返回值,带 run 的啥也没有,带 compose 的会返回一个新的 CompletableFuture 实例。
public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(1000);
System.out.println(Thread.currentThread() + "工作实现");
return "supplyAsync";
} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
CompletableFuture newFuture = future.thenApply((ret) -> {System.out.println(Thread.currentThread() + "thenApply=>" + ret);
return "thenApply";
}).thenAccept((ret) -> {System.out.println(Thread.currentThread() + "thenAccept=>" + ret);
}).thenRun(() -> {System.out.println(Thread.currentThread() + "thenRun");
});
CompletableFuture<String> composeFuture = future.thenCompose((ret) -> {System.out.println(Thread.currentThread() + "thenCompose=>" + ret);
return CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(1000);
System.out.println(Thread.currentThread() + "thenCompose 工作实现");
return "thenCompose";
} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
});
System.out.println(future.get());
System.out.println(newFuture.get());
System.out.println(composeFuture.get());
}
// 输入
Thread[ForkJoinPool.commonPool-worker-9,5,main]工作实现
Thread[ForkJoinPool.commonPool-worker-9,5,main]thenCompose=>supplyAsync
Thread[main,5,main]thenApply=>supplyAsync
Thread[main,5,main]thenAccept=>thenApply
Thread[main,5,main]thenRun
supplyAsync
null
Thread[ForkJoinPool.commonPool-worker-2,5,main]thenCompose 工作实现
thenCompose
AND 聚合
这个意思是下一个工作执行必须等前两个工作实现能够。
次要蕴含 thenCombine、thenAcceptBoth、runAfterBoth,以及他们对应的带 async 的异步办法,区别和下面一样。
public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(1000);
System.out.println(Thread.currentThread() + "A 工作实现");
return "A";
} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(2000);
System.out.println(Thread.currentThread() + "B 工作实现");
return "B";
} catch (InterruptedException e) {throw new RuntimeException(e);
}
});
CompletableFuture newFuture = future.thenCombine(future2, (ret1, ret2) -> {System.out.println(Thread.currentThread() + "thenCombine=>" + ret1 + "," + ret2);
return "thenCombine";
}).thenAcceptBoth(future2, (ret1, ret2) -> {System.out.println(Thread.currentThread() + "thenAcceptBoth=>" + ret1 + "," + ret2);
}).runAfterBoth(future2, () -> {System.out.println(Thread.currentThread() + "runAfterBoth");
});
System.out.println(future.get());
System.out.println(future2.get());
System.out.println(newFuture.get());
}
// 输入
Thread[ForkJoinPool.commonPool-worker-9,5,main]A 工作实现
A
Thread[ForkJoinPool.commonPool-worker-2,5,main]B 工作实现
B
Thread[ForkJoinPool.commonPool-worker-2,5,main]thenCombine=>A,B
Thread[ForkJoinPool.commonPool-worker-2,5,main]thenAcceptBoth=>thenCombine,B
Thread[ForkJoinPool.commonPool-worker-2,5,main]runAfterBoth
null
Or 聚合
Or 聚合代表只有多个工作中有一个实现了,就能够持续上面的工作。
次要蕴含 applyToEither、acceptEither、runAfterEither,以及他们对应的带 async 的异步办法,区别和下面一样,不再举例了。
回调 / 异样解决
whenComplete、handle 代表执行实现的回调,肯定会执行,exceptionally 则是工作执行产生异样的回调。
public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(1000);
int a = 1 / 0;
return "success";
} catch (Exception e) {throw new RuntimeException(e);
}
});
CompletableFuture newFuture = future.handle((ret, exception) -> {System.out.println(Thread.currentThread() + "handle exception=>" + exception.getMessage());
return "handle";
});
future.whenComplete((ret, exception) -> {System.out.println(Thread.currentThread() + "whenComplete exception=>" + exception.getMessage());
});
CompletableFuture exceptionFuture = future.exceptionally((e) -> {System.out.println(Thread.currentThread() + "exceptionally exception=>" + e.getMessage());
return "exception";
});
System.out.println("task future =" + future.get());
System.out.println("handle future =" + newFuture.get());
System.out.println("exception future =" + exceptionFuture.get());
}
// 输入
Thread[ForkJoinPool.commonPool-worker-9,5,main]exceptionally exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
Thread[main,5,main]whenComplete exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
Thread[ForkJoinPool.commonPool-worker-9,5,main]handle exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.example.demo.CompletableFutureTest3.main(CompletableFutureTest3.java:31)
Caused by: java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
at com.example.demo.CompletableFutureTest3.lambda$main$0(CompletableFutureTest3.java:13)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.ArithmeticException: / by zero
at com.example.demo.CompletableFutureTest3.lambda$main$0(CompletableFutureTest3.java:10)
... 6 more
阻塞队列
并发编程中,队列是其中不可短少的一环,其实后面在说到线程池的时候,就曾经提及到了阻塞队列了,这里咱们要一起看看 JUC 包下提供的这些队列。
阻塞队列中的阻塞蕴含两层意思:
- 插入的时候,如果阻塞队列满,插入元素阻塞
- 删除 / 查问的时候,如果阻塞队列空,删除 / 查问元素阻塞
上面列出队列的一些插入和删除元素的办法,一个个来说:
add:向队列尾部插入元素,插入胜利返回 true,队列满则抛出 IllegalStateException("Queue full")
异样
offer:向队列尾部插入元素,队列满返回 false,否则返回 true,带超时的则是会阻塞,达到超时工夫后返回
put:向队列尾部插入元素,队列满会始终阻塞
remove:删除队列头部元素,删除胜利返回 true,队列空则抛出 NoSuchElementException
异样
poll:删除队列头部元素,删除胜利返回队列头部元素,队列空返回 null,带超时的则是会阻塞,达到超时工夫后返回
take:删除队列头部元素,队列空会始终阻塞
element:查问队列头部元素,并且返回,队列空则抛出 NoSuchElementException
异样
peek:查问队列头部元素,并且返回
ArrayBlockingQueue
ArrayBlockingQueue 从名字就晓得,基于数组实现的有界阻塞队列,基于 AQS 反对偏心和非偏心策略。
还是看构造函数吧,能够传入初始数组大小,一旦设置之后大小就不能扭转了,传参能够反对偏心和非偏心,最初一个构造函数能够反对传入汇合进行初始化,然而长度不能超过 capacity,否则抛出 ArrayIndexOutOfBoundsException
异样。
public ArrayBlockingQueue(int capacity);
public ArrayBlockingQueue(int capacity, boolean fair);
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c);
这个其实在下面介绍 Condition 的时候咱们就曾经实现过他了,这里就不再说了,能够参考下面 Condition 的局部。
LinkedBlockingQueue
LinkedBlockingQueue 基于链表实现的有界阻塞队列。
应用无参构造函数则链表长度为 Integer.MAX_VALUE,另外两个构造函数和 ArrayBlockingQueue 差不多。
public LinkedBlockingQueue();
public LinkedBlockingQueue(int capacity);
public LinkedBlockingQueue(Collection<? extends E> c);
咱们能够看看 put 和 take 的源码。
- 首先加锁中断
- 而后判断如果达到了队列的最大长度,那么就阻塞期待,否则就把元素插入到队列的尾部
- 留神这里和 ArrayBlockingQueue 有个区别,这里再次做了一次判断,如果队列没满,唤醒因为 put 阻塞的线程,为什么要做判断,因为他们不是一把锁
- 最初的逻辑是一样的,notEmpty 唤醒
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {while (count.get() == capacity) {notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();} finally {putLock.unlock();
}
if (c == 0)
signalNotEmpty();}
private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {notEmpty.signal();
} finally {takeLock.unlock();
}
}
take 的逻辑也是十分相似啊。
- 加锁中断
- 判断队列是不是空了,空了的话就阻塞期待,否则就从队列移除一个元素
- 而后再次做一次判断,队列要是不空,就唤醒阻塞的线程
- 最初唤醒 notFull 的线程
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {while (count.get() == 0) {notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();} finally {takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {notFull.signal();
} finally {putLock.unlock();
}
}
PriorityBlockingQueue
PriorityBlockingQueue 是反对优先级的无界阻塞队列,默认排序依照天然排序升序排列。
几个构造函数,无参构造函数初始容量为 11,能够自定义,也能够在创立的时候传入 comparator 自定义排序规定。
public PriorityBlockingQueue();
public PriorityBlockingQueue(int initialCapacity);
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator);
public PriorityBlockingQueue(Collection<? extends E> c);
间接看 put 和 take 办法吧,前面都这样,其余的就疏忽好了,找到 put 之后,发现间接就是调用的 offer,那咱们就间接看 offer 的实现。
- 首先还是加锁,而后看以后元素个数是否达到了数组的下限,到了就调用 tryGrow 去扩容。
- 看是否实现了 Comparator 接口,是的话就用 Comparator 去排序,否则就用 Comparable 去比拟,如果两个都没有,会报错
- notEmpty 唤醒,最初解锁
public void put(E e) {offer(e); // never need to block
}
public boolean offer(E e) {if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();} finally {lock.unlock();
}
return true;
}
这里,咱们要持续关注一下这个扩容的逻辑,到底是怎么解决的?代码不长,然而看着很方的样子。
- 首先,先开释锁,因为上面用 CAS 解决,预计怕扩容工夫太长阻塞的线程太多
- 而后 CAS 批改 allocationSpinLock 为 1
- CAS 胜利的话,进行扩容的逻辑,如果长度小于 64 就扩容一倍,否则扩容一半
- 之前咱们说他无界,其实不太对,这里就判断是否超过了最大长度,MAX_ARRAY_SIZE = Integer.MAX_VALUE – 8,判断一下有可能会抛出内存溢出异样
- 而后创立一个新的对象数组,并且 allocationSpinLock 从新复原为 0
- 执行了一次 Thread.yield(),让出 CPU,因为有可能其余线程正在扩容,让大家争抢一下
- 最初确保新的对象数组创立胜利了,也就是扩容是没有问题的,再次加锁,数组拷贝,完结
private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {allocationSpinLock = 0;}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
take 的逻辑根本一样,最多有个排序的逻辑在外面,就不再多说了。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {while ( (result = dequeue()) == null)
notEmpty.await();} finally {lock.unlock();
}
return result;
}
DelayQueue
DelayQueue 是反对延时的无界阻塞队列,这个在咱们聊 ScheduledThreadPoolExecutor 也谈到过,外面也应用了提早队列,只不过是它本人的一个外部类,DelayQueue 外部其实应用 PriorityQueue 来实现。
DelayQueue 的用法是增加元素的时候能够设置一个延迟时间,当工夫到了之后能力从队列中取出来,应用 DelayQueue 中的对象必须实现 Delayed 接口,重写 getDelay 和 compareTo 办法,就像这样,那实现其实能够看 ScheduledThreadPoolExecutor 外面是怎么做的,这里我就不论那么多,示意一下就好了。
public class Test {public static void main(String[] args) throws Exception {DelayQueue<User> delayQueue = new DelayQueue<>();
delayQueue.put(new User(1, "a"));
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User implements Delayed {
private Integer id;
private String username;
@Override
public long getDelay(TimeUnit unit) {return 0;}
@Override
public int compareTo(Delayed o) {return 0;}
}
}
咱们能够看看他的属性和构造函数,呐看到了吧,应用的 PriorityQueue,另外构造函数比较简单了,不说了。
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
public DelayQueue();
public DelayQueue(Collection<? extends E> c);
OK,没啥故障,这里咱们要先看 take 办法,不能先看 put,否则我感觉闹不明确。
- 来第一步加锁,如果头结点是空的,也就是队列是空的话,阻塞,没啥好说的
- 反之队列有货色,咱们就要去取了嘛,然而这里要看对象本人实现的 getDelay 办法取得提早的工夫,如果提早的工夫小于 0,那阐明到工夫了,能够执行了,poll 返回
- 第一次,leader 线程必定是空的,线程阻塞 delay 的工夫之后才开始执行,齐全没故障,而后 leader 从新 置为 null
- 当 leader 不是 null 的时候,阐明其余线程在操作了,所以阻塞期待唤醒
- 最初,leader 为 null,唤醒阻塞中的线程,解锁
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {for (;;) {E first = q.peek();
if (first == null)
available.await();
else {long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {Thread thisThread = Thread.currentThread();
leader = thisThread;
try {available.awaitNanos(delay);
} finally {if (leader == thisThread)
leader = null;
}
}
}
}
} finally {if (leader == null && q.peek() != null)
available.signal();
lock.unlock();}
}
而后再来看 put 就会简略多了,put 还是间接调用的 offer,看 offer 办法。
这里应用的是 PriorityQueue 的 offer 办法,其实和咱们下面说到的 PriorityBlockingQueue 差不多,不再多说了,增加到队列头部之后,leader 置为 null,唤醒,完结了。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();}
return true;
} finally {lock.unlock();
}
}
SynchronousQueue&LinkedTransferQueue
为什么这两个放一起说呢。。。因为这源码真的不想在这里说一遍,这俩源码能够独自出一个专题来写,长篇精悍文章不适宜他他们,就简略先理解下。
SynchronousQueue 是一个不存储元素的阻塞队列,每个 put 必须期待 take,否则不能持续增加元素。
如果你还记得咱们下面说到线程池的中央,newCachedThreadPool 默认就是应用的 SynchronousQueue。
他就两个构造方法,你一看就晓得,对吧,反对偏心和非偏心,当然你也别问默认是啥,问就是非偏心。
public SynchronousQueue();
public SynchronousQueue(boolean fair);
次要靠外部抽象类 Transferer,他的实现次要有两个,TransferQueue 和 TransferStack。
留神:如果是偏心模式,应用的是 TransferQueue 队列,非偏心则应用 TransferStack 栈。
abstract static class Transferer<E> {abstract E transfer(E e, boolean timed, long nanos);
}
LinkedTransferQueue 是链表组成的无界阻塞队列,看他外部类就晓得了,这是个链表实现。
static final class Node {
final boolean isData; // 标记生产者或者消费者
volatile Object item; // 值
volatile Node next; // 下一个节点
volatile Thread waiter;
}
LinkedBlockingDeque
LinkedBlockingDeque 是链表组成的双向阻塞队列,它反对从队列的头尾进行进行插入和删除元素。
构造函数有 3 个,不传初始容量就是 Integer 最大值。
public LinkedBlockingDeque() {this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity);
public LinkedBlockingDeque(Collection<? extends E> c);
看下双向链表的构造:
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {item = x;}
}
因为是双向链表,所以比其余的队列多了一些办法,比方 add、addFirst、addLast,add 其实就是 addLast,offer、put 也是相似。
咱们能够辨别看一下 putFirst 和 putLast,次要区别就是 linkFirst 和 linkLast,别离去队列头部和尾部增加新节点,其余基本一致。
public void putFirst(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {while (!linkFirst(node))
notFull.await();} finally {lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {while (!linkLast(node))
notFull.await();} finally {lock.unlock();
}
}
结尾
本次长篇内容参考书籍和文档
- Java 并发编程的艺术
- Java 并发编程之美
- Java 并发编程实战
- Java 8 实战
- 极客工夫:Java 并发编程实战
OK,本期内容到此结束,我是艾小仙,咱们过两个月再见。