生产着,消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。通常有两个角色:若干个生产者线程,若个个消费者线程。生产者线程负责提交用户的请求,消费者线程负责具体处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。生产者-消费者模式中的内存缓冲区主要功能是数据在多线程间的共享,此外,通过该缓冲区,可以缓解生产者和消费者间的性能差。
public class Main {
public static void main(String[] args) throws InterruptedException { BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(); //缓冲区域 Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue);//生产者 Producer producer3 = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue);//消费者 Consumer consumer3 = new Consumer(queue); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(producer1); executorService.execute(producer2); executorService.execute(producer3); executorService.execute(consumer1); executorService.execute(consumer2); executorService.execute(consumer3); Thread.sleep(10*1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(3000); executorService.shutdown();}
}
public class Producer implements Runnable{
private volatile boolean isRunning = true;private BlockingQueue<PCData> queue;private static AtomicInteger count = new AtomicInteger();private static final int SLEEEPTIME =1000;public Producer(BlockingQueue<PCData> queue) { this.queue = queue;}@Overridepublic void run() { PCData data = null; Random random = new Random(); System.out.println("start producer name"+Thread.currentThread().getName()); try{ while (isRunning){ Thread.sleep(random.nextInt(SLEEEPTIME)); data = new PCData(count.incrementAndGet()); System.out.println(data+"is put into queue"); if(!queue.offer(data,2,TimeUnit.SECONDS)){ System.err.println("failed to put data"+data); } } }catch (Exception e){ e.printStackTrace(); Thread.currentThread().interrupt(); }}public void stop(){ isRunning=false;}
}
public class Consumer implements Runnable {
private BlockingQueue<PCData> queue;private static final int SLEEPTIME = 1000;public Consumer(BlockingQueue<PCData> queue) { this.queue = queue;}@Overridepublic void run() { System.out.println("start Consumer id"+Thread.currentThread().getName()); Random random = new Random(); try{ while(true){ PCData pcData = queue.take(); if(pcData!=null){ int re = pcData.getData()*pcData.getData(); System.out.println(MessageFormat.format("{0}*{1}={2}",pcData.getData(),pcData.getData(),re)); Thread.sleep(random.nextInt(SLEEPTIME)); } } }catch (Exception e){ e.printStackTrace(); Thread.currentThread().interrupt(); }}
}
public class PCData {
private final int intData;public PCData(int intData) { this.intData = intData;}public PCData(String data) { this.intData = Integer.valueOf(data);}public int getData(){ return intData;}@Overridepublic String toString() { return "PCData{" + "intData=" + intData + '}';}
}