生产着,消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。通常有两个角色:若干个生产者线程,若个个消费者线程。生产者线程负责提交用户的请求,消费者线程负责具体处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。生产者 - 消费者模式中的内存缓冲区主要功能是数据在多线程间的共享,此外,通过该缓冲区,可以缓解生产者和消费者间的性能差。
public class Main {
public static void main(String[] args) throws InterruptedException {BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(); // 缓冲区域
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);// 生产者
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);// 消费者
Consumer consumer3 = new Consumer(queue);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(producer1);
executorService.execute(producer2);
executorService.execute(producer3);
executorService.execute(consumer1);
executorService.execute(consumer2);
executorService.execute(consumer3);
Thread.sleep(10*1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(3000);
executorService.shutdown();}
}
public class Producer implements Runnable{
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEEPTIME =1000;
public Producer(BlockingQueue<PCData> queue) {this.queue = queue;}
@Override
public void run() {
PCData data = null;
Random random = new Random();
System.out.println("start producer name"+Thread.currentThread().getName());
try{while (isRunning){Thread.sleep(random.nextInt(SLEEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data+"is put into queue");
if(!queue.offer(data,2,TimeUnit.SECONDS)){System.err.println("failed to put data"+data);
}
}
}catch (Exception e){e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop(){isRunning=false;}
}
public class Consumer implements Runnable {
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue) {this.queue = queue;}
@Override
public void run() {System.out.println("start Consumer id"+Thread.currentThread().getName());
Random random = new Random();
try{while(true){PCData pcData = queue.take();
if(pcData!=null){int re = pcData.getData()*pcData.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}",pcData.getData(),pcData.getData(),re));
Thread.sleep(random.nextInt(SLEEPTIME));
}
}
}catch (Exception e){e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
public class PCData {
private final int intData;
public PCData(int intData) {this.intData = intData;}
public PCData(String data) {this.intData = Integer.valueOf(data);
}
public int getData(){return intData;}
@Override
public String toString() {
return "PCData{" +
"intData=" + intData +
'}';
}
}