关于java:线程间协作

  • 期待与告诉
  • 条件变量
  • 倒计时协调器
  • 栅栏
  • 阻塞队列
  • 流量管制与信号量
  • Exchager
  • 如何正确的进行线程

期待与告诉

  1. 在java平台能够通过应用Object.wait()/Object.wait(long)和Object.notify()/Object.notifyAll()配合来实现线程的期待与告诉。

    Object.wait()可能使以后线程暂停(状态转为WAITING),该办法能够用来实现期待,其所在的线程称为期待线程。

    Object.notify()能够唤醒一个期待线程,其所在线程被称为告诉线程。

    wait()和notify()办法都使Object类的办法,因为其是所有类的父类,所以,所有类都有这两个办法。

    synchronized(lockObject){
      while(期待条件){
        lockObject.wait();
      }
      ......
      //后续操作
    }

    下面代码中,while的判断条件,咱们暂且称为期待条件,当这个条件成立,这个线程就会进入期待条件,当其它线程从新将其唤醒,而后再次判断期待条件成不成立,若不成立示意线程能够持续往下执行做相应的操作,否则持续进入期待状态。

    上面咱们来思考一下,while的作用,期待条件为什么要和while配合应用,而不是和if配合应用。

    /**
     * @ClassName WaitIfSample
     * @description:
     * @author: yong.yuan
     * @create: 2022-04-15 16:44
     * @Version 1.0
     **/
    public class WaitIfExample {
        static AtomicInteger stock = new AtomicInteger();
        static final Object LOCKER = new Object();
    
        static class Consumer implements Runnable{
            @Override
            public void run() {
                consume();
            }
    
            void consume(){
                synchronized (LOCKER){
                    while (stock.get() == 0) {
                        try {
                            LOCKER.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    stock.getAndDecrement();
                    System.out.println("consumer " + Thread.currentThread().getName()
                     + "生产音讯后库存为:"+stock.get());
                    LOCKER.notifyAll();
                }
            }
        }
    
        static class Producer implements Runnable{
            @Override
            public void run() {
                product();
            }
            void product(){
                synchronized (LOCKER) {
                    while (stock.get() != 0) {
                        try {
                            LOCKER.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    stock.getAndAdd(2);
                    System.out.println("producer 生产音讯,生产后库存为:"+stock.get());
                    LOCKER.notifyAll();
                }
            }
        }
    
        public static void main(String[] args) {
            Consumer consumer = new Consumer();
            new Thread(consumer).start();
            new Thread(consumer).start();
            new Thread(consumer).start();
    
            new Thread(new Producer()).start();
        }
    }

    下面咱们实现一个简略的生产者-消费者的这样一个音讯队列,对于消费者而言只有在库存大于0的时候能力进行生产,假如当初有3个生产线程正在执行,然而初始化的时候库存为0,所以这3个线程就都进入到了WAITING状态,而后有一个生产者在库存中减少了2,而后对所有期待线程进行唤醒,唤醒的线程会先通过while判断期待条件到底成不成立,成立→持续期待,不成立→向后执行生产动作,代码中的3个期待线程中有2个将会生产胜利,有1个会持续进入期待状态。

    producer 生产音讯,生产后库存为:2
    consumer Thread-2生产音讯后库存为:1
    consumer Thread-1生产音讯后库存为:0

    然而,如果将while换成if呢?当期待线程被唤醒后,即便,期待条件成立,线程也会继续执行,显然这不是咱们想要的后果。

    producer 生产音讯,生产后库存为:2
    consumer Thread-2生产音讯后库存为:1
    consumer Thread-1生产音讯后库存为:0
    consumer Thread-0生产音讯后库存为:-1

    当然while配合期待条件只是一种通用场景,有的非凡场景不应用while或者应用if也是能够的。

  2. Object.wait(long)容许咱们指定一个超时工夫(单位为毫秒),如果期待线程在这个工夫内没有被其它线程唤醒,那么java虚构机会主动唤醒这个线程,不过这样既不会抛出异样也没有返回值,所以线程是否主动唤醒须要一些额定操作。
  3. wait/notify的开销及问题

    • 过早唤醒:A、B、C三个线程中都应用同一个锁对象,而且都存在线程暂停的判断,然而A和B应用的线程暂停的判断条件和C是不同的,所以当A、B、C同时处于WAITING状态时,有一个线程为了唤醒A、B会应用notifyAll(),这样同时也会把C唤醒,然而C的通过while判断还是持续进入到了暂停状态,也就是说这个notify动作是与C没有太大关联的,这就被称作过早唤醒。
    • 信号失落:如果期待线程在执行Object.wait()前没有先判断爱护条件是否未然成立,那么有可能呈现这种情景——告诉线程在该期待线程进入临界区之前就曾经更新了相干共享变量,使得相应的爱护条件成立并进行了告诉,然而此时期待线程还没有被暂停,天然也就无所谓唤醒了。这就可能造成期待线程间接执行Object.wait()而被暂停的时候,该线程因为没有其余线程进行告诉而始终处于期待状态。这种景象就相当于期待线程错过了一个原本“发送”给它的“信号”,因而被称为信号失落。
    • 欺骗性唤醒:线程可能在没有其余线程执行notify/notifyAll的状况下被唤醒,这就被称为欺骗性唤醒。在wait()办法里面加while()进行判断就能够解决这个问题。
    • 上下文切换:wait/notify对应着是线程暂停/线程唤醒,所以会导致屡次锁的申请与开释,锁的申请与开释可能会造成上下文切换。
  4. java虚拟机为每个对象保护一个被称为 期待集(wait set)的队列,该队列用于存储该对象上的期待线程,Object.wait()会让以后线程暂停并开释相应的锁,并将以后线程存入对象的期待集中。执行Object.notify()会使该对象的期待集中的任意一个线程被唤醒,唤醒的线程并不会立刻被从这个期待集中移除,而是,等到这个线程在次持有对象锁的时候才会被移除。
  5. Thread.join():某个线程执行完了,此线程能力执行

    static void main(){
      Thread t = new Thread();
      t.start();
      ......
      t.join();//A
      ......
    }

    以上为例,只有t线程执行结束后,主线程能力执行A前面的内容。

条件变量

  1. Condition能够作为wait/notify的替代品来实现期待/告诉,它的await()、signal()/signalAll()别离对应wait()、notify()/notifyAll()并且解决了过早唤醒以及wait(long)是否超时无奈判断等问题。
  2. Object.wait()/Object.notify()要求执行线程持有该对象的外部锁。

    Condition.await()/Condition.signal()要求执行线程持有该对象的显式锁。

  3. Condition.awaitUntil(Date),参数是期待的截至期限,当awaitUntil被其它线程signal时这个办法会返回true。
  4. Condition应用样例

    /**
     * @ClassName ConditionSimple
     * @description:
     * @author: yong.yuan
     * @create: 2022-04-18 10:40
     * @Version 1.0
     **/
    public class ConditionExample {
        static Lock lock = new ReentrantLock();
        static Condition conditionA = lock.newCondition();
        static Condition conditionB = lock.newCondition();
        static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    
        static class Consumer implements Runnable{
            @Override
            public void run() {
                lock.lock();
                try {
                    while (queue.isEmpty()){
                        System.out.println("消费者暂停中....");
                        conditionA.await();
                    }
                    System.out.println("生产线程"+Thread.currentThread().getName()
                    +"生产音讯:"+queue.take());
                    conditionB.signalAll();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }
    
        static class Producer implements Runnable{
            @Override
            public void run() {
                lock.lock();
                try {
                    while (queue.remainingCapacity() == 0){
                        System.out.println("生产者暂停中...");
                        conditionB.await();
                    }
                    System.out.println("生产线程"+Thread.currentThread().getName()
                    +"生产音讯...");
                    queue.add("hello");
                    conditionA.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    

倒计时协调器

CountDownLatch是个别用来实现一个其余线程期待其余线程实现一组特定的操作后再继续执行,这组特定的操作被称为先决操作。

CountDownLatch会有一个保护没有实现的先决操作的数量的计数器countDownLatch.countDown() 每被执行一次,计数器就会-1,CountDownLatch.await()相当于一个受爱护办法,当它所在线程执行到这个办法后,线程就会暂停,直到它外部锁保护的计数器为0时,线程才会被唤醒,持续向下执行。

当CountDownLatch中的计数器为0时,再次执行countDown办法,计数器的值不会变,也不会抛出异样,再次执行await办法线程也不会进行,这意味着CountDownLatch的应用是一次性的。

应用CountDownLatch实现期待/告诉的时候调用await、countDown办法都毋庸加锁。

应用场景:例如启动java服务,各服务之间有调用关系,个别先启动相应数量的被调用服务。例如A服务启动5个实例后能力启动B服务,那就能够有一个初始值为5的CountDownLatch,在B服务启动前设置await,每一个A服务启动就countDown,当A服务实例启动结束后,B才开始启动。

public class CountDownLatchExample {
  private static final CountDownLatch latch = new CountDownLatch(4);
  private static int data;

  public static void main(String[] args) throws InterruptedException {
    Thread workerThread = new Thread() {
      @Override
      public void run() {
        for (int i = 1; i < 10; i++) {
          data = i;
          latch.countDown();
          // 使以后线程暂停(随机)一段时间
          Tools.randomPause(1000);
        }

      };
    };
    workerThread.start();
    latch.await();
    Debug.info("It's done. data=%d", data);
  }
}

栅栏

CyclicBarrier和CountDownLatch有肯定相似之处,CountDownLatch是一个await的线程要期待其它线程实现肯定数量的先决条件能力继续执行,CyclicBarrier会在多个线程中设置一个await点,达到这个点的线程数量达到了设置的数量要求才会继续执行。

/**
 * @ClassName ClimbMountains
 * @description:
 * @author: yong.yuan
 * @create: 2022-04-03 21:59
 * @Version 1.0
 **/
public class ClimbMountains {
    static Logger logger = Logger.getLogger(ClimbMountains.class.getName());
    static CyclicBarrier[] climbBarrier = new CyclicBarrier[2];

    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            Company company = new Company(i);
            company.start();
        }
    }
    static class Company{
        private final String name;
        private final List<Staff> staffList;
        public Company(int c) {
            this.name ="公司".concat(String.valueOf(c));
            climbBarrier[c] = new CyclicBarrier(5);
            staffList = new ArrayList<>();
            for (int j = 0; j < 5; j++) {
                staffList.add(new Staff(name,c,j));
            }
        }

        synchronized void start() {
            String log = String.format("%s 五位精英开始攀登....",name);
            logger.info(log);
            for (Staff staff:staffList) {
                new Thread(staff::start).start();
            }
        }
    }

    static class Staff{
        private final String name;
        private final int c;
        public Staff(String company,int c,int s) {
            this.c = c;
            this.name = company.concat("-员工").concat(String.valueOf(s));
        }

        void start() {
            System.out.println(name + ":开始攀登!");
            try {
                int time = new Random().nextInt(20000);
                Thread.sleep(time);
                System.out.println(name+":用时"+time/1000+"秒");
                climbBarrier[c].await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }finally {
                System.out.println(name + ":结束");
            }
        }
    }
}
=================输入后果=================
四月 18, 2022 4:00:49 下午 io.github.viscent.mtia.ch5.ClimbMountains$Company start
信息: 公司0 五位精英开始攀登....
公司0-员工0:开始攀登!
公司0-员工1:开始攀登!
公司0-员工2:开始攀登!
公司0-员工3:开始攀登!
公司0-员工4:开始攀登!
公司1-员工0:开始攀登!
公司1-员工1:开始攀登!
公司1-员工2:开始攀登!
公司1-员工3:开始攀登!
公司1-员工4:开始攀登!
四月 18, 2022 4:00:49 下午 io.github.viscent.mtia.ch5.ClimbMountains$Company start
信息: 公司1 五位精英开始攀登....
公司1-员工3:用时4秒
公司1-员工4:用时4秒
公司0-员工0:用时5秒
公司0-员工3:用时7秒
公司0-员工2:用时9秒
公司1-员工2:用时11秒
公司1-员工1:用时12秒
公司1-员工0:用时13秒
公司1-员工0:结束
公司1-员工1:结束
公司1-员工2:结束
公司1-员工4:结束
公司1-员工3:结束
公司0-员工4:用时13秒
公司0-员工1:用时16秒
公司0-员工0:结束
公司0-员工4:结束
公司0-员工2:结束
公司0-员工3:结束
![](https://eacape-1259159524.cos.ap-shanghai.myqcloud.com/images/clipboard.png)
公司0-员工1:结束

阻塞队列

阻塞队列能够依照其存储空间是否受限制划分为有界队列和无界队列,有界队列的队列容量是由程序设定的,无界队列的容量是Integer.MAX\_VALUE也就是2^31

罕用阻塞队列和罕用办法

  1. ArrayBlockingQueue

    其底层的数据结构是一个数组,所以它在put和take的时候不会造成垃圾回收的累赘,它在put和take的时候应用的是同一个显式锁,所以造成他在put和take的时候会随同着锁的开释与申请,如果有大量线程在一直的put和take会造成锁竞争过高,从而一直导致上下文切换。

  2. LinkedBlockingQueue

    其底层的数据结构是一个链表,所以它在put个take的时候会随同着空间的动态分配,也就是每此put或take操作都会随同着节点的创立和移除,这样就会造成垃圾回收的累赘。然而,它的put和take操作是应用的两个不一样的显式锁,这样绝对会减缓锁竞争度。

    此外其外部保护着一个Atomic变量用于保护队列长度,也可能存在被put线程和take线程一直的争用。

  3. SychronousQueue

    容量为0,次要时用来转发工作(阻塞作用),SychronousQueue.take(E)的时候没有线程执行SychronousQueue.put(E)那么生产线程就会暂停晓得有生产线程执行SychronousQueue.put(E),

    同样,SychronousQueue.put(E)的时候没有生产线程执行SychronousQueue.take(E),生产线程也会进行直到,有生产线程执行SychronousQueue.take(E)。

    SychronousQueue和ArrayBlockingQueue/LinkedBlockingQueue前者就像送快递时快递员要把快递交到你的手上能力去送下一份快递,而后者就是间接把快递放到蜂巢储物柜中。

  4. PriorityBlockingQueue

    反对优先级的无界阻塞队列,能够通过自定义的类实现compareTo办法指定元素的 排序规定,它take时如果队列为空将会阻塞,然而它会有限扩容所以,put并不会 阻塞。其实现原理是堆排序

    在最小堆[1, 5, 8, 6, 10, 11, 20]中再插入一个元素4,上面用图示剖析插入的过程:

    最大堆[20, 10, 15, 6, 9, 10, 12]中移除元素后,上面用图示剖析重排的过程:

  5. DelayWorkQueue

    DelayWorkQueue是实现了提早性能的PriorityBlockingQueue

    外部采纳的时堆构造(插入时会进行排序),特点是外部元素不会依照入队的程序来出队,

    而是会依据延时长短对外部元素进行排序。

ArrayBlockingQueue和SynchronousQueue都既反对非偏心调度也反对偏心调度,而LinkedBlockingQueue仅反对非偏心调度。

如果生产者线程和消费者线程之间的并发水平比拟大,那么这些线程对传输通道外部所应用的锁的争用可能性也随之减少。这时,有界队列的实现适宜选用LinkedBlockingQueue,否则咱们能够思考ArrayBlockingQueue。

  • LinkedBlockingQueue适宜在生产者线程和消费者线程之间的并发水平比拟大的状况下应用
  • ArrayBlockingQueue适宜在生产者线程和消费者线程之间的并发水平较低的状况下应用
  • SynchronousQueue适宜在消费者解决能力与生产者解决能力相差不大的状况下应用。

流量管制与信号量

SemaPhore通常叫做信号量,个别用来管制同时拜访特定资源的的线程数量,通过协调各个线程,以保障正当应用资源。

通常应用的场景就是限流,比如说,限度数据库连接池的连接线程数量。

罕用办法

acquire()  
获取一个令牌,在获取到令牌、或者被其余线程调用中断之前线程始终处于阻塞状态。
​
acquire(int permits)  
获取一个令牌,在获取到令牌、或者被其余线程调用中断、或超时之前线程始终处于阻塞状态。
    
acquireUninterruptibly() 
获取一个令牌,在获取到令牌之前线程始终处于阻塞状态(疏忽中断)。
    
tryAcquire()
尝试取得令牌,返回获取令牌胜利或失败,不阻塞线程。
​
tryAcquire(long timeout, TimeUnit unit)
尝试取得令牌,在超时工夫内循环尝试获取,直到尝试获取胜利或超时返回,不阻塞线程。
​
release()
开释一个令牌,唤醒一个获取令牌不胜利的阻塞线程。
​
hasQueuedThreads()
期待队列里是否还存在期待线程。
​
getQueueLength()
获取期待队列里阻塞的线程数。
​
drainPermits()
清空令牌把可用令牌数置为0,返回清空令牌的数量。
​
availablePermits()
返回可用的令牌数量。

实现一个简略的令牌获取的例子

/**
 * @ClassName SemaphoreExample
 * @description:
 * @author: yong.yuan
 * @create: 2022-04-18 18:59
 * @Version 1.0
 **/
public class SemaphoreExample {
    static final Semaphore semaphore = new Semaphore(5);
    static Lock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();
    static CountDownLatch countDownLatch = new CountDownLatch(10);

    static class Worker implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                semaphore.acquire();
                System.out.println(Thread.currentThread().getId()+
                "号工人,从流水线上取货物一件,现有" + semaphore.availablePermits() + "件货物");
                countDownLatch.countDown();
                lock.lock();
                try {
                    condition.signalAll();
                }finally {
                    lock.unlock();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Machine implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                while(semaphore.availablePermits() >= 5) {
                    lock.lock();
                    try {
                        System.out.println("流水线上的货物满了");
                        condition.await();
                    } finally {
                        lock.unlock();
                    }
                }
                semaphore.release();
                System.out.println("向流水线上送货物,现有"+semaphore.availablePermits()
                +"件货物");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(new Worker()).start();
            new Thread(new Machine()).start();
        }

        countDownLatch.await();
        System.out.println("曾经有10个工人搬走货物了");
        while (semaphore.tryAcquire()){
            System.out.println("残余货物搬走.....");
        }
    }
}
==============执行后果==============
流水线上的货物满了
流水线上的货物满了
17号工人,从流水线上取货物一件,现有4件货物
向流水线上送货物,现有4件货物
流水线上的货物满了
向流水线上送货物,现有5件货物
19号工人,从流水线上取货物一件,现有5件货物
13号工人,从流水线上取货物一件,现有4件货物
向流水线上送货物,现有5件货物
流水线上的货物满了
向流水线上送货物,现有4件货物
27号工人,从流水线上取货物一件,现有3件货物
15号工人,从流水线上取货物一件,现有4件货物
向流水线上送货物,现有5件货物
向流水线上送货物,现有5件货物
25号工人,从流水线上取货物一件,现有4件货物
21号工人,从流水线上取货物一件,现有4件货物
向流水线上送货物,现有5件货物
23号工人,从流水线上取货物一件,现有4件货物
向流水线上送货物,现有5件货物
流水线上的货物满了
向流水线上送货物,现有5件货物
29号工人,从流水线上取货物一件,现有4件货物
向流水线上送货物,现有5件货物
31号工人,从流水线上取货物一件,现有5件货物
曾经有10个工人搬走货物了
残余货物搬走.....
残余货物搬走.....
残余货物搬走.....
残余货物搬走.....
残余货物搬走.....

Exchager

Exchanger类可用于两个线程之间替换信息。可简略地将Exchanger对象了解为一个蕴含两个格子的容器,通过exchanger办法能够向两个格子中填充信息。当两个格子中的均被填充时,该对象会主动将两个格子的信息替换,而后返回给线程,从而实现两个线程的信息替换。

当消费者线程生产一个已填充的缓冲区时,另外一个缓冲区能够由生产者线程进行填充,从而实现了数据生成与生产的并发。这种缓冲技术就被称为双缓冲

/**
 * @ClassName ExchangerExample
 * @description:
 * @author: yong.yuan
 * @create: 2022-04-18 23:10
 * @Version 1.0
 **/
public class ExchangerExample {
    static Exchanger<String> STRING_EXCHANGER = new Exchanger<>();
    static class MyThread extends Thread{
        String msg;
        public MyThread(String threadName,String msg) {
            Thread.currentThread().setName(threadName);
            this.msg = msg;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+":"
                +STRING_EXCHANGER.exchange(msg));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        Thread t1 = new MyThread("thread-1","hello thread-1");
        Thread t2 = new MyThread("thread-2","hello thread-2");
        t1.start();
        t2.start();
    }
}
===============操作后果===============
Thread-1:hello thread-1
Thread-0:hello thread-2

如何正确的进行线程

应用interrupt终止线程

  1. 应用interrupt实际上是通过interrupt状态的变动来对线程实现进行,而不会立刻终止这个线程。
  2. 当线程处于wait()或者sleep()状态时,应用interrupt能够将休眠的线程唤醒,然而会抛出异样,咱们能够在Catch(InterruptedExcetion e){}中手动用interrupt()来终止这个线程
  3. 比照其它终止形式 – stop():会间接把线程停掉,不能解决进行之前想要解决的数据
public class StopThread{
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            int count = 0;
            while (!Thread.currentThread().isInterrupted() && count < 1000){
                System.out.println("count = " + count++);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    //线程在休眠期间被中断,那么会主动革除中断信号,所以须要在catch中再次中断
                    Thread.currentThread().interrupt();
                }
            }
        });
        thread.start();
        Thread.sleep(5000);
        thread.interrupt();
    }
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理