乐趣区

聊聊elasticsearch的MasterFaultDetection

本文主要研究一下 elasticsearch 的 MasterFaultDetection

FaultDetection

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java

/**
 * A base class for {@link MasterFaultDetection} & {@link NodesFaultDetection},
 * making sure both use the same setting.
 */
public abstract class FaultDetection implements Closeable {private static final Logger logger = LogManager.getLogger(FaultDetection.class);

    public static final Setting<Boolean> CONNECT_ON_NETWORK_DISCONNECT_SETTING =
        Setting.boolSetting("discovery.zen.fd.connect_on_network_disconnect", false, Property.NodeScope, Property.Deprecated);
    public static final Setting<TimeValue> PING_INTERVAL_SETTING =
        Setting.positiveTimeSetting("discovery.zen.fd.ping_interval", timeValueSeconds(1), Property.NodeScope, Property.Deprecated);
    public static final Setting<TimeValue> PING_TIMEOUT_SETTING =
        Setting.timeSetting("discovery.zen.fd.ping_timeout", timeValueSeconds(30), Property.NodeScope, Property.Deprecated);
    public static final Setting<Integer> PING_RETRIES_SETTING =
        Setting.intSetting("discovery.zen.fd.ping_retries", 3, Property.NodeScope, Property.Deprecated);
    public static final Setting<Boolean> REGISTER_CONNECTION_LISTENER_SETTING =
        Setting.boolSetting("discovery.zen.fd.register_connection_listener", true, Property.NodeScope, Property.Deprecated);

    protected final ThreadPool threadPool;
    protected final ClusterName clusterName;
    protected final TransportService transportService;

    // used mainly for testing, should always be true
    protected final boolean registerConnectionListener;
    protected final FDConnectionListener connectionListener;
    protected final boolean connectOnNetworkDisconnect;

    protected final TimeValue pingInterval;
    protected final TimeValue pingRetryTimeout;
    protected final int pingRetryCount;

    public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.clusterName = clusterName;

        this.connectOnNetworkDisconnect = CONNECT_ON_NETWORK_DISCONNECT_SETTING.get(settings);
        this.pingInterval = PING_INTERVAL_SETTING.get(settings);
        this.pingRetryTimeout = PING_TIMEOUT_SETTING.get(settings);
        this.pingRetryCount = PING_RETRIES_SETTING.get(settings);
        this.registerConnectionListener = REGISTER_CONNECTION_LISTENER_SETTING.get(settings);

        this.connectionListener = new FDConnectionListener();
        if (registerConnectionListener) {transportService.addConnectionListener(connectionListener);
        }
    }

    @Override
    public void close() {transportService.removeConnectionListener(connectionListener);
    }

    /**
     * This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event
     */
    abstract void handleTransportDisconnect(DiscoveryNode node);

    private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeDisconnected(DiscoveryNode node) {AbstractRunnable runnable = new AbstractRunnable() {
                @Override
                public void onFailure(Exception e) {logger.warn("failed to handle transport disconnect for node: {}", node);
                }

                @Override
                protected void doRun() {handleTransportDisconnect(node);
                }
            };
            threadPool.generic().execute(runnable);
        }
    }

}
  • FaultDetection 实现了 Closeable 接口,它定义了 FDConnectionListener,其构造器在 registerConnectionListener 为 true 的情况下会给 transportService 添加 FDConnectionListener,而 close 方法则是将 FDConnectionListener 从 transportService 中移除;FaultDetection 还定义了抽象方法 handleTransportDisconnect

MasterFaultDetection

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

public class MasterFaultDetection extends FaultDetection {private static final Logger logger = LogManager.getLogger(MasterFaultDetection.class);

    public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping";

    public interface Listener {

        /** called when pinging the master failed, like a timeout, transport disconnects etc */
        void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason);

    }

    private final MasterService masterService;
    private final java.util.function.Supplier<ClusterState> clusterStateSupplier;
    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();

    private volatile MasterPinger masterPinger;

    private final Object masterNodeMutex = new Object();

    private volatile DiscoveryNode masterNode;

    private volatile int retryCount;

    private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();

    public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
                                java.util.function.Supplier<ClusterState> clusterStateSupplier, MasterService masterService,
                                ClusterName clusterName) {super(settings, threadPool, transportService, clusterName);
        this.clusterStateSupplier = clusterStateSupplier;
        this.masterService = masterService;

        logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,
            pingRetryCount);

        transportService.registerRequestHandler(MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());
    }

    @Override
    public void close() {super.close();
        stop("closing");
        this.listeners.clear();}

    @Override
    protected void handleTransportDisconnect(DiscoveryNode node) {synchronized (masterNodeMutex) {if (!node.equals(this.masterNode)) {return;}
            if (connectOnNetworkDisconnect) {
                try {transportService.connectToNode(node);
                    // if all is well, make sure we restart the pinger
                    if (masterPinger != null) {masterPinger.stop();
                    }
                    this.masterPinger = new MasterPinger();
                    // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                    threadPool.schedule(masterPinger, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME);
                } catch (Exception e) {logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                    notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)");
                }
            } else {logger.trace("[master] [{}] transport disconnected", node);
                notifyMasterFailure(node, null, "transport disconnected");
            }
        }
    }

    private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {if (notifiedMasterFailure.compareAndSet(false, true)) {
            try {threadPool.generic().execute(() -> {for (Listener listener : listeners) {listener.onMasterFailure(masterNode, cause, reason);
                    }
                });
            } catch (EsRejectedExecutionException e) {logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
            }
            stop("master failure," + reason);
        }
    }

    //......
}
  • MasterFaultDetection 继承了 FaultDetection,其构造器给 transportService 注册了 MasterPingRequestHandler
  • 其 handleTransportDisconnect 方法在 connectOnNetworkDisconnect 为 true 的情况下会对 node 进行重试,如果重试成功则重新注册 MasterPinger 的延时任务,如果重试失败或者是 connectOnNetworkDisconnect 为 false 的情况下会调用 notifyMasterFailure 方法
  • notifyMasterFailure 方法则会回调 MasterFaultDetection.Listener 的 onMasterFailure 方法

MasterPingRequestHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

    private class MasterPingRequestHandler implements TransportRequestHandler<MasterPingRequest> {

        @Override
        public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception {final DiscoveryNodes nodes = clusterStateSupplier.get().nodes();
            // check if we are really the same master as the one we seemed to be think we are
            // this can happen if the master got "kill -9" and then another node started using the same port
            if (!request.masterNode.equals(nodes.getLocalNode())) {throw new ThisIsNotTheMasterYouAreLookingForException();
            }

            // ping from nodes of version < 1.4.0 will have the clustername set to null
            if (request.clusterName != null && !request.clusterName.equals(clusterName)) {logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]",
                    request.clusterName, clusterName);
                throw new ThisIsNotTheMasterYouAreLookingForException("master fault detection ping request is targeted for a different ["
                    + request.clusterName + "] cluster then us [" + clusterName + "]");
            }

            // when we are elected as master or when a node joins, we use a cluster state update thread
            // to incorporate that information in the cluster state. That cluster state is published
            // before we make it available locally. This means that a master ping can come from a node
            // that has already processed the new CS but it is not known locally.
            // Therefore, if we fail we have to check again under a cluster state thread to make sure
            // all processing is finished.
            //

            if (!nodes.isLocalNodeElectedMaster() || !nodes.nodeExists(request.sourceNode)) {logger.trace("checking ping from {} under a cluster state thread", request.sourceNode);
                masterService.submitStateUpdateTask("master ping (from:" + request.sourceNode + ")", new ClusterStateUpdateTask() {

                    @Override
                    public ClusterState execute(ClusterState currentState) throws Exception {
                        // if we are no longer master, fail...
                        DiscoveryNodes nodes = currentState.nodes();
                        if (!nodes.nodeExists(request.sourceNode)) {throw new NodeDoesNotExistOnMasterException();
                        }
                        return currentState;
                    }

                    @Override
                    public void onNoLongerMaster(String source) {onFailure(source, new NotMasterException("local node is not master"));
                    }

                    @Override
                    public void onFailure(String source, @Nullable Exception e) {if (e == null) {e = new ElasticsearchException("unknown error while processing ping");
                        }
                        try {channel.sendResponse(e);
                        } catch (IOException inner) {inner.addSuppressed(e);
                            logger.warn("error while sending ping response", inner);
                        }
                    }

                    @Override
                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                        try {channel.sendResponse(new MasterPingResponseResponse());
                        } catch (IOException e) {logger.warn("error while sending ping response", e);
                        }
                    }
                });
            } else {
                // send a response, and note if we are connected to the master or not
                channel.sendResponse(new MasterPingResponseResponse());
            }
        }
    }

    public static class MasterPingRequest extends TransportRequest {

        public DiscoveryNode sourceNode;

        private DiscoveryNode masterNode;
        private ClusterName clusterName;

        public MasterPingRequest() {}

        public MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) {
            this.sourceNode = sourceNode;
            this.masterNode = masterNode;
            this.clusterName = clusterName;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {super.readFrom(in);
            sourceNode = new DiscoveryNode(in);
            masterNode = new DiscoveryNode(in);
            clusterName = new ClusterName(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
            sourceNode.writeTo(out);
            masterNode.writeTo(out);
            clusterName.writeTo(out);
        }
    }

    public static class MasterPingResponseResponse extends TransportResponse {public MasterPingResponseResponse() { }

        public MasterPingResponseResponse(StreamInput in) throws IOException {super(in);
        }
    }
  • MasterPingRequestHandler 用于响应 MasterPingRequest 请求,它正在 localNode 不是 master 或者 sourceNode 存在的前提下会执行 ClusterStateUpdateTask,否则直接返回 MasterPingResponseResponse
  • ClusterStateUpdateTask 的 execute 方法会校验 request 的 sourceNode 是否存在,如果不存在则抛出 NodeDoesNotExistOnMasterException 异常
  • ClusterStateUpdateTask 的 onNoLongerMaster 方法会调用 onFailure 方法,传递的异常为 NotMasterException;onFailure 方法判断异常是否为 null,为 null 则创建 ElasticsearchException 异常,然后返回异常响应;clusterStateProcessed 方法则返回 MasterPingResponseResponse

ZenDiscovery.processNextCommittedClusterState

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
    //......

    // return true if state has been sent to applier
    boolean processNextCommittedClusterState(String reason) {assert Thread.holdsLock(stateMutex);

        final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();
        final ClusterState currentState = committedState.get();
        // all pending states have been processed
        if (newClusterState == null) {return false;}

        assert newClusterState.nodes().getMasterNode() != null : "received a cluster state without a master";
        assert !newClusterState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock()) :
            "received a cluster state with a master block";

        if (currentState.nodes().isLocalNodeElectedMaster() && newClusterState.nodes().isLocalNodeElectedMaster() == false) {handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(),
                "via a new cluster state");
            return false;
        }

        try {if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) {
                String message = String.format(
                    Locale.ROOT,
                    "rejecting cluster state version [%d] uuid [%s] received from [%s]",
                    newClusterState.version(),
                    newClusterState.stateUUID(),
                    newClusterState.nodes().getMasterNodeId()
                );
                throw new IllegalStateException(message);
            }
        } catch (Exception e) {
            try {pendingStatesQueue.markAsFailed(newClusterState, e);
            } catch (Exception inner) {inner.addSuppressed(e);
                logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
            }
            return false;
        }

        if (currentState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock())) {
            // its a fresh update from the master as we transition from a start of not having a master to having one
            logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
        }

        if (currentState == newClusterState) {return false;}

        committedState.set(newClusterState);

        // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
        // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
        if (newClusterState.nodes().isLocalNodeElectedMaster()) {
            // update the set of nodes to ping
            nodesFD.updateNodesAndPing(newClusterState);
        } else {
            // check to see that we monitor the correct master of the cluster
            if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {masterFD.restart(newClusterState.nodes().getMasterNode(),
                    "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
            }
        }

        //......

        return true;
    }

    //......
}
  • ZenDiscovery 的 processNextCommittedClusterState 方法在当前 node 不是 master 的时候会在 masterFD.masterNode() 为 null 或者 masterFD.masterNode() 与 newClusterState.nodes().getMasterNode() 不同时执行 masterFD.restart 方法

MasterFaultDetection.restart

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

public class MasterFaultDetection extends FaultDetection {

    //......

    public void restart(DiscoveryNode masterNode, String reason) {synchronized (masterNodeMutex) {if (logger.isDebugEnabled()) {logger.debug("[master] restarting fault detection against master [{}], reason [{}]", masterNode, reason);
            }
            innerStop();
            innerStart(masterNode);
        }
    }

    private void innerStart(final DiscoveryNode masterNode) {
        this.masterNode = masterNode;
        this.retryCount = 0;
        this.notifiedMasterFailure.set(false);
        if (masterPinger != null) {masterPinger.stop();
        }
        this.masterPinger = new MasterPinger();

        // we start pinging slightly later to allow the chosen master to complete it's own master election
        threadPool.schedule(masterPinger, pingInterval, ThreadPool.Names.SAME);
    }

    private void innerStop() {
        // also will stop the next ping schedule
        this.retryCount = 0;
        if (masterPinger != null) {masterPinger.stop();
            masterPinger = null;
        }
        this.masterNode = null;
    }

    //......
}
  • MasterFaultDetection 的 restart 方法内部先执行 innerStop,然后再执行 innerStart;innerStop 主要是执行 masterPinger.stop() 并设置 masterPinger 及 masterNode 为 null;innerStart 方法则创建并注册 MasterPinger 的延时任务,延时 pingInterval 执行

MasterPinger

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

    private class MasterPinger implements Runnable {

        private volatile boolean running = true;

        public void stop() {this.running = false;}

        @Override
        public void run() {if (!running) {
                // return and don't spawn...
                return;
            }
            final DiscoveryNode masterToPing = masterNode;
            if (masterToPing == null) {
                // master is null, should not happen, but we are still running, so reschedule
                threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);
                return;
            }

            final MasterPingRequest request = new MasterPingRequest(clusterStateSupplier.get().nodes().getLocalNode(), masterToPing, clusterName);
            final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
                .withTimeout(pingRetryTimeout).build();
            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options,
                new TransportResponseHandler<MasterPingResponseResponse>() {
                        @Override
                        public MasterPingResponseResponse read(StreamInput in) throws IOException {return new MasterPingResponseResponse(in);
                        }

                        @Override
                        public void handleResponse(MasterPingResponseResponse response) {if (!running) {return;}
                            // reset the counter, we got a good result
                            MasterFaultDetection.this.retryCount = 0;
                            // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                // we don't stop on disconnection from master, we keep pinging it
                                threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);
                            }
                        }

                        @Override
                        public void handleException(TransportException exp) {if (!running) {return;}
                            synchronized (masterNodeMutex) {
                                // check if the master node did not get switched on us...
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {handleTransportDisconnect(masterToPing);
                                        return;
                                    } else if (exp.getCause() instanceof NotMasterException) {logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                        notifyMasterFailure(masterToPing, exp, "no longer master");
                                        return;
                                    } else if (exp.getCause() instanceof ThisIsNotTheMasterYouAreLookingForException) {logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                        notifyMasterFailure(masterToPing, exp,"not master");
                                        return;
                                    } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure"
                                            , masterNode);
                                        notifyMasterFailure(masterToPing, exp,"do not exists on master, act as master failure");
                                        return;
                                    }

                                    int retryCount = ++MasterFaultDetection.this.retryCount;
                                    logger.trace(() -> new ParameterizedMessage("[master] failed to ping [{}], retry [{}] out of [{}]",
                                            masterNode, retryCount, pingRetryCount), exp);
                                    if (retryCount >= pingRetryCount) {logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout",
                                            masterNode, pingRetryCount, pingRetryTimeout);
                                        // not good, failure
                                        notifyMasterFailure(masterToPing, null, "failed to ping, tried [" + pingRetryCount
                                            + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                    } else {
                                        // resend the request, not reschedule, rely on send timeout
                                        transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                    }
                                }
                            }
                        }

                        @Override
                        public String executor() {return ThreadPool.Names.SAME;}
                    }
            );
        }
    }

    private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {if (notifiedMasterFailure.compareAndSet(false, true)) {
            try {threadPool.generic().execute(() -> {for (Listener listener : listeners) {listener.onMasterFailure(masterNode, cause, reason);
                    }
                });
            } catch (EsRejectedExecutionException e) {logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
            }
            stop("master failure," + reason);
        }
    }
  • MasterPinger 的 run 方法首先判断 masterToPing 是否为 null,如果为 null 则在注册 MasterPinger 的延时任务;如果不为 null 则发送 MasterPingRequest 请求给 masterToPing
  • TransportResponseHandler 的 handleResponse 方法会清空 MasterFaultDetection.this.retryCount,然后判断 masterNode 是否变化,没有变化则继续注册 MasterPinger 的延时任务
  • TransportResponseHandler 的 handleException 方法会在 masterNode 没有变化的前提下对异常进行处理,如果是 ConnectTransportException 则执行 handleTransportDisconnect 方法,如果是 NotMasterException、ThisIsNotTheMasterYouAreLookingForException、NodeDoesNotExistOnMasterException 则执行 notifyMasterFailure 方法,其他异常则进行重试
  • 重试时先递增 MasterFaultDetection.this.retryCount,如果重试次数大于等于 pingRetryCount 则直接执行 notifyMasterFailure 方法,否则进行重试发送 MasterPingRequest 请求
  • notifyMasterFailure 方法则回调 MasterFaultDetection.Listener 的 onMasterFailure 方法

ZenDiscovery.MasterNodeFailureListener

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

    private class MasterNodeFailureListener implements MasterFaultDetection.Listener {

        @Override
        public void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason) {handleMasterGone(masterNode, cause, reason);
        }
    }

    private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a master failure
            return;
        }
        if (localNodeMaster()) {
            // we might get this on both a master telling us shutting down, and then the disconnect failure
            return;
        }

        logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);

        synchronized (stateMutex) {if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
                // flush any pending cluster states from old master, so it will not be set as master again
                pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
                rejoin("master left (reason =" + reason + ")");
            }
        }
    }

    protected void rejoin(String reason) {assert Thread.holdsLock(stateMutex);
        ClusterState clusterState = committedState.get();

        logger.warn("{}, current nodes: {}", reason, clusterState.nodes());
        nodesFD.stop();
        masterFD.stop(reason);

        // TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle
        // before a decision is made.
        joinThreadControl.startNewThreadIfNotRunning();

        if (clusterState.nodes().getMasterNodeId() != null) {
            // remove block if it already exists before adding new one
            assert clusterState.blocks().hasGlobalBlockWithId(noMasterBlockService.getNoMasterBlock().id()) == false :
                "NO_MASTER_BLOCK should only be added by ZenDiscovery";
            ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
                .addGlobalBlock(noMasterBlockService.getNoMasterBlock())
                .build();

            DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();
            clusterState = ClusterState.builder(clusterState)
                .blocks(clusterBlocks)
                .nodes(discoveryNodes)
                .build();

            committedState.set(clusterState);
            clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied
        }
    }

    private class JoinThreadControl {private final AtomicBoolean running = new AtomicBoolean(false);
        private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>();

        /** returns true if join thread control is started and there is currently an active join thread */
        public boolean joinThreadActive() {Thread currentThread = currentJoinThread.get();
            return running.get() && currentThread != null && currentThread.isAlive();
        }

        /** returns true if join thread control is started and the supplied thread is the currently active joinThread */
        public boolean joinThreadActive(Thread joinThread) {return running.get() && joinThread.equals(currentJoinThread.get());
        }

        /** cleans any running joining thread and calls {@link #rejoin} */
        public void stopRunningThreadAndRejoin(String reason) {assert Thread.holdsLock(stateMutex);
            currentJoinThread.set(null);
            rejoin(reason);
        }

        /** starts a new joining thread if there is no currently active one and join thread controlling is started */
        public void startNewThreadIfNotRunning() {assert Thread.holdsLock(stateMutex);
            if (joinThreadActive()) {return;}
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {Thread currentThread = Thread.currentThread();
                    if (!currentJoinThread.compareAndSet(null, currentThread)) {return;}
                    while (running.get() && joinThreadActive(currentThread)) {
                        try {innerJoinCluster();
                            return;
                        } catch (Exception e) {logger.error("unexpected error while joining cluster, trying again", e);
                            // Because we catch any exception here, we want to know in
                            // tests if an uncaught exception got to this point and the test infra uncaught exception
                            // leak detection can catch this. In practise no uncaught exception should leak
                            assert ExceptionsHelper.reThrowIfNotNull(e);
                        }
                    }
                    // cleaning the current thread from currentJoinThread is done by explicit calls.
                }
            });
        }

        /**
         * marks the given joinThread as completed and makes sure another thread is running (starting one if needed)
         * If the given thread is not the currently running join thread, the command is ignored.
         */
        public void markThreadAsDoneAndStartNew(Thread joinThread) {assert Thread.holdsLock(stateMutex);
            if (!markThreadAsDone(joinThread)) {return;}
            startNewThreadIfNotRunning();}

        /** marks the given joinThread as completed. Returns false if the supplied thread is not the currently active join thread */
        public boolean markThreadAsDone(Thread joinThread) {assert Thread.holdsLock(stateMutex);
            return currentJoinThread.compareAndSet(joinThread, null);
        }

        public void stop() {running.set(false);
            Thread joinThread = currentJoinThread.getAndSet(null);
            if (joinThread != null) {joinThread.interrupt();
            }
        }

        public void start() {running.set(true);
        }

    }
  • ZenDiscovery 的 MasterNodeFailureListener 实现了 MasterFaultDetection.Listener 接口,其 onMasterFailure 方法执行的是 handleMasterGone 方法;handleMasterGone 方法主要是执行 pendingStatesQueue.failAllStatesAndClear,然后进行 rejoin
  • rejoin 方法首先执行 nodesFD.stop() 及 masterFD.stop(reason),然后触发 joinThreadControl.startNewThreadIfNotRunning(),最后构造新的 clusterState,执行 clusterApplier.onNewClusterState
  • joinThreadControl.startNewThreadIfNotRunning() 方法主要是执行 innerJoinCluster 方法

小结

  • FaultDetection 实现了 Closeable 接口,它定义了 FDConnectionListener,其构造器在 registerConnectionListener 为 true 的情况下会给 transportService 添加 FDConnectionListener,而 close 方法则是将 FDConnectionListener 从 transportService 中移除;FaultDetection 还定义了抽象方法 handleTransportDisconnect
  • MasterFaultDetection 继承了 FaultDetection,其构造器给 transportService 注册了 MasterPingRequestHandler;其 handleTransportDisconnect 方法在 connectOnNetworkDisconnect 为 true 的情况下会对 node 进行重试,如果重试成功则重新注册 MasterPinger 的延时任务,如果重试失败或者是 connectOnNetworkDisconnect 为 false 的情况下会调用 notifyMasterFailure 方法;notifyMasterFailure 方法则会回调 MasterFaultDetection.Listener 的 onMasterFailure 方法
  • ZenDiscovery 的 MasterNodeFailureListener 实现了 MasterFaultDetection.Listener 接口,其 onMasterFailure 方法执行的是 handleMasterGone 方法;handleMasterGone 方法主要是执行 pendingStatesQueue.failAllStatesAndClear,然后进行 rejoin
  • ZenDiscovery 的 processNextCommittedClusterState 方法在当前 node 不是 master 的时候会在 masterFD.masterNode() 为 null 或者 masterFD.masterNode() 与 newClusterState.nodes().getMasterNode() 不同时执行 masterFD.restart 方法
  • MasterFaultDetection 的 restart 方法内部先执行 innerStop,然后再执行 innerStart;innerStop 主要是执行 masterPinger.stop() 并设置 masterPinger 及 masterNode 为 null;innerStart 方法则创建并注册 MasterPinger 的延时任务,延时 pingInterval 执行
  • MasterPinger 的 run 方法首先判断 masterToPing 是否为 null,如果为 null 则在注册 MasterPinger 的延时任务;如果不为 null 则发送 MasterPingRequest 请求给 masterToPing;请求成功时会清空 MasterFaultDetection.this.retryCount,然后判断 masterNode 是否变化,没有变化则继续注册 MasterPinger 的延时任务;请求失败则根据异常做不同处理,比如执行 handleTransportDisconnect 方法,或者执行 notifyMasterFailure 方法,或者则进行重试

doc

  • Cluster fault detection
退出移动版