本文主要研究一下Elasticsearch的SizeBlockingQueue

SizeBlockingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java

public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {    private final BlockingQueue<E> queue;    private final int capacity;    private final AtomicInteger size = new AtomicInteger();    public SizeBlockingQueue(BlockingQueue<E> queue, int capacity) {        assert capacity >= 0;        this.queue = queue;        this.capacity = capacity;    }    @Override    public int size() {        return size.get();    }    public int capacity() {        return this.capacity;    }    @Override    public Iterator<E> iterator() {        final Iterator<E> it = queue.iterator();        return new Iterator<E>() {            E current;            @Override            public boolean hasNext() {                return it.hasNext();            }            @Override            public E next() {                current = it.next();                return current;            }            @Override            public void remove() {                // note, we can't call #remove on the iterator because we need to know                // if it was removed or not                if (queue.remove(current)) {                    size.decrementAndGet();                }            }        };    }    @Override    public E peek() {        return queue.peek();    }    @Override    public E poll() {        E e = queue.poll();        if (e != null) {            size.decrementAndGet();        }        return e;    }    @Override    public E poll(long timeout, TimeUnit unit) throws InterruptedException {        E e = queue.poll(timeout, unit);        if (e != null) {            size.decrementAndGet();        }        return e;    }    @Override    public boolean remove(Object o) {        boolean v = queue.remove(o);        if (v) {            size.decrementAndGet();        }        return v;    }    /**     * Forces adding an element to the queue, without doing size checks.     */    public void forcePut(E e) throws InterruptedException {        size.incrementAndGet();        try {            queue.put(e);        } catch (InterruptedException ie) {            size.decrementAndGet();            throw ie;        }    }    @Override    public boolean offer(E e) {        while (true) {            final int current = size.get();            if (current >= capacity()) {                return false;            }            if (size.compareAndSet(current, 1 + current)) {                break;            }        }        boolean offered = queue.offer(e);        if (!offered) {            size.decrementAndGet();        }        return offered;    }    @Override    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {        // note, not used in ThreadPoolExecutor        throw new IllegalStateException("offer with timeout not allowed on size queue");    }    @Override    public void put(E e) throws InterruptedException {        // note, not used in ThreadPoolExecutor        throw new IllegalStateException("put not allowed on size queue");    }    @Override    public E take() throws InterruptedException {        E e;        try {            e = queue.take();            size.decrementAndGet();        } catch (InterruptedException ie) {            throw ie;        }        return e;    }    @Override    public int remainingCapacity() {        return capacity() - size.get();    }    @Override    public int drainTo(Collection<? super E> c) {        int v = queue.drainTo(c);        size.addAndGet(-v);        return v;    }    @Override    public int drainTo(Collection<? super E> c, int maxElements) {        int v = queue.drainTo(c, maxElements);        size.addAndGet(-v);        return v;    }    @Override    public Object[] toArray() {        return queue.toArray();    }    @Override    public <T> T[] toArray(T[] a) {        return (T[]) queue.toArray(a);    }    @Override    public boolean contains(Object o) {        return queue.contains(o);    }    @Override    public boolean containsAll(Collection<?> c) {        return queue.containsAll(c);    }}
  • SizeBlockingQueue继承了AbstractQueue,同时实现了BlockingQueue接口;它的构造器要求输入blockingQueue及capacity参数
  • SizeBlockingQueue有个AtomicInteger类型的size参数用于记录queue的大小,它在poll、remove、offer、take等方法都会维护这个size参数
  • 其中offer方法会判断当前size是否大于等于capacity,如果大于等于则直接返回false;而put方法则直接抛出IllegalStateException

ResizableBlockingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java

final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> {    private volatile int capacity;    ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) {        super(queue, initialCapacity);        this.capacity = initialCapacity;    }    @Override    public int capacity() {        return this.capacity;    }    @Override    public int remainingCapacity() {        return Math.max(0, this.capacity());    }    /** Resize the limit for the queue, returning the new size limit */    public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) {        assert adjustmentAmount > 0 : "adjustment amount should be a positive value";        assert optimalCapacity >= 0 : "desired capacity cannot be negative";        assert minCapacity >= 0 : "cannot have min capacity smaller than 0";        assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity";        if (optimalCapacity == capacity) {            // Yahtzee!            return this.capacity;        }        if (optimalCapacity > capacity + adjustmentAmount) {            // adjust up            final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);            this.capacity = newCapacity;            return newCapacity;        } else if (optimalCapacity < capacity - adjustmentAmount) {            // adjust down            final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);            this.capacity = newCapacity;            return newCapacity;        } else {            return this.capacity;        }    }}
  • ResizableBlockingQueue继承了SizeBlockingQueue,它提供了一个线程安全的adjustCapacity方法,用于resize队列的capacity

EsExecutors

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

public class EsExecutors {    //......    public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,                                                ThreadFactory threadFactory, ThreadContext contextHolder) {        BlockingQueue<Runnable> queue;        if (queueCapacity < 0) {            queue = ConcurrentCollections.newBlockingQueue();        } else {            queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);        }        return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,            queue, threadFactory, new EsAbortPolicy(), contextHolder);    }    public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,                                                         int maxQueueSize, int frameSize, TimeValue targetedResponseTime,                                                         ThreadFactory threadFactory, ThreadContext contextHolder) {        if (initialQueueCapacity <= 0) {            throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +                            initialQueueCapacity);        }        ResizableBlockingQueue<Runnable> queue =                new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);        return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,                queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,                new EsAbortPolicy(), contextHolder);    }    //......}
  • EsExecutors的newFixed创建的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed创建的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor

小结

  • SizeBlockingQueue继承了AbstractQueue,同时实现了BlockingQueue接口;它的构造器要求输入blockingQueue及capacity参数;它有个AtomicInteger类型的size参数用于记录queue的大小,它在poll、remove、offer、take等方法都会维护这个size参数;其中offer方法会判断当前size是否大于等于capacity,如果大于等于则直接返回false;而put方法则直接抛出IllegalStateException
  • ResizableBlockingQueue继承了SizeBlockingQueue,它提供了一个线程安全的adjustCapacity方法,用于resize队列的capacity
  • EsExecutors的newFixed创建的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed创建的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor

doc

  • EsExecutors
  • SizeBlockingQueue
  • ResizableBlockingQueue