本文主要研究一下Elasticsearch的RoundRobinSupplier

RoundRobinSupplier

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java

final class RoundRobinSupplier<S> implements Supplier<S> {    private final AtomicBoolean selectorsSet = new AtomicBoolean(false);    private volatile S[] selectors;    private AtomicInteger counter = new AtomicInteger(0);    RoundRobinSupplier() {        this.selectors = null;    }    RoundRobinSupplier(S[] selectors) {        this.selectors = selectors;        this.selectorsSet.set(true);    }    @Override    public S get() {        S[] selectors = this.selectors;        return selectors[counter.getAndIncrement() % selectors.length];    }    void setSelectors(S[] selectors) {        if (selectorsSet.compareAndSet(false, true)) {            this.selectors = selectors;        } else {            throw new AssertionError("Selectors already set. Should only be set once.");        }    }    int count() {        return selectors.length;    }}
  • RoundRobinSupplier实现了Supplier接口,其get方法使用counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值

NioSelectorGroup

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java

public class NioSelectorGroup implements NioGroup {    private final List<NioSelector> dedicatedAcceptors;    private final RoundRobinSupplier<NioSelector> acceptorSupplier;    private final List<NioSelector> selectors;    private final RoundRobinSupplier<NioSelector> selectorSupplier;    private final AtomicBoolean isOpen = new AtomicBoolean(true);    //......    public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory,                            int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException {        dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);        selectors = new ArrayList<>(selectorCount);        try {            List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount);            for (int i = 0; i < selectorCount; ++i) {                RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>();                suppliersToSet.add(supplier);                NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));                selectors.add(selector);            }            for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) {                supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));                assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";            }            for (int i = 0; i < dedicatedAcceptorCount; ++i) {                RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));                NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));                dedicatedAcceptors.add(acceptor);            }            if (dedicatedAcceptorCount != 0) {                acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));            } else {                acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));            }            selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));            assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";            assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";            startSelectors(selectors, selectorThreadFactory);            startSelectors(dedicatedAcceptors, acceptorThreadFactory);        } catch (Exception e) {            try {                close();            } catch (Exception e1) {                e.addSuppressed(e1);            }            throw e;        }    }    public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)        throws IOException {        ensureOpen();        return factory.openNioServerSocketChannel(address, acceptorSupplier);    }    @Override    public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {        ensureOpen();        return factory.openNioChannel(address, selectorSupplier);    }        //......}
  • NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)

ChannelFactory

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java

public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> {    //......    public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<NioSelector> supplier) throws IOException {        ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);        NioSelector selector = supplier.get();        ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel);        scheduleServerChannel(serverChannel, selector);        return serverChannel;    }    public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<NioSelector> supplier) throws IOException {        SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);        NioSelector selector = supplier.get();        Socket channel = internalCreateChannel(selector, rawChannel);        scheduleChannel(channel, selector);        return channel;    }    //......}
  • ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector

小结

  • RoundRobinSupplier实现了Supplier接口,其get方法使用counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值
  • NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)
  • ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector

doc

  • RoundRobinSupplier