DelayQueue
DelayQueue是一个无界阻塞队列,只有在提早期满时能力从中提取元素。该队列的头部是提早期满后保留工夫最长的Delayed元素。
寄存到DelayDeque的元素必须继承Delayed接口。Delayed接口使对象成为提早对象,它使寄存在DelayQueue类中的对象具备了激活日期,该接口强制执行下列两个办法:
- CompareTo(Delayed o):Delayed接口继承了Comparable接口,因而有了这个办法
- getDelay(TimeUnit unit):这个办法返回到激活日期的剩余时间,工夫单位由单位参数指定
DelayQueue应用场景
- 敞开闲暇链接。服务器中,有很多客户端链接,闲暇一段时间后须要敞开。
- 缓存超过了缓存工夫,就须要从缓存中移除。
DelayQueue超时订单解决案例
package com.rumenz.learn.delayqueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;//DelayQueue外面的元素必须实现Delayedpublic class Item<T> implements Delayed { private Long expireTime; private T data; public Item(Long expireTime, T data) { this.expireTime = expireTime+System.currentTimeMillis(); this.data = data; } @Override public long getDelay(TimeUnit unit) { long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit); return d; } @Override public int compareTo(Delayed o) { long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS); if(d==0){ return 0; } return d>0?1:-1; } public Long getExpireTime() { return expireTime; } public void setExpireTime(Long expireTime) { this.expireTime = expireTime; } public T getData() { return data; } public void setData(T data) { this.data = data; }}// 订单实体类package com.rumenz.learn.delayqueue;public class OrderItem { private Double orderAmount; private String orderNo; //0未领取 1领取了 private Integer orderStatus; public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) { this.orderAmount = orderAmount; this.orderNo = orderNo; this.orderStatus = orderStatus; } public Double getOrderAmount() { return orderAmount; } public void setOrderAmount(Double orderAmount) { this.orderAmount = orderAmount; } public String getOrderNo() { return orderNo; } public void setOrderNo(String orderNo) { this.orderNo = orderNo; } public Integer getOrderStatus() { return orderStatus; } public void setOrderStatus(Integer orderStatus) { this.orderStatus = orderStatus; }}//package com.rumenz.learn.delayqueue;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import java.util.Map;import java.util.Random;import java.util.concurrent.*;public class DelayQueueExample { //3个线程 1个线程下单 1个线程领取 1个线程敞开超时订单 订单领取超时工夫为10s public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>(); ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>(); //下单线程 executorService.execute(()->{ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Integer orderNo=100; while (true){ try{ Thread.sleep(3000); Integer amount = new Random().nextInt(1000); OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0); Item<OrderItem> item=new Item<>(10*1000L,orderItem); Date date=new Date(); date.setTime(item.getExpireTime()); System.out.println("=======================下单=========================="); System.out.println("生成订单工夫:"+simpleDateFormat.format(new Date())); System.out.println("订单编号:"+orderNo); System.out.println("订单金额:"+orderItem.getOrderAmount()); System.out.println("领取过期工夫:"+simpleDateFormat.format(date)); System.out.println("========================下单========================="); map.put(String.valueOf(orderNo),orderItem); orderNo++; delayeds.offer(item); }catch (Exception e){ e.printStackTrace(); } } }); //领取线程 executorService.execute(()->{ while (true){ try { //随机期待 再领取 Thread.sleep(new Random().nextInt(15)*1000); String orderNo=""; Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator(); if(iterator.hasNext()){ OrderItem orderItem = iterator.next().getValue(); orderItem.setOrderStatus(1); orderNo=orderItem.getOrderNo(); System.out.println("-----------------------领取订单-----------------------"); System.out.println("订单领取"+orderNo); System.out.println("领取金额"+orderItem.getOrderAmount()); System.out.println("-----------------------领取订单-----------------------"); } map.remove(orderNo); }catch (Exception e){ e.printStackTrace(); } } }); //关系过期的订单 executorService.execute(()->{ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); while (true){ try{ Item<OrderItem> item = delayeds.take(); OrderItem data = item.getData(); Date date=new Date(); date.setTime(item.getExpireTime()); if(data.getOrderStatus()==0){ System.out.println("########################过期订单########################"); System.out.println("订单编号:"+data.getOrderNo()); System.out.println("订单金额:"+data.getOrderAmount()); System.out.println("订单到期领取工夫:"+simpleDateFormat.format(date)); System.out.println("########################过期订单########################"); } map.remove(data.getOrderNo()); }catch (Exception e){ e.printStackTrace(); } } }); executorService.shutdown(); }}
SynchronousQueue
它是一个非凡的队列交做同步队列,特点是当一个线程往队列里写一个元素,写入操作不会了解返回,须要期待另外一个线程来将这个元素拿走。同理,当一个读线程做读操作的时候,同样须要一个相匹配写线程的写操作。这里的Synchronous
指的就是读写线程须要同步,一个读线程匹配一个写线程,同理一个写线程匹配一个读线程。不像
ArrayBlockingQueue
、LinkedBlockingDeque
之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue
间接应用CAS实现线程的平安拜访。较少应用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中失去了利用。
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //外部栈 static final class TransferStack<E> extends Transferer<E> {} //外部队列 static final class TransferQueue<E> extends Transferer<E> {} public SynchronousQueue() {this(false);} public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }}
SynchronousQueue
代码演示
package com.rumenz.learn.synchronousqueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;public class SynchronousQueueExample { public static void main(String[] args) { SynchronousQueue<String> queue = new SynchronousQueue<>(); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(()->{ try { System.out.println(Thread.currentThread().getName()+"put 1"); queue.put("1"); System.out.println(Thread.currentThread().getName()+"put 2"); queue.put("2"); System.out.println(Thread.currentThread().getName()+"put 3"); queue.put("3"); System.out.println(Thread.currentThread().getName()+"put 4"); queue.put("4"); }catch (Exception e){ e.printStackTrace(); } }); executorService.execute(()->{ try{ TimeUnit.SECONDS.sleep(1); System.out.println("获取数据:"+queue.take()); TimeUnit.SECONDS.sleep(1); System.out.println("获取数据:"+queue.take()); TimeUnit.SECONDS.sleep(1); System.out.println("获取数据:"+queue.take()); TimeUnit.SECONDS.sleep(1); System.out.println("获取数据:"+queue.take()); }catch (Exception e){ e.printStackTrace(); } }); executorService.shutdown(); }}
关注微信公众号:【入门小站】,关注更多知识点