关于多线程:Java多线程学习笔记三-甚欢篇

4次阅读

共计 10224 个字符,预计需要花费 26 分钟才能阅读完成。

使人有乍交之欢, 不若使其无久处之厌《小窗幽记》
很多时候,咱们须要的都不是再多一个线程,咱们须要的线程是许多个, 咱们须要让他们配合。同时咱们还有一个欲望就是复用线程,就是将线程当做一个工人来看,咱们委托线程执行工作,执行实现之后,并不沦亡,而是在存活一段时间,因为咱们可能还须要向线程委托工作,这也就是线程池的思维。本篇讲线程合作和线程池。
忽然发现初遇、相识、甚欢,这几个题目,对于多线程来说,有些不够用了,多线程的体系有些宏大。

wait 和 notify、notifyAll

Java 多线程学习笔记 (一) 初遇篇, 咱们曾经介绍了多线程罕用的 API(废除的接口不做介绍),然而咱们还有两个比拟重要的没有介绍,即 wait(期待)、notify(告诉),这是事实世界中比拟常见的动作,比方你的女朋友说周末想跟你去看电影,而后你灰溜溜的去你女朋友家等她,而后你女朋友说让你等下(wait),她见心上人须要画个妆,画完妆之后,跟你说我画好了(notify),咱们出门吧。
像上面这样:

public class RomaticDateThreadDemo implements Runnable {
    // 画妆标记位
    private  boolean flag;

    private static final String THREAD_GIRLFRIEND = "女朋友";

    public RomaticDateThreadDemo(boolean flag) {this.flag = flag;}

    @Override
    public void run() {synchronized (this) {
            // RomaticDateThreadDemo 的构造函数给的是 false
        
            if (!flag) {
                flag = true; 
                String currentThreadName = Thread.currentThread().getName();
                if (THREAD_GIRLFRIEND.equals(currentThreadName)) {
                    // 输入这句话阐明女朋友线程先进来
                    System.out.println("你死定了, 敢让你女朋友等");
                } else {
                    // 假如男孩子线程先进来
                    System.out.println("........... 女朋友正在化妆中................., 请等一会儿");
                    try {// 期待女朋友唤醒,wait 代表开释对象锁(开释许可证)
                        this.wait();} catch (InterruptedException e) {e.printStackTrace();
                    }
                }
            } else {String currentThreadName = Thread.currentThread().getName();
                if (THREAD_GIRLFRIEND.equals(currentThreadName)) {
                    // 走到这里, 阐明男孩子线程率先被线程调度器选中
                    System.out.println(".......... 要画十秒的妆.............");
                    try {TimeUnit.SECONDS.sleep(5);
                        // 唤醒
                        this.notify();
                        System.out.println(currentThreadName+"说: 咱们走吧");
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                } else {
                    // 走到这里, 阐明女朋友线程率先被线程调度器选中
                    System.out.println(currentThreadName+"说: 咱们走吧");
                    this.notify();}
            }
        }
    }
}
public class WaitDemo {public static void main(String[] args) {RomaticDateThreadDemo romaticDateThreadDemo = new RomaticDateThreadDemo(false);
        Thread you = new Thread(romaticDateThreadDemo);
        you.setName("你");
        Thread girlFriend = new Thread(romaticDateThreadDemo);
        girlFriend.setName("女朋友");
        you.start();
        girlFriend.start();}
}

然而 notify、wait、notifyAll(唤醒所有处于期待中的线程)没有位于 Thread 类下,所有的类都具备这三个办法,那么请问 Java 的设计者是怎么做到呢? 让所有的类都具备了这三个办法呢?当然是在 Object 类中做了,而且做成 public,咱们晓得所有的类都继承自 Object。因为 wait 办法在调用的时候是开释了以后线程持有的锁,那么咱们能够大抵得出一个论断,wait、notify、notifyAll 只能配合 synchronized 应用,显式锁是通过调用 unlock 办法来实现开释锁的,而咱们在 Object 看到,wait、notify、notifyAll 是一个 native(java 和其余语言通信的一种伎俩,因为操作系统大多都采纳 C、C++ 编写而成,而一些文件的操作只能通过调用操作系统提供的接口实现,所以 Java 调用 C、C++ 就通过 native 这种机制来实现调用操作系统的接口)办法。
那么为什么呢? 为什么要讲这三个原属于线程的办法放在所有类中呢?

那这就跟锁的地位有关系了, 咱们晓得锁是放在对象中的,JVM(个别不加阐明说的虚拟机都是 HotSpot,Oracle 发行的虚拟机)会为每个对象保护一个入口集 (Entry Set) 用于存储申请该对象外部锁的线程。除此之外,JVM 还会为每一个对象保护一个期待集 (Wait Set) 的队列,该队列用于存储该对象上的期待线程。

首先 synchronized 是一个对象锁,从面向对象的角度来讲,获取锁是一种行为,开释锁也是一种行为,那么 wait、notify、notifyAll 放在 Obejct 中就是正当的,所有对象都能够是锁,所以锁放在 Object 中,所有的类都间接继承 Object,这个设计正当。

那么当初咱们假如咱们就将 wait、notify、notifyAll 放入 Thread 类中,这个时候线程调用 wait 办法开释对应锁的时候,就要拿到对应的对象,因为所有对象都能够当锁来用,那么这个办法可能还须要一个泛型参数接管存储锁的对象 (一个线程可能持有多把锁,假如如果调用者不阐明要开释哪吧锁,JVM 无从得悉以后线程心愿开释哪吧锁),用来开释锁。除此之外,notify 办法的作用是唤醒相应对象上的期待线程,下面咱们也提到 JVM 会为每一个对象保护一个入口集(Entry Set) 用于存储申请该对象外部锁的线程,JVM 还会为每一个对象保护一个期待集的队列,该对象用于存储该对象上的期待线程。如果这三个办法都放在 Thread 类内,那么这两个队列是间接跟每一个线程挂钩吗?然而线程能够持有的锁可不止一种,那有同学可能这里会讲,还是跟对象挂钩比拟正当,这三个办法都接管一个对象,用于保护这两个队列和做开释锁操作,那如果是这样的话,你为什么不间接放入 Object 中。

应用 wait、notify、notifyAll 来实现生产者消费者模式

生产者消费者模式是 web 中比拟常见的一种模式,比方音讯队列,一方负责生产信息,一方负责生产信息,这是咱们实现解耦的一种形式。本次咱们做的是一个卖包子的案例,生产者负责卖包子,消费者负责卖包子。

public class GoodsStock {
    private int goodsNum;

    public GoodsStock(int goodsNum) {this.goodsNum = goodsNum;}

    public void produceGoods() throws InterruptedException {synchronized (this) {if (goodsNum < 100) {
                goodsNum++;
                System.out.println("正在生产第" + goodsNum + "个包子");
                // 休眠, 避免执行的太快, 影响咱们剖析问题
                TimeUnit.MILLISECONDS.sleep(50);
                // 生产之后, 马上唤醒全副的线程
                notifyAll();} else {System.out.println("生产达到峰值, 打工人能够劳动");
                wait();}
        }
    }
    public void consumerGoods() throws InterruptedException {synchronized (this) {if (goodsNum == 100) {
                goodsNum--;
                System.out.println("正在生产第" + goodsNum + "个包子");
                // 休眠, 避免执行的太快, 影响咱们剖析问题
                TimeUnit.MILLISECONDS.sleep(50);
                // 生产一个, 马上唤醒全副的线程
                notifyAll();}
            if (goodsNum == 0){System.out.println("还未开始生产, 或已将包子全副售出");
                wait();}
        }
    }
}
public class ProduceGoods implements Runnable {
    private GoodsStock goodsStock;

    public ProduceGoods(GoodsStock goodsStock) {this.goodsStock = goodsStock;}

    @Override
    public void run() {while (true) {
            try {goodsStock.produceGoods();
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }
}
public class ConsumerGoods implements Runnable {
    private GoodsStock goodsStock;

    public ConsumerGoods(GoodsStock goodsStock) {this.goodsStock = goodsStock;}

    @Override
    public void run() {while (true) {
            try {goodsStock.consumerGoods();
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }
}
public static void main(String[] args) {GoodsStock goodsStock = new GoodsStock(0);
        ProduceGoods produceGoods = new ProduceGoods(goodsStock);
        ConsumerGoods consumerGoods = new ConsumerGoods(goodsStock);
        Thread p1 = new Thread(produceGoods);
        Thread p2 = new Thread(produceGoods);
        Thread p3 = new Thread(consumerGoods);
        Thread p4 = new Thread(consumerGoods);
        p1.start();
        p2.start();
        p3.start();
        p4.start();}

后果:

wait、notify、notifyAll 存在的问题

下面模仿的生产者消费者模式,在刚开始生产者会率先开始生产,即便消费者先被 CPU 选中,也会陷入期待状态,在生产实现 100 个包子之后,消费者开始生产包子。这个时候就是一种动态平衡了,因为依照咱们的代码,在包子的数量达到 100 的时候,生产者唤醒所有的线程,于是咱们大略就能看到一个动态平衡了,包子的数量总是维持在一百个。然而如果说咱们心愿的场景是消费者线程卖完了仅仅告诉生产者线程呢?生产者生产够 100 个包子后仅唤醒消费者线程呢? 那么 notify 和 notifyAll 就不满足咱们的要求了,notify 仅仅唤醒任意一个处于对象锁下的线程,notifyAll 告诉所有。除此之外 notify 和 notifyAll 只能配合 synchronzed 应用,咱们心愿显式锁也能有相似的操作,实现告诉和唤醒,这也就是 condition 类呈现的起因。

那么 Condition 是如何做到的呢?

咱们再来大抵的看一下 Lock 和 Condition 的源码:

每个 Condition 实例外部都保护了一个用于存储期待线程的队列:

那么生产者线程和消费者线程就放在了两个队列中, 就可能防止误唤醒的景象了。因为 signalAll 只唤醒所属的 Condition 实例上的期待线程。
这样说的可能有点形象,咱们这里举一个例子吧: 假如咱们有两个 Condition 变量,一个叫 con1,一个叫 con2,执行 con1.await()办法的线程,其生命周期的状态就变为期待,并被存入 con1 对应的期待队列中,con1.signal()会随机唤醒处于 cond1 期待队列中的任意一个线程。

condition 版本的生产者消费者模式

基于下面的阐述,咱们就能够实现,生产者生产 100 个包子劳动,消费者把包子卖完再告诉生产者了。

public class GoodsStock {
    private int goodsNum;
    private static final ReentrantLock lock = new ReentrantLock();
    private final Condition produceCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

    public GoodsStock(int goodsNum) {this.goodsNum = goodsNum;}

    public void produceGoods() throws InterruptedException {lock.lock();
        if (goodsNum < 100) {
            goodsNum++;
            System.out.println("正在生产第" + goodsNum + "个包子");
            // 休眠, 避免执行的太快, 影响咱们剖析问题
            TimeUnit.MILLISECONDS.sleep(50);
        }
        if (goodsNum == 100) {System.out.println("生产达到峰值, 打工人能够劳动");
            // 告诉所有的消费者线程
            consumerCondition.signalAll();
            produceCondition.await();}
        lock.unlock();}

    public void consumerGoods() throws InterruptedException {lock.lock();
        if (goodsNum > 0) {System.out.println("正在生产第" + goodsNum-- + "个包子");
            // 休眠, 避免执行的太快, 影响咱们剖析问题
            TimeUnit.MILLISECONDS.sleep(50);
            // 唤醒所有的生产者
        }
        if (goodsNum == 0) {System.out.println("还未开始生产, 或已将包子全副售出");
             // 获取该 produceCondition 变量对应的期待线程队列的长度
            int waitQueueLength = lock.getWaitQueueLength(produceCondition);
            if (waitQueueLength > 0){produceCondition.signalAll();
            }
            // 消费者线程先暂停,放入消费者对应的队列中
            consumerCondition.await();}
        lock.unlock();}
}

CountDownLatch 简介

Thread.join()实现的是一个线程期待另外一个线程完结,有的时候一个线程可能只须要期待其余线程执行特定的操作完结即可,而不用期待这些线程齐全执行结束。咱们能够用条件变量来实现,也能够用更加间接的工具类—JUC 下的 CountDownLatch。
那么问题来了,那么该线程是怎么晓得其余线程特定操作完结的呢?这就须要一个计数器,CountDownLatch 外部保护了一个示意未实现操作的计数器,线程调用 CountDownLatch.countDown(), 计数器减一,代表该线程曾经执行了特定操作。
CountDownLatch.await()相当于一个未爱护的办法,当计数器为 0 时,也就是其余线程都执行了特定操作时,调用该办法的线程才会复原执行。
一个典型的利用场景是: 咱们须要做一个统计,由五段 SQL 形成,咱们须要取出这五段 SQL 的数据,在代码中造成前端对应的数据结构返回给前端,那这五段 SQL 每个都执行一秒,那么这个接口响应的工夫就须要五秒,一种优化的思路就是启动五个线程去取数据,取完数据放在一个汇合中,五个线程都取完数据就告诉主线程解决数据,这样接口的相应工夫就取决于哪段 SQL 的最长执行工夫。其实这个咱们也能够通过 Thread.join 办法来做,然而如果这五个线程取完数据还有别的动作呢?然而解决数据的线程只须要晓得数据取好了。

CyclicBarrier 栅栏 简介

有的时候多个线程须要互相期待对方执行到代码中的某个中央(集合点),这时这些线程能力往下执行,相似于你和你女朋友去看电影,你女朋友让你九点在楼下等她,而后只有你们两个都到齐了才会去电影院。又像寻找龙珠,七队人都去寻找龙珠,龙珠到了,能力许诺。

 public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7);
        for(int i = 1;i <= 7; i++){
            int finalI = i;
            new Thread(() -> {System.out.println(Thread.currentThread().getName() + "\t 收集到第"+ finalI +"颗龙珠");
                try {
                    // 用于期待
                    cyclicBarrier.await();
                    System.out.println("**** 号召神龙");
                } catch (InterruptedException e) {e.printStackTrace();
                } catch (BrokenBarrierException e) {e.printStackTrace();
                }
            },String.valueOf(i)).start();}
    }

CyclicBarrier 外部也保护了一个计数器,用于标识多个线程是否曾经执行到了某个中央,告诉这些线程往下执行。与 CountDownLatch 不同的在于,栅栏能够被应用屡次,所有参与方处于唤醒状态,任何一个参与方再度调用 CyclicBarrier.await()又会被暂停,直到最初一个参与方执行了 CyclicBarrier.await()。

Semaphore 信号量与限购简介

个别到了节假日,风景区就会很火爆,然而一个风景区可能包容的人就那么些,资源是给定的,然而有些景区分外紧俏,为了防黄牛,咱们创造了限购。java.util.concurrent.Semaphore 能够用来做流量管制,线程在拜访资源之前必须申请许可证,如果以后许可证被申请完了,那么申请的线程会被暂停,许可证复原了,这些被暂停的线程就会被唤醒。

线程池

线程池简介

后面几篇咱们讲示例,都是间接 new Thread()来一个一个的创立线程,那么在理论中,个别咱们不倡议显式的创立线程。绝对于一般的对象,一个 Thread 的实例还占用了额定的存储空间,大概是 1M。除此之外线程的销毁也有其开销。一个零碎可能创立的线程总是受限于该零碎所领有的处理器数目。无论是 CPU 密集型还是 I / O 密集型:

  • CPU 密集型

CPU 密集型工作执行过程中耗费的资源就是 CPU,一个典型的 CPU 密集型工作就是加密和解密,CPU 的使用率会比拟高。

  • IO 密集型工作

执行过程中耗费的资源就是 (I/ O 资源), 比方文件读写,CPU 的使用率不高。
咱们的欲望是不那么频繁的创立线程,就像数据库连接池一样,一开始连接池里寄存了若干连贯,咱们须要和数据库交互的时候从连接池重获取连贯,提交咱们的 SQL 即可。

线程池和数据库连接池有些不同,数据库连接池算是对象池外部保护了肯定数量的对象,客户端代码须要一个对象时就向对象池借用一个对象,用完之后再将该对象返回给对象池,于是数据库连接池就能够先后被多个客户端线程所应用。线程池也是一个对象,咱们并不是从线程池像数据库连接池一样借用线程,而是将咱们心愿让线程执行的工作提交给线程池,线程池将这些工作放在工作队列中,由线程池的线程将这些工作取走执行。因而,线程池能够被看做是一种基于生产者 - 消费者模式的一种服务,该服务外部保护的工作者线程相当于消费者线程,向线程池提交工作的线程相当于生产者线程。

如何创立线程池?

通过 java.util.concurrent.ThreadPoolExecutor 来创立线程池,ThreadPoolExecutor 蕴含参数最多的一个构造函数如下:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

ThreadPoolExecutor 的线程数量参数有三个:

  • 以后线程池大小(示意线程池中理论工作者线程的数量)
  • 最大线程池大小(示意线程池最大可能领有多少工作者线程,maximumPoolSize)
  • 外围线程池大小(corePoolSize)(小于最大线程池,个别状况下以后线程池大小超过外围线程池时,新来的人工作会被寄存于工作队列(workQueue))。

在刚开始的时候,线程池并不会先创立线程,客户端没提交一个工作,线程池就会启用有一个线程执行工作,随着提交的工作越来越多,在超过线程池外围数时,就会被放入阻塞队列中,那么阻塞队列也放不下了怎么办? 这也就是 handler 参数的作用,回绝策略。线程池是通过 threadFactory.newThread()办法来创立线程的。如果咱们在创立线程池的时候没有指定线程工厂,那么 ThreadPoolExecutor 就会应用 Executors.defaultThreadFactory()所返回的线程工厂。
RejectedExecutionHandler 是一个接口,JDK 有默认的实现:

在以后线程池的大小超过外围线程池的大小且阻塞队列中没有工作时,线程中的线程的闲暇工夫达到 keepAliveTime 所制订的工夫时, 线程池就会销毁掉这类不干活的线程,以达到节俭资源的目标。

线程池罕用的办法

ThreadPoolExecutor 的 prestartCoreThread 办法能够让线程池在没有接到任何工作的状况下创立所有的外围线程,这样能够缩小工作被线程池解决时所需的等待时间。

通数据库连接池不一样的是,数据库连接池随同着 web 程序始终开启,随着咱们 web 的程序敞开而敞开,已经我也是这么了解线程池的。然而线程池和数据库连接池并不是雷同的思路,线程池须要敞开,假如客户端提交的工作都执行结束的话。毕竟线程的开销还是比拟大的。咱们应用线程池的初衷也是复用线程,一个线程执行工作结束之后,还能接着执行工作,这就达到了复用的目标。那么问题来了,为什么线程池不设计成相似于数据库连接池这样的呢?咱们能够这么想,基本上对于一个服务端的程序基本上外面都须要执行 SQL,都须要获取连贯,然而并不是都须要线程池。咱们下面曾经强调过,线程绝对于一般的对象更耗费资源,基于这种思维线程池设计了敞开的办法。咱们再次强调一下线程池的价值,复用线程,通过 new Thread 创立进去的线程,执行结束就完结了,期待 GC 回收其占用的资源。然而通过线程池创立的线程在工作执行结束之后还能够去执行工作。
ThreadPoolExecutor.shutdown()/shutdownNow()办法可用来敞开线程池。应用 shutdown 敞开线程池的时候,已提交的工作会被继续执行,而新提交的工作会像线程池饱和那样被回绝掉。即便 ThreadPoolExecutor.shutdown()执行,线程池也不会马上销毁,因为线程池可能还有线程在执行工作。可通过 awaitTermination(long timeout, TimeUnit unit)办法来期待线程池敞开完结,线程池敞开完结,该办法返回 true。
ThreadPoolExecutor.submit(Runnable task)和 execute(Runnable command)被用于提交工作,不同的是 submit()返回线程的执行后果,execute 不返回。

参考资料:

  • 对象池和线程池
  • 并发编程之 CyclicBarrier 原理与应用
  • 《Java 多线程编程实战指南》黄文海著
正文完
 0