乐趣区

一文搞懂同步并发套路

看完上一个章节,相信你已经充分的理解 java 代码的执行套路了,猿人工厂君也知道,内容对于新手而言,理解起来还是很吃力的,不过上一章节涉及编译原理、类加载机制和一点点 jvm 的知识,很重要,请务必掌握其中的过程和概念。另外真的很感谢大家的支持,和巨兽的斗争暂时进入僵持阶段,猿人工厂君已经说了,虽千万人,吾往矣。中间细节,猿人工厂君,会在方便的时候公开,程序猿鸭,且行且珍惜。

猿思考是一个原创系列文章,帮助你从一个小白快速掌握基础知识,很多基础知识,在于思考的变通,更多精彩内容,敬请大家关注公主号 猿人工厂 ,点击 猿人养成获取

在讨论多线程这个话题之前,猿人君先强调一点,多线程的魅力在于减少阻塞,提升系统的性能,多个线程操作同一个资源(比如同一个对象),才可能出现线程安全问题,没有竞争的结局是很好的结局,这也是为什么业务代码中返回数据,比如返回列表数据的对象,每次都是新的对象。

在谈论线程问题之前,考虑到一些基础薄弱的同学,我们可以先了解下什么是线程,什么是进程?

进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体。进程是一种抽象的概念,从来没有统一的标准定义。进程一般由程序,数据集合和进程控制块三部分组成。程序用于描述进程要完成的功能,是控制进程执行的指令集;数据集合是程序在执行时所需要的数据和工作区;程序控制块包含进程的描述信息和控制信息是进程存在的唯一标志。

线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。一个进程可以有一个或多个线程,各个线程之间共享程序的内存空间 (也就是所在进程的内存空间)。一个标准的线程由线程 ID,当前指令指针 PC,寄存器和堆栈组成。而进程由内存空间(代码,数据,进程空间,打开的文件) 和一个或多个线程组成。

看上去比较绕,但是二者的区别是很明显的:

1. 线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;

2. 一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线。

3. 进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间 (包括代码段,数据集,堆等) 及一些进程级的资源(如打开文件和信号等),某进程内的线程在其他进程是不可见的。

4. 调度和切换:线程上下文切换比进程上下文切换要快得多。

多线程:一个程序进程中产生了多个线程。

并行:多个处理器或者多台机器同时一时刻执行代码片段,是真正的同时。

并发:多个代码片段在同一个处理器上交替执行,同一时刻,只有一个代码片段在执行。多线程,指的是并发而非并行。

线程安全:是指在并发的情况下,一个代码片段被多个线程使用,线程的调度顺序不影响任何程序的执行结果。

同步:通过对线程进行人为的控制和调度,保证共享资源在多个线程的访问下,保证执行结果的准确性。在 java 中,比如加入 synchronized 关键字等手段来保证同一个代码片段在同一时刻只有一个线程在执行。 程序结果的准确性远大于程序的性能,只有在保证准确性的前提下,才能去谈性能,要不没有意义。

我们先来了解下 java 线程的生命周期。在线程的生命周期中,一个线程要经过新建 (New)、就绪(Runnable)、运行(Running)、阻塞(Blocked) 和死亡(Dead)5 种状态。特别注意噢,run 方法执行完毕,线程就结束了。

新建状态:当程序使用 new 关键字创建了一个线程之后,该线程就处于新建状态,此时仅由 JVM 为其分配内存,并初始化其成员变量的值

就绪状态:当线程对象调用了 start()方法之后,该线程处于就绪状态。Java 虚拟机会为其创建方法调用栈和程序计数器,等待调度运行

运行状态:如果处于就绪状态的线程获得了 CPU,开始执行 run()方法的线程执行体,则该线程处于运行状态

阻塞状态:当处于运行状态的线程失去所占用资源之后,便进入阻塞状态。

死亡状态:线程在 run()方法执行结束后进入死亡状态。此外,如果线程执行了 interrupt()或 stop()方法,那么它也会以异常退出的方式进入死亡状态。

synchronized, wait, notify 是每个对象都有的同步工具。Java 中的每个对象都有一个监视器,来监测并发代码的重入。在非多线程编码时该监视器不发挥作用,反之如果在 synchronized 范围内,监视器发挥作用。

wait/notify 必须存在于 synchronized 块中。并且,这三个关键字针对的是同一个监视器(某对象的监视器)。这意味着 wait 之后,其他线程可以进入同步块执行。

当某代码并不持有监视器的使用权时,比如去调用 wait 或 notify,会抛出 java.lang.IllegalMonitorStateException。也包括在 synchronized 块中去调用另一个对象的 wait/notify,因为不同对象的监视器不同,同样会抛出此异常。

我们看看 synchronized 的用法:

如下,在多线程环境下,synchronized 块中的方法获取了 lock 实例的 monitor,如果实例相同,那么只有一个线程能执行该代码块的内容。

public class Thread1 implements Runnable {

    Object lock;

    public void run() {synchronized(lock){// 你要做什么}

    }

}

如下相当于上面代码中用 lock 来锁定的效果,实际获取的是 Thread1 类的 monitor。注意,如果 synchronized 修饰的是静态方法,那么,这个锁,锁住的就是该类所有的实例。

public classThread2implementsRunnable {public synchronized void run() {// 你要做什么}

}

我们来思考一个场景,金馆长开了个包子铺,包子铺的档口最多只能放下 50 个包子,当档口满了,金馆长可以休息。蘑菇头专门吃档口上的包子,档口上没有包子了,蘑菇头就只有等待。我们可以考虑使用非线程安全的 ArrayList 类来做档口,那么需要同步的,自然是这个档口了,在同一时刻,档口要么可以放包子,要么只能被拿走包子。我们来实现下这一经典的生产者 - 消费者场景。

package com.pz.thread.demo;



importjava.util.ArrayList;

importjava.util.List;

importjava.util.concurrent.atomic.AtomicInteger;



public classProducerAndConsumerDemo {



    // 包子个数

    public static AtomicInteger number = new AtomicInteger();

    public volatile boolean flag = true;

    public static final int MAX_COUNT = 50;

    // 档口,使用非线程安全类,需要同步

    public static final List<Integer> pool = new ArrayList<>();



    public void produce() {

        // 判断,干活,通知

        while (flag) {

            // 每隔 100 毫秒生产一个包子

            try {Thread.sleep(100);

            } catch (InterruptedException e) { }

            synchronized (pool) {

                // 档口上满了,生产者停止生产

                if (pool.size() == MAX_COUNT) {

                    try {System.out.println("档口上放不下了, 歇会儿...");

                        pool.wait();} catch (InterruptedException e) {e.printStackTrace();

                    }

                }

                // 包子好了,放上档口

                pool.add(number.incrementAndGet());

                System.out.println("金馆长生产了第:" +number.get() + "个包子," + "有:" + pool.size()+"个包子等待售卖");

                // 通知

                pool.notifyAll();}

        }

    }



    public void consumue() {

        // 判断,干活,通知

        while (flag) {

            try {Thread.sleep(200);

            } catch (InterruptedException e) { }

            synchronized (pool) {

                // 档口里包子没有了,顾客只有等了

                if (pool.size() == 0) {

                    try {System.out.println("包子没有了, 只有等了...");

                        pool.wait();} catch (InterruptedException e) {e.printStackTrace();

                    }

                }

                // 吃包子

                int temp = pool.get(0);

                pool.remove(0);

                System.out.println("蘑菇头吃了:" + temp + "号包子,\t" + "档口上还有:" + pool.size());

                // 通知

                pool.notifyAll();}

        }

    }



    public void stop() {flag = false;}



    public static void main(String args[]){final ProducerAndConsumerDemo demo = new ProducerAndConsumerDemo();



        Thread t1 = new Thread(new Runnable() {

            @Override

            public void run() {demo.produce();

            }

        });

        t1.start();



        Thread t2 = new Thread(new Runnable() {

            @Override

            public void run() {demo.consumue();

            }

        });

        t2.start();

        try {Thread.sleep(10000);

        } catch (InterruptedException e) {e.printStackTrace();

        }



        demo.stop();}

}

你需要先搞明白一个道理,多线程的本质,是去抢占 cpu 的时间分片,来执行代码片段。CPU 从一个线程的切换到另一个线程,CPU 需要使用内存来存存储的堆栈,也会消耗操作系统的资源去管理这个线程。这种切换称之为 ” 上下文切换 ”,切换并不容易,是需要慎重考虑的噢。

线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。我们不妨直接建立一批固定大小的线程,来执行多线程的任务,这样的话,资源的调度就可控了。

下面就是一个简单的线程池列子:

packagecom.pz.thread.demo;



import java.util.List;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;



public class ThreadPool {



    // 默认线程个数 10

    private static int workerNum =10;

    // 真正执行任务的线程

    private ExcuteThread[] excuteThrads;

    // 已处理的任务

    private static volatile int finishedTask= 0;

    // 任务队列,LinkedBlockingQueue 线程安全

    private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();

    private static ThreadPool threadPool;



    // 默认线程个数的线程池

    private ThreadPool() {this(workerNum);

    }



    // 创建线程池,workerNum 为线程池中执行任务的线程个数

    private ThreadPool(int workerNum) {

        ThreadPool.workerNum = workerNum;

        excuteThrads = new ExcuteThread[workerNum];

        for (int i = 0; i < workerNum; i++) {excuteThrads[i] = new ExcuteThread();

            excuteThrads[i].start();// 就绪线程池中的线程}

    }



    // 获得一个线程池

    public static ThreadPool getThreadPool() {return getThreadPool(ThreadPool.workerNum);

    }



        // 获得一个线程池,指定执行任务的线程个数,即线程池大小

    public static ThreadPool getThreadPool(int workerNum) {if (workerNum <= 0){workerNum = ThreadPool.workerNum;}

        if (threadPool == null){threadPool = new ThreadPool(workerNum);
            }

        return threadPool;

    }



    // 将任务加入队列,交由线程池去处理

    public void execute(Runnable task) {synchronized (taskQueue) {taskQueue.add(task);

            taskQueue.notify();}

    }



    // 批量加入任务

    public void execute(Runnable[] task) {synchronized (taskQueue) {for (Runnable t : task)

                taskQueue.add(t);

            taskQueue.notify();}

    }



    // 批量加入任务

    public void execute(List<Runnable> task) {synchronized (taskQueue) {for (Runnable t : task)

                taskQueue.add(t);

            taskQueue.notify();}

    }



    // 销毁线程池

    public void destroy() {while (!taskQueue.isEmpty()) {
        // 如果有任务没完成,等待任务完成

            try {Thread.sleep(10);

            } catch (InterruptedException e) {e.printStackTrace();

            }

        }

        // 停止线程池工作

        for (int i = 0; i < workerNum; i++) {excuteThrads[i].stopWorker();

            excuteThrads[i] = null;

        }

        threadPool=null;

        taskQueue.clear();// 清空任务队列}



    // 返回线程池大小

    public int getExcuteThreadNumber() {return workerNum;}



    // 返回已完成任务的个数
    public int getFinishedTaskNumber() {return finishedTask;}



    // 返回未处理任务大小

    public int getUnDoTasknumber() {return taskQueue.size();

    }





    /**

     * 内部类,工作线程

     */

    private class ExcuteThread extends Thread {

        // 线程状态位

        private boolean isRunning = true;



        /*

         * 真正的执行,如果任务队列不空,则取出任务执行,若任务队列为空,则等待

         */

        @Override

        public void run() {

            Runnable r = null;

            while (isRunning) {
            // 注意,若状态位改变,run 完就不执行了

                synchronized (taskQueue) {while (isRunning && taskQueue.isEmpty()) {
                    // 队列为空等待 5 毫秒

                        try {taskQueue.wait(5);

                        } catch (InterruptedException e) {e.printStackTrace();

                        }

                    }

                    if (!taskQueue.isEmpty()) {

                        try {r = taskQueue.take();// 取出任务

                        }catch (java.lang.InterruptedException e){e.printStackTrace();

                        }

                    }

                }

                if (r != null) {r.run();// 执行任务

                }

                finishedTask++;

                r = null;

            }

        }



        // 停止线程

        public void stopExcute() {isRunning = false;}

    }

}

当然,java 还提供了 ThreadPoolExecutor 来,实现线程池,很多时候我们都会用到它。

其实当我们在使用它时,你必须面对一个不想等待,或者很多同时等待的场景。拿一个简单的例子来说,你要做订单下单这个操作。你要查询用户、校验商品、校验价格、预占库存……这种场景为了提高你的程序性能,比如你最多能仍受多长时间,有这种要求的时候,你就可以考虑使用 ThreadPoolExecutor 来处理了。你可以让程序同时去校验商品,校验加个,查询用户,预占库存,最后只要都成功了,才进行下一步。

我们先看下ThreadPoolExecutor 的几个和核心参数:

corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads()或者 prestartCoreThread()方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0;

unit:参数 keepAliveTime 的时间单位,有 7 种取值,在 TimeUnit 类中有提供单位的实现。

packagecom.pz.thread.demo;



import java.util.concurrent.*;



public class ThreadPoolExecutorDemo {public static void main(String args[]){ThreadPoolExecutor threadPoolTaskExecutor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2));



        Future<Void> userFuture = threadPoolTaskExecutor.submit(new Callable<Void>() {

            @Override

            public Void call()  {

                // 查询用户代码

                return null;

            }

        });



        Future<Void> productFuture = threadPoolTaskExecutor.submit(new Callable<Void>() {

            @Override

            public Void call()  {

                // 商品代码

                return null;

            }

        });



        Future<Void> stockFuture = threadPoolTaskExecutor.submit(new Callable<Void>() {

            @Override

            public Void call()  {

                // 库存代码

                return null;

            }

        });



        try {userFuture.get(1000, TimeUnit.MILLISECONDS);

            productFuture.get(1000, TimeUnit.MILLISECONDS);

            stockFuture.get(1000, TimeUnit.MILLISECONDS);

        }catch (Exception e){}}

}
退出移动版