本文主要研究一下elasticsearch的ZenPing

ZenPing

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

public interface ZenPing extends Releasable {    void start();    void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);    class PingResponse implements Writeable {        //......    }    class PingCollection {        //......    }}
  • ZenPing接口继承了Releasable接口,另外它还定义了start、ping方法;除此之外还定义了PingResponse、PingCollection这两个类

PingResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

    class PingResponse implements Writeable {        /**         * An ID of a ping response that was generated on behalf of another node. Needs to be less than all other ping IDs so that fake ping         * responses don't override real ones.         */        public static long FAKE_PING_ID = -1;        private static final AtomicLong idGenerator = new AtomicLong();        // an always increasing unique identifier for this ping response.        // lower values means older pings.        private final long id;        private final ClusterName clusterName;        private final DiscoveryNode node;        private final DiscoveryNode master;        private final long clusterStateVersion;        /**         * @param node                the node which this ping describes         * @param master              the current master of the node         * @param clusterName         the cluster name of the node         * @param clusterStateVersion the current cluster state version of that node         *                            ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)         */        public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {            this(idGenerator.incrementAndGet(), node, master, clusterName, clusterStateVersion);        }        /**         * @param id                  the ping's ID         * @param node                the node which this ping describes         * @param master              the current master of the node         * @param clusterName         the cluster name of the node         * @param clusterStateVersion the current cluster state version of that node*                            ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)         */        public PingResponse(long id, DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {            this.id = id;            this.node = node;            this.master = master;            this.clusterName = clusterName;            this.clusterStateVersion = clusterStateVersion;        }        public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterState state) {            this(node, master, state.getClusterName(),                state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) ?                    ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version());        }        PingResponse(StreamInput in) throws IOException {            this.clusterName = new ClusterName(in);            this.node = new DiscoveryNode(in);            this.master = in.readOptionalWriteable(DiscoveryNode::new);            this.clusterStateVersion = in.readLong();            this.id = in.readLong();        }        @Override        public void writeTo(StreamOutput out) throws IOException {            clusterName.writeTo(out);            node.writeTo(out);            out.writeOptionalWriteable(master);            out.writeLong(clusterStateVersion);            out.writeLong(id);        }        /**         * an always increasing unique identifier for this ping response.         * lower values means older pings.         */        public long id() {            return this.id;        }        /**         * the name of the cluster this node belongs to         */        public ClusterName clusterName() {            return this.clusterName;        }        /** the node which this ping describes */        public DiscoveryNode node() {            return node;        }        /** the current master of the node */        public DiscoveryNode master() {            return master;        }        /**         * the current cluster state version of that node ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION}         * for not recovered) */        public long getClusterStateVersion() {            return clusterStateVersion;        }        @Override        public String toString() {            return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "]," +                   "cluster_state_version [" + clusterStateVersion + "], cluster_name[" + clusterName.value() + "]}";        }    }
  • PingResponse实现了Writeable接口,其writeTo方法会依次写入clusterName、node、master、clusterStateVersion、id

PingCollection

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

    class PingCollection {        Map<DiscoveryNode, PingResponse> pings;        public PingCollection() {            pings = new HashMap<>();        }        /**         * adds a ping if newer than previous pings from the same node         *         * @return true if added, false o.w.         */        public synchronized boolean addPing(PingResponse ping) {            PingResponse existingResponse = pings.get(ping.node());            // in case both existing and new ping have the same id (probably because they come            // from nodes from version <1.4.0) we prefer to use the last added one.            if (existingResponse == null || existingResponse.id() <= ping.id()) {                pings.put(ping.node(), ping);                return true;            }            return false;        }        /** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */        public synchronized List<PingResponse> toList() {            return new ArrayList<>(pings.values());        }        /** the number of nodes for which there are known pings */        public synchronized int size() {            return pings.size();        }    }
  • PingCollection定义了DiscoveryNode与PingResponse的map,并提供了addPing方法来添加pingResponse

UnicastZenPing

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

public class UnicastZenPing implements ZenPing {    private static final Logger logger = LogManager.getLogger(UnicastZenPing.class);    public static final String ACTION_NAME = "internal:discovery/zen/unicast";    private final ThreadPool threadPool;    private final TransportService transportService;    private final ClusterName clusterName;    private final PingContextProvider contextProvider;    private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();    private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap();    // a list of temporal responses a node will return for a request (holds responses from other nodes)    private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();    private final SeedHostsProvider hostsProvider;    protected final EsThreadPoolExecutor unicastZenPingExecutorService;    private final TimeValue resolveTimeout;    private final String nodeName;    private volatile boolean closed = false;    public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,                          SeedHostsProvider seedHostsProvider, PingContextProvider contextProvider) {        this.threadPool = threadPool;        this.transportService = transportService;        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);        this.hostsProvider = seedHostsProvider;        this.contextProvider = contextProvider;        final int concurrentConnects = SeedHostsResolver.getMaxConcurrentResolvers(settings);        resolveTimeout = SeedHostsResolver.getResolveTimeout(settings);        nodeName = Node.NODE_NAME_SETTING.get(settings);        logger.debug(            "using max_concurrent_resolvers [{}], resolver timeout [{}]",            concurrentConnects,            resolveTimeout);        transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SAME, UnicastPingRequest::new,            new UnicastPingRequestHandler());        final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");        unicastZenPingExecutorService = EsExecutors.newScaling(                nodeName + "/" + "unicast_connect",                0,                concurrentConnects,                60,                TimeUnit.SECONDS,                threadFactory,                threadPool.getThreadContext());    }    private SeedHostsProvider.HostsResolver createHostsResolver() {        return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts,            limitPortCounts, transportService, resolveTimeout);    }    @Override    public void close() {        ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS);        Releasables.close(activePingingRounds.values());        closed = true;    }    @Override    public void start() {    }    /**     * Clears the list of cached ping responses.     */    public void clearTemporalResponses() {        temporalResponses.clear();    }    /**     * Sends three rounds of pings notifying the specified {@link Consumer} when pinging is complete. Pings are sent after resolving     * configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch     * of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}.     * The pings that are sent carry a timeout of 1.25 times the specified {@link TimeValue}. When pinging each node, a connection and     * handshake is performed, with a connection timeout of the specified {@link TimeValue}.     *     * @param resultsConsumer the callback when pinging is complete     * @param duration        the timeout for various components of the pings     */    @Override    public void ping(final Consumer<PingCollection> resultsConsumer, final TimeValue duration) {        ping(resultsConsumer, duration, duration);    }    protected void ping(final Consumer<PingCollection> resultsConsumer,                        final TimeValue scheduleDuration,                        final TimeValue requestDuration) {        final List<TransportAddress> seedAddresses = new ArrayList<>();        seedAddresses.addAll(hostsProvider.getSeedAddresses(createHostsResolver()));        final DiscoveryNodes nodes = contextProvider.clusterState().nodes();        // add all possible master nodes that were active in the last known cluster configuration        for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {            seedAddresses.add(masterNode.value.getAddress());        }        final ConnectionProfile connectionProfile =            ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);        final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,            nodes.getLocalNode(), connectionProfile);        activePingingRounds.put(pingingRound.id(), pingingRound);        final AbstractRunnable pingSender = new AbstractRunnable() {            @Override            public void onFailure(Exception e) {                if (e instanceof AlreadyClosedException == false) {                    logger.warn("unexpected error while pinging", e);                }            }            @Override            protected void doRun() throws Exception {                sendPings(requestDuration, pingingRound);            }        };        threadPool.generic().execute(pingSender);        threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC);        threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC);        threadPool.schedule(new AbstractRunnable() {            @Override            protected void doRun() throws Exception {                finishPingingRound(pingingRound);            }            @Override            public void onFailure(Exception e) {                logger.warn("unexpected error while finishing pinging round", e);            }        }, scheduleDuration, ThreadPool.Names.GENERIC);    }    protected void sendPings(final TimeValue timeout, final PingingRound pingingRound) {        final ClusterState lastState = contextProvider.clusterState();        final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState));        List<TransportAddress> temporalAddresses = temporalResponses.stream().map(pingResponse -> {            assert clusterName.equals(pingResponse.clusterName()) :                "got a ping request from a different cluster. expected " + clusterName + " got " + pingResponse.clusterName();            return pingResponse.node().getAddress();        }).collect(Collectors.toList());        final Stream<TransportAddress> uniqueAddresses = Stream.concat(pingingRound.getSeedAddresses().stream(),            temporalAddresses.stream()).distinct();        // resolve what we can via the latest cluster state        final Set<DiscoveryNode> nodesToPing = uniqueAddresses            .map(address -> {                DiscoveryNode foundNode = lastState.nodes().findByAddress(address);                if (foundNode != null && transportService.nodeConnected(foundNode)) {                    return foundNode;                } else {                    return new DiscoveryNode(                        address.toString(),                        address,                        emptyMap(),                        emptySet(),                        Version.CURRENT.minimumCompatibilityVersion());                }            }).collect(Collectors.toSet());        nodesToPing.forEach(node -> sendPingRequestToNode(node, timeout, pingingRound, pingRequest));    }    private void sendPingRequestToNode(final DiscoveryNode node, TimeValue timeout, final PingingRound pingingRound,                                       final UnicastPingRequest pingRequest) {        submitToExecutor(new AbstractRunnable() {            @Override            protected void doRun() throws Exception {                Connection connection = null;                if (transportService.nodeConnected(node)) {                    try {                        // concurrency can still cause disconnects                        connection = transportService.getConnection(node);                    } catch (NodeNotConnectedException e) {                        logger.trace("[{}] node [{}] just disconnected, will create a temp connection", pingingRound.id(), node);                    }                }                if (connection == null) {                    connection = pingingRound.getOrConnect(node);                }                logger.trace("[{}] sending to {}", pingingRound.id(), node);                transportService.sendRequest(connection, ACTION_NAME, pingRequest,                    TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(),                    getPingResponseHandler(pingingRound, node));            }            @Override            public void onFailure(Exception e) {                if (e instanceof ConnectTransportException || e instanceof AlreadyClosedException) {                    // can't connect to the node - this is more common path!                    logger.trace(() -> new ParameterizedMessage("[{}] failed to ping {}", pingingRound.id(), node), e);                } else if (e instanceof RemoteTransportException) {                    // something went wrong on the other side                    logger.debug(() -> new ParameterizedMessage(                            "[{}] received a remote error as a response to ping {}", pingingRound.id(), node), e);                } else {                    logger.warn(() -> new ParameterizedMessage("[{}] failed send ping to {}", pingingRound.id(), node), e);                }            }            @Override            public void onRejection(Exception e) {                // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings                // But don't bail here, we can retry later on after the send ping has been scheduled.                logger.debug("Ping execution rejected", e);            }        });    }    //......}
  • UnicastZenPing实现了ZenPing接口,它创建了unicastZenPingExecutorService线程池,同时维护了一个PingResponse的queue以及activePingingRounds
  • ping方法主要是异步及调度执行pingSender,而其doRun方法执行的是sendPings方法;sendPings方法构建UnicastPingRequest,对pingingRound中的seedAddresses挨个执行sendPingRequestToNode方法
  • sendPingRequestToNode向线程池提交AbstractRunnable,其doRun方法主要是使用transportService.sendRequest发送pingRequest

PingingRound

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    protected class PingingRound implements Releasable {        private final int id;        private final Map<TransportAddress, Connection> tempConnections = new HashMap<>();        private final KeyedLock<TransportAddress> connectionLock = new KeyedLock<>(true);        private final PingCollection pingCollection;        private final List<TransportAddress> seedAddresses;        private final Consumer<PingCollection> pingListener;        private final DiscoveryNode localNode;        private final ConnectionProfile connectionProfile;        private AtomicBoolean closed = new AtomicBoolean(false);        PingingRound(int id, List<TransportAddress> seedAddresses, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,                     ConnectionProfile connectionProfile) {            this.id = id;            this.seedAddresses = Collections.unmodifiableList(seedAddresses.stream().distinct().collect(Collectors.toList()));            this.pingListener = resultsConsumer;            this.localNode = localNode;            this.connectionProfile = connectionProfile;            this.pingCollection = new PingCollection();        }        public int id() {            return this.id;        }        public boolean isClosed() {            return this.closed.get();        }        public List<TransportAddress> getSeedAddresses() {            ensureOpen();            return seedAddresses;        }        public Connection getOrConnect(DiscoveryNode node) throws IOException {            Connection result;            try (Releasable ignore = connectionLock.acquire(node.getAddress())) {                result = tempConnections.get(node.getAddress());                if (result == null) {                    ensureOpen();                    boolean success = false;                    logger.trace("[{}] opening connection to [{}]", id(), node);                    result = transportService.openConnection(node, connectionProfile);                    try {                        transportService.handshake(result, connectionProfile.getHandshakeTimeout().millis());                        synchronized (this) {                            // acquire lock and check if closed, to prevent leaving an open connection after closing                            ensureOpen();                            Connection existing = tempConnections.put(node.getAddress(), result);                            assert existing == null;                            success = true;                        }                    } finally {                        if (success == false) {                            logger.trace("[{}] closing connection to [{}] due to failure", id(), node);                            IOUtils.closeWhileHandlingException(result);                        }                    }                }            }            return result;        }        private void ensureOpen() {            if (isClosed()) {                throw new AlreadyClosedException("pinging round [" + id + "] is finished");            }        }        public void addPingResponseToCollection(PingResponse pingResponse) {            if (localNode.equals(pingResponse.node()) == false) {                pingCollection.addPing(pingResponse);            }        }        @Override        public void close() {            List<Connection> toClose = null;            synchronized (this) {                if (closed.compareAndSet(false, true)) {                    activePingingRounds.remove(id);                    toClose = new ArrayList<>(tempConnections.values());                    tempConnections.clear();                }            }            if (toClose != null) {                // we actually closed                try {                    pingListener.accept(pingCollection);                } finally {                    IOUtils.closeWhileHandlingException(toClose);                }            }        }        public ConnectionProfile getConnectionProfile() {            return connectionProfile;        }    }
  • PingingRound提供了getOrConnect方法,来获取或创建一个discoveryNode的Connection,主要是调用transportService.openConnection方法并执行transportService.handshake

UnicastPingRequest

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    public static class UnicastPingRequest extends TransportRequest {        public final int id;        public final TimeValue timeout;        public final PingResponse pingResponse;        public UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) {            this.id = id;            this.timeout = timeout;            this.pingResponse = pingResponse;        }        public UnicastPingRequest(StreamInput in) throws IOException {            super(in);            id = in.readInt();            timeout = in.readTimeValue();            pingResponse = new PingResponse(in);        }        @Override        public void readFrom(StreamInput in) throws IOException {            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");        }        @Override        public void writeTo(StreamOutput out) throws IOException {            super.writeTo(out);            out.writeInt(id);            out.writeTimeValue(timeout);            pingResponse.writeTo(out);        }    }
  • UnicastPingRequest继承了TransportRequest,其writeTo方法,写入id、timeout、pingResponse

UnicastPingResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    public static class UnicastPingResponse extends TransportResponse {        final int id;        public final PingResponse[] pingResponses;        public UnicastPingResponse(int id, PingResponse[] pingResponses) {            this.id = id;            this.pingResponses = pingResponses;        }        public UnicastPingResponse(StreamInput in) throws IOException {            id = in.readInt();            pingResponses = new PingResponse[in.readVInt()];            for (int i = 0; i < pingResponses.length; i++) {                pingResponses[i] = new PingResponse(in);            }        }        @Override        public void readFrom(StreamInput in) throws IOException {            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");        }        @Override        public void writeTo(StreamOutput out) throws IOException {            super.writeTo(out);            out.writeInt(id);            out.writeVInt(pingResponses.length);            for (PingResponse pingResponse : pingResponses) {                pingResponse.writeTo(out);            }        }    }
  • UnicastPingResponse继承了TransportResponse,其writeTo写入id、pingResponses.length及pingResponses

UnicastPingRequestHandler

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    class UnicastPingRequestHandler implements TransportRequestHandler<UnicastPingRequest> {        @Override        public void messageReceived(UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {            if (closed) {                throw new AlreadyClosedException("node is shutting down");            }            if (request.pingResponse.clusterName().equals(clusterName)) {                channel.sendResponse(handlePingRequest(request));            } else {                throw new IllegalStateException(                    String.format(                        Locale.ROOT,                        "mismatched cluster names; request: [%s], local: [%s]",                        request.pingResponse.clusterName().value(),                        clusterName.value()));            }        }    }    private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {        assert clusterName.equals(request.pingResponse.clusterName()) :            "got a ping request from a different cluster. expected " + clusterName + " got " + request.pingResponse.clusterName();        temporalResponses.add(request.pingResponse);        // add to any ongoing pinging        activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse));        threadPool.schedule(() -> temporalResponses.remove(request.pingResponse),            TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME);        List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);        pingResponses.add(createPingResponse(contextProvider.clusterState()));        return new UnicastPingResponse(request.id, pingResponses.toArray(new PingResponse[pingResponses.size()]));    }
  • UnicastPingRequestHandler继承了TransportRequestHandler,其messageReceived方法主要是调用handlePingRequest方法并返回结果;handlePingRequest方法主要是将使用pingRound的addPingResponseToCollection添加request.pingResponse,同事注册一个timeout任务执行temporalResponses.remove(request.pingResponse);最后创建UnicastPingResponse返回

getPingResponseHandler

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(final PingingRound pingingRound,                                                                                   final DiscoveryNode node) {        return new TransportResponseHandler<UnicastPingResponse>() {            @Override            public UnicastPingResponse read(StreamInput in) throws IOException {                return new UnicastPingResponse(in);            }            @Override            public String executor() {                return ThreadPool.Names.SAME;            }            @Override            public void handleResponse(UnicastPingResponse response) {                logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses));                if (pingingRound.isClosed()) {                    if (logger.isTraceEnabled()) {                        logger.trace("[{}] skipping received response from {}. already closed", pingingRound.id(), node);                    }                } else {                    Stream.of(response.pingResponses).forEach(pingingRound::addPingResponseToCollection);                }            }            @Override            public void handleException(TransportException exp) {                if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException ||                    exp.getCause() instanceof AlreadyClosedException) {                    // ok, not connected...                    logger.trace(() -> new ParameterizedMessage("failed to connect to {}", node), exp);                } else if (closed == false) {                    logger.warn(() -> new ParameterizedMessage("failed to send ping to [{}]", node), exp);                }            }        };    }
  • getPingResponseHandler方法创建了匿名的TransportResponseHandler,用于处理UnicastPingResponse;其handleResponse方法执行的是pingingRound.addPingResponseToCollection方法

小结

  • ZenPing接口继承了Releasable接口,另外它还定义了start、ping方法;除此之外还定义了PingResponse、PingCollection这两个类
  • UnicastZenPing实现了ZenPing接口,它创建了unicastZenPingExecutorService线程池,同时维护了一个PingResponse的queue以及activePingingRounds
  • ping方法主要是异步及调度执行pingSender,而其doRun方法执行的是sendPings方法;sendPings方法构建UnicastPingRequest,对pingingRound中的seedAddresses挨个执行sendPingRequestToNode方法;sendPingRequestToNode向线程池提交AbstractRunnable,其doRun方法主要是使用transportService.sendRequest发送pingRequest

doc

  • ZenPing