乐趣区

聊聊elasticsearch的ZenPing

本文主要研究一下 elasticsearch 的 ZenPing

ZenPing

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

public interface ZenPing extends Releasable {void start();

    void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);

    class PingResponse implements Writeable {//......}

    class PingCollection {//......}
}
  • ZenPing 接口继承了 Releasable 接口,另外它还定义了 start、ping 方法;除此之外还定义了 PingResponse、PingCollection 这两个类

PingResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

    class PingResponse implements Writeable {

        /**
         * An ID of a ping response that was generated on behalf of another node. Needs to be less than all other ping IDs so that fake ping
         * responses don't override real ones.
         */
        public static long FAKE_PING_ID = -1;

        private static final AtomicLong idGenerator = new AtomicLong();

        // an always increasing unique identifier for this ping response.
        // lower values means older pings.
        private final long id;

        private final ClusterName clusterName;

        private final DiscoveryNode node;

        private final DiscoveryNode master;

        private final long clusterStateVersion;

        /**
         * @param node                the node which this ping describes
         * @param master              the current master of the node
         * @param clusterName         the cluster name of the node
         * @param clusterStateVersion the current cluster state version of that node
         *                            ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
         */
        public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {this(idGenerator.incrementAndGet(), node, master, clusterName, clusterStateVersion);
        }

        /**
         * @param id                  the ping's ID
         * @param node                the node which this ping describes
         * @param master              the current master of the node
         * @param clusterName         the cluster name of the node
         * @param clusterStateVersion the current cluster state version of that node
*                            ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
         */
        public PingResponse(long id, DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
            this.id = id;
            this.node = node;
            this.master = master;
            this.clusterName = clusterName;
            this.clusterStateVersion = clusterStateVersion;
        }

        public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterState state) {this(node, master, state.getClusterName(),
                state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) ?
                    ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version());
        }

        PingResponse(StreamInput in) throws IOException {this.clusterName = new ClusterName(in);
            this.node = new DiscoveryNode(in);
            this.master = in.readOptionalWriteable(DiscoveryNode::new);
            this.clusterStateVersion = in.readLong();
            this.id = in.readLong();}

        @Override
        public void writeTo(StreamOutput out) throws IOException {clusterName.writeTo(out);
            node.writeTo(out);
            out.writeOptionalWriteable(master);
            out.writeLong(clusterStateVersion);
            out.writeLong(id);
        }

        /**
         * an always increasing unique identifier for this ping response.
         * lower values means older pings.
         */
        public long id() {return this.id;}

        /**
         * the name of the cluster this node belongs to
         */
        public ClusterName clusterName() {return this.clusterName;}

        /** the node which this ping describes */
        public DiscoveryNode node() {return node;}

        /** the current master of the node */
        public DiscoveryNode master() {return master;}

        /**
         * the current cluster state version of that node ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION}
         * for not recovered) */
        public long getClusterStateVersion() {return clusterStateVersion;}

        @Override
        public String toString() {return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "]," +
                   "cluster_state_version [" + clusterStateVersion + "], cluster_name[" + clusterName.value() + "]}";
        }
    }
  • PingResponse 实现了 Writeable 接口,其 writeTo 方法会依次写入 clusterName、node、master、clusterStateVersion、id

PingCollection

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

    class PingCollection {

        Map<DiscoveryNode, PingResponse> pings;

        public PingCollection() {pings = new HashMap<>();
        }

        /**
         * adds a ping if newer than previous pings from the same node
         *
         * @return true if added, false o.w.
         */
        public synchronized boolean addPing(PingResponse ping) {PingResponse existingResponse = pings.get(ping.node());
            // in case both existing and new ping have the same id (probably because they come
            // from nodes from version <1.4.0) we prefer to use the last added one.
            if (existingResponse == null || existingResponse.id() <= ping.id()) {pings.put(ping.node(), ping);
                return true;
            }
            return false;
        }

        /** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */
        public synchronized List<PingResponse> toList() {return new ArrayList<>(pings.values());
        }

        /** the number of nodes for which there are known pings */
        public synchronized int size() {return pings.size();
        }
    }
  • PingCollection 定义了 DiscoveryNode 与 PingResponse 的 map,并提供了 addPing 方法来添加 pingResponse

UnicastZenPing

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

public class UnicastZenPing implements ZenPing {private static final Logger logger = LogManager.getLogger(UnicastZenPing.class);

    public static final String ACTION_NAME = "internal:discovery/zen/unicast";

    private final ThreadPool threadPool;
    private final TransportService transportService;
    private final ClusterName clusterName;

    private final PingContextProvider contextProvider;

    private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();

    private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap();

    // a list of temporal responses a node will return for a request (holds responses from other nodes)
    private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();

    private final SeedHostsProvider hostsProvider;

    protected final EsThreadPoolExecutor unicastZenPingExecutorService;

    private final TimeValue resolveTimeout;

    private final String nodeName;

    private volatile boolean closed = false;

    public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
                          SeedHostsProvider seedHostsProvider, PingContextProvider contextProvider) {
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        this.hostsProvider = seedHostsProvider;
        this.contextProvider = contextProvider;

        final int concurrentConnects = SeedHostsResolver.getMaxConcurrentResolvers(settings);
        resolveTimeout = SeedHostsResolver.getResolveTimeout(settings);
        nodeName = Node.NODE_NAME_SETTING.get(settings);
        logger.debug("using max_concurrent_resolvers [{}], resolver timeout [{}]",
            concurrentConnects,
            resolveTimeout);

        transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SAME, UnicastPingRequest::new,
            new UnicastPingRequestHandler());

        final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
        unicastZenPingExecutorService = EsExecutors.newScaling(
                nodeName + "/" + "unicast_connect",
                0,
                concurrentConnects,
                60,
                TimeUnit.SECONDS,
                threadFactory,
                threadPool.getThreadContext());
    }

    private SeedHostsProvider.HostsResolver createHostsResolver() {return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
            limitPortCounts, transportService, resolveTimeout);
    }

    @Override
    public void close() {ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS);
        Releasables.close(activePingingRounds.values());
        closed = true;
    }

    @Override
    public void start() {}

    /**
     * Clears the list of cached ping responses.
     */
    public void clearTemporalResponses() {temporalResponses.clear();
    }

    /**
     * Sends three rounds of pings notifying the specified {@link Consumer} when pinging is complete. Pings are sent after resolving
     * configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch
     * of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}.
     * The pings that are sent carry a timeout of 1.25 times the specified {@link TimeValue}. When pinging each node, a connection and
     * handshake is performed, with a connection timeout of the specified {@link TimeValue}.
     *
     * @param resultsConsumer the callback when pinging is complete
     * @param duration        the timeout for various components of the pings
     */
    @Override
    public void ping(final Consumer<PingCollection> resultsConsumer, final TimeValue duration) {ping(resultsConsumer, duration, duration);
    }

    protected void ping(final Consumer<PingCollection> resultsConsumer,
                        final TimeValue scheduleDuration,
                        final TimeValue requestDuration) {final List<TransportAddress> seedAddresses = new ArrayList<>();
        seedAddresses.addAll(hostsProvider.getSeedAddresses(createHostsResolver()));
        final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
        // add all possible master nodes that were active in the last known cluster configuration
        for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {seedAddresses.add(masterNode.value.getAddress());
        }

        final ConnectionProfile connectionProfile =
            ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
        final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
            nodes.getLocalNode(), connectionProfile);
        activePingingRounds.put(pingingRound.id(), pingingRound);
        final AbstractRunnable pingSender = new AbstractRunnable() {
            @Override
            public void onFailure(Exception e) {if (e instanceof AlreadyClosedException == false) {logger.warn("unexpected error while pinging", e);
                }
            }

            @Override
            protected void doRun() throws Exception {sendPings(requestDuration, pingingRound);
            }
        };
        threadPool.generic().execute(pingSender);
        threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC);
        threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC);
        threadPool.schedule(new AbstractRunnable() {
            @Override
            protected void doRun() throws Exception {finishPingingRound(pingingRound);
            }

            @Override
            public void onFailure(Exception e) {logger.warn("unexpected error while finishing pinging round", e);
            }
        }, scheduleDuration, ThreadPool.Names.GENERIC);
    }

    protected void sendPings(final TimeValue timeout, final PingingRound pingingRound) {final ClusterState lastState = contextProvider.clusterState();
        final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState));

        List<TransportAddress> temporalAddresses = temporalResponses.stream().map(pingResponse -> {assert clusterName.equals(pingResponse.clusterName()) :
                "got a ping request from a different cluster. expected" + clusterName + "got" + pingResponse.clusterName();
            return pingResponse.node().getAddress();
        }).collect(Collectors.toList());

        final Stream<TransportAddress> uniqueAddresses = Stream.concat(pingingRound.getSeedAddresses().stream(),
            temporalAddresses.stream()).distinct();

        // resolve what we can via the latest cluster state
        final Set<DiscoveryNode> nodesToPing = uniqueAddresses
            .map(address -> {DiscoveryNode foundNode = lastState.nodes().findByAddress(address);
                if (foundNode != null && transportService.nodeConnected(foundNode)) {return foundNode;} else {
                    return new DiscoveryNode(address.toString(),
                        address,
                        emptyMap(),
                        emptySet(),
                        Version.CURRENT.minimumCompatibilityVersion());
                }
            }).collect(Collectors.toSet());

        nodesToPing.forEach(node -> sendPingRequestToNode(node, timeout, pingingRound, pingRequest));
    }

    private void sendPingRequestToNode(final DiscoveryNode node, TimeValue timeout, final PingingRound pingingRound,
                                       final UnicastPingRequest pingRequest) {submitToExecutor(new AbstractRunnable() {
            @Override
            protected void doRun() throws Exception {
                Connection connection = null;
                if (transportService.nodeConnected(node)) {
                    try {
                        // concurrency can still cause disconnects
                        connection = transportService.getConnection(node);
                    } catch (NodeNotConnectedException e) {logger.trace("[{}] node [{}] just disconnected, will create a temp connection", pingingRound.id(), node);
                    }
                }

                if (connection == null) {connection = pingingRound.getOrConnect(node);
                }

                logger.trace("[{}] sending to {}", pingingRound.id(), node);
                transportService.sendRequest(connection, ACTION_NAME, pingRequest,
                    TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(),
                    getPingResponseHandler(pingingRound, node));
            }

            @Override
            public void onFailure(Exception e) {if (e instanceof ConnectTransportException || e instanceof AlreadyClosedException) {
                    // can't connect to the node - this is more common path!
                    logger.trace(() -> new ParameterizedMessage("[{}] failed to ping {}", pingingRound.id(), node), e);
                } else if (e instanceof RemoteTransportException) {
                    // something went wrong on the other side
                    logger.debug(() -> new ParameterizedMessage("[{}] received a remote error as a response to ping {}", pingingRound.id(), node), e);
                } else {logger.warn(() -> new ParameterizedMessage("[{}] failed send ping to {}", pingingRound.id(), node), e);
                }
            }

            @Override
            public void onRejection(Exception e) {
                // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings
                // But don't bail here, we can retry later on after the send ping has been scheduled.
                logger.debug("Ping execution rejected", e);
            }
        });
    }

    //......
}
  • UnicastZenPing 实现了 ZenPing 接口,它创建了 unicastZenPingExecutorService 线程池,同时维护了一个 PingResponse 的 queue 以及 activePingingRounds
  • ping 方法主要是异步及调度执行 pingSender,而其 doRun 方法执行的是 sendPings 方法;sendPings 方法构建 UnicastPingRequest,对 pingingRound 中的 seedAddresses 挨个执行 sendPingRequestToNode 方法
  • sendPingRequestToNode 向线程池提交 AbstractRunnable,其 doRun 方法主要是使用 transportService.sendRequest 发送 pingRequest

PingingRound

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    protected class PingingRound implements Releasable {
        private final int id;
        private final Map<TransportAddress, Connection> tempConnections = new HashMap<>();
        private final KeyedLock<TransportAddress> connectionLock = new KeyedLock<>(true);
        private final PingCollection pingCollection;
        private final List<TransportAddress> seedAddresses;
        private final Consumer<PingCollection> pingListener;
        private final DiscoveryNode localNode;
        private final ConnectionProfile connectionProfile;

        private AtomicBoolean closed = new AtomicBoolean(false);

        PingingRound(int id, List<TransportAddress> seedAddresses, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,
                     ConnectionProfile connectionProfile) {
            this.id = id;
            this.seedAddresses = Collections.unmodifiableList(seedAddresses.stream().distinct().collect(Collectors.toList()));
            this.pingListener = resultsConsumer;
            this.localNode = localNode;
            this.connectionProfile = connectionProfile;
            this.pingCollection = new PingCollection();}

        public int id() {return this.id;}

        public boolean isClosed() {return this.closed.get();
        }

        public List<TransportAddress> getSeedAddresses() {ensureOpen();
            return seedAddresses;
        }

        public Connection getOrConnect(DiscoveryNode node) throws IOException {
            Connection result;
            try (Releasable ignore = connectionLock.acquire(node.getAddress())) {result = tempConnections.get(node.getAddress());
                if (result == null) {ensureOpen();
                    boolean success = false;
                    logger.trace("[{}] opening connection to [{}]", id(), node);
                    result = transportService.openConnection(node, connectionProfile);
                    try {transportService.handshake(result, connectionProfile.getHandshakeTimeout().millis());
                        synchronized (this) {
                            // acquire lock and check if closed, to prevent leaving an open connection after closing
                            ensureOpen();
                            Connection existing = tempConnections.put(node.getAddress(), result);
                            assert existing == null;
                            success = true;
                        }
                    } finally {if (success == false) {logger.trace("[{}] closing connection to [{}] due to failure", id(), node);
                            IOUtils.closeWhileHandlingException(result);
                        }
                    }
                }
            }
            return result;
        }

        private void ensureOpen() {if (isClosed()) {throw new AlreadyClosedException("pinging round [" + id + "] is finished");
            }
        }

        public void addPingResponseToCollection(PingResponse pingResponse) {if (localNode.equals(pingResponse.node()) == false) {pingCollection.addPing(pingResponse);
            }
        }

        @Override
        public void close() {
            List<Connection> toClose = null;
            synchronized (this) {if (closed.compareAndSet(false, true)) {activePingingRounds.remove(id);
                    toClose = new ArrayList<>(tempConnections.values());
                    tempConnections.clear();}
            }
            if (toClose != null) {
                // we actually closed
                try {pingListener.accept(pingCollection);
                } finally {IOUtils.closeWhileHandlingException(toClose);
                }
            }
        }

        public ConnectionProfile getConnectionProfile() {return connectionProfile;}
    }
  • PingingRound 提供了 getOrConnect 方法,来获取或创建一个 discoveryNode 的 Connection,主要是调用 transportService.openConnection 方法并执行 transportService.handshake

UnicastPingRequest

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    public static class UnicastPingRequest extends TransportRequest {

        public final int id;
        public final TimeValue timeout;
        public final PingResponse pingResponse;

        public UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) {
            this.id = id;
            this.timeout = timeout;
            this.pingResponse = pingResponse;
        }

        public UnicastPingRequest(StreamInput in) throws IOException {super(in);
            id = in.readInt();
            timeout = in.readTimeValue();
            pingResponse = new PingResponse(in);
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
            out.writeInt(id);
            out.writeTimeValue(timeout);
            pingResponse.writeTo(out);
        }
    }
  • UnicastPingRequest 继承了 TransportRequest,其 writeTo 方法,写入 id、timeout、pingResponse

UnicastPingResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    public static class UnicastPingResponse extends TransportResponse {

        final int id;

        public final PingResponse[] pingResponses;

        public UnicastPingResponse(int id, PingResponse[] pingResponses) {
            this.id = id;
            this.pingResponses = pingResponses;
        }

        public UnicastPingResponse(StreamInput in) throws IOException {id = in.readInt();
            pingResponses = new PingResponse[in.readVInt()];
            for (int i = 0; i < pingResponses.length; i++) {pingResponses[i] = new PingResponse(in);
            }
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
            out.writeInt(id);
            out.writeVInt(pingResponses.length);
            for (PingResponse pingResponse : pingResponses) {pingResponse.writeTo(out);
            }
        }
    }
  • UnicastPingResponse 继承了 TransportResponse,其 writeTo 写入 id、pingResponses.length 及 pingResponses

UnicastPingRequestHandler

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    class UnicastPingRequestHandler implements TransportRequestHandler<UnicastPingRequest> {

        @Override
        public void messageReceived(UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {if (closed) {throw new AlreadyClosedException("node is shutting down");
            }
            if (request.pingResponse.clusterName().equals(clusterName)) {channel.sendResponse(handlePingRequest(request));
            } else {
                throw new IllegalStateException(
                    String.format(
                        Locale.ROOT,
                        "mismatched cluster names; request: [%s], local: [%s]",
                        request.pingResponse.clusterName().value(),
                        clusterName.value()));
            }
        }

    }

    private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {assert clusterName.equals(request.pingResponse.clusterName()) :
            "got a ping request from a different cluster. expected" + clusterName + "got" + request.pingResponse.clusterName();
        temporalResponses.add(request.pingResponse);
        // add to any ongoing pinging
        activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse));
        threadPool.schedule(() -> temporalResponses.remove(request.pingResponse),
            TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME);

        List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
        pingResponses.add(createPingResponse(contextProvider.clusterState()));

        return new UnicastPingResponse(request.id, pingResponses.toArray(new PingResponse[pingResponses.size()]));
    }
  • UnicastPingRequestHandler 继承了 TransportRequestHandler,其 messageReceived 方法主要是调用 handlePingRequest 方法并返回结果;handlePingRequest 方法主要是将使用 pingRound 的 addPingResponseToCollection 添加 request.pingResponse,同事注册一个 timeout 任务执行 temporalResponses.remove(request.pingResponse);最后创建 UnicastPingResponse 返回

getPingResponseHandler

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(final PingingRound pingingRound,
                                                                                   final DiscoveryNode node) {return new TransportResponseHandler<UnicastPingResponse>() {

            @Override
            public UnicastPingResponse read(StreamInput in) throws IOException {return new UnicastPingResponse(in);
            }

            @Override
            public String executor() {return ThreadPool.Names.SAME;}

            @Override
            public void handleResponse(UnicastPingResponse response) {logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses));
                if (pingingRound.isClosed()) {if (logger.isTraceEnabled()) {logger.trace("[{}] skipping received response from {}. already closed", pingingRound.id(), node);
                    }
                } else {Stream.of(response.pingResponses).forEach(pingingRound::addPingResponseToCollection);
                }
            }

            @Override
            public void handleException(TransportException exp) {if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException ||
                    exp.getCause() instanceof AlreadyClosedException) {
                    // ok, not connected...
                    logger.trace(() -> new ParameterizedMessage("failed to connect to {}", node), exp);
                } else if (closed == false) {logger.warn(() -> new ParameterizedMessage("failed to send ping to [{}]", node), exp);
                }
            }
        };
    }
  • getPingResponseHandler 方法创建了匿名的 TransportResponseHandler,用于处理 UnicastPingResponse;其 handleResponse 方法执行的是 pingingRound.addPingResponseToCollection 方法

小结

  • ZenPing 接口继承了 Releasable 接口,另外它还定义了 start、ping 方法;除此之外还定义了 PingResponse、PingCollection 这两个类
  • UnicastZenPing 实现了 ZenPing 接口,它创建了 unicastZenPingExecutorService 线程池,同时维护了一个 PingResponse 的 queue 以及 activePingingRounds
  • ping 方法主要是异步及调度执行 pingSender,而其 doRun 方法执行的是 sendPings 方法;sendPings 方法构建 UnicastPingRequest,对 pingingRound 中的 seedAddresses 挨个执行 sendPingRequestToNode 方法;sendPingRequestToNode 向线程池提交 AbstractRunnable,其 doRun 方法主要是使用 transportService.sendRequest 发送 pingRequest

doc

  • ZenPing
退出移动版