关于java:JAVA并发编程生产者与消费者模式传统版阻塞队列版

9次阅读

共计 3718 个字符,预计需要花费 10 分钟才能阅读完成。

1. 前言

2. 生产者与消费者模型传统版代码示例

3. 生产者与消费者模型阻塞队列版代码示例

1. 前言
明天咱们要用两种形式来实现生产者和消费者模式,咱们要先介绍一个概念,生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不间接通信,而是通过音讯队列进行通信,所以生产者生产完数据后不必期待消费者解决,而是间接扔给队列,消费者不找生产者要数据,而是间接从队列里取数据,队列相当于一个缓冲区,均衡了生产者和消费者的解决能力。这个队列就是用来给生产者和消费者解耦的。咱们明天要用两种形式来实现这个模型,一种是阻塞队列版本,一种是传统版本。

2. 生产者与消费者模型传统版代码示例
咱们先来对传统消费者和生产者模型的一个 JAVA 代码的实现:
假如咱们当初有这么一个需要,一个生产者和一个消费者,他们独特管制一个变量 number,一个线程生产数据,将 number 变为 1,一个线程生产数据,将 number 从 1 变为 0,从此往返循环。

咱们应用 lock 和 Condition 版本的锁,对于 Lock 和 Condition,能够依据我写的另外一篇文章:
JAVA 并发编程——Synchronized 与 Lock 的区别以及 Lock 的应用
咱们先设定三个变量:

    // 生产者和消费者线程操作的共享对象
    private volatile int number = 0;
    // 一把锁
    private Lock lock = new ReentrantLock();
    //condition 用以准确唤醒线程,用来代替 wait()和 notify()办法
    private Condition condition = lock.newCondition();

生产办法:

   public void increment() throws Exception {
        try {
            // 加锁
            lock.lock();
            // 如果线程不是 0,则认定为还未生产,先唤醒生产线程
            while (number != 0) {condition.await();
            }
            // 生产数据
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            // 唤醒所有线程
            condition.signalAll();} finally {lock.unlock();
        }
    }

生产办法:

     public void decrement() throws Exception {
        try {
            // 加锁
            lock.lock();
            // 如果线程是 0,则认定为还未生产,先唤醒生产线程
            while (number == 0) {condition.await();
            }
            // 生产
            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            // 唤醒所有线程
            condition.signalAll();} finally {lock.unlock();
        }
    }

接下来咱们各开启 1000 个线程进行生产和生产:

   public static void main(String[] args) {ShareData shareData = new ShareData();

        for (int i = 1; i <= 500; i++) {new Thread(() -> {
                try {shareData.increment();
                } catch (Exception e) {e.printStackTrace();
                }
            }, "线程 AA 生产:").start();}

        for (int i = 1; i <= 500; i++) {new Thread(() -> {
                try {shareData.decrement();
                } catch (Exception e) {e.printStackTrace();
                }
            }, "线程 BB 生产:").start();}

        for (int i = 1; i <= 500; i++) {new Thread(() -> {
                try {shareData.increment();
                } catch (Exception e) {e.printStackTrace();
                }
            }, "线程 CC 生产:").start();}

        for (int i = 1; i <= 500; i++) {new Thread(() -> {
                try {shareData.decrement();
                } catch (Exception e) {e.printStackTrace();
                }
            }, "线程 DD 生产:").start();}

    }

}

运行后果为:

胜利地实现了传统版本生产者和消费者模式。

3. 生产者与消费者模型阻塞队列版代码示例
接下来咱们应用阻塞队列版本来实现以下生产者和消费者模式,至于阻塞队列,在之前的文章讲过:
JAVA 并发编程——阻塞队列实践及其 API 介绍

咱们设定三个变量和一个构造方法

    private volatile boolean flag = true;// 默认开启,true 生产 + f 生产  false 退出生产 + 生产
    private AtomicInteger atomicInteger = new AtomicInteger(); // 代表生产的数据
    private BlockingQueue<String> blockingQueue = null;// 阻塞队列
    public MyResources(BlockingQueue<String> blockingQueue) {this.blockingQueue = blockingQueue;// 阻塞队列的类型通过构造方法确定}

生产办法:

    public void myProd() throws InterruptedException {
        String data = null; // 生产的数据内容
        boolean retValue; // 生产是否胜利
        while (flag) { //true 代表生产着消费者模式开启,false 代表完结
            data = atomicInteger.getAndIncrement() + "";
            // 阻塞队列超时期待办法
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if (retValue) System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "胜利");
            else System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
            Thread.sleep(1000);
        }
        System.out.println("falg = false 生产叫停!");
    }

生产办法:

    public void myCustomer() throws InterruptedException {
        String result = null;
        while (flag) { //true 代表生产着消费者模式开启,false 代表完结
            // 阻塞队列超时期待办法
            result = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if (result == null || result.equalsIgnoreCase("")) {
                flag = false;
                System.out.println("超过两秒钟没有数据生产,退出");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t 生产队列" + result + "胜利");
            Thread.sleep(1000);
        }
    }

进行生产办法:

    public void stop() {this.flag = false;// 当 flag 等于 0 的时候,生产和生产就进行了}

接下来咱们创立两个线程进行生产和生产,生产和生产超过 10 秒后,进行两个线程的运行:

 public static void main(String[] args) throws InterruptedException {MyResources myResources = new MyResources(new ArrayBlockingQueue<String>(3));

        new Thread(() -> {
            try {myResources.myProd();
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }, "生产线程").start();

        new Thread(() -> {
            try {myResources.myCustomer();
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }, "生产线程").start();


        Thread.sleep(10000);
        System.out.println("生产叫停!");
        myResources.stop();}

运行后果:

能够看出,利用阻塞队列,生产到工夫了当前,就会主动进行了!

总结:
这章咱们次要利用两种形式实现了生产者和消费者模型:
传统模式:利用 lock 类和 condition
阻塞队列模式,利用了阻塞队列。

正文完
 0