一、协调生产/生产的需要
本文内容次要想向大家介绍一下Lock联合Condition的应用办法,为了更好的了解Lock锁与Condition锁信号,咱们来手写一个ArrayBlockingQueue。 JDK实际上曾经有这个类,基于Lock锁与Condition锁信号实现的,当然JDK实现代码很简单蕴含了更谨严的逻辑校验,以及从性能优化的角度做了更多的工作。本文中咱们只是来简略实现一下其外围逻辑:
- ArrayBlockingQueue初始化结构时指定容量下限最大值
- 提供put办法,当达到Queue队列容量下限最大值,阻塞生产数据的线程。
- put办法生产数据之后,队列必定是不为空,告诉消费者线程进行生产。
- 提供take办法,当Queue队列容量为0时候,阻塞生产数据的线程。
- take办法执行之后,队列必定不是满的,告诉生产者线程进行生产。
- 一条数据只能被take一次,take之后数据从queue中删除
置信实现实现下面的逻辑之后,java并发编程之Lock锁与Condition锁信号,你必定是把握了!其实这个逻辑基本上就是kafka生产者客户端缓冲队列,批量进行数据发送的实现逻辑。区别是take办法一次取出缓冲区所有数据,本文take办法一次取出一条数据。
二、构造方法
结构队列的办法很简略,应用一个List作为数据存储队列,并指定其容量。到此咱们还没有实现容量判断,以及阻塞线程的性能。
//类成员变量-存储数据的队列private List<Object> queue;//类成员变量-存储队列的容量下限private int queueSize;public MyBlockingQueue(int queueSize) { this.queueSize = queueSize; queue = new ArrayList<>(queueSize);//存储音讯的汇合}
三、Lock& Condition逻辑设计
首先咱们要有一把锁,保证数据put与take操作的同步性,即:一条数据只能被take一次,take之后数据从queue中删除;以及创立Condition逻辑都须要Lock锁。学过java根底并发编程的同学,能够把Lock锁了解为Synchronized 同步代码块性能是一样的。我写过一个专栏《java并发编程》中介绍了二者的区别,欢送关注。
private Lock lock = new ReentrantLock();//锁
Condition逻辑大家能够了解为传统JDK多线程编程中的wait与notify,然而Condition的语义更容易被了解。如下文代码所示:
private Condition notFull = lock.newCondition(); //队列不为满notFull.signal(); //告诉生产者队列不为满,能够持续生产数据(通常在消费者拿走数据之后,调用)notFull.await(); //队列已满,阻塞生产线程(await对condition逻辑取反)
private Condition notEmpty = lock.newCondition(); //队列不为空notEmpty.signal(); //告诉消费者线程队列不为空,能够持续生产数据(通常在生产者生产数据之后,调用)notEmpty.await(); //队列曾经空了,阻塞生产线程(await对condition逻辑取反)
大家在应用Lock& Condition进行线程同步协调的时候,肯定像我一样先把condition的逻辑语义设计好
- 将当xxxx时候的表白,设计为condition。
- 当状况满足condition的时候发出信号signal()告诉其余线程;
- 当状况与condtion正好相同的的时候,应用await阻塞以后线程。
四、put放入数据
其实最重要的就是实现Lock& Condition逻辑设计,剩下的就是填空了,模板如下
通过while循环判断队列以后容量是否达到容量下限,如果达到下限就示意队列满了。队列满了(notFull取反应用await),await阻塞生产线程向队列中持续放入数据。在这里,有小伙伴已经问过我一个奇葩的问题:多线程持有同一个lock锁,你怎么晓得阻塞的是生产线程,而不是生产线程呢? 答:一个线程是生产线程还是生产线程,取决于它的动作(调用什么办法),并没有一个标签给它定义死,调用put办法放入数据的就是生产数据的线程。while/await组合是规范写法,请不要随便翻新改成if,否则你会遇到很多诡异的bug。
//队列满了,await阻塞生产线程while (queue.size() >= queueSize) { System.out.println(Thread.currentThread().getName() + "期待,因为队列满了" ); notFull.await();}
向队列中增加一条数据,此时咱们能够确定队列是notEmpty,所以应用notEmpty.signal()向生产者发送信号。这里问题又来了:多线程持有同一个lock锁,你怎么晓得告诉的是消费者线程,而不是生产者线程呢? 答案是我的确不晓得,所以在上文中的while (queue.size() >= queueSize)
采纳的是while,而不是if。即便生产者线程被唤醒了,while判断也会把它await拦住。
//向队列增加一条音讯,同时告诉消费者有新音讯了queue.add(message);System.out.println(Thread.currentThread().getName() + "生产" + message );notEmpty.signal();//告诉消费者线程
五、take生产数据
take从队列中取出数据,取出数据之后,队列必定是notFull ,所以收回notFull.signal信号。当队列空了(notEmpty应用await取反),await同时阻塞消费者线程。
public Object take() throws InterruptedException { Object retVal = null; lock.lock();//操作队列先加锁 try { //队列空了,告诉生产线程,生产线程阻塞 while (queue.size() == 0) { System.out.println("队列曾经空了,进行生产!"); notEmpty.await(); } //队列删除一条音讯,同时告诉生产者队列有地位了 retVal = queue.get(0); queue.remove(0); notFull.signal(); //同时告诉生产者队列 } finally { lock.unlock(); } return retVal;}
我置信有了下面put办法的根底,了解take办法中的代码,就非常容易了,这里我就不做过多的阐明了。
六、生产生产测试
public static void main(String[] args) { //为了不便查看测试后果,咱们的队列容量设置小一些 MyBlockingQueue queue = new MyBlockingQueue(2); //生产者线程 new Thread(()->{ for(int i = 0;i < 5;i++){ try { queue.put("msg" + i); //放入5条数据 } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); //消费者线程 new Thread(()->{ while(true){ //始终生产 try { System.out.println(Thread.currentThread().getName() + "生产数据" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();}
输入后果如下,满足咱们的需要。队列满了,生产者线程Thread-0期待;生产生产相互协调告诉,最终数据生产实现,队列空了,消费者线程阻塞。
Thread-0生产msg0Thread-0生产msg1Thread-0期待,因为队列满了Thread-1生产数据msg0Thread-0生产msg2Thread-0期待,因为队列满了Thread-1生产数据msg1Thread-0生产msg3Thread-0期待,因为队列满了Thread-1生产数据msg2Thread-0生产msg4Thread-1生产数据msg3Thread-1生产数据msg4队列曾经空了,进行生产!
欢送关注我的博客,更多精品常识合集
本文转载注明出处(必须带连贯,不能只转文字):字母哥博客 - zimug.com
感觉对您有帮忙的话,帮我点赞、分享!您的反对是我不竭的创作能源!。另外,笔者最近一段时间输入了如下的精品内容,期待您的关注。
- 《kafka修炼之道》
- 《手摸手教你学Spring Boot2.0》
- 《Spring Security-JWT-OAuth2一本通》
- 《实战前后端拆散RBAC权限管理系统》
- 《实战SpringCloud微服务从青铜到王者》