java.util.concurrent并发编程包,这个包下都是Java解决线程相干的类

虚伪唤醒

多个线程中应用wait办法的时候应始终定义在while中,wait在哪里睡就在哪里醒,会持续往下判断,如果应用的是if只会执行一次

当初有四个线程,AB做加法,CD做减法:

public class Test {    public static void main(String[] args) {        TestDemo testDemo = new TestDemo();        new Thread(() -> {            for (int i = 0; i < 5; i++) {                try {                    testDemo.incr();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }, "A").start();        new Thread(() -> {            for (int i = 0; i < 5; i++) {                try {                    testDemo.incr();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }, "B").start();        new Thread(() -> {            for (int i = 0; i < 5; i++) {                try {                    testDemo.decr();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }, "C").start();        new Thread(() -> {            for (int i = 0; i < 5; i++) {                try {                    testDemo.decr();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }, "D").start();    }}class TestDemo {    private int number = 0;    public synchronized void incr() throws InterruptedException {        if (number != 0) {            this.wait();        }        number++;        System.out.println(Thread.currentThread().getName() + " : " + number);        this.notifyAll();    }    public synchronized void decr() throws InterruptedException {        if (number == 0) {            this.wait();        }        number--;        System.out.println(Thread.currentThread().getName() + " : " + number);        this.notifyAll();    }}

下面的代码会呈现虚伪唤醒的状况,咱们来试着剖析一下为什么?

假如:A获取锁执行++;A再次获取锁判断number!=0,这时候阻塞;C获取锁执行--;B获取锁执行++;A获取锁,从以后地位醒来持续往下执行,又对number进行了++操作,所以失去2...

为了解决这种状况的产生,咱们应该在每次醒来时都进行判断,将if改为while即可:

while (number != 0) {    this.wait();}

Lock实现案例

Locksynchronized的区别 →Lock是接口而synchronized是关键字,Lock有着比synchronized更宽泛的锁的操作

// 创立Lockprivate Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();public void incr() throws InterruptedException {    lock.lock();    try {        while (number != 0) {            condition.await();        }        number++;        System.out.println(Thread.currentThread().getName() + " : " + number);        condition.signalAll();    } finally {        lock.unlock();    }}

Condition它用来代替传统的Object的wait ()notify ()实现线程间的合作,依赖于Lock接口,需注意:传统的wait办法会主动开释锁,而应用lock需手动开释

线程汇合不平安

汇合自身的办法上并没有synchronized 关键字,所以是不平安的,看源码:

public boolean add(E var1) {    this.ensureCapacityInternal(this.size + 1);    this.elementData[this.size++] = var1;    return true;}

示例代码:

List<String> list = new ArrayList<>();for (int i = 0; i < 30; i++) {    new Thread(() -> {        list.add(UUID.randomUUID().toString().substring(0, 8));        System.out.println(list);    }, String.valueOf(i)).start();}

执行下面的代码会失去一个ConcurrentModificationException 异样,因为汇合中的办法并不是同步的,所以在多个线程同时写的时候就会抛出异样,如何解决呢?

计划一:应用Vector解决并发批改异样

List<String> list = new Vector<>();

计划二:应用Collections解决并发批改异样

List<String> list = Collections.synchronizedList(new ArrayList<>());

计划三:应用CopyOnWriteArrayList解决并发批改异样

后面两种办法其实并不罕用,个别都是通过写时复制技术来解决,那何为写时复制呢?

汇合在每次写的时候都会将元素复制一份进去,在新的汇合中写,而后再合并,这样就实现了单写多读的操作

List<String> list = new CopyOnWriteArrayList();

HashSet和HashMap线程不平安

跟汇合一样,办法也没有synchronized关键字,也会失去并发批改异样,所以要通过写时复制技术来单写多读

HashSet:

// 通过CopyOnWriteArraySet解决Set<String> set = new CopyOnWriteArraySet<>();

HashMap:

// 通过ConcurrentHashMap解决Map<String, String> map = new ConcurrentHashMap<>();

多线程锁

偏心锁和非偏心锁

偏心锁:多个线程都能失去执行

非偏心锁:谁先抢到谁就执行,其余线程不能执行

ReentrantLock 来配置偏心锁或非偏心锁:

public ReentrantLock(boolean fair) {    sync = fair ? new FairSync() : new NonfairSync();}

能够看到源码中通过truefalse来配置锁

可重入锁

synchronizedLock都是可重入锁,可重入锁即可屡次取得该锁

就比方咱们回家,用钥匙开门之后就能随便进出房间了

Object o = new Object();new Thread(() -> {    synchronized (o) {        System.out.println(Thread.currentThread().getName() + " 外层");        synchronized (o) {            System.out.println(Thread.currentThread().getName() + " 中层");            synchronized (o) {                System.out.println(Thread.currentThread().getName() + " 内层");            }        }    }}, "t1").start();
ReentrantLock lock = new ReentrantLock();new Thread(() -> {    try {        lock.lock();        System.out.println(Thread.currentThread().getName() + " 外层");        try {            lock.lock();            System.out.println(Thread.currentThread().getName() + " 内层");        } finally {            lock.unlock();        }    } finally {        lock.unlock();    }}, "t1").start();

死锁

两个或两个以上线程,因抢夺资源造成相互期待的景象,需外力干预来防止死锁

产生死锁的起因:

  • 资源零碎有余
  • 过程运行推动程序不适合
  • 资源分配不当

public static void main(String[] args) {    new Thread(() -> {        synchronized (a) {            try {                System.out.println(Thread.currentThread().getName() + " waiting...");                TimeUnit.SECONDS.sleep(2);                synchronized (b) {                    System.out.println(Thread.currentThread().getName() + " get b");                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }, "线程A").start();    new Thread(() -> {        synchronized (b) {            try {                System.out.println(Thread.currentThread().getName() + " waiting...");                TimeUnit.SECONDS.sleep(2);                synchronized (a) {                    System.out.println(Thread.currentThread().getName() + " get a");                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }, "线程B").start();}

两个线程都在尝试获取对方线程资源,就造成了死锁,这是通过代码输入来判断是否为死锁,JDK中有一个堆栈跟踪工具,能够通过命令查看是否为死锁

Callable

Runnable接口缺失了一项性能,当线程终止时,无奈取得线程返回的后果,为了反对此性能,Java中提供了Callable接口

这两个接口之间的区别次要是:

  1. 是否有返回值
  2. 是否抛出异样
  3. 实现办法名称不同,一个是run,一个是call
class Demo implements Callable<String> {    @Override    public String call() throws Exception {        System.out.println("test callable...");        return "hello";    }}

应用Callable 就不能间接用Thread来创立线程了,须要应用FutureTask

FutureTask<String> task = new FutureTask<>(new Demo());new Thread(task, "callable").start();System.out.println(task.get());    // 获取call()中的返回值

弱小的辅助类

CountDownLatch缩小计数:

public class CountDownLatchDemo {    public static void main(String[] args) throws InterruptedException {        CountDownLatch countDownLatch = new CountDownLatch(3);        for (int i = 1; i <= 3; i++) {            new Thread(() -> {                System.out.println(Thread.currentThread().getName() + "号同学来到");                // 计数器-1                countDownLatch.countDown();            }, String.valueOf(i)).start();        }        // 当计数器没有变为0时就会始终期待        countDownLatch.await();        System.out.println(Thread.currentThread().getName() + "班长锁门来到了");    }}

班长总是在最初一个才来到,这就是CountDownLatch 的作用

CyclicBarrier循环栅栏

public class CyclicBarrierDemo {    private static final intNUMBER= 7;    public static void main(String[] args) {        CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {            System.out.println("祝贺你集齐七颗龙珠");        });        for (int i = 1; i <= 7; i++) {            new Thread(() -> {                try {                    System.out.println(Thread.currentThread().getName() + "颗龙珠");                    // 期待                    cyclicBarrier.await();                } catch (Exception e) {                    e.printStackTrace();                }            }, String.valueOf(i)).start();        }    }}

只有在集齐七颗龙珠后才会执行CyclicBarrier 中的办法

Semaphore信号灯

public class SemaphoreDemo {    public static void main(String[] args) {        // 设置许可数量,只有三个车位        Semaphore semaphore = new Semaphore(3);        // 模仿六辆汽车        for (int i = 1; i <= 6; i++) {            new Thread(() -> {                try {                    // 抢占车位                    semaphore.acquire();                    System.out.println(Thread.currentThread().getName() + "号车抢到了车位");                    // 设置随机停车工夫                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));                    System.out.println(Thread.currentThread().getName() + "号车来到了车位");                } catch (Exception e) {                    e.printStackTrace();                } finally {                    // 开释车位                    semaphore.release();                }            }, String.valueOf(i)).start();        }    }}

用信号灯模仿停车的场景,只有三个车位,只有当某个车位的车来到了之后,其余的车能力抢占车位

读写锁

在多线程环境下对资源进行读写操作的时候,是可能会产生死锁的,须要用Java提供的读写锁来上锁和解锁,读写锁在读的时候是不能进行写操作的。

写锁:独占锁(一次只能一个线程进行写操作),读锁:共享锁(可多个线程进行读操作)

class Resource {    private Map<String, Object> map = new HashMap<>();    private ReadWriteLock lock = new ReentrantReadWriteLock();    public void put(String key, Object value) {        // 增加写锁        lock.writeLock().lock();        System.out.println(Thread.currentThread().getName() + "正在写操作" + key);        try {            TimeUnit.MICROSECONDS.sleep(300);        } catch (InterruptedException e) {            e.printStackTrace();        }finally {            // 开释锁            lock.writeLock().unlock();        }        map.put(key, value);        System.out.println(Thread.currentThread().getName() + "写完了" + key);    }    public Object get(String key) {        // 增加读锁        lock.readLock().lock();        Object result = null;        System.out.println(Thread.currentThread().getName() + "正在读操作" + key);        try {            TimeUnit.MICROSECONDS.sleep(300);        } catch (InterruptedException e) {            e.printStackTrace();        }finally {            lock.readLock().unlock();        }        result = map.get(key);        System.out.println(Thread.currentThread().getName() + "读完了" + key);        return result;    }}

锁降级:

读写锁在读的时候是不能进行写操作的。咱们能够将写锁降为读锁,读锁不能降级为写锁

public class DowngradeDemo {    public static void main(String[] args) {        ReadWriteLock lock = new ReentrantReadWriteLock();        Lock writeLock = lock.writeLock();        Lock readLock = lock.readLock();        // 锁降级        // 1.获取写锁        writeLock.lock();        System.out.println("write");        // 2.获取读锁        readLock.lock();        System.out.println("read");        // 3.开释写锁和读锁        writeLock.unlock();        readLock.unlock();    }}

阻塞队列

当队列为空时,获取元素将阻塞,直到插入新的元素,当队列满时,增加元素将阻塞

应用阻塞队列的益处就是,咱们不须要关怀什么时候阻塞线程,什么时候唤醒线程,这些操作都交给BlockingQueue来做

// 创立阻塞队列BlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);queue.add("a")queue.add("b")queue.add("c")// Queue fullqueue.add("d")

线程池

一种线程应用模式,保护着多个线程,期待着监督管理,防止了频繁创立与销毁线程的代价,不仅能保障内核的充分利用,还能避免过分调度

线程池应用形式

通过Executors 工具类来创立线程

Executors.newFixedThreadPool(): 一池N线程

Executors.newSingleThreadExecutor(): 一池一线程

Executors.newCachedThreadPool(): 依据需要创立线程,可扩容

public class ThreadPoolDemo {    public static void main(String[] args) {        // 一池N线程        ExecutorService threadPool1 = Executors.newFixedThreadPool(5);        // 一池一线程        ExecutorService threadPool2 = Executors.newSingleThreadExecutor();        // 一池可扩容线程        ExecutorService threadPool3 = Executors.newCachedThreadPool();        // 10个客户申请        for (int i = 1; i <=10 ; i++) {            // 执行            threadPool3.execute(()->{                System.out.println(Thread.currentThread().getName()+"正在办理业务");            });        }        threadPool3.shutdown();    }}

查看源码能够发现Executors调用的办法底层都应用了ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.acc = System.getSecurityManager() == null ?            null :            AccessController.getContext();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}

构造方法中有7个参数,别离是什么意思呢?

corePoolSize :外围(常驻)的线程数量,比方一个银行有10个窗口,平时只凋谢5个窗口

maximumPoolSize:最大线程数量,就好比银行一共有10个窗口

keepAliveTime :线程存活工夫

unit :搭配keepAliveTime 设置线程存活工夫

workQueue :阻塞队列

threadFactory :用于创立线程

handler :回绝策略(多种)

线程池的工作流程和回绝策略

下面的流程图即为线程池的工作流程:首先通过execute()来创立一个池子,外围线程数为2,如果要创立第三个线程,就会放到workQueue中期待,当workQueue满时就会创立新的线程直到

maximumPoolSize满,当maximumPoolSize满时就会执行回绝策略。

JDK内置的回绝策略:

AbortPolicy :抛出 RejectedExecutionException来回绝新工作的解决。

CallerRunsPolicy :“调用者运行”一种调节机制,该策略不会摈弃工作和异样,而是将某些工作回退到调用者,升高新工作的流量。

DiscardPolicy :摈弃队列中期待最久的工作,而后把当前任务增加到队列中,尝试再次提交当前任务。

DiscardOldestPolicy :该策略默默地抛弃无奈解决的工作,不予任何解决也不抛出异样,如果容许工作失落,那这是最好的一种策略。

自定义线程

个别都是用自定义线程,在阿里巴巴开发手册中线程池不容许用Executors去创立,而是通过ThreadPoolExecutor 的形式,这样的解决形式让写的人更加明确线程池的运行规定,躲避资源耗尽的危险。

public class CustomThreadPoolDemo {    public static void main(String[] args) {        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());        // 10个客户申请        for (int i = 1; i <=10 ; i++) {            // 执行            threadPoolExecutor.execute(()->{                System.out.println(Thread.currentThread().getName()+"正在办理业务");            });        }        threadPoolExecutor.shutdown();    }}

分支合并框架(Fork/Join)

能够将一个大的工作拆分成多个子工作进行并行处理,最初将子工作后果合并成最初的计算结果

public class ForkJoinDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        MyTask task = new MyTask(1, 100);        // 创立分支合并池对象        ForkJoinPool forkJoinPool = new ForkJoinPool();        ForkJoinTask<Integer> submit = forkJoinPool.submit(task);        // 获取最终合并之后的后果        System.out.println(submit.get());        forkJoinPool.shutdown();    }}class MyTask extends RecursiveTask<Integer> {    // 拆分时差值不能大于10    private static final Integer VALUE= 10;    private int begin;    private int end;    private int result;    public MyTask(int begin, int end) {        this.begin = begin;        this.end = end;    }    // 拆分和合并的过程    @Override    protected Integer compute() {        if (end - begin <=VALUE) {            // 相加            for (int i = begin; i <= end; i++) {                result = result + i;            }        } else {            // 进一步做拆分            // 获取两头值            int middle = (begin + end) / 2;            // 拆分右边            MyTask task1 = new MyTask(begin, middle);            // 拆分左边            MyTask task2 = new MyTask(middle + 1, end);            task1.fork();            task2.fork();            // 合并后果            result = task1.join() + task2.join();        }        return result;    }}

异步回调

public class AsynchronousCallbackDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 异步调用,无返回值        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {            System.out.println(Thread.currentThread().getName() + "completableFuture1");        });        completableFuture1.get();        // 异步调用,有返回值        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {            System.out.println(Thread.currentThread().getName() + "completableFuture2");            return 1024;        });        completableFuture2.whenComplete((result, exception) -> {            System.out.println("--t--" + result);    // 办法返回值            System.out.println("--u--" + exception);    // 异样信息        }).get();    }}