乐趣区

关于java:延时队列DelayQueue探秘

由下面的 UML 图可知,DelayQueue 依赖于 PriorityQueue, 应用 PriorityQueue 存储对象, 所以要彻底搞懂 DelayQueue 必须先晓得 PriorityQueue 原理.

PriorityQueue

架构

根本属性
// 默认初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
 * Priority queue represented as a balanced binary heap:
 * 优先级队列代表了一个均衡二叉堆
 * the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].
 * queue[n]的两个孩子别离为 queue[2*n+1]和 queue[2*(n+1)]
 * The priority queue is ordered by comparator, or by the elements' natural ordering,
 * 优先级队列应用 comparator 或者元素的天然排序
 * if comparator is null:
 * 如果 comparator 为空
 * For each node n in the heap and each descendant d of n, n <= d.
 * 对于堆中的每个节点 n 和 n 的每个后辈 d,n<=d.
 * The element with the lowest value is in queue[0], assuming the queue is nonempty.
 * 如果队列不为空, 则最小的元素在队列的第一位
 */
transient Object[] queue;
/**
 * The comparator, or null if priority queue uses elements' natural ordering.
 * 比拟器, 如果为空则应用元素的天然排序
 */
private final Comparator<? super E> comparator;
// 汇合操作次数(除去查问), 次要用于 fast-fail
transient int modCount = 0;
结构器
// 绝大部分状况下, 都是用该结构器 
public PriorityQueue() {this(DEFAULT_INITIAL_CAPACITY, null);
}
// 该结构器初始化队列
public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
    // Note: This restriction of at least one is not actually needed,
    // but continues for 1.5 compatibility
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.queue = new Object[initialCapacity];
    this.comparator = comparator;
}
罕用办法

增加元素

public boolean add(E e) {return offer(e);
}
 public boolean offer(E e) {
     // 不容许有空元素
     if (e == null)
         throw new NullPointerException();
     modCount++;
     int i = size;
     // 数组已满, 须要扩容
     if (i >= queue.length)
         grow(i + 1);
     // 新元素个数
     size = i + 1;
     // 数组还未插入元素, 直接插入到第一个地位
     if (i == 0)
         queue[0] = e;
     else
         // 数组第一个地位曾经插入了元素, 须要向堆中插入元素
         siftUp(i, e);
     return true;
 }
private void grow(int minCapacity) {
    // 旧数组容量
    int oldCapacity = queue.length;
    // Double size if small; else grow by 50%
    // 如果旧容量小于 64 则新容量为旧容量的一倍 +2. 否则扩容 50%
    int newCapacity = oldCapacity +
        ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1));
    // overflow-conscious code
    // 如果 minCapacity 大于 Integer.MAX_VALUE- 8 则从新计算新容量, 最大为 Integer.MAX_VALUE
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    // 数组拷贝, 将旧数组扩容并将旧数组数据迁徙
    queue = Arrays.copyOf(queue, newCapacity);
}
private static int hugeCapacity(int minCapacity) {if (minCapacity < 0) // overflow
        throw new OutOfMemoryError();
    return (minCapacity > MAX_ARRAY_SIZE) ?
        Integer.MAX_VALUE :
    MAX_ARRAY_SIZE;
}
private void siftUp(int k, E x) {
    // 比拟器不为空, 依照比拟器的规定插入
    if (comparator != null)
        siftUpUsingComparator(k, x);
    else
        // 比拟器为空, 依照天然排序插入
        siftUpComparable(k, x);
}
private void siftUpComparable(int k, E x) {
    // 天然排序器
    Comparable<? super E> key = (Comparable<? super E>) x;
    while (k > 0) {// 寻找以后节点的父节点, 这一步详情看下面正文 Object[] queue;
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        // 应用天然比拟器与父节点比拟, 如果以后节点大于以后遍历节点的父节点, 间接退出, 这一步次要是为了维持小顶堆的性质
        if (key.compareTo((E) e) >= 0)
            break;
        // 父节点下沉
        queue[k] = e;
        // 向上挪动
        k = parent;
    }
    queue[k] = key;
}

删除元素

public boolean remove(Object o) {int i = indexOf(o);
    if (i == -1)
        return false;
    else {removeAt(i);
        return true;
    }
}
private E removeAt(int i) {
    // assert i >= 0 && i < size;
    modCount++;
    int s = --size;
    // 要删除的为最初一个节点间接删除
    if (s == i) // removed last element
        queue[i] = null;
    else {// 获取最初一个元素(最大的元素)
        E moved = (E) queue[s];
        // 删除最初一个元素
        queue[s] = null;
        // 将 moved 插入到 i 地位, 期间会调整 i 的子堆
        siftDown(i, moved);
        // 如果 i 的地位就是 moved 则示意 moved 子树元素都小, 则须要持续调整堆
        if (queue[i] == moved) {
            // 从 i 开始向上调整
            siftUp(i, moved);
            // 以后地位就是 moved 的适合地位(moved 持续上移), 间接返回 moved
            if (queue[i] != moved)
                return moved;
        }
    }
    return null;
}
private void siftDown(int k, E x) {if (comparator != null)
        siftDownUsingComparator(k, x);
    else
        siftDownComparable(k, x);
}
private void siftDownComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;
    // 第一个叶子节点的下标
    int half = size >>> 1;        // loop while a non-leaf
    // 从以后节点向叶子节点遍历
    while (k < half) {
        // 左孩子
        int child = (k << 1) + 1;
        Object c = queue[child];
        // 右孩子
        int right = child + 1;
        // 如果右孩子不是最初一个节点并且左孩子比右孩子大, 找出较小的孩子
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo((E) c) <= 0)
            break;
        // 子节点向上挪动
        queue[k] = c;
        // 向下挪动
        k = child;
    }
    queue[k] = key;
}
总结

由下面的剖析可知 PriorityQueue 采纳小顶堆存储数据, 并且采纳 Comparator 和 Comparable 两种形式放弃元素的优先级, 然而 PriorityQueue 不是线程平安的, 也没有提供获取元素的形式, 只能通过 contains 来获取是否存在, 通过 iterator 来遍历元素.

DelayQueue

根本属性
// 锁, 保障线程平安
private final transient ReentrantLock lock = new ReentrantLock();
// 存储元素应用的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
 /**
  * Thread designated to wait for the element at the head of the queue.
  * 指定在队列头期待元素的线程.
  * This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
  * 这种 Leader-Follower 模式的变体用于最小化不必要的定时期待。* When a thread becomes the leader, it waits only for the next delay to elapse,
  * but other threads await indefinitely.
  * 当一个线程成为 leader 线程时, 它只期待下一个提早过来, 而其余线程则无限期地期待.
  * The leader thread must signal some other thread before returning from take() or
  * poll(...), unless some other thread becomes leader in the interim.
  * 前导线程必须在从 take()或 poll(...)返回之前告诉其余线程, 除非其余线程在此期间成为前导线程.
  * Whenever the head of the queue is replaced with an element with an earlier expiration time,
  * the leader field is invalidated by being reset to null, and some waiting thread,
  * but not necessarily the current leader, is signalled.
  * 每当队列的头被一个过期工夫较早的元素替换,leader 字段就会被重置为 null 而生效, 并且一些期待的线程 (不肯定是以后的 leader) 会收到信号.
  * So waiting threads must be prepared to acquire and lose leadership while waiting.
  * 因而, 期待线程必须筹备好在期待过程中取得和失去领导位置.
  */
private Thread leader = null;
//Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.
// 当一个较新的元素在队列头变得可用或一个新线程可能须要成为领队时收回的状态信号.
private final Condition available = lock.newCondition();
结构器
public DelayQueue() {}
罕用办法

增加元素

public boolean add(E e) {return offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 插入元素, 此时插入的元素不肯定在优先级队列的头部
        q.offer(e);
        // 如果以后元素在优先级队列的头部, 须要革除掉 leader 线程, 激活 available 队列中的一个线程
        if (q.peek() == e) {
            leader = null;
            available.signal();}
        return true;
    } finally {lock.unlock();
    }
}

移除元素

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 先锁定, 如果被中断, 该线程会抛出 InterruptedException, 详情请查阅 AQS 源码
    lock.lockInterruptibly();
    try {for (;;) {
            // 获取优先级队列中的根节点, 如果为空, 将以后线程放入到 available 条件队列中
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                // 如果根节点不为空, 获取以后节点的过期工夫
                long delay = first.getDelay(NANOSECONDS);
                // 如果已过期, 间接出队
                if (delay <= 0)
                    return q.poll();
                // 以后线程不是 leader 线程, 持续阻塞进入到条件队列
                if (leader != null)
                    available.await();
                else {
                    //leader 线程为 null, 设置以后线程为 leader 线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 以后线程持续期待 delay 工夫, 在此期间该线程会开释锁, 其余线程能够继续执行 offer 和 take 操作
                        // 工夫到后, 该线程会持续尝试获取锁, 继续执行下面的操作
                        available.awaitNanos(delay);
                    } finally {
                        // 执行结束后重置 leader 线程
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果优先级队列中还有元素, 并且 leader 为空, 则唤醒条件队列中的线程
        if (leader == null && q.peek() != null)
            available.signal();
        // 开释锁
        lock.unlock();}
}
总结

DelayQueue 采纳 PriorityQueue 存储元素, 采纳 ReentrantLock 实现线程同步, 采纳 Delayed 接口获取剩余时间来实现一个优先级队列.

退出移动版