本文主要研究一下Elasticsearch的EvictingQueue

EvictingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/collect/EvictingQueue.java

public class EvictingQueue<T> implements Queue<T> {    private final int maximumSize;    private final ArrayDeque<T> queue;    /**     * Construct a new {@code EvictingQueue} that holds {@code maximumSize} elements.     *     * @param maximumSize The maximum number of elements that the queue can hold     * @throws IllegalArgumentException if {@code maximumSize} is less than zero     */    public EvictingQueue(int maximumSize) {        if (maximumSize < 0) {            throw new IllegalArgumentException("maximumSize < 0");        }        this.maximumSize = maximumSize;        this.queue = new ArrayDeque<>(maximumSize);    }    /**     * @return the number of additional elements that the queue can accommodate before evictions occur     */    public int remainingCapacity() {        return this.maximumSize - this.size();    }    /**     * Add the given element to the queue, possibly forcing an eviction from the head if {@link #remainingCapacity()} is     * zero.     *     * @param t the element to add     * @return true if the element was added (always the case for {@code EvictingQueue}     */    @Override    public boolean add(T t) {        if (maximumSize == 0) {            return true;        }        if (queue.size() == maximumSize) {            queue.remove();        }        queue.add(t);        return true;    }    /**     * @see #add(Object)     */    @Override    public boolean offer(T t) {        return add(t);    }    @Override    public T remove() {        return queue.remove();    }    @Override    public T poll() {        return queue.poll();    }    @Override    public T element() {        return queue.element();    }    @Override    public T peek() {        return queue.peek();    }    @Override    public int size() {        return queue.size();    }    @Override    public boolean isEmpty() {        return queue.isEmpty();    }    @Override    public boolean contains(Object o) {        return queue.contains(o);    }    @Override    public Iterator<T> iterator() {        return queue.iterator();    }    @Override    public Object[] toArray() {        return queue.toArray();    }    @Override    public <T1> T1[] toArray(T1[] a) {        return queue.toArray(a);    }    @Override    public boolean remove(Object o) {        return queue.remove(o);    }    @Override    public boolean containsAll(Collection<?> c) {        return queue.containsAll(c);    }    /**     * Add the given elements to the queue, possibly forcing evictions from the head if {@link #remainingCapacity()} is     * zero or becomes zero during the execution of this method.     *     * @param c the collection of elements to add     * @return true if any elements were added to the queue     */    @Override    public boolean addAll(Collection<? extends T> c) {        boolean modified = false;        for (T e : c)            if (add(e))                modified = true;        return modified;    }    @Override    public boolean removeAll(Collection<?> c) {        return queue.removeAll(c);    }    @Override    public boolean retainAll(Collection<?> c) {        return queue.retainAll(c);    }    @Override    public void clear() {        queue.clear();    }}
  • EvictingQueue实现了Queue接口,它的构造器要求输入maximumSize,然后根据maximumSize创建ArrayDeque;其add方法会判断当前队列大小是否等于maximumSize,等于则移除队首的元素然后再添加新元素

实例

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnWhitelistedFunctionTests.java

    public void testWindowMax() {        int numValues = randomIntBetween(1, 100);        int windowSize = randomIntBetween(1, 50);        EvictingQueue<Double> window = new EvictingQueue<>(windowSize);        for (int i = 0; i < numValues; i++) {            double randValue = randomDouble();            double expected = -Double.MAX_VALUE;            if (i == 0) {                window.offer(randValue);                continue;            }            for (double value : window) {                expected = Math.max(expected, value);            }            double actual = MovingFunctions.max(window.stream().mapToDouble(Double::doubleValue).toArray());            assertEquals(expected, actual, 0.01 * Math.abs(expected));            window.offer(randValue);        }    }
  • 这里使用EvictingQueue作为一个window的数据,不断根据numValues来offer数据,同时计算window中的最大值

小结

EvictingQueue实现了Queue接口,它的构造器要求输入maximumSize,然后根据maximumSize创建ArrayDeque;其add方法会判断当前队列大小是否等于maximumSize,等于则移除队首的元素然后再添加新元素

doc

  • EvictingQueue