关于java:延时队列DelayQueue探秘

由下面的UML图可知,DelayQueue依赖于PriorityQueue,应用PriorityQueue存储对象,所以要彻底搞懂DelayQueue必须先晓得PriorityQueue原理.

PriorityQueue

架构

根本属性
//默认初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
 * Priority queue represented as a balanced binary heap:
 * 优先级队列代表了一个均衡二叉堆
 * the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].
 * queue[n]的两个孩子别离为queue[2*n+1]和queue[2*(n+1)]
 * The priority queue is ordered by comparator, or by the elements' natural ordering,
 * 优先级队列应用comparator或者元素的天然排序
 * if comparator is null:
 * 如果comparator为空
 * For each node n in the heap and each descendant d of n, n <= d.
 * 对于堆中的每个节点n和n的每个后辈d,n<=d.
 * The element with the lowest value is in queue[0], assuming the queue is nonempty.
 * 如果队列不为空,则最小的元素在队列的第一位
 */
transient Object[] queue;
/**
 * The comparator, or null if priority queue uses elements' natural ordering.
 * 比拟器,如果为空则应用元素的天然排序
 */
private final Comparator<? super E> comparator;
//汇合操作次数(除去查问),次要用于fast-fail
transient int modCount = 0;
结构器
//绝大部分状况下,都是用该结构器 
public PriorityQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
//该结构器初始化队列
public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
    // Note: This restriction of at least one is not actually needed,
    // but continues for 1.5 compatibility
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.queue = new Object[initialCapacity];
    this.comparator = comparator;
}
罕用办法

增加元素

public boolean add(E e) {
    return offer(e);
}
 public boolean offer(E e) {
     //不容许有空元素
     if (e == null)
         throw new NullPointerException();
     modCount++;
     int i = size;
     //数组已满,须要扩容
     if (i >= queue.length)
         grow(i + 1);
     //新元素个数
     size = i + 1;
     //数组还未插入元素,直接插入到第一个地位
     if (i == 0)
         queue[0] = e;
     else
         //数组第一个地位曾经插入了元素,须要向堆中插入元素
         siftUp(i, e);
     return true;
 }
private void grow(int minCapacity) {
    //旧数组容量
    int oldCapacity = queue.length;
    // Double size if small; else grow by 50%
    //如果旧容量小于64则新容量为旧容量的一倍+2.否则扩容50%
    int newCapacity = oldCapacity +
        ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1));
    // overflow-conscious code
    //如果minCapacity大于Integer.MAX_VALUE-8则从新计算新容量,最大为Integer.MAX_VALUE
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    //数组拷贝,将旧数组扩容并将旧数组数据迁徙
    queue = Arrays.copyOf(queue, newCapacity);
}
private static int hugeCapacity(int minCapacity) {
    if (minCapacity < 0) // overflow
        throw new OutOfMemoryError();
    return (minCapacity > MAX_ARRAY_SIZE) ?
        Integer.MAX_VALUE :
    MAX_ARRAY_SIZE;
}
private void siftUp(int k, E x) {
    //比拟器不为空,依照比拟器的规定插入
    if (comparator != null)
        siftUpUsingComparator(k, x);
    else
        //比拟器为空,依照天然排序插入
        siftUpComparable(k, x);
}
private void siftUpComparable(int k, E x) {
    //天然排序器
    Comparable<? super E> key = (Comparable<? super E>) x;
    while (k > 0) {
        //寻找以后节点的父节点,这一步详情看下面正文Object[] queue;
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        //应用天然比拟器与父节点比拟,如果以后节点大于以后遍历节点的父节点,间接退出,这一步次要是为了维持小顶堆的性质
        if (key.compareTo((E) e) >= 0)
            break;
        //父节点下沉
        queue[k] = e;
        //向上挪动
        k = parent;
    }
    queue[k] = key;
}

删除元素

public boolean remove(Object o) {
    int i = indexOf(o);
    if (i == -1)
        return false;
    else {
        removeAt(i);
        return true;
    }
}
private E removeAt(int i) {
    // assert i >= 0 && i < size;
    modCount++;
    int s = --size;
    //要删除的为最初一个节点间接删除
    if (s == i) // removed last element
        queue[i] = null;
    else {
        //获取最初一个元素(最大的元素)
        E moved = (E) queue[s];
        //删除最初一个元素
        queue[s] = null;
        //将moved插入到i地位,期间会调整i的子堆
        siftDown(i, moved);
        //如果i的地位就是moved则示意moved子树元素都小,则须要持续调整堆
        if (queue[i] == moved) {
            //从i开始向上调整
            siftUp(i, moved);
            //以后地位就是moved的适合地位(moved持续上移),间接返回moved
            if (queue[i] != moved)
                return moved;
        }
    }
    return null;
}
private void siftDown(int k, E x) {
    if (comparator != null)
        siftDownUsingComparator(k, x);
    else
        siftDownComparable(k, x);
}
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    //第一个叶子节点的下标
    int half = size >>> 1;        // loop while a non-leaf
    //从以后节点向叶子节点遍历
    while (k < half) {
        //左孩子
        int child = (k << 1) + 1;
        Object c = queue[child];
        //右孩子
        int right = child + 1;
        //如果右孩子不是最初一个节点并且左孩子比右孩子大,找出较小的孩子
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo((E) c) <= 0)
            break;
        //子节点向上挪动
        queue[k] = c;
        //向下挪动
        k = child;
    }
    queue[k] = key;
}
总结

由下面的剖析可知PriorityQueue采纳小顶堆存储数据,并且采纳Comparator和Comparable两种形式放弃元素的优先级,然而PriorityQueue不是线程平安的,也没有提供获取元素的形式,只能通过contains来获取是否存在,通过iterator来遍历元素.

DelayQueue

根本属性
//锁,保障线程平安
private final transient ReentrantLock lock = new ReentrantLock();
//存储元素应用的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
 /**
  * Thread designated to wait for the element at the head of the queue.
  * 指定在队列头期待元素的线程.
  * This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
  * 这种Leader-Follower模式的变体用于最小化不必要的定时期待。
  * When a thread becomes the leader, it waits only for the next delay to elapse,
  * but other threads await indefinitely.
  * 当一个线程成为leader线程时,它只期待下一个提早过来,而其余线程则无限期地期待.
  * The leader thread must signal some other thread before returning from take() or
  * poll(...), unless some other thread becomes leader in the interim.
  * 前导线程必须在从take()或poll(...)返回之前告诉其余线程,除非其余线程在此期间成为前导线程.
  * Whenever the head of the queue is replaced with an element with an earlier expiration time,
  * the leader field is invalidated by being reset to null, and some waiting thread,
  * but not necessarily the current leader, is signalled.
  * 每当队列的头被一个过期工夫较早的元素替换,leader字段就会被重置为null而生效,并且一些期待的线程(不肯定是以后的leader)会收到信号.
  * So waiting threads must be prepared to acquire and lose leadership while waiting.
  * 因而,期待线程必须筹备好在期待过程中取得和失去领导位置.
  */
private Thread leader = null;
//Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.
//当一个较新的元素在队列头变得可用或一个新线程可能须要成为领队时收回的状态信号.
private final Condition available = lock.newCondition();
结构器
public DelayQueue() {}
罕用办法

增加元素

public boolean add(E e) {
    return offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //插入元素,此时插入的元素不肯定在优先级队列的头部
        q.offer(e);
        //如果以后元素在优先级队列的头部,须要革除掉leader线程,激活available队列中的一个线程
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

移除元素

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //先锁定,如果被中断,该线程会抛出InterruptedException,详情请查阅AQS源码
    lock.lockInterruptibly();
    try {
        for (;;) {
            //获取优先级队列中的根节点,如果为空,将以后线程放入到available条件队列中
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                //如果根节点不为空,获取以后节点的过期工夫
                long delay = first.getDelay(NANOSECONDS);
                //如果已过期,间接出队
                if (delay <= 0)
                    return q.poll();
                //以后线程不是leader线程,持续阻塞进入到条件队列
                if (leader != null)
                    available.await();
                else {
                    //leader线程为null,设置以后线程为leader线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //以后线程持续期待delay工夫,在此期间该线程会开释锁,其余线程能够继续执行offer和take操作
                        //工夫到后,该线程会持续尝试获取锁,继续执行下面的操作
                        available.awaitNanos(delay);
                    } finally {
                        //执行结束后重置leader线程
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //如果优先级队列中还有元素,并且leader为空,则唤醒条件队列中的线程
        if (leader == null && q.peek() != null)
            available.signal();
        //开释锁
        lock.unlock();
    }
}
总结

DelayQueue采纳PriorityQueue存储元素,采纳ReentrantLock实现线程同步,采纳Delayed接口获取剩余时间来实现一个优先级队列.

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理