DelayQueue

DelayQueue是一个无界阻塞队列,只有在提早期满时能力从中提取元素。该队列的头部是提早期满后保留工夫最长的Delayed元素。
寄存到DelayDeque的元素必须继承Delayed接口。Delayed接口使对象成为提早对象,它使寄存在DelayQueue类中的对象具备了激活日期,该接口强制执行下列两个办法:
  • CompareTo(Delayed o):Delayed接口继承了Comparable接口,因而有了这个办法
  • getDelay(TimeUnit unit):这个办法返回到激活日期的剩余时间,工夫单位由单位参数指定

DelayQueue应用场景

  • 敞开闲暇链接。服务器中,有很多客户端链接,闲暇一段时间后须要敞开。
  • 缓存超过了缓存工夫,就须要从缓存中移除。

DelayQueue超时订单解决案例

package com.rumenz.learn.delayqueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;//DelayQueue外面的元素必须实现Delayedpublic class Item<T> implements Delayed {    private Long expireTime;    private T data;    public Item(Long expireTime, T data) {        this.expireTime = expireTime+System.currentTimeMillis();        this.data = data;    }    @Override    public long getDelay(TimeUnit unit) {        long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit);        return d;    }    @Override    public int compareTo(Delayed o) {        long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);        if(d==0){            return 0;        }        return d>0?1:-1;    }    public Long getExpireTime() {        return expireTime;    }    public void setExpireTime(Long expireTime) {        this.expireTime = expireTime;    }    public T getData() {        return data;    }    public void setData(T data) {        this.data = data;    }}// 订单实体类package com.rumenz.learn.delayqueue;public class OrderItem {    private Double orderAmount;    private String orderNo;    //0未领取 1领取了    private Integer orderStatus;    public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) {        this.orderAmount = orderAmount;        this.orderNo = orderNo;        this.orderStatus = orderStatus;    }    public Double getOrderAmount() {        return orderAmount;    }    public void setOrderAmount(Double orderAmount) {        this.orderAmount = orderAmount;    }    public String getOrderNo() {        return orderNo;    }    public void setOrderNo(String orderNo) {        this.orderNo = orderNo;    }    public Integer getOrderStatus() {        return orderStatus;    }    public void setOrderStatus(Integer orderStatus) {        this.orderStatus = orderStatus;    }}//package com.rumenz.learn.delayqueue;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import java.util.Map;import java.util.Random;import java.util.concurrent.*;public class DelayQueueExample {    //3个线程 1个线程下单 1个线程领取  1个线程敞开超时订单  订单领取超时工夫为10s    public static void main(String[] args) {        ExecutorService executorService = Executors.newFixedThreadPool(3);        DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>();        ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>();        //下单线程        executorService.execute(()->{            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");            Integer orderNo=100;            while (true){                try{                    Thread.sleep(3000);                    Integer amount = new Random().nextInt(1000);                    OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0);                    Item<OrderItem> item=new Item<>(10*1000L,orderItem);                    Date date=new Date();                    date.setTime(item.getExpireTime());                    System.out.println("=======================下单==========================");                    System.out.println("生成订单工夫:"+simpleDateFormat.format(new Date()));                    System.out.println("订单编号:"+orderNo);                    System.out.println("订单金额:"+orderItem.getOrderAmount());                    System.out.println("领取过期工夫:"+simpleDateFormat.format(date));                    System.out.println("========================下单=========================");                    map.put(String.valueOf(orderNo),orderItem);                    orderNo++;                    delayeds.offer(item);                }catch (Exception e){                    e.printStackTrace();                }            }        });        //领取线程        executorService.execute(()->{            while (true){                try {                    //随机期待 再领取                    Thread.sleep(new Random().nextInt(15)*1000);                    String orderNo="";                    Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator();                    if(iterator.hasNext()){                        OrderItem orderItem = iterator.next().getValue();                        orderItem.setOrderStatus(1);                        orderNo=orderItem.getOrderNo();                        System.out.println("-----------------------领取订单-----------------------");                        System.out.println("订单领取"+orderNo);                        System.out.println("领取金额"+orderItem.getOrderAmount());                        System.out.println("-----------------------领取订单-----------------------");                    }                    map.remove(orderNo);                }catch (Exception e){                    e.printStackTrace();                }            }        });        //关系过期的订单        executorService.execute(()->{            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");            while (true){                try{                    Item<OrderItem> item = delayeds.take();                    OrderItem data = item.getData();                    Date date=new Date();                    date.setTime(item.getExpireTime());                    if(data.getOrderStatus()==0){                        System.out.println("########################过期订单########################");                        System.out.println("订单编号:"+data.getOrderNo());                        System.out.println("订单金额:"+data.getOrderAmount());                        System.out.println("订单到期领取工夫:"+simpleDateFormat.format(date));                        System.out.println("########################过期订单########################");                    }                    map.remove(data.getOrderNo());                }catch (Exception e){                    e.printStackTrace();                }            }        });        executorService.shutdown();    }}

SynchronousQueue

它是一个非凡的队列交做同步队列,特点是当一个线程往队列里写一个元素,写入操作不会了解返回,须要期待另外一个线程来将这个元素拿走。同理,当一个读线程做读操作的时候,同样须要一个相匹配写线程的写操作。这里的Synchronous指的就是读写线程须要同步,一个读线程匹配一个写线程,同理一个写线程匹配一个读线程。

不像ArrayBlockingQueueLinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue间接应用CAS实现线程的平安拜访。

较少应用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中失去了利用。

public class SynchronousQueue<E> extends AbstractQueue<E>    implements BlockingQueue<E>, java.io.Serializable {    //外部栈    static final class TransferStack<E> extends Transferer<E> {}    //外部队列    static final class TransferQueue<E> extends Transferer<E> {}    public SynchronousQueue() {this(false);}    public SynchronousQueue(boolean fair) {        transferer = fair ?                  new TransferQueue<E>() : new TransferStack<E>();    }}

SynchronousQueue代码演示

package com.rumenz.learn.synchronousqueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;public class SynchronousQueueExample {    public static void main(String[] args) {        SynchronousQueue<String> queue = new SynchronousQueue<>();        ExecutorService executorService = Executors.newFixedThreadPool(2);        executorService.execute(()->{          try {              System.out.println(Thread.currentThread().getName()+"put 1");              queue.put("1");              System.out.println(Thread.currentThread().getName()+"put 2");              queue.put("2");              System.out.println(Thread.currentThread().getName()+"put 3");              queue.put("3");              System.out.println(Thread.currentThread().getName()+"put 4");              queue.put("4");          }catch (Exception e){              e.printStackTrace();          }        });        executorService.execute(()->{            try{              TimeUnit.SECONDS.sleep(1);              System.out.println("获取数据:"+queue.take());              TimeUnit.SECONDS.sleep(1);              System.out.println("获取数据:"+queue.take());              TimeUnit.SECONDS.sleep(1);              System.out.println("获取数据:"+queue.take());              TimeUnit.SECONDS.sleep(1);              System.out.println("获取数据:"+queue.take());            }catch (Exception e){                e.printStackTrace();            }        });        executorService.shutdown();    }}

关注微信公众号:【入门小站】,关注更多知识点