关于java:延迟队列的几种实现方案

43次阅读

共计 3549 个字符,预计需要花费 9 分钟才能阅读完成。

提早队列是这样一种队列:元素有一个过期工夫,元素在队列中程序是依照过期工夫排序的,只有达到过期工夫的元素能力出队,最先达到过期工夫的元素最先出队。

提早队列能够用到这样的场景:比方有一个工作须要 10 分钟后执行,那么就能够把这个工作存入提早队列,并且设置过期工夫为 10 分钟,队列的另一头有一个消费者去生产,没达到过期工夫时消费者是取不到这个工作的,达到过期工夫后消费者能力取到这个工作。

像这种多少分钟之后执行的工作特地适宜应用提早队列实现,如果不应用提早队列,把工作存入数据库定期扫描也能实现,然而会有一个问题:如果扫描时间距离长了,执行工夫不准,如果扫描间隔时间短了,那么对数据库压力比拟大。

DelayQueue

DelayQueue 是 JDK 中自带的提早队列实现,向该队列中提交的工作须要实现 Delayed 接口,这个接口只有一个办法 long getDelay(TimeUnit unit) 这个接口如果返回 0 或正数示意工作曾经达到了过期工夫,能够执行;否则示意还没有达到过期工夫,不能执行。

DelayQueue 应用示例

public class DelayedTask implements Delayed {
    private String taskName;
    private long avaibleTime;

    public String getTaskName() {return taskName;}

    public DelayedTask(String taskName, long delayTime) {
        this.taskName = taskName;
        this.avaibleTime = System.currentTimeMillis() + delayTime;}

    @Override
    public long getDelay(TimeUnit unit) {return unit.convert(avaibleTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {return (int)(this.avaibleTime - ((DelayedTask)o).avaibleTime);
    }
}

下面代码中咱们定义了 DelayedTask 工作并且实现了 Delayed 接口,在 DelayedTask 中保护了一个成员变量 avaibleTime 寄存工作到期工夫,在 getDelay 办法实现中用 avaibleTime 减去以后工夫戳,如果是一个正数或 0,示意工作曾经达到了到期工夫,能够执行了。

测试

DelayQueue<DelayedTask> queue = new DelayQueue<>();
new Thread(()->{
    try {
        //take 办法会阻塞,直到有工作到期
        DelayedTask task = queue.take();
        System.out.println(task.getTaskName());
    } catch (InterruptedException e) {e.printStackTrace();
    }
}).start();

queue.put(new DelayedTask("工作", 3000));

三秒种后输入 工作

实现原理

次要属性

// 优先队列
private final PriorityQueue q = new PriorityQueue();
//leader 线程示意正在期待出队的线程
private Thread leader = null;
// 可用条件
private final Condition available = lock.newCondition();

DelayQueue 的底层实现是优先队列

take 办法

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();       ①
        try {for (;;) {E first = q.peek();
                if (first == null)      ②
                    available.await();
                else {long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)     ③
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)  ④
                        available.await();
                    else {Thread thisThread = Thread.currentThread();
                        leader = thisThread;   ⑤
                        try {available.awaitNanos(delay);
                        } finally {if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {if (leader == null && q.peek() != null)
                available.signal();    ⑥
            lock.unlock();}
    }

在 take 办法中,首先获取锁,而后查看队首元素

  • 如果队首元素为 null,阐明队列是空的,调用 available.await() 期待 ②
  • 如果队首元素不为 null,调用 getDelay 判断该工作是否到期

    • 如果返回正数或 0,阐明工作曾经到期,间接获取队首元素返回 ③
    • 如果不是正数或 0,阐明工作还没到期。这时判断 leader 是否为 null

      • 如果 leader 不为 null,阐明以后有一个 leader 在期待队列头元素到期,调用 available.await() 将以后线程挂起 ④
      • 如果 leader 为 null,将以后线程设置为 leader,并调用 available.await(delay) 挂起期待残余到期工夫。当线程被零碎唤醒后,将 leader 设置为 null,从新去获取元素,这时队首元素就曾经到期了,这个线程会在 ③ 处返回。
  • 最初,办法执行返回前,会判断如果 leader 为 null,并且队列中有元素,会调用 avilable.signal 唤醒一个期待的线程去竞争 leader。⑥
  • 如图,当初有三个线程去生产这个队列,队列中的 TaskA 没到执行工夫,Thread1 是以后的 Leader 线程,它会期待提早纳秒数,而后拉取队列头节点生产并且调用 signal() 唤醒 Follower 节点。如下图,Thread3 节点被唤醒,它会先看一下头节点是否到期,如果没到期就期待到它到期。

    工夫轮算法

    如上图,在工夫轮算法中,有一个轮,这个轮由 n 个槽组成,有一个指针向钟表一样每隔一个工夫间走到下个槽位上,而后这个槽位上的工作就能够执行。假如当初有 8 个槽组成工夫轮,指针每秒挪动一个槽位,当初指针指向 1 这个槽,假如当初有一个工作要在 5 秒钟之后执行,那么把这个工作就放在 5 + 1 = 6 这个槽上,这样在 5 秒钟后指针就会挪动到这个槽,而后这个槽对应的工作就会被执行。

    如果有工作要在 20 秒后执行,那么就须要指针多转两圈,将工作放在(20%8 + 1)= 5 这个槽上。

    这个圈数须要记录在槽中的数据结构外面。这个数据结构最重要的是两个指针,一个是触发工作的函数指针,另外一个是触发的总第几圈数。工夫轮能够用简略的数组或者是环形链表来实现。

    netty 的 HashedWheelTimer 就采纳了工夫轮算法。

    相比 DelayQueue 的数据结构,工夫轮在算法复杂度上有肯定劣势。DelayQueue 因为波及到排序,须要调整数据的地位,插入和移除的复杂度是 O(lgn),而工夫轮在插入和移除的复杂度都是 O(1)。

    应用 zset 实现提早队列

    音讯的生产者应用 zadd quque [以后工夫戳 + 延迟时间] [序列化成字符串的音讯] 将音讯插入音讯队列。

    开启多个线程一直执行 zrangebyscore quque [0] [以后工夫戳] limit 0 1 命令,ZRANGEBYSCORE 这个命令示意获取队列中的成员,并按分数从小到大排列,limit 0 1 偏移量为 0,获取一条数据,所以返回的就是工夫戳最小的那条。

    判断这条数据是否达到了执行工夫,如果达到了执行工夫,执行 zrem member,ZREM 执行这个命令有两个目标:

    1. 从提早队列中删除这条音讯避免其它线程再次获取;

    2. 可能存在多个线程同时获取到这条音讯,所以这里靠 zrem 只有返回 > 0,才阐明以后线程胜利获取到音讯,能够生产音讯,如果返回 = 0,阐明已有其它线程领先获取到音讯了,以后线程不能够生产音讯。

    优化:

    因为咱们是用多个线程去同时 zrangebyscore 而后再通过 zrem 是否胜利来确定是否争抢到音讯,没有抢到音讯的线程实际上就白白执行了一次 zrangebyscore,能够应用 Lua 脚本将 zrangebyscore 和 zrem 放到一起在服务端执行。

    正文完
     0