共计 13393 个字符,预计需要花费 34 分钟才能阅读完成。
并发简述
并发通常是用于提高运行在单处理器上的程序的性能。在单 CPU 机器上使用多任务的程序在任意时刻只在执行一项工作。
并发编程使得一个程序可以被划分为多个分离的、独立的任务。一个线程就是在进程中的一个单一的顺序控制流。java 的线程机制是抢占式。
线程的好处是提供了轻量级的执行上下文切换,只改变了程序的执行序列和局部变量。
多线程的主要缺陷:<!– java 编程思想 –>
等待共享资源的时候性能降低。
需要处理线程的额外 CPU 花费。
糟糕的程序设计导致不必要的复杂度。
有可能产生一些病态行为,若饿死、竞争、死锁和活锁。
不同平台导致的不一样。
volatile 关键字
源来:
当程序运行,JVM 会为每一个线程分配一个独立的缓存用于提高执行效率,每一个线程都在自己独立的缓存中操作各自的数据。一个线程在缓冲中对数据进行修改,写入到主存后,其他线程无法得知数据已被更改,仍在操作缓存中已过时的数据,为了解决这个问题,提供了 volatile 关键字,实现内存可见,一旦主存数据被修改,便致使其他线程缓存数据行无效,强制前往主存获取新数据。
Example:内存不可见,导致主线程无法结束。
class ThreadDemo implements Runnable {
// 添加 volatile 关键字可实现内存可见性 public volatile boolean flag = false;
public boolean flag = Boolean.false;
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
flag = Boolean.true;
System.out.println(“ThreadDemo over”);
}
public boolean isFlag() {
return flag;
}
}
public class TestVolatile {
public static void main(String[] args) {
ThreadDemo demo = new ThreadDemo();
new Thread(demo).start();
while (true) {
if (demo.flag || demo.isFlag()) {
System.out.println(“Main over”);
break;
}
}
}
}/*output:打印 ThreadDemo over,主线程持续循环 */
作用:
当多个线程操作共享数据时,保证内存中的数据可见性。采用底层的内存栅栏,及时的将缓存中修改的数据刷新到主存中,并导致其他线程所缓存的数据无效,使得这些线程必须去主存中获取修改的数据。
优缺点:
保证内存可见性,让各个线程能够彼此获取最新的内存数据。
较传统 synchronized 加锁操作提高了效率,若有线程正在操作被 synchronized 修饰的代码块数据时,其他线程试图进行操作,发现已被其他线程占用,试图操作的线程必须挂起,等到下一次继续尝试操作。
对 volatile 修饰的数据被修改后,其他线程必须前往主存中读取,若修改频繁,需要不断读取主存数据,效率将会降低。
使用 volatile,底层采用内存栅栏,JVM 将不会对其提供指令重排序及其优化。
不具备互斥性。多个线程可以同时对数据进行操作,只是由原来的在缓存操作转变成了直接在主存中操作。(synchronized 是互斥的,一个线程正在执行,其他线程必须挂起等待)
不保证变量的原子性。使用 volatile 仅仅是一个能保证可见性的轻量级同步策略。
原子变量与 CAS 算法
Example:使用 volatile 修饰,number 自增问题。
class ThreadDemo implements Runnable {
public volatile int number = 0;
@Override
public void run() {
try {
Thread.sleep(200);
} catch (Exception e) {
}
System.out.print(getIncrementNumber() + ” “);
}
public int getIncrementNumber() {
return ++number;
}
}
public class TestAtomic {
public static void main(String[] args) {
ThreadDemo demo = new ThreadDemo();
for (int i = 0; i < 10; i++) {
new Thread(demo).start();
}
}
}/*output: 1 5 4 7 3 9 2 1 8 6 */
// ++number 底层原理思想
int temp = number; // ①
number = number + 1; // ②
temp = number; // ③
return temp; // ④
由 ++number 可知,返回的是 temp 中存储的值,且自增是一个多步操作,当多个线程调用 incrementNumber 方法时,方法去主存中获取 number 值放入 temp 中,根据 CPU 时间片切换,当 A 线程完成了 ③ 操作时,时间片到了被中断,A 线程开始执行 ① 时不幸被中断,接着 A 获取到了 CPU 执行权,继续执行完成 ④ 操作更新了主存中的值,紧接着 B 线程开始执行,但是 B 线程 temp 中存储的值已经过时了。注意:自增操作为四步,只有在第四步的时候才会刷新主存的值,而不是 number = number + 1 操作就反映到主存中去。如图所示:
源来:
volatile 只能保证内存可见性,对多步操作的变量,无法保证其原子性,为了解决这个问题,提供了原子变量。
作用:
原子变量既含有 volatile 的内存可见性,又提供了对变量原子性操作的支持,采用底层硬件对并发操作共享数据的 CAS(Compare-And-Swap)算法,保证数据的原子性。
提供的原子类:
类
描述
AtomicBoolean
一个 boolean 值可以用原子更新。
AtomicInteger
可能原子更新的 int 值。
AtomicIntegerArray
一个 int 数组,其中元素可以原子更新。
AtomicIntegerFieldUpdater<T>
基于反射的实用程序,可以对指定类的指定的 volatile int 字段进行原子更新。
AtomicLong
一个 long 值可以用原子更新。
AtomicLongArray
可以 long 地更新元素的 long 数组。
AtomicLongFieldUpdater<T>
基于反射的实用程序,可以对指定类的指定的 volatile long 字段进行原子更新。
AtomicMarkableReference<V>
AtomicMarkableReference 维护一个对象引用以及可以原子更新的标记位。
AtomicReference<V>
可以原子更新的对象引用。
AtomicReferenceArray<E>
可以以原子方式更新元素的对象引用数组。
AtomicReferenceFieldUpdater<T,V>
一种基于反射的实用程序,可以对指定类的指定的 volatile volatile 引用原子更新。
AtomicStampedReference<V>
AtomicStampedReference 维护对象引用以及可以原子更新的整数“印记”。
DoubleAccumulator
一个或多个变量一起维护使用提供的功能更新的运行的值 double。
DoubleAdder
一个或多个变量一起保持初始为零 double 和。
LongAccumulator
一个或多个变量,它们一起保持运行 long 使用所提供的功能更新值。
LongAdder
一个或多个变量一起保持初始为零 long 总和。
CAS 算法:
CAS(Compare-And-Swap)是底层硬件对于原子操作的一种算法,其包含了三个操作数:内存值(V),预估值(A),更新值(B)。当且仅当 V == A 时,执行 V = B 操作;否则不执行任何结果。这里需要注意,A 和 B 两个操作数是原子性的,同一时刻只能有一个线程进行 AB 操作。
优缺点:
操作失败时,直接放弃结果,并不释放对 CPU 的控制权,进而可以继续尝试操作,不必挂起等待。(synchronized 会让出 CPU)
当多个线程并发的对主存中的数据进行操作时,有且只有一个会成功,其余均失败。
原子变量中封装了用于对数据的原子操作,简化了代码的编写。
Collection 并发类
HashMap 与 HashTable 简述
HashMap 是线程不安全的,而 HashTable 是线程安全的,因为 HashTable 所维护的 Hash 表存在着独占锁,当多个线程并发访问时,只能有一个线程可进行操作,但是对于复合操作时,HashTable 仍然存在线程安全问题,不使用 HashTable 的主要原因还是效率低下。
// 功能:不包含 obj,则添加
if (!hashTable.contains(obj)) {
// 复合操作,执行此处时线程中断,obj 被其他线程添加至容器中,此处继续执行将导致重复添加
hashTable.put(obj);
}
可知上述两个操作需要“原子性”,为了达到效果,还不是得对代码块进行同步
ConcurrentHashMap
采用锁分段机制,分为 16 个段(并发级别),每一个段下有一张表,该表采用链表结构链接着各个元素,每个段都使用独立的锁。当多个线程并发操作的时候,根据各自的级别不同,操作不同的段,多个线程并行操作,明显提高了效率,其次还提供了复合操作的诸多方法。注:jdk1.8 由原来的数组 + 单向链表结构转换成数据 + 单向链表 + 红黑树结构。
ConcurrentSkipListMap 和 ConcurrentSkipListSet
有序的哈希表,通过跳表实现,不允许 null 作为键或值。ConcurrentSkipListMap 详解
CopyOnWriteArrayList 和 CopyOnWriteArraySet
对 collection 进行写入操作时,将导致创建整个底层数组的副本,而源数组将保留在原地,使得复制的数组在被修改时,读取操作可以安全的执行。当修改完成时,一个原子性的操作将把心的数组换人,使得新的读取操作可以看到新的修改。<!–Java 编程思想 –>
好处之一是当多个迭代器同时遍历和修改列表时,不会抛出 ConcurrentModificationException。
小结:
当期望许多线程访问一个给定 collection 时,ConcurrentHashMap 通常优于同步的 HashMap
ConcurrentSkipListMap 通常优于同步的 TreeMap。
当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList 优于同步的 ArrayList。
并发迭代操作多时,可选择 CopyOnWriteArrayList 和 CopyOnWriteArraySet。
高并发情况下,可选择 ConcurrentSkipListMap 和 ConcurrentSkipListSet
CountDownLatch 闭锁
源由:
当一个修房子的 A 线程正在执行,需要砖头时,开启了一个线程 B 去拉砖头,此时 A 线程需要等待 B 线程的结果后才能继续执行时,但是线程之间都是并行操作的,为了解决这个问题,提供了 CountDownLatch。
作用:
一个同步辅助类,为了保证执行某些操作时,“所有准备事项都已就绪”,仅当某些操作执行完毕后,才能执行后续的代码块,否则一直等待。
CountDownLatch 中存在一个锁计数器,如果锁计数器不为 0 的话,它会阻塞任何一个调用 await() 方法的线程。也就是说,当一个线程调用 await() 方法时,如果锁计数器不等于 0,那么就会一直等待锁计数器为 0 的那一刻,这样就解决了需要等待其他线程执行完毕才执行的需求。
Example:
class ThreadDemo implements Runnable {
private CountDownLatch latch = null;
public ThreadDemo(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println(“execute over”);
} finally {
latch.countDown(); // 必须保证计数器减一
}
}
}
public class TestCountDownLatch {
public static void main(String[] args) {
final int count = 10;
final CountDownLatch latch = new CountDownLatch(count);
ThreadDemo demo = new ThreadDemo(latch);
for (int i = 0; i < count; ++i) {
new Thread(demo).start();
}
try {
latch.await(); // 等待计数器为 0
System.out.println(“ 其他线程结束,继续往下执行 …”);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}/**output:
execute over
…
其他线程结束,继续往下执行 …
*/
细节:
子线程完毕后,必须调用 countDown() 方法使得 锁计数器减一,否则将会导致调用 await() 方法的线程持续等待,尽可能的放置在 finally 中。
锁计数器的个数与子线程数最好相等,只要计数器等于 0,不论是否还存在子线程,await() 方法将得到响应,继续执行后续代码。
Callable 接口
源由:
当开启一个线程执行运算时,可能会需要该线程的计算结果,之前的 implements Runnable 和 extends Thread 的 run() 方法并没有提供可以返回的功能,因此提供了 Callable 接口。Callable 的运行结果,需要使用 FutureTask 类来接受。
Example:
class ThreadDemo implements Callable<Integer> {
private Integer cycleValue;
public ThreadDemo(Integer cycleValue) {
this.cycleValue = cycleValue;
}
@Override
public Integer call() throws Exception {
int result = 0;
for (int i=0; i<cycleValue; ++i) {
result += i;
}
return result;
}
}
public class TestCallable {
public static void main(String[] args) throws Exception {
ThreadDemo demo = new ThreadDemo(Integer.MAX_VALUE);
// 使用 FutureTask 接受结果
FutureTask<Integer> task = new FutureTask<>(demo);
new Thread(task).start();
Integer result = task.get(); // 等待计算结果返回,闭锁
System.out.println(result);
}
}/*output:1073741825 */
Lock 同步锁和 Condition 线程通信控制对象
Lock:在进行性能测试时,使用 Lock 通常会比使用 synchronized 要高效许多,并且 synchronized 的开销变化范围很大,而 Lock 相对稳定。只有在性能调优时才使用 Lock 对象。<!–Java 编程思想 –>
Condition:替代了 Object 监视器方法的使用,描述了可能会与锁有关的条件标量,相比 Object 的 notifyAll(),Condition 的 signalAll() 更安全。Condition 实质上被绑定到一个锁上,使用 newCondition() 方法为 Lock 实例获取 Condition。
Lock 和 Condition 对象只有在困难的多线程问题中才是必须的。<!–Java 编程思想 –>
synchonized 与 Lock 的区别:
synchonized
Lock
隐式锁
显示锁
JVM 底层实现,由 JVM 维护
由程序员手动维护
灵活控制(也有风险)
“虚假唤醒”:当一个线程 A 在等待时,被另一个线程唤醒,被唤醒的线程不一定满足了可继续向下执行的条件,如果被唤醒的线程未满足条件,而又向下执行了,那么称这个现象为“虚假唤醒”。
// 安全的方式,保证退出等待循环前,一定能满足条件
while (条件) {
wait();
}
Example:生产消费者 <!– 参考 Java 编程思想 P712–>
// 产品 car
class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean available = false; // false:无货;true 有货
public void put(){
lock.lock();
try {
while (available) {// 有货等待
condition.await();
}
System.out.println(Thread.currentThread().getName() + “put():进货 ”);
available = true;
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get() {
lock.lock();
try {
while (!available) {// 无货等待
condition.await();
}
System.out.println(Thread.currentThread().getName() + “get():出货 ”);
available = false;
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
// 消费者
class Consume implements Runnable {
private Car car;
public Consume(Car car) {
this.car = car;
}
@Override
public void run() {
for (int i=0; i<TestProduceAndConsume.LOOP_SIZE; ++i) {
car.get();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}
// 生产者
class Produce implements Runnable {
private Car car;
public Produce(Car car) {
this.car = car;
}
@Override
public void run() {
for (int i=0; i<TestProduceAndConsume.LOOP_SIZE; i++) {
car.put();
}
}
}
public class TestProduceAndConsume {
public static final int LOOP_SIZE = 10;
public static void main(String[] args) {
Car car = new Car();
for (int i=0; i<5; ++i) {
Consume consume = new Consume(car);
Produce produce = new Produce(car);
new Thread(consume, i + “–“).start();
new Thread(produce, i + “–“).start();
}
}
}
每一个 对 lock() 的调用都必须紧跟着一个 try-finally 子句,用以保证可以在任何情况下都能释放锁,任务在调用 await()、signal()、signalAll() 之前,必须拥有锁。
lock.lock();
try {
… // 业务代码
} finally {
lock.unlock();
}
ReadWriteLock 读写锁
源由:
上述讲解的锁都是读写一把锁,不论是读或写,都是一把锁解决,当多线程访问数据时,若发生了一千次操作,其中的写操作只执行了一次,数据的更新率非常低,那么每次进行读操作时,都要加锁读取”不会更改的“数据,显然是不必要的开销,因此出现了 ReadWriteLock 读写锁,该对象提供读锁和写锁。
作用:
ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 write 写入操作,那么多个线程可以同时进行持有读锁。而写入锁是独占的,当执行写操作时,其他线程不可写,也不可读。
性能的提升取决于读写操作期间读取数据相对于修改数据的频率,如果读取操作远远大于写入操作时,便能增强并发性。
Example:
class Demo {
private int value = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void read() {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + ” : ” + value);
} finally {
lock.readLock().unlock();
}
}
public void write(int value) {
lock.writeLock().lock();
try {
this.value = value;
System.out.println(“write(” + value + “)”);
} finally {
lock.writeLock().unlock();
}
}
}
class ReadLock implements Runnable {
private Demo demo = null;
public ReadLock(Demo demo) {
this.demo = demo;
}
@Override
public void run() {
for (int i=0; i<20; ++i) {
demo.read();
try {
Thread.sleep(320);
} catch (InterruptedException e) {
}
}
}
}
class WriteLock implements Runnable {
private Demo demo = null;
public WriteLock(Demo demo) {
this.demo = demo;
}
@Override
public void run() {
for (int i=0; i<10; ++i) {
demo.write(i);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
}
}
public class TestReadWriteLock {
public static void main(String[] args) {
Demo demo = new Demo();
ReadLock readLock = new ReadLock(demo);
WriteLock writeLock = new WriteLock(demo);
for (int i=0; i<3; ++i) {
new Thread(readLock, i + “–“).start();
}
new Thread(writeLock).start();
}
}/**output:
0– : 0
1– : 0
2– : 0
write(0)
write(1)
1– : 1
2– : 1
0– : 1
write(2)
write(3)
1– : 3
0– : 3
…
*/
线程池与线程调度
源来:
在传统操作中(如连接数据库),当我们需要使用一个线程的时候,就 直接创建一个线程,线程完毕后被垃圾收集器回收。每一次需要线程的时候,不断的创建与销毁,大大增加了资源的开销。
作用:
线程池维护着一个线程队列,该队列中保存着所有等待着的线程,避免了重复的创建与销毁而带来的开销。
体系结构:
Execuotr:负责线程的使用与调度的根接口。
|- ExecutorService:线程池的主要接口。
|- ForkJoinPool:采用分而治之技术将任务分解。
|- ThreadPoolExecutor:线程池的实现类。
|- ScheduledExecutorService:负责线程调度的子接口。
|- ScheduledThreadPoolExecutor:负责线程池的调度。继承 ThreadPoolExecutor 并实现 ScheduledExecutorService 接口
Executors 工具类 API 描述:
方法
描述
ExecutorService newFixedThreadPool(int nThreads)
创建一个可重用固定数量的无界队列线程池。使用了有限的线程集来执行所提交的所有任务。创建的时候可以一次性预先进行代价高昂的线程分配。
ExecutorService newWorkStealingPool(int parallelism)
创建一个维护足够的线程以支持给定的 parallelism 并行级别的线程池。
ExecutorService newSingleThreadExecutor()
创建一个使用单个线程运行的无界队列的执行程序。
ExecutorService newCachedThreadPool()
创建一个根据需要创建新线程的线程池,当有可用线程时将重新使用以前构造的线程。
ScheduledExecutorService newSingleThreadScheduledExecutor()
创建一个单线程执行器,可以调度命令在给定的延迟之后运行,或定期执行。
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个线程池,可以调度命令在给定的延迟之后运行,或定期执行。
ThreadFactory privilegedThreadFactory()
返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。
补充:
ExecutorService.shutdown():防止新任务被提交,并继续运行被调用之前所提交的所有任务,待任务都完成后退出。
CachedThreadPoo 在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,是 Executor 的首选。仅当这个出现问题时,才需切换 FixedThreadPool。
SingleThreadExecutor:类似于线程数量为 1 的 FixedThreadPool,但它提供了不会存在两个及以上的线程被并发调用的并发。
Example:线程池
public class TestThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; ++i) {
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
String threadName = future.get();
System.out.println(threadName);
}
pool.shutdown(); // 拒绝新任务并等待正在执行的线程完成当前任务后关闭。
}
}/**output:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
…
*/
Example:线程调度
public class TestThreadPool {
public static void main(String[] args) throws Exception {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
for (int i = 0; i < 5; ++i) {
ScheduledFuture<String> future = pool.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName() + ”:” + Instant.now();
}
}, 1, TimeUnit.SECONDS); // 延迟执行单位为 1 秒的任务
String result = future.get();
System.out.println(result);
}
pool.shutdown();
}
}/**output:
pool-1-thread-1:2019-03-18T12:10:31.260Z
pool-1-thread-1:2019-03-18T12:10:32.381Z
pool-1-thread-2:2019-03-18T12:10:33.382Z
pool-1-thread-1:2019-03-18T12:10:34.383Z
pool-1-thread-2:2019-03-18T12:10:35.387Z
*/
<span style=”color: red”> 注意:若没有执行 shutdown() 方法,则线程会一直等待而不停止。</span>
ForkJoinPool 分支 / 合并框架
源由:
在一个线程队列中,假如队头的线程由于某种原因导致了阻塞,那么在该队列中的后继线程需要等待队头线程结束,只要队头一直阻塞,这个队列中的所有线程都将等待。此时,可能其他线程队列都已经完成了任务而空闲,这种情况下,就大大减少了吞吐量。
ForkJoin 的“工作窃取”模式:
当执行一个新任务时,采用分而治之的思想,将其分解成更小的任务执行,并将分解的任务加入到线程队列中,当某一个线程队列没有任务时,会随机从其他线程队列中“偷取”一个任务,放入自己的队列中执行。
Example:
// 求次方:value 为底,size 为次方数
class CountPower extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
public Long value = 0L;
public int size = 0;
public static final Long CRITICAL = 10L; // 阈值
public CountPower(Long value, int size) {
this.value = value;
this.size = size;
}
@Override
protected Long compute() {
// 当要开方的此时 小于 阈值,则计算(视为最小的任务单元)
if(size <= CRITICAL) {
Long sum = 1L;
for (int i=0; i<size; ++i) {
sum *= value;
}
return sum;
} else {
int mid = size / 2;
// 拆分任务,并压入线程队列
CountPower leftPower = new CountPower(value, mid);
leftPower.fork();
CountPower rightPower = new CountPower(value, size – mid);
rightPower.fork();
// 将当前两个任务返回的执行结果再相乘
return leftPower.join() * rightPower.join();
}
}
}
public class TestForkJoinPool {
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool();
CountPower task = new CountPower(2L, 11);
Long result = pool.invoke(task);
System.out.println(result);
}
}/**output: 2048*/
根据分而治之的思想进行分解,需要一个结束递归的条件,该条件内的代码就是被分解的最小单元。使用 fork() 在当前任务正在运行的池中异步执行此任务,即将该任务压入线程队列。调用 join()` 返回计算结果。RecursiveTask 是有返回值的 task,RecursiveAction 则是没有返回值的。
参考
尚硅谷 JUC 视频教程
《java 编程思想》第 21 章 并发