关于java:JAVA并发编程生产者与消费者模式传统版阻塞队列版

1.前言

2.生产者与消费者模型传统版代码示例

3.生产者与消费者模型阻塞队列版代码示例

1.前言
明天咱们要用两种形式来实现生产者和消费者模式,咱们要先介绍一个概念,生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不间接通信,而是通过音讯队列进行通信,所以生产者生产完数据后不必期待消费者解决,而是间接扔给队列,消费者不找生产者要数据,而是间接从队列里取数据,队列相当于一个缓冲区,均衡了生产者和消费者的解决能力。这个队列就是用来给生产者和消费者解耦的。咱们明天要用两种形式来实现这个模型,一种是阻塞队列版本,一种是传统版本。

2.生产者与消费者模型传统版代码示例
咱们先来对传统消费者和生产者模型的一个JAVA代码的实现:
假如咱们当初有这么一个需要,一个生产者和一个消费者,他们独特管制一个变量number,一个线程生产数据,将number变为1,一个线程生产数据,将number从1变为0,从此往返循环。

咱们应用lock和Condition版本的锁,对于Lock和Condition,能够依据我写的另外一篇文章:
JAVA并发编程——Synchronized与Lock的区别以及Lock的应用
咱们先设定三个变量:

    //生产者和消费者线程操作的共享对象
    private volatile int number = 0;
    //一把锁
    private Lock lock = new ReentrantLock();
    //condition用以准确唤醒线程,用来代替wait()和notify()办法
    private Condition condition = lock.newCondition();

生产办法:

   public void increment() throws Exception {
        try {
            //加锁
            lock.lock();
            //如果线程不是0,则认定为还未生产,先唤醒生产线程
            while (number != 0) {
                condition.await();
            }
            //生产数据
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            //唤醒所有线程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

生产办法:

     public void decrement() throws Exception {
        try {
            //加锁
            lock.lock();
            //如果线程是0,则认定为还未生产,先唤醒生产线程
            while (number == 0) {
                condition.await();
            }
            //生产
            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            //唤醒所有线程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

接下来咱们各开启1000个线程进行生产和生产:

   public static void main(String[] args) {

        ShareData shareData = new ShareData();

        for (int i = 1; i <= 500; i++) {
            new Thread(() -> {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "线程AA生产:").start();
        }

        for (int i = 1; i <= 500; i++) {
            new Thread(() -> {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "线程BB生产:").start();
        }

        for (int i = 1; i <= 500; i++) {
            new Thread(() -> {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "线程CC生产:").start();
        }

        for (int i = 1; i <= 500; i++) {
            new Thread(() -> {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "线程DD生产:").start();
        }

    }

}

运行后果为:

胜利地实现了传统版本生产者和消费者模式。

3.生产者与消费者模型阻塞队列版代码示例
接下来咱们应用阻塞队列版本来实现以下生产者和消费者模式,至于阻塞队列,在之前的文章讲过:
JAVA并发编程——阻塞队列实践及其API介绍

咱们设定三个变量和一个构造方法

    private volatile boolean flag = true;//默认开启,true生产+f生产  false 退出生产+生产
    private AtomicInteger atomicInteger = new AtomicInteger(); //代表生产的数据
    private BlockingQueue<String> blockingQueue = null;//阻塞队列
    public MyResources(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;//阻塞队列的类型通过构造方法确定
    }

生产办法:

    public void myProd() throws InterruptedException {
        String data = null; //生产的数据内容
        boolean retValue; //生产是否胜利
        while (flag) { //true代表生产着消费者模式开启,false代表完结
            data = atomicInteger.getAndIncrement() + "";
            //阻塞队列超时期待办法
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if (retValue) System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "胜利");
            else System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
            Thread.sleep(1000);
        }
        System.out.println("falg = false 生产叫停!");
    }

生产办法:

    public void myCustomer() throws InterruptedException {
        String result = null;
        while (flag) { //true代表生产着消费者模式开启,false代表完结
            //阻塞队列超时期待办法
            result = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if (result == null || result.equalsIgnoreCase("")) {
                flag = false;
                System.out.println("超过两秒钟没有数据生产,退出");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t 生产队列" + result + "胜利");
            Thread.sleep(1000);
        }
    }

进行生产办法:

    public void stop() {
        this.flag = false;//当flag等于0的时候,生产和生产就进行了
    }

接下来咱们创立两个线程进行生产和生产,生产和生产超过10秒后,进行两个线程的运行:

 public static void main(String[] args) throws InterruptedException {
        MyResources myResources = new MyResources(new ArrayBlockingQueue<String>(3));

        new Thread(() -> {
            try {
                myResources.myProd();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "生产线程").start();

        new Thread(() -> {
            try {
                myResources.myCustomer();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "生产线程").start();


        Thread.sleep(10000);
        System.out.println("生产叫停!");
        myResources.stop();

    }

运行后果:

能够看出,利用阻塞队列,生产到工夫了当前,就会主动进行了!

总结:
这章咱们次要利用两种形式实现了生产者和消费者模型:
传统模式:利用lock类和condition
阻塞队列模式,利用了阻塞队列。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理