乐趣区

关于java:线程间协作

  • 期待与告诉
  • 条件变量
  • 倒计时协调器
  • 栅栏
  • 阻塞队列
  • 流量管制与信号量
  • Exchager
  • 如何正确的进行线程

期待与告诉

  1. 在 java 平台能够通过应用 Object.wait()/Object.wait(long)和 Object.notify()/Object.notifyAll()配合来实现线程的期待与告诉。

    Object.wait()可能使以后线程暂停(状态转为 WAITING), 该办法能够用来实现期待,其所在的线程称为期待线程。

    Object.notify()能够唤醒一个期待线程,其所在线程被称为告诉线程。

    wait()和 notify()办法都使 Object 类的办法,因为其是所有类的父类,所以,所有类都有这两个办法。

    synchronized(lockObject){while(期待条件){lockObject.wait();
      }
      ......
      // 后续操作
    }

    下面代码中,while 的判断条件,咱们暂且称为期待条件,当这个条件成立,这个线程就会进入期待条件, 当其它线程从新将其唤醒,而后再次判断期待条件成不成立,若不成立示意线程能够持续往下执行做相应的操作,否则持续进入期待状态。

    上面咱们来思考一下,while 的作用,期待条件为什么要和 while 配合应用,而不是和 if 配合应用。

    /**
     * @ClassName WaitIfSample
     * @description:
     * @author: yong.yuan
     * @create: 2022-04-15 16:44
     * @Version 1.0
     **/
    public class WaitIfExample {static AtomicInteger stock = new AtomicInteger();
        static final Object LOCKER = new Object();
    
        static class Consumer implements Runnable{
            @Override
            public void run() {consume();
            }
    
            void consume(){synchronized (LOCKER){while (stock.get() == 0) {
                        try {LOCKER.wait();
                        } catch (InterruptedException e) {e.printStackTrace();
                        }
                    }
                    stock.getAndDecrement();
                    System.out.println("consumer" + Thread.currentThread().getName()
                     + "生产音讯后库存为:"+stock.get());
                    LOCKER.notifyAll();}
            }
        }
    
        static class Producer implements Runnable{
            @Override
            public void run() {product();
            }
            void product(){synchronized (LOCKER) {while (stock.get() != 0) {
                        try {LOCKER.wait();
                        } catch (InterruptedException e) {e.printStackTrace();
                        }
                    }
                    stock.getAndAdd(2);
                    System.out.println("producer 生产音讯, 生产后库存为:"+stock.get());
                    LOCKER.notifyAll();}
            }
        }
    
        public static void main(String[] args) {Consumer consumer = new Consumer();
            new Thread(consumer).start();
            new Thread(consumer).start();
            new Thread(consumer).start();
    
            new Thread(new Producer()).start();}
    }

    下面咱们实现一个简略的生产者 - 消费者的这样一个音讯队列,对于消费者而言只有在库存大于 0 的时候能力进行生产,假如当初有 3 个生产线程正在执行,然而初始化的时候库存为 0,所以这 3 个线程就都进入到了 WAITING 状态,而后有一个生产者在库存中减少了 2,而后对所有期待线程进行唤醒,唤醒的线程会先通过 while 判断期待条件到底成不成立,成立→持续期待,不成立→向后执行生产动作,代码中的 3 个期待线程中有 2 个将会生产胜利,有 1 个会持续进入期待状态。

    producer 生产音讯, 生产后库存为:2
    consumer Thread- 2 生产音讯后库存为:1
    consumer Thread- 1 生产音讯后库存为:0

    然而,如果将 while 换成 if 呢?当期待线程被唤醒后,即便,期待条件成立,线程也会继续执行,显然这不是咱们想要的后果。

    producer 生产音讯, 生产后库存为:2
    consumer Thread- 2 生产音讯后库存为:1
    consumer Thread- 1 生产音讯后库存为:0
    consumer Thread- 0 生产音讯后库存为:-1

    当然 while 配合期待条件只是一种通用场景,有的非凡场景不应用 while 或者应用 if 也是能够的。

  2. Object.wait(long)容许咱们指定一个超时工夫(单位为毫秒),如果期待线程在这个工夫内没有被其它线程唤醒,那么 java 虚构机会主动唤醒这个线程,不过这样既不会抛出异样也没有返回值,所以线程是否主动唤醒须要一些额定操作。
  3. wait/notify 的开销及问题

    • 过早唤醒:A、B、C 三个线程中都应用同一个锁对象,而且都存在线程暂停的判断,然而 A 和 B 应用的线程暂停的判断条件和 C 是不同的,所以当 A、B、C 同时处于 WAITING 状态时,有一个线程为了唤醒 A、B 会应用 notifyAll(),这样同时也会把 C 唤醒,然而 C 的通过 while 判断还是持续进入到了暂停状态,也就是说这个 notify 动作是与 C 没有太大关联的,这就被称作过早唤醒。
    • 信号失落:如果期待线程在执行 Object.wait()前没有先判断爱护条件是否未然成立,那么有可能呈现这种情景——告诉线程在该期待线程进入临界区之前就曾经更新了相干共享变量,使得相应的爱护条件成立并进行了告诉,然而此时期待线程还没有被暂停,天然也就无所谓唤醒了。这就可能造成期待线程间接执行 Object.wait()而被暂停的时候,该线程因为没有其余线程进行告诉而始终处于期待状态。这种景象就相当于期待线程错过了一个原本“发送”给它的“信号”,因而被称为信号失落。
    • 欺骗性唤醒:线程可能在没有其余线程执行 notify/notifyAll 的状况下被唤醒,这就被称为欺骗性唤醒。在 wait()办法里面加 while()进行判断就能够解决这个问题。
    • 上下文切换:wait/notify 对应着是 线程暂停 / 线程唤醒,所以会导致屡次锁的申请与开释,锁的申请与开释可能会造成上下文切换。
  4. java 虚拟机为每个对象保护一个被称为 期待集 (wait set) 的队列,该队列用于存储该对象上的期待线程,Object.wait()会让以后线程暂停并开释相应的锁,并将以后线程存入对象的期待集中。执行 Object.notify()会使该对象的期待集中的任意一个线程被唤醒,唤醒的线程并不会立刻被从这个期待集中移除,而是,等到这个线程在次持有对象锁的时候才会被移除。
  5. Thread.join(): 某个线程执行完了,此线程能力执行

    static void main(){Thread t = new Thread();
      t.start();
      ......
      t.join();//A
      ......
    }

    以上为例,只有 t 线程执行结束后,主线程能力执行 A 前面的内容。

条件变量

  1. Condition 能够作为 wait/notify 的替代品来实现期待 / 告诉,它的 await()、signal()/signalAll()别离对应 wait()、notify()/notifyAll()并且解决了过早唤醒以及 wait(long)是否超时无奈判断等问题。
  2. Object.wait()/Object.notify()要求执行线程持有该对象的外部锁。

    Condition.await()/Condition.signal()要求执行线程持有该对象的显式锁。

  3. Condition.awaitUntil(Date),参数是期待的截至期限,当 awaitUntil 被其它线程 signal 时这个办法会返回 true。
  4. Condition 应用样例

    /**
     * @ClassName ConditionSimple
     * @description:
     * @author: yong.yuan
     * @create: 2022-04-18 10:40
     * @Version 1.0
     **/
    public class ConditionExample {static Lock lock = new ReentrantLock();
        static Condition conditionA = lock.newCondition();
        static Condition conditionB = lock.newCondition();
        static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    
        static class Consumer implements Runnable{
            @Override
            public void run() {lock.lock();
                try {while (queue.isEmpty()){System.out.println("消费者暂停中....");
                        conditionA.await();}
                    System.out.println("生产线程"+Thread.currentThread().getName()
                    +"生产音讯:"+queue.take());
                    conditionB.signalAll();}catch (InterruptedException e){e.printStackTrace();
                }finally {lock.unlock();
                }
            }
        }
    
        static class Producer implements Runnable{
            @Override
            public void run() {lock.lock();
                try {while (queue.remainingCapacity() == 0){System.out.println("生产者暂停中...");
                        conditionB.await();}
                    System.out.println("生产线程"+Thread.currentThread().getName()
                    +"生产音讯...");
                    queue.add("hello");
                    conditionA.signalAll();} catch (InterruptedException e) {e.printStackTrace();
                } finally {lock.unlock();
                }
            }
        }
    }
    

倒计时协调器

CountDownLatch 是个别用来实现一个其余线程期待其余线程实现一组特定的操作后再继续执行,这组特定的操作被称为先决操作。

CountDownLatch 会有一个保护没有实现的先决操作的数量的计数器 countDownLatch.countDown() 每被执行一次,计数器就会 -1,CountDownLatch.await()相当于一个受爱护办法,当它所在线程执行到这个办法后,线程就会暂停,直到它外部锁保护的计数器为 0 时,线程才会被唤醒,持续向下执行。

当 CountDownLatch 中的计数器为 0 时,再次执行 countDown 办法,计数器的值不会变,也不会抛出异样,再次执行 await 办法线程也不会进行,这意味着 CountDownLatch 的应用是一次性的。

应用 CountDownLatch 实现期待 / 告诉的时候调用 await、countDown 办法都毋庸加锁。

应用场景:例如启动 java 服务,各服务之间有调用关系,个别先启动相应数量的被调用服务。例如 A 服务启动 5 个实例后能力启动 B 服务,那就能够有一个初始值为 5 的 CountDownLatch,在 B 服务启动前设置 await,每一个 A 服务启动就 countDown,当 A 服务实例启动结束后,B 才开始启动。

public class CountDownLatchExample {private static final CountDownLatch latch = new CountDownLatch(4);
  private static int data;

  public static void main(String[] args) throws InterruptedException {Thread workerThread = new Thread() {
      @Override
      public void run() {for (int i = 1; i < 10; i++) {
          data = i;
          latch.countDown();
          // 使以后线程暂停(随机)一段时间
          Tools.randomPause(1000);
        }

      };
    };
    workerThread.start();
    latch.await();
    Debug.info("It's done. data=%d", data);
  }
}

栅栏

CyclicBarrier 和 CountDownLatch 有肯定相似之处,CountDownLatch 是一个 await 的线程要期待其它线程实现肯定数量的先决条件能力继续执行,CyclicBarrier 会在多个线程中设置一个 await 点,达到这个点的线程数量达到了设置的数量要求才会继续执行。

/**
 * @ClassName ClimbMountains
 * @description:
 * @author: yong.yuan
 * @create: 2022-04-03 21:59
 * @Version 1.0
 **/
public class ClimbMountains {static Logger logger = Logger.getLogger(ClimbMountains.class.getName());
    static CyclicBarrier[] climbBarrier = new CyclicBarrier[2];

    public static void main(String[] args) {for (int i = 0; i < 2; i++) {Company company = new Company(i);
            company.start();}
    }
    static class Company{
        private final String name;
        private final List<Staff> staffList;
        public Company(int c) {this.name ="公司".concat(String.valueOf(c));
            climbBarrier = new CyclicBarrier(5);
            staffList = new ArrayList<>();
            for (int j = 0; j < 5; j++) {staffList.add(new Staff(name,c,j));
            }
        }

        synchronized void start() {String log = String.format("%s 五位精英开始攀登....",name);
            logger.info(log);
            for (Staff staff:staffList) {new Thread(staff::start).start();}
        }
    }

    static class Staff{
        private final String name;
        private final int c;
        public Staff(String company,int c,int s) {
            this.c = c;
            this.name = company.concat("- 员工").concat(String.valueOf(s));
        }

        void start() {System.out.println(name + ": 开始攀登!");
            try {int time = new Random().nextInt(20000);
                Thread.sleep(time);
                System.out.println(name+": 用时"+time/1000+"秒");
                climbBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();
            }finally {System.out.println(name + ": 结束");
            }
        }
    }
}
================= 输入后果 =================
四月 18, 2022 4:00:49 下午 io.github.viscent.mtia.ch5.ClimbMountains$Company start
信息: 公司 0 五位精英开始攀登....
公司 0 - 员工 0: 开始攀登!
公司 0 - 员工 1: 开始攀登!
公司 0 - 员工 2: 开始攀登!
公司 0 - 员工 3: 开始攀登!
公司 0 - 员工 4: 开始攀登!
公司 1 - 员工 0: 开始攀登!
公司 1 - 员工 1: 开始攀登!
公司 1 - 员工 2: 开始攀登!
公司 1 - 员工 3: 开始攀登!
公司 1 - 员工 4: 开始攀登!
四月 18, 2022 4:00:49 下午 io.github.viscent.mtia.ch5.ClimbMountains$Company start
信息: 公司 1 五位精英开始攀登....
公司 1 - 员工 3: 用时 4 秒
公司 1 - 员工 4: 用时 4 秒
公司 0 - 员工 0: 用时 5 秒
公司 0 - 员工 3: 用时 7 秒
公司 0 - 员工 2: 用时 9 秒
公司 1 - 员工 2: 用时 11 秒
公司 1 - 员工 1: 用时 12 秒
公司 1 - 员工 0: 用时 13 秒
公司 1 - 员工 0: 结束
公司 1 - 员工 1: 结束
公司 1 - 员工 2: 结束
公司 1 - 员工 4: 结束
公司 1 - 员工 3: 结束
公司 0 - 员工 4: 用时 13 秒
公司 0 - 员工 1: 用时 16 秒
公司 0 - 员工 0: 结束
公司 0 - 员工 4: 结束
公司 0 - 员工 2: 结束
公司 0 - 员工 3: 结束
![](https://eacape-1259159524.cos.ap-shanghai.myqcloud.com/images/clipboard.png)
公司 0 - 员工 1: 结束

阻塞队列

阻塞队列能够依照其存储空间是否受限制划分为有界队列和无界队列,有界队列的队列容量是由程序设定的,无界队列的容量是 Integer.MAX\_VALUE 也就是 2^31

罕用阻塞队列和罕用办法

  1. ArrayBlockingQueue

    其底层的数据结构是一个数组,所以它在 put 和 take 的时候不会造成垃圾回收的累赘,它在 put 和 take 的时候应用的是同一个显式锁,所以造成他在 put 和 take 的时候会随同着锁的开释与申请,如果有大量线程在一直的 put 和 take 会造成锁竞争过高,从而一直导致上下文切换。

  2. LinkedBlockingQueue

    其底层的数据结构是一个链表,所以它在 put 个 take 的时候会随同着空间的动态分配,也就是每此 put 或 take 操作都会随同着节点的创立和移除,这样就会造成垃圾回收的累赘。然而,它的 put 和 take 操作是应用的两个不一样的显式锁,这样绝对会减缓锁竞争度。

    此外其外部保护着一个 Atomic 变量用于保护队列长度,也可能存在被 put 线程和 take 线程一直的争用。

  3. SychronousQueue

    容量为 0,次要时用来转发工作(阻塞作用),SychronousQueue.take(E)的时候没有线程执行 SychronousQueue.put(E)那么生产线程就会暂停晓得有生产线程执行 SychronousQueue.put(E),

    同样,SychronousQueue.put(E)的时候没有生产线程执行 SychronousQueue.take(E),生产线程也会进行直到,有生产线程执行 SychronousQueue.take(E)。

    SychronousQueue 和 ArrayBlockingQueue/LinkedBlockingQueue 前者就像送快递时快递员要把快递交到你的手上能力去送下一份快递, 而后者就是间接把快递放到蜂巢储物柜中。

  4. PriorityBlockingQueue

    反对优先级的无界阻塞队列,能够通过自定义的类实现 compareTo 办法指定元素的 排序规定,它 take 时如果队列为空将会阻塞,然而它会有限扩容所以,put 并不会 阻塞。其实现原理是堆排序

    在最小堆 [1, 5, 8, 6, 10, 11, 20] 中再插入一个元素 4,上面用图示剖析插入的过程:

    最大堆 [20, 10, 15, 6, 9, 10, 12] 中移除元素后,上面用图示剖析重排的过程:

  5. DelayWorkQueue

    DelayWorkQueue 是实现了提早性能的 PriorityBlockingQueue

    外部采纳的时堆构造(插入时会进行排序),特点是外部元素不会依照入队的程序来出队,

    而是会依据延时长短对外部元素进行排序。

ArrayBlockingQueue 和 SynchronousQueue 都既反对非偏心调度也反对偏心调度,而 LinkedBlockingQueue 仅反对非偏心调度。

如果生产者线程和消费者线程之间的并发水平比拟大,那么这些线程对传输通道外部所应用的锁的争用可能性也随之减少。这时,有界队列的实现适宜选用 LinkedBlockingQueue,否则咱们能够思考 ArrayBlockingQueue。

  • LinkedBlockingQueue 适宜在生产者线程和消费者线程之间的并发水平比拟大的状况下应用
  • ArrayBlockingQueue 适宜在生产者线程和消费者线程之间的并发水平较低的状况下应用
  • SynchronousQueue 适宜在消费者解决能力与生产者解决能力相差不大的状况下应用。

流量管制与信号量

SemaPhore 通常叫做信号量,个别用来管制同时拜访特定资源的的线程数量,通过协调各个线程,以保障正当应用资源。

通常应用的场景就是限流,比如说,限度数据库连接池的连接线程数量。

罕用办法

acquire()  
获取一个令牌,在获取到令牌、或者被其余线程调用中断之前线程始终处于阻塞状态。​
acquire(int permits)  
获取一个令牌,在获取到令牌、或者被其余线程调用中断、或超时之前线程始终处于阻塞状态。acquireUninterruptibly() 
获取一个令牌,在获取到令牌之前线程始终处于阻塞状态(疏忽中断)。tryAcquire()
尝试取得令牌,返回获取令牌胜利或失败,不阻塞线程。​
tryAcquire(long timeout, TimeUnit unit)
尝试取得令牌,在超时工夫内循环尝试获取,直到尝试获取胜利或超时返回,不阻塞线程。​
release()
开释一个令牌,唤醒一个获取令牌不胜利的阻塞线程。​
hasQueuedThreads()
期待队列里是否还存在期待线程。​
getQueueLength()
获取期待队列里阻塞的线程数。​
drainPermits()
清空令牌把可用令牌数置为 0,返回清空令牌的数量。​
availablePermits()
返回可用的令牌数量。

实现一个简略的令牌获取的例子

/**
 * @ClassName SemaphoreExample
 * @description:
 * @author: yong.yuan
 * @create: 2022-04-18 18:59
 * @Version 1.0
 **/
public class SemaphoreExample {static final Semaphore semaphore = new Semaphore(5);
    static Lock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();
    static CountDownLatch countDownLatch = new CountDownLatch(10);

    static class Worker implements Runnable{
        @Override
        public void run() {
            try {Thread.sleep(1000);
                semaphore.acquire();
                System.out.println(Thread.currentThread().getId()+
                "号工人, 从流水线上取货物一件, 现有" + semaphore.availablePermits() + "件货物");
                countDownLatch.countDown();
                lock.lock();
                try {condition.signalAll();
                }finally {lock.unlock();
                }
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }

    static class Machine implements Runnable{
        @Override
        public void run() {
            try {Thread.sleep(1000);
                while(semaphore.availablePermits() >= 5) {lock.lock();
                    try {System.out.println("流水线上的货物满了");
                        condition.await();} finally {lock.unlock();
                    }
                }
                semaphore.release();
                System.out.println("向流水线上送货物,现有"+semaphore.availablePermits()
                +"件货物");
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10; i++) {new Thread(new Worker()).start();
            new Thread(new Machine()).start();}

        countDownLatch.await();
        System.out.println("曾经有 10 个工人搬走货物了");
        while (semaphore.tryAcquire()){System.out.println("残余货物搬走.....");
        }
    }
}
============== 执行后果 ==============
流水线上的货物满了
流水线上的货物满了
17 号工人, 从流水线上取货物一件, 现有 4 件货物
向流水线上送货物,现有 4 件货物
流水线上的货物满了
向流水线上送货物,现有 5 件货物
19 号工人, 从流水线上取货物一件, 现有 5 件货物
13 号工人, 从流水线上取货物一件, 现有 4 件货物
向流水线上送货物,现有 5 件货物
流水线上的货物满了
向流水线上送货物,现有 4 件货物
27 号工人, 从流水线上取货物一件, 现有 3 件货物
15 号工人, 从流水线上取货物一件, 现有 4 件货物
向流水线上送货物,现有 5 件货物
向流水线上送货物,现有 5 件货物
25 号工人, 从流水线上取货物一件, 现有 4 件货物
21 号工人, 从流水线上取货物一件, 现有 4 件货物
向流水线上送货物,现有 5 件货物
23 号工人, 从流水线上取货物一件, 现有 4 件货物
向流水线上送货物,现有 5 件货物
流水线上的货物满了
向流水线上送货物,现有 5 件货物
29 号工人, 从流水线上取货物一件, 现有 4 件货物
向流水线上送货物,现有 5 件货物
31 号工人, 从流水线上取货物一件, 现有 5 件货物
曾经有 10 个工人搬走货物了
残余货物搬走.....
残余货物搬走.....
残余货物搬走.....
残余货物搬走.....
残余货物搬走.....

Exchager

Exchanger 类可用于两个线程之间替换信息。可简略地将 Exchanger 对象了解为一个蕴含两个格子的容器,通过 exchanger 办法能够向两个格子中填充信息。当两个格子中的均被填充时,该对象会主动将两个格子的信息替换,而后返回给线程,从而实现两个线程的信息替换。

当消费者线程生产一个已填充的缓冲区时,另外一个缓冲区能够由生产者线程进行填充,从而实现了数据生成与生产的并发。这种缓冲技术就被称为双缓冲

/**
 * @ClassName ExchangerExample
 * @description:
 * @author: yong.yuan
 * @create: 2022-04-18 23:10
 * @Version 1.0
 **/
public class ExchangerExample {static Exchanger<String> STRING_EXCHANGER = new Exchanger<>();
    static class MyThread extends Thread{
        String msg;
        public MyThread(String threadName,String msg) {Thread.currentThread().setName(threadName);
            this.msg = msg;
        }

        @Override
        public void run() {
            try {System.out.println(Thread.currentThread().getName()+":"
                +STRING_EXCHANGER.exchange(msg));
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {Thread t1 = new MyThread("thread-1","hello thread-1");
        Thread t2 = new MyThread("thread-2","hello thread-2");
        t1.start();
        t2.start();}
}
=============== 操作后果 ===============
Thread-1:hello thread-1
Thread-0:hello thread-2

如何正确的进行线程

应用 interrupt 终止线程

  1. 应用 interrupt 实际上是通过 interrupt 状态的变动来对线程实现进行,而不会立刻终止这个线程。
  2. 当线程处于 wait()或者 sleep()状态时,应用 interrupt 能够将休眠的线程唤醒,然而会抛出异样,咱们能够在 Catch(InterruptedExcetion e){}中手动用 interrupt()来终止这个线程
  3. 比照其它终止形式 – stop(): 会间接把线程停掉,不能解决进行之前想要解决的数据
public class StopThread{public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(() -> {
            int count = 0;
            while (!Thread.currentThread().isInterrupted() && count < 1000){System.out.println("count =" + count++);
                try {Thread.sleep(1000);
                } catch (InterruptedException e) {e.printStackTrace();
                    // 线程在休眠期间被中断,那么会主动革除中断信号, 所以须要在 catch 中再次中断
                    Thread.currentThread().interrupt();
                }
            }
        });
        thread.start();
        Thread.sleep(5000);
        thread.interrupt();}
}
退出移动版