生产者模式消费者模式

31次阅读

共计 2546 个字符,预计需要花费 7 分钟才能阅读完成。

 生产着,消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。通常有两个角色:若干个生产者线程,若个个消费者线程。生产者线程负责提交用户的请求,消费者线程负责具体处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。生产者 - 消费者模式中的内存缓冲区主要功能是数据在多线程间的共享,此外,通过该缓冲区,可以缓解生产者和消费者间的性能差。

public class Main {

public static void main(String[] args) throws InterruptedException {BlockingQueue<PCData> queue = new LinkedBlockingDeque<>();  // 缓冲区域
    Producer producer1 = new Producer(queue);
    Producer producer2 = new Producer(queue);// 生产者
    Producer producer3 = new Producer(queue);
    Consumer consumer1 = new Consumer(queue);
    Consumer consumer2 = new Consumer(queue);// 消费者
    Consumer consumer3 = new Consumer(queue);
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(producer1);
    executorService.execute(producer2);
    executorService.execute(producer3);
    executorService.execute(consumer1);
    executorService.execute(consumer2);
    executorService.execute(consumer3);

    Thread.sleep(10*1000);
    producer1.stop();
    producer2.stop();
    producer3.stop();
    Thread.sleep(3000);
    executorService.shutdown();}

}
public class Producer implements Runnable{

private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEEPTIME =1000;

public Producer(BlockingQueue<PCData> queue) {this.queue = queue;}

@Override
public void run() {
    PCData data = null;
    Random random = new Random();
    System.out.println("start producer name"+Thread.currentThread().getName());
    try{while (isRunning){Thread.sleep(random.nextInt(SLEEEPTIME));
            data = new PCData(count.incrementAndGet());
            System.out.println(data+"is put into queue");
            if(!queue.offer(data,2,TimeUnit.SECONDS)){System.err.println("failed to put data"+data);
            }
        }
    }catch (Exception e){e.printStackTrace();
        Thread.currentThread().interrupt();
    }
}

public void stop(){isRunning=false;}

}
public class Consumer implements Runnable {

private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue) {this.queue = queue;}

@Override
public void run() {System.out.println("start Consumer id"+Thread.currentThread().getName());
    Random random = new Random();
    try{while(true){PCData pcData = queue.take();
            if(pcData!=null){int re = pcData.getData()*pcData.getData();
                System.out.println(MessageFormat.format("{0}*{1}={2}",pcData.getData(),pcData.getData(),re));
                Thread.sleep(random.nextInt(SLEEPTIME));
            }
        }
    }catch (Exception e){e.printStackTrace();
        Thread.currentThread().interrupt();
    }
}

}
public class PCData {

private final int intData;

public PCData(int intData) {this.intData = intData;}
public PCData(String data) {this.intData = Integer.valueOf(data);
}
public int getData(){return intData;}

@Override
public String toString() {
    return "PCData{" +
            "intData=" + intData +
            '}';
}

}

正文完
 0