java.util.concurrent 并发编程包,这个包下都是 Java 解决线程相干的类
虚伪唤醒
多个线程中应用 wait
办法的时候应始终定义在 while
中,wait
在哪里睡就在哪里醒,会持续往下判断,如果应用的是 if
只会执行一次
当初有四个线程,AB 做加法,CD 做减法:
public class Test {public static void main(String[] args) {TestDemo testDemo = new TestDemo();
new Thread(() -> {for (int i = 0; i < 5; i++) {
try {testDemo.incr();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {for (int i = 0; i < 5; i++) {
try {testDemo.incr();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {for (int i = 0; i < 5; i++) {
try {testDemo.decr();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {for (int i = 0; i < 5; i++) {
try {testDemo.decr();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}, "D").start();}
}
class TestDemo {
private int number = 0;
public synchronized void incr() throws InterruptedException {if (number != 0) {this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + ":" + number);
this.notifyAll();}
public synchronized void decr() throws InterruptedException {if (number == 0) {this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + ":" + number);
this.notifyAll();}
}
下面的代码会呈现虚伪唤醒的状况,咱们来试着剖析一下为什么?
假如:A 获取锁执行 ++;A 再次获取锁判断 number!=0,这时候阻塞;C 获取锁执行 --;B 获取锁执行 ++;A 获取锁,从以后地位醒来持续往下执行,又对 number 进行了 ++ 操作,所以失去 2
...
为了解决这种状况的产生,咱们应该在每次醒来时都进行判断,将 if
改为 while
即可:
while (number != 0) {this.wait();
}
Lock 实现案例
Lock
跟 synchronized
的区别 →Lock
是接口而 synchronized
是关键字,Lock
有着比 synchronized
更宽泛的锁的操作
// 创立 Lock
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void incr() throws InterruptedException {lock.lock();
try {while (number != 0) {condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + ":" + number);
condition.signalAll();} finally {lock.unlock();
}
}
Condition
它用来代替传统的 Object 的 wait ()
、notify ()
实现线程间的合作,依赖于 Lock
接口,需注意:传统的 wait
办法会主动开释锁,而应用 lock 需手动开释
线程汇合不平安
汇合自身的办法上并没有 synchronized
关键字,所以是不平安的,看源码:
public boolean add(E var1) {this.ensureCapacityInternal(this.size + 1);
this.elementData[this.size++] = var1;
return true;
}
示例代码:
List<String> list = new ArrayList<>();
for (int i = 0; i < 30; i++) {new Thread(() -> {list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();}
执行下面的代码会失去一个ConcurrentModificationException
异样,因为汇合中的办法并不是同步的,所以在多个线程同时写的时候就会抛出异样,如何解决呢?
计划一:应用 Vector
解决并发批改异样
List<String> list = new Vector<>();
计划二:应用 Collections
解决并发批改异样
List<String> list = Collections.synchronizedList(new ArrayList<>());
计划三:应用 CopyOnWriteArrayList
解决并发批改异样
后面两种办法其实并不罕用,个别都是通过写时复制技术来解决,那何为写时复制呢?
汇合在每次写的时候都会将元素复制一份进去,在新的汇合中写,而后再合并,这样就实现了单写多读的操作
List<String> list = new CopyOnWriteArrayList();
HashSet 和 HashMap 线程不平安
跟汇合一样,办法也没有 synchronized
关键字,也会失去并发批改异样,所以要通过写时复制技术来单写多读
HashSet:
// 通过 CopyOnWriteArraySet 解决
Set<String> set = new CopyOnWriteArraySet<>();
HashMap:
// 通过 ConcurrentHashMap 解决
Map<String, String> map = new ConcurrentHashMap<>();
多线程锁
偏心锁和非偏心锁
偏心锁:多个线程都能失去执行
非偏心锁:谁先抢到谁就执行,其余线程不能执行
ReentrantLock
来配置偏心锁或非偏心锁:
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
能够看到源码中通过 true
或false
来配置锁
可重入锁
synchronized
和 Lock
都是可重入锁,可重入锁即可屡次取得该锁
就比方咱们回家,用钥匙开门之后就能随便进出房间了
Object o = new Object();
new Thread(() -> {synchronized (o) {System.out.println(Thread.currentThread().getName() + "外层");
synchronized (o) {System.out.println(Thread.currentThread().getName() + "中层");
synchronized (o) {System.out.println(Thread.currentThread().getName() + "内层");
}
}
}
}, "t1").start();
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
try {lock.lock();
System.out.println(Thread.currentThread().getName() + "外层");
try {lock.lock();
System.out.println(Thread.currentThread().getName() + "内层");
} finally {lock.unlock();
}
} finally {lock.unlock();
}
}, "t1").start();
死锁
两个或两个以上线程,因抢夺资源造成相互期待的景象,需外力干预来防止死锁
产生死锁的起因:
- 资源零碎有余
- 过程运行推动程序不适合
- 资源分配不当
public static void main(String[] args) {new Thread(() -> {synchronized (a) {
try {System.out.println(Thread.currentThread().getName() + "waiting...");
TimeUnit.SECONDS.sleep(2);
synchronized (b) {System.out.println(Thread.currentThread().getName() + "get b");
}
} catch (InterruptedException e) {e.printStackTrace();
}
}
}, "线程 A").start();
new Thread(() -> {synchronized (b) {
try {System.out.println(Thread.currentThread().getName() + "waiting...");
TimeUnit.SECONDS.sleep(2);
synchronized (a) {System.out.println(Thread.currentThread().getName() + "get a");
}
} catch (InterruptedException e) {e.printStackTrace();
}
}
}, "线程 B").start();}
两个线程都在尝试获取对方线程资源,就造成了死锁,这是通过代码输入来判断是否为死锁,JDK 中有一个堆栈跟踪工具,能够通过命令查看是否为死锁
Callable
Runnable
接口缺失了一项性能,当线程终止时,无奈取得线程返回的后果,为了反对此性能,Java 中提供了 Callable
接口
这两个接口之间的区别次要是:
- 是否有返回值
- 是否抛出异样
- 实现办法名称不同,一个是 run,一个是 call
class Demo implements Callable<String> {
@Override
public String call() throws Exception {System.out.println("test callable...");
return "hello";
}
}
应用 Callable
就不能间接用Thread
来创立线程了,须要应用FutureTask
FutureTask<String> task = new FutureTask<>(new Demo());
new Thread(task, "callable").start();
System.out.println(task.get()); // 获取 call()中的返回值
弱小的辅助类
CountDownLatch
缩小计数:
public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + "号同学来到");
// 计数器 -1
countDownLatch.countDown();}, String.valueOf(i)).start();}
// 当计数器没有变为 0 时就会始终期待
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班长锁门来到了");
}
}
班长总是在最初一个才来到,这就是CountDownLatch
的作用
CyclicBarrier
循环栅栏
public class CyclicBarrierDemo {
private static final intNUMBER= 7;
public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {System.out.println("祝贺你集齐七颗龙珠");
});
for (int i = 1; i <= 7; i++) {new Thread(() -> {
try {System.out.println(Thread.currentThread().getName() + "颗龙珠");
// 期待
cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();
}
}, String.valueOf(i)).start();}
}
}
只有在集齐七颗龙珠后才会执行 CyclicBarrier
中的办法
Semaphore
信号灯
public class SemaphoreDemo {public static void main(String[] args) {
// 设置许可数量,只有三个车位
Semaphore semaphore = new Semaphore(3);
// 模仿六辆汽车
for (int i = 1; i <= 6; i++) {new Thread(() -> {
try {
// 抢占车位
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "号车抢到了车位");
// 设置随机停车工夫
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + "号车来到了车位");
} catch (Exception e) {e.printStackTrace();
} finally {
// 开释车位
semaphore.release();}
}, String.valueOf(i)).start();}
}
}
用信号灯模仿停车的场景,只有三个车位,只有当某个车位的车来到了之后,其余的车能力抢占车位
读写锁
在多线程环境下对资源进行读写操作的时候,是可能会产生死锁的,须要用 Java 提供的读写锁来上锁和解锁,读写锁在读的时候是不能进行写操作的。
写锁:独占锁(一次只能一个线程进行写操作),读锁:共享锁(可多个线程进行读操作)
class Resource {private Map<String, Object> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
// 增加写锁
lock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "正在写操作" + key);
try {TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {e.printStackTrace();
}finally {
// 开释锁
lock.writeLock().unlock();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写完了" + key);
}
public Object get(String key) {
// 增加读锁
lock.readLock().lock();
Object result = null;
System.out.println(Thread.currentThread().getName() + "正在读操作" + key);
try {TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {e.printStackTrace();
}finally {lock.readLock().unlock();}
result = map.get(key);
System.out.println(Thread.currentThread().getName() + "读完了" + key);
return result;
}
}
锁降级:
读写锁在读的时候是不能进行写操作的。咱们能够将写锁降为读锁,读锁不能降级为写锁
public class DowngradeDemo {public static void main(String[] args) {ReadWriteLock lock = new ReentrantReadWriteLock();
Lock writeLock = lock.writeLock();
Lock readLock = lock.readLock();
// 锁降级
// 1. 获取写锁
writeLock.lock();
System.out.println("write");
// 2. 获取读锁
readLock.lock();
System.out.println("read");
// 3. 开释写锁和读锁
writeLock.unlock();
readLock.unlock();}
}
阻塞队列
当队列为空时,获取元素将阻塞,直到插入新的元素,当队列满时,增加元素将阻塞
应用阻塞队列的益处就是,咱们不须要关怀什么时候阻塞线程,什么时候唤醒线程,这些操作都交给 BlockingQueue
来做
// 创立阻塞队列
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);
queue.add("a")
queue.add("b")
queue.add("c")
// Queue full
queue.add("d")
线程池
一种线程应用模式,保护着多个线程,期待着监督管理,防止了频繁创立与销毁线程的代价,不仅能保障内核的充分利用,还能避免过分调度
线程池应用形式
通过Executors
工具类来创立线程
Executors.newFixedThreadPool()
:一池 N 线程
Executors.newSingleThreadExecutor()
:一池一线程
Executors.newCachedThreadPool()
:依据需要创立线程,可扩容
public class ThreadPoolDemo {public static void main(String[] args) {
// 一池 N 线程
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
// 一池一线程
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
// 一池可扩容线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();
// 10 个客户申请
for (int i = 1; i <=10 ; i++) {
// 执行
threadPool3.execute(()->{System.out.println(Thread.currentThread().getName()+"正在办理业务");
});
}
threadPool3.shutdown();}
}
查看源码能够发现 Executors
调用的办法底层都应用了ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造方法中有 7 个参数,别离是什么意思呢?
corePoolSize
:外围(常驻)的线程数量,比方一个银行有 10 个窗口,平时只凋谢 5 个窗口
maximumPoolSize
:最大线程数量,就好比银行一共有 10 个窗口
keepAliveTime
:线程存活工夫
unit
:搭配keepAliveTime
设置线程存活工夫
workQueue
:阻塞队列
threadFactory
:用于创立线程
handler
:回绝策略(多种)
线程池的工作流程和回绝策略
下面的流程图即为线程池的工作流程:首先通过 execute()
来创立一个池子,外围线程数为 2,如果要创立第三个线程,就会放到 workQueue
中期待,当 workQueue
满时就会创立新的线程直到
maximumPoolSize
满,当 maximumPoolSize
满时就会执行回绝策略。
JDK 内置的回绝策略:
AbortPolicy
:抛出 RejectedExecutionException
来回绝新工作的解决。
CallerRunsPolicy
:“调用者运行”一种调节机制,该策略不会摈弃工作和异样,而是将某些工作回退到调用者,升高新工作的流量。
DiscardPolicy
:摈弃队列中期待最久的工作,而后把当前任务增加到队列中,尝试再次提交当前任务。
DiscardOldestPolicy
:该策略默默地抛弃无奈解决的工作,不予任何解决也不抛出异样,如果容许工作失落,那这是最好的一种策略。
自定义线程
个别都是用自定义线程,在阿里巴巴开发手册中线程池不容许用 Executors
去创立,而是通过 ThreadPoolExecutor
的形式,这样的解决形式让写的人更加明确线程池的运行规定,躲避资源耗尽的危险。
public class CustomThreadPoolDemo {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
// 10 个客户申请
for (int i = 1; i <=10 ; i++) {
// 执行
threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName()+"正在办理业务");
});
}
threadPoolExecutor.shutdown();}
}
分支合并框架(Fork/Join)
能够将一个大的工作拆分成多个子工作进行并行处理,最初将子工作后果合并成最初的计算结果
public class ForkJoinDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {MyTask task = new MyTask(1, 100);
// 创立分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
// 获取最终合并之后的后果
System.out.println(submit.get());
forkJoinPool.shutdown();}
}
class MyTask extends RecursiveTask<Integer> {
// 拆分时差值不能大于 10
private static final Integer VALUE= 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
// 拆分和合并的过程
@Override
protected Integer compute() {if (end - begin <=VALUE) {
// 相加
for (int i = begin; i <= end; i++) {result = result + i;}
} else {
// 进一步做拆分
// 获取两头值
int middle = (begin + end) / 2;
// 拆分右边
MyTask task1 = new MyTask(begin, middle);
// 拆分左边
MyTask task2 = new MyTask(middle + 1, end);
task1.fork();
task2.fork();
// 合并后果
result = task1.join() + task2.join();
}
return result;
}
}
异步回调
public class AsynchronousCallbackDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步调用,无返回值
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName() + "completableFuture1");
});
completableFuture1.get();
// 异步调用,有返回值
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "completableFuture2");
return 1024;
});
completableFuture2.whenComplete((result, exception) -> {System.out.println("--t--" + result); // 办法返回值
System.out.println("--u--" + exception); // 异样信息
}).get();}
}