欢迎进入JAVA基础课程

本系列文章将主要针对JAVA一些基础知识点进行讲解,为平时归纳所总结,不管是刚接触JAVA开发菜鸟还是业界资深人士,都希望对广大同行带来一些帮助。若有问题请及时留言或加QQ:243042162。

谨记:
“天眼”之父南仁东,心无旁骛,为崇山峻岭间的中国“天眼”燃尽生命,看似一口“大锅”,天眼是世界上最大、最灵敏的单口径射电望远镜,可以接受百亿光年外的电磁信号。南仁东总工程师执着追求科学梦想的精神,将激励一代又一代科技工作者继续奋斗,勇攀世界科技高峰。作为IT界的从业者,我们需要紧跟时代的步伐,踏过平庸,一生为科技筑梦。

生产者消费者问题

1. 背景

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

2. 条件

  • 生产者仅仅在仓储未满时候生产, 仓满则停止生产.
  • 生产者在生产出可消费产品时候, 应该通知等待的消费者去消费.
  • 消费者仅仅在仓储有产品时候才能消费, 仓空则等待.
  • 消费者发现仓储没产品可消费时候会通知生产者生产.

3.实现方式

  • wait() / notify()方法
  • await() / signal()方法
  • BlockingQueue阻塞队列方法
  • Semaphore方法
  • PipedInputStream / PipedOutputStream

下面主要针对前面三种方式做代码实现

(1) wait() / notify()方法

public class ProducerMain {    private static Integer count=0;    private final Integer full=10;    private static String LOCK="LOCK";    class  Producer implements Runnable{        @Override        public void run() {//            for(int i=0;i<10;i++){//                try {//                    Thread.sleep(3000);//                } catch (InterruptedException e) {//                    e.printStackTrace();//                }//            }            synchronized (LOCK){                while(count==full){                    try {                        LOCK.wait();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }                count++;                System.out.println(Thread.currentThread().getName()+"生产者生产,目前共生产了:"+count);                LOCK.notifyAll();            }        }    }    class Consumer implements Runnable{        @Override        public void run() {            synchronized (LOCK){                while (count==0){                    try {                        LOCK.wait();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }                count--;                System.out.println(Thread.currentThread().getName()+"生产者消费,目前共剩余:"+count);                LOCK.notifyAll();            }        }    }    public static void main(String[] args) {        ProducerMain producer = new ProducerMain();        new Thread(producer.new Producer()).start();        new Thread(producer.new Consumer()).start();        new Thread(producer.new Producer()).start();        new Thread(producer.new Consumer()).start();        new Thread(producer.new Producer()).start();        new Thread(producer.new Consumer()).start();        new Thread(producer.new Producer()).start();        new Thread(producer.new Consumer()).start();    }}

输出结果

Thread-0生产者生产,目前共生产了:1Thread-5生产者消费,目前共剩余:0Thread-2生产者生产,目前共生产了:1Thread-7生产者消费,目前共剩余:0Thread-6生产者生产,目前共生产了:1Thread-1生产者消费,目前共剩余:0Thread-4生产者生产,目前共生产了:1Thread-3生产者消费,目前共剩余:0

(2)await() / signal()方法

public class ReentrantLockDemo {    private static Integer count = 0;    private final Integer FULL = 10;    final Lock lock = new ReentrantLock();    final Condition NotFull = lock.newCondition();    final Condition NotEmpty = lock.newCondition();    class Producer implements Runnable {        @Override        public void run() {            for (int i = 0; i < 10; i++) {                try {                    Thread.sleep(3000);                } catch (Exception e) {                    e.printStackTrace();                }                lock.lock();                try {                    while (count == FULL) {                        try {                            NotFull.await();                        } catch (InterruptedException e) {                            // TODO Auto-generated catch block                            e.printStackTrace();                        }                    }                    count++;                    System.out.println(Thread.currentThread().getName()                            + "生产者生产,目前总共有" + count);                    NotEmpty.signal();                } finally {                    lock.unlock();                }            }        }    }    class Consumer implements Runnable {        @Override        public void run() {            for (int i = 0; i < 10; i++) {                try {                    Thread.sleep(3000);                } catch (InterruptedException e1) {                    e1.printStackTrace();                }                lock.lock();                try {                    while (count == 0) {                        try {                            NotEmpty.await();                        } catch (Exception e) {                            // TODO: handle exception                            e.printStackTrace();                        }                    }                    count--;                    System.out.println(Thread.currentThread().getName()                            + "消费者消费,目前总共有" + count);                    NotFull.signal();                } finally {                    lock.unlock();                }            }        }    }    public static void main(String[] args) throws Exception {        ReentrantLockDemo hosee = new ReentrantLockDemo();        new Thread(hosee.new Producer()).start();        new Thread(hosee.new Consumer()).start();        new Thread(hosee.new Producer()).start();        new Thread(hosee.new Consumer()).start();        new Thread(hosee.new Producer()).start();        new Thread(hosee.new Consumer()).start();        new Thread(hosee.new Producer()).start();        new Thread(hosee.new Consumer()).start();    }}

输出结果

Thread-0生产者生产,目前总共有1Thread-2生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-7消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-2生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-6生产者生产,目前总共有2Thread-7消费者消费,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-2生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-7消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-2生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-7消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-2生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-6生产者生产,目前总共有2Thread-7消费者消费,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-0生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-7消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-0生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-6生产者生产,目前总共有2Thread-7消费者消费,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-0生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-6生产者生产,目前总共有2Thread-7消费者消费,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-0生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-6生产者生产,目前总共有2Thread-7消费者消费,目前总共有1Thread-4生产者生产,目前总共有2Thread-5消费者消费,目前总共有1Thread-3消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-2生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-6生产者生产,目前总共有2Thread-7消费者消费,目前总共有1Thread-5消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-3消费者消费,目前总共有0

(3)BlockingQueue阻塞队列方法

public class BlockingQueueMain {    private static Integer count = 0;    final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10);    class Producer implements Runnable {        @Override        public void run() {            for (int i = 0; i < 10; i++) {                try {                    Thread.sleep(3000);                } catch (Exception e) {                    e.printStackTrace();                }                try {                    bq.put(1);                    count++;                    System.out.println(Thread.currentThread().getName()                            + "生产者生产,目前总共有" + count);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        }    }    class Consumer implements Runnable {        @Override        public void run() {            for (int i = 0; i < 10; i++) {                try {                    Thread.sleep(3000);                } catch (InterruptedException e1) {                    e1.printStackTrace();                }                try {                    bq.take();                    count--;                    System.out.println(Thread.currentThread().getName()                            + "消费者消费,目前总共有" + count);                } catch (Exception e) {                    // TODO: handle exception                    e.printStackTrace();                }            }        }    }    public static void main(String[] args) throws Exception {        BlockingQueueMain hosee = new BlockingQueueMain();        new Thread(hosee.new Producer()).start();        new Thread(hosee.new Consumer()).start();        new Thread(hosee.new Producer()).start();        new Thread(hosee.new Consumer()).start();        new Thread(hosee.new Producer()).start();        new Thread(hosee.new Consumer()).start();        new Thread(hosee.new Producer()).start();        new Thread(hosee.new Consumer()).start();    }}

输出结果

Thread-1消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-0生产者生产,目前总共有0Thread-5消费者消费,目前总共有0Thread-3消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-6生产者生产,目前总共有1Thread-7消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-1消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-5消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-6生产者生产,目前总共有0Thread-7消费者消费,目前总共有0Thread-4生产者生产,目前总共有0Thread-1消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-5消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-7消费者消费,目前总共有0Thread-4生产者生产,目前总共有1Thread-1消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-5消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-7消费者消费,目前总共有0Thread-4生产者生产,目前总共有2Thread-0生产者生产,目前总共有1Thread-1消费者消费,目前总共有0Thread-5消费者消费,目前总共有1Thread-2生产者生产,目前总共有1Thread-3消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-7消费者消费,目前总共有0