之前有介绍几种提早队列的对方,如java并发编程学习之DelayQueue、ActiveMQ - 提早队列、RabbitMQ - 提早队列,对于提早队列,我还是举荐用以上几种,这边只对redis如何实现提早队列做了一个例子。
为了实现提早队列,咱们须要定义两个类型的数据:

  1. 队列,须要执行的工作,间接放入队列,线程通过队列的内容进行执行工作。
  2. 有序汇合,通过成员的分数用来判断是否能够放入队列执行。咱们往有序汇合插入数据的时候,分数就是以后工夫+提早的工夫,判断的时候,就能够通过以后工夫和分数进行比拟,如果以后工夫大于分数,阐明还没到执行的时候。如果小于等于分数,则放入队列执行。

示例

把工作退出到有序汇合,如果分数为0,阐明没有提早,间接退出到队列中。如果分数不为0,阐明有提早,把以后工夫加上延迟时间,作为分数存入有序汇合中。

private static void add(double score) {    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    String format = formatter.format(new Date());    // 马上执行的工作    if (score == 0) {        JedisUtils.rpush(queue, format);    } else {        double date = System.currentTimeMillis() + score;        JedisUtils.zadd(delay, date, format + "-" + score);    }}

队列解决工作。如果取到工作,间接执行,如果没有工作,则休眠10毫秒。

static String queue = "queue:";static String delay = "delay:";static class QueueThread implements Runnable {    @Override    public void run() {        while (true) {            String res = JedisUtils.lpop(queue);            if (null != res) {                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                System.out.println(formatter.format(new Date()) + "---" + "解决音讯:" + res);            } else {                //临时没有音讯,就休眠10毫秒                try {                    TimeUnit.MILLISECONDS.sleep(10);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }}

提早工作解决。在while循环中 ,先依据小于以后工夫的分数取有序汇合的数据,如果有数据,阐明存在马上执行的工作,把工作从有序汇合移除,并退出到队列中。

static class DelayThread implements Runnable {    @Override    public void run() {        while (true) {            long now = System.currentTimeMillis();            Set<String> set = JedisUtils.zrangeByScore(delay, Double.MIN_VALUE, now);            // 如果有能够运行的,则从有序汇合移除,并放入队列            if (set.size() > 0) {                Iterator<String> iterator = set.iterator();                while (iterator.hasNext()) {                    String next = iterator.next();                    JedisUtils.zrem(delay, next);                    JedisUtils.rpush(queue, next);                }            } else {                // 没有内容,则休眠10毫秒                try {                    TimeUnit.MILLISECONDS.sleep(10);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }}

测试例子,

@Testpublic void testDelayQueue() throws InterruptedException {    new Thread(new QueueThread()).start();    new Thread(new DelayThread()).start();    add(2000);    add(3000);    add(0);    add(4000);    add(5000);    add(6000);    TimeUnit.SECONDS.sleep(10);}

运行后果如下:

2020-09-24 22:36:59---解决音讯:2020-09-24 22:36:592020-09-24 22:37:01---解决音讯:2020-09-24 22:36:59-2000.02020-09-24 22:37:02---解决音讯:2020-09-24 22:36:59-3000.02020-09-24 22:37:03---解决音讯:2020-09-24 22:36:59-4000.02020-09-24 22:37:04---解决音讯:2020-09-24 22:36:59-5000.02020-09-24 22:37:05---解决音讯:2020-09-24 22:36:59-6000.0