共计 6262 个字符,预计需要花费 16 分钟才能阅读完成。
DelayQueue
DelayQueue 是一个无界阻塞队列,只有在提早期满时能力从中提取元素。该队列的头部是提早期满后保留工夫最长的 Delayed 元素。
寄存到 DelayDeque 的元素必须继承 Delayed 接口。Delayed 接口使对象成为提早对象,它使寄存在 DelayQueue 类中的对象具备了激活日期,该接口强制执行下列两个办法:
- CompareTo(Delayed o):Delayed 接口继承了 Comparable 接口,因而有了这个办法
- getDelay(TimeUnit unit): 这个办法返回到激活日期的剩余时间,工夫单位由单位参数指定
DelayQueue 应用场景
- 敞开闲暇链接。服务器中, 有很多客户端链接, 闲暇一段时间后须要敞开。
- 缓存超过了缓存工夫, 就须要从缓存中移除。
DelayQueue 超时订单解决案例
package com.rumenz.learn.delayqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
//DelayQueue 外面的元素必须实现 Delayed
public class Item<T> implements Delayed {
private Long expireTime;
private T data;
public Item(Long expireTime, T data) {this.expireTime = expireTime+System.currentTimeMillis();
this.data = data;
}
@Override
public long getDelay(TimeUnit unit) {long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit);
return d;
}
@Override
public int compareTo(Delayed o) {long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);
if(d==0){return 0;}
return d>0?1:-1;
}
public Long getExpireTime() {return expireTime;}
public void setExpireTime(Long expireTime) {this.expireTime = expireTime;}
public T getData() {return data;}
public void setData(T data) {this.data = data;}
}
// 订单实体类
package com.rumenz.learn.delayqueue;
public class OrderItem {
private Double orderAmount;
private String orderNo;
// 0 未领取 1 领取了
private Integer orderStatus;
public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) {
this.orderAmount = orderAmount;
this.orderNo = orderNo;
this.orderStatus = orderStatus;
}
public Double getOrderAmount() {return orderAmount;}
public void setOrderAmount(Double orderAmount) {this.orderAmount = orderAmount;}
public String getOrderNo() {return orderNo;}
public void setOrderNo(String orderNo) {this.orderNo = orderNo;}
public Integer getOrderStatus() {return orderStatus;}
public void setOrderStatus(Integer orderStatus) {this.orderStatus = orderStatus;}
}
//
package com.rumenz.learn.delayqueue;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;
public class DelayQueueExample {
// 3 个线程 1 个线程下单 1 个线程领取 1 个线程敞开超时订单 订单领取超时工夫为 10s
public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(3);
DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>();
ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>();
// 下单线程
executorService.execute(()->{SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Integer orderNo=100;
while (true){
try{Thread.sleep(3000);
Integer amount = new Random().nextInt(1000);
OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0);
Item<OrderItem> item=new Item<>(10*1000L,orderItem);
Date date=new Date();
date.setTime(item.getExpireTime());
System.out.println("======================= 下单 ==========================");
System.out.println("生成订单工夫:"+simpleDateFormat.format(new Date()));
System.out.println("订单编号:"+orderNo);
System.out.println("订单金额:"+orderItem.getOrderAmount());
System.out.println("领取过期工夫:"+simpleDateFormat.format(date));
System.out.println("======================== 下单 =========================");
map.put(String.valueOf(orderNo),orderItem);
orderNo++;
delayeds.offer(item);
}catch (Exception e){e.printStackTrace();
}
}
});
// 领取线程
executorService.execute(()->{while (true){
try {
// 随机期待 再领取
Thread.sleep(new Random().nextInt(15)*1000);
String orderNo="";
Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator();
if(iterator.hasNext()){OrderItem orderItem = iterator.next().getValue();
orderItem.setOrderStatus(1);
orderNo=orderItem.getOrderNo();
System.out.println("----------------------- 领取订单 -----------------------");
System.out.println("订单领取"+orderNo);
System.out.println("领取金额"+orderItem.getOrderAmount());
System.out.println("----------------------- 领取订单 -----------------------");
}
map.remove(orderNo);
}catch (Exception e){e.printStackTrace();
}
}
});
// 关系过期的订单
executorService.execute(()->{SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (true){
try{Item<OrderItem> item = delayeds.take();
OrderItem data = item.getData();
Date date=new Date();
date.setTime(item.getExpireTime());
if(data.getOrderStatus()==0){System.out.println("######################## 过期订单 ########################");
System.out.println("订单编号:"+data.getOrderNo());
System.out.println("订单金额:"+data.getOrderAmount());
System.out.println("订单到期领取工夫:"+simpleDateFormat.format(date));
System.out.println("######################## 过期订单 ########################");
}
map.remove(data.getOrderNo());
}catch (Exception e){e.printStackTrace();
}
}
});
executorService.shutdown();}
}
SynchronousQueue
它是一个非凡的队列交做同步队列, 特点是当一个线程往队列里写一个元素, 写入操作不会了解返回, 须要期待另外一个线程来将这个元素拿走。同理, 当一个读线程做读操作的时候, 同样须要一个相匹配写线程的写操作。这里的
Synchronous
指的就是读写线程须要同步, 一个读线程匹配一个写线程, 同理一个写线程匹配一个读线程。不像
ArrayBlockingQueue
、LinkedBlockingDeque
之类的阻塞队列依赖 AQS 实现并发操作,SynchronousQueue
间接应用 CAS 实现线程的平安拜访。较少应用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中失去了利用。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 外部栈
static final class TransferStack<E> extends Transferer<E> {}
// 外部队列
static final class TransferQueue<E> extends Transferer<E> {}
public SynchronousQueue() {this(false);}
public SynchronousQueue(boolean fair) {
transferer = fair ?
new TransferQueue<E>() : new TransferStack<E>();
}
}
SynchronousQueue
代码演示
package com.rumenz.learn.synchronousqueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueExample {public static void main(String[] args) {SynchronousQueue<String> queue = new SynchronousQueue<>();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
try {System.out.println(Thread.currentThread().getName()+"put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
queue.put("3");
System.out.println(Thread.currentThread().getName()+"put 4");
queue.put("4");
}catch (Exception e){e.printStackTrace();
}
});
executorService.execute(()->{
try{TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
}catch (Exception e){e.printStackTrace();
}
});
executorService.shutdown();}
}
关注微信公众号:【入门小站】, 关注更多知识点
正文完