生产着,消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。通常有两个角色:若干个生产者线程,若个个消费者线程。生产者线程负责提交用户的请求,消费者线程负责具体处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。生产者-消费者模式中的内存缓冲区主要功能是数据在多线程间的共享,此外,通过该缓冲区,可以缓解生产者和消费者间的性能差。

public class Main {

public static void main(String[] args) throws InterruptedException {    BlockingQueue<PCData> queue = new LinkedBlockingDeque<>();  //缓冲区域    Producer producer1 = new Producer(queue);    Producer producer2 = new Producer(queue);//生产者    Producer producer3 = new Producer(queue);    Consumer consumer1 = new Consumer(queue);    Consumer consumer2 = new Consumer(queue);//消费者    Consumer consumer3 = new Consumer(queue);    ExecutorService executorService = Executors.newCachedThreadPool();    executorService.execute(producer1);    executorService.execute(producer2);    executorService.execute(producer3);    executorService.execute(consumer1);    executorService.execute(consumer2);    executorService.execute(consumer3);    Thread.sleep(10*1000);    producer1.stop();    producer2.stop();    producer3.stop();    Thread.sleep(3000);    executorService.shutdown();}

}
public class Producer implements Runnable{

private volatile boolean isRunning = true;private BlockingQueue<PCData> queue;private static AtomicInteger count = new AtomicInteger();private static final int SLEEEPTIME =1000;public Producer(BlockingQueue<PCData> queue) {    this.queue = queue;}@Overridepublic void run() {    PCData data = null;    Random random = new Random();    System.out.println("start producer name"+Thread.currentThread().getName());    try{        while (isRunning){            Thread.sleep(random.nextInt(SLEEEPTIME));            data = new PCData(count.incrementAndGet());            System.out.println(data+"is put into queue");            if(!queue.offer(data,2,TimeUnit.SECONDS)){                System.err.println("failed to put data"+data);            }        }    }catch (Exception e){        e.printStackTrace();        Thread.currentThread().interrupt();    }}public void stop(){    isRunning=false;}

}
public class Consumer implements Runnable {

private BlockingQueue<PCData> queue;private static final int SLEEPTIME = 1000;public Consumer(BlockingQueue<PCData> queue) {    this.queue = queue;}@Overridepublic void run() {    System.out.println("start Consumer id"+Thread.currentThread().getName());    Random random = new Random();    try{        while(true){            PCData pcData = queue.take();            if(pcData!=null){                int re = pcData.getData()*pcData.getData();                System.out.println(MessageFormat.format("{0}*{1}={2}",pcData.getData(),pcData.getData(),re));                Thread.sleep(random.nextInt(SLEEPTIME));            }        }    }catch (Exception e){        e.printStackTrace();        Thread.currentThread().interrupt();    }}

}
public class PCData {

private final int intData;public PCData(int intData) {    this.intData = intData;}public PCData(String data) {    this.intData = Integer.valueOf(data);}public int getData(){    return intData;}@Overridepublic String toString() {    return "PCData{" +            "intData=" + intData +            '}';}

}