本文主要研究一下elasticsearch的MasterFaultDetection

FaultDetection

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java

/** * A base class for {@link MasterFaultDetection} &amp; {@link NodesFaultDetection}, * making sure both use the same setting. */public abstract class FaultDetection implements Closeable {    private static final Logger logger = LogManager.getLogger(FaultDetection.class);    public static final Setting<Boolean> CONNECT_ON_NETWORK_DISCONNECT_SETTING =        Setting.boolSetting("discovery.zen.fd.connect_on_network_disconnect", false, Property.NodeScope, Property.Deprecated);    public static final Setting<TimeValue> PING_INTERVAL_SETTING =        Setting.positiveTimeSetting("discovery.zen.fd.ping_interval", timeValueSeconds(1), Property.NodeScope, Property.Deprecated);    public static final Setting<TimeValue> PING_TIMEOUT_SETTING =        Setting.timeSetting("discovery.zen.fd.ping_timeout", timeValueSeconds(30), Property.NodeScope, Property.Deprecated);    public static final Setting<Integer> PING_RETRIES_SETTING =        Setting.intSetting("discovery.zen.fd.ping_retries", 3, Property.NodeScope, Property.Deprecated);    public static final Setting<Boolean> REGISTER_CONNECTION_LISTENER_SETTING =        Setting.boolSetting("discovery.zen.fd.register_connection_listener", true, Property.NodeScope, Property.Deprecated);    protected final ThreadPool threadPool;    protected final ClusterName clusterName;    protected final TransportService transportService;    // used mainly for testing, should always be true    protected final boolean registerConnectionListener;    protected final FDConnectionListener connectionListener;    protected final boolean connectOnNetworkDisconnect;    protected final TimeValue pingInterval;    protected final TimeValue pingRetryTimeout;    protected final int pingRetryCount;    public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {        this.threadPool = threadPool;        this.transportService = transportService;        this.clusterName = clusterName;        this.connectOnNetworkDisconnect = CONNECT_ON_NETWORK_DISCONNECT_SETTING.get(settings);        this.pingInterval = PING_INTERVAL_SETTING.get(settings);        this.pingRetryTimeout = PING_TIMEOUT_SETTING.get(settings);        this.pingRetryCount = PING_RETRIES_SETTING.get(settings);        this.registerConnectionListener = REGISTER_CONNECTION_LISTENER_SETTING.get(settings);        this.connectionListener = new FDConnectionListener();        if (registerConnectionListener) {            transportService.addConnectionListener(connectionListener);        }    }    @Override    public void close() {        transportService.removeConnectionListener(connectionListener);    }    /**     * This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event     */    abstract void handleTransportDisconnect(DiscoveryNode node);    private class FDConnectionListener implements TransportConnectionListener {        @Override        public void onNodeDisconnected(DiscoveryNode node) {            AbstractRunnable runnable = new AbstractRunnable() {                @Override                public void onFailure(Exception e) {                    logger.warn("failed to handle transport disconnect for node: {}", node);                }                @Override                protected void doRun() {                    handleTransportDisconnect(node);                }            };            threadPool.generic().execute(runnable);        }    }}
  • FaultDetection实现了Closeable接口,它定义了FDConnectionListener,其构造器在registerConnectionListener为true的情况下会给transportService添加FDConnectionListener,而close方法则是将FDConnectionListener从transportService中移除;FaultDetection还定义了抽象方法handleTransportDisconnect

MasterFaultDetection

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

public class MasterFaultDetection extends FaultDetection {    private static final Logger logger = LogManager.getLogger(MasterFaultDetection.class);    public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping";    public interface Listener {        /** called when pinging the master failed, like a timeout, transport disconnects etc */        void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason);    }    private final MasterService masterService;    private final java.util.function.Supplier<ClusterState> clusterStateSupplier;    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();    private volatile MasterPinger masterPinger;    private final Object masterNodeMutex = new Object();    private volatile DiscoveryNode masterNode;    private volatile int retryCount;    private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();    public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,                                java.util.function.Supplier<ClusterState> clusterStateSupplier, MasterService masterService,                                ClusterName clusterName) {        super(settings, threadPool, transportService, clusterName);        this.clusterStateSupplier = clusterStateSupplier;        this.masterService = masterService;        logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,            pingRetryCount);        transportService.registerRequestHandler(            MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());    }    @Override    public void close() {        super.close();        stop("closing");        this.listeners.clear();    }    @Override    protected void handleTransportDisconnect(DiscoveryNode node) {        synchronized (masterNodeMutex) {            if (!node.equals(this.masterNode)) {                return;            }            if (connectOnNetworkDisconnect) {                try {                    transportService.connectToNode(node);                    // if all is well, make sure we restart the pinger                    if (masterPinger != null) {                        masterPinger.stop();                    }                    this.masterPinger = new MasterPinger();                    // we use schedule with a 0 time value to run the pinger on the pool as it will run on later                    threadPool.schedule(masterPinger, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME);                } catch (Exception e) {                    logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);                    notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)");                }            } else {                logger.trace("[master] [{}] transport disconnected", node);                notifyMasterFailure(node, null, "transport disconnected");            }        }    }    private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {        if (notifiedMasterFailure.compareAndSet(false, true)) {            try {                threadPool.generic().execute(() -> {                    for (Listener listener : listeners) {                        listener.onMasterFailure(masterNode, cause, reason);                    }                });            } catch (EsRejectedExecutionException e) {                logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);            }            stop("master failure, " + reason);        }    }    //......}
  • MasterFaultDetection继承了FaultDetection,其构造器给transportService注册了MasterPingRequestHandler
  • 其handleTransportDisconnect方法在connectOnNetworkDisconnect为true的情况下会对node进行重试,如果重试成功则重新注册MasterPinger的延时任务,如果重试失败或者是connectOnNetworkDisconnect为false的情况下会调用notifyMasterFailure方法
  • notifyMasterFailure方法则会回调MasterFaultDetection.Listener的onMasterFailure方法

MasterPingRequestHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

    private class MasterPingRequestHandler implements TransportRequestHandler<MasterPingRequest> {        @Override        public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception {            final DiscoveryNodes nodes = clusterStateSupplier.get().nodes();            // check if we are really the same master as the one we seemed to be think we are            // this can happen if the master got "kill -9" and then another node started using the same port            if (!request.masterNode.equals(nodes.getLocalNode())) {                throw new ThisIsNotTheMasterYouAreLookingForException();            }            // ping from nodes of version < 1.4.0 will have the clustername set to null            if (request.clusterName != null && !request.clusterName.equals(clusterName)) {                logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]",                    request.clusterName, clusterName);                throw new ThisIsNotTheMasterYouAreLookingForException("master fault detection ping request is targeted for a different ["                    + request.clusterName + "] cluster then us [" + clusterName + "]");            }            // when we are elected as master or when a node joins, we use a cluster state update thread            // to incorporate that information in the cluster state. That cluster state is published            // before we make it available locally. This means that a master ping can come from a node            // that has already processed the new CS but it is not known locally.            // Therefore, if we fail we have to check again under a cluster state thread to make sure            // all processing is finished.            //            if (!nodes.isLocalNodeElectedMaster() || !nodes.nodeExists(request.sourceNode)) {                logger.trace("checking ping from {} under a cluster state thread", request.sourceNode);                masterService.submitStateUpdateTask("master ping (from: " + request.sourceNode + ")", new ClusterStateUpdateTask() {                    @Override                    public ClusterState execute(ClusterState currentState) throws Exception {                        // if we are no longer master, fail...                        DiscoveryNodes nodes = currentState.nodes();                        if (!nodes.nodeExists(request.sourceNode)) {                            throw new NodeDoesNotExistOnMasterException();                        }                        return currentState;                    }                    @Override                    public void onNoLongerMaster(String source) {                        onFailure(source, new NotMasterException("local node is not master"));                    }                    @Override                    public void onFailure(String source, @Nullable Exception e) {                        if (e == null) {                            e = new ElasticsearchException("unknown error while processing ping");                        }                        try {                            channel.sendResponse(e);                        } catch (IOException inner) {                            inner.addSuppressed(e);                            logger.warn("error while sending ping response", inner);                        }                    }                    @Override                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {                        try {                            channel.sendResponse(new MasterPingResponseResponse());                        } catch (IOException e) {                            logger.warn("error while sending ping response", e);                        }                    }                });            } else {                // send a response, and note if we are connected to the master or not                channel.sendResponse(new MasterPingResponseResponse());            }        }    }    public static class MasterPingRequest extends TransportRequest {        public DiscoveryNode sourceNode;        private DiscoveryNode masterNode;        private ClusterName clusterName;        public MasterPingRequest() {        }        public MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) {            this.sourceNode = sourceNode;            this.masterNode = masterNode;            this.clusterName = clusterName;        }        @Override        public void readFrom(StreamInput in) throws IOException {            super.readFrom(in);            sourceNode = new DiscoveryNode(in);            masterNode = new DiscoveryNode(in);            clusterName = new ClusterName(in);        }        @Override        public void writeTo(StreamOutput out) throws IOException {            super.writeTo(out);            sourceNode.writeTo(out);            masterNode.writeTo(out);            clusterName.writeTo(out);        }    }    public static class MasterPingResponseResponse extends TransportResponse {        public MasterPingResponseResponse() {        }        public MasterPingResponseResponse(StreamInput in) throws IOException {            super(in);        }    }
  • MasterPingRequestHandler用于响应MasterPingRequest请求,它正在localNode不是master或者sourceNode存在的前提下会执行ClusterStateUpdateTask,否则直接返回MasterPingResponseResponse
  • ClusterStateUpdateTask的execute方法会校验request的sourceNode是否存在,如果不存在则抛出NodeDoesNotExistOnMasterException异常
  • ClusterStateUpdateTask的onNoLongerMaster方法会调用onFailure方法,传递的异常为NotMasterException;onFailure方法判断异常是否为null,为null则创建ElasticsearchException异常,然后返回异常响应;clusterStateProcessed方法则返回MasterPingResponseResponse

ZenDiscovery.processNextCommittedClusterState

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {    //......    // return true if state has been sent to applier    boolean processNextCommittedClusterState(String reason) {        assert Thread.holdsLock(stateMutex);        final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();        final ClusterState currentState = committedState.get();        // all pending states have been processed        if (newClusterState == null) {            return false;        }        assert newClusterState.nodes().getMasterNode() != null : "received a cluster state without a master";        assert !newClusterState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock()) :            "received a cluster state with a master block";        if (currentState.nodes().isLocalNodeElectedMaster() && newClusterState.nodes().isLocalNodeElectedMaster() == false) {            handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(),                "via a new cluster state");            return false;        }        try {            if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) {                String message = String.format(                    Locale.ROOT,                    "rejecting cluster state version [%d] uuid [%s] received from [%s]",                    newClusterState.version(),                    newClusterState.stateUUID(),                    newClusterState.nodes().getMasterNodeId()                );                throw new IllegalStateException(message);            }        } catch (Exception e) {            try {                pendingStatesQueue.markAsFailed(newClusterState, e);            } catch (Exception inner) {                inner.addSuppressed(e);                logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);            }            return false;        }        if (currentState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock())) {            // its a fresh update from the master as we transition from a start of not having a master to having one            logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());        }        if (currentState == newClusterState) {            return false;        }        committedState.set(newClusterState);        // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest        // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node        if (newClusterState.nodes().isLocalNodeElectedMaster()) {            // update the set of nodes to ping            nodesFD.updateNodesAndPing(newClusterState);        } else {            // check to see that we monitor the correct master of the cluster            if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {                masterFD.restart(newClusterState.nodes().getMasterNode(),                    "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");            }        }        //......        return true;    }    //......}
  • ZenDiscovery的processNextCommittedClusterState方法在当前node不是master的时候会在masterFD.masterNode()为null或者masterFD.masterNode()与newClusterState.nodes().getMasterNode()不同时执行masterFD.restart方法

MasterFaultDetection.restart

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

public class MasterFaultDetection extends FaultDetection {    //......    public void restart(DiscoveryNode masterNode, String reason) {        synchronized (masterNodeMutex) {            if (logger.isDebugEnabled()) {                logger.debug("[master] restarting fault detection against master [{}], reason [{}]", masterNode, reason);            }            innerStop();            innerStart(masterNode);        }    }    private void innerStart(final DiscoveryNode masterNode) {        this.masterNode = masterNode;        this.retryCount = 0;        this.notifiedMasterFailure.set(false);        if (masterPinger != null) {            masterPinger.stop();        }        this.masterPinger = new MasterPinger();        // we start pinging slightly later to allow the chosen master to complete it's own master election        threadPool.schedule(masterPinger, pingInterval, ThreadPool.Names.SAME);    }    private void innerStop() {        // also will stop the next ping schedule        this.retryCount = 0;        if (masterPinger != null) {            masterPinger.stop();            masterPinger = null;        }        this.masterNode = null;    }    //......}
  • MasterFaultDetection的restart方法内部先执行innerStop,然后再执行innerStart;innerStop主要是执行masterPinger.stop()并设置masterPinger及masterNode为null;innerStart方法则创建并注册MasterPinger的延时任务,延时pingInterval执行

MasterPinger

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

    private class MasterPinger implements Runnable {        private volatile boolean running = true;        public void stop() {            this.running = false;        }        @Override        public void run() {            if (!running) {                // return and don't spawn...                return;            }            final DiscoveryNode masterToPing = masterNode;            if (masterToPing == null) {                // master is null, should not happen, but we are still running, so reschedule                threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);                return;            }            final MasterPingRequest request = new MasterPingRequest(                clusterStateSupplier.get().nodes().getLocalNode(), masterToPing, clusterName);            final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)                .withTimeout(pingRetryTimeout).build();            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options,                new TransportResponseHandler<MasterPingResponseResponse>() {                        @Override                        public MasterPingResponseResponse read(StreamInput in) throws IOException {                            return new MasterPingResponseResponse(in);                        }                        @Override                        public void handleResponse(MasterPingResponseResponse response) {                            if (!running) {                                return;                            }                            // reset the counter, we got a good result                            MasterFaultDetection.this.retryCount = 0;                            // check if the master node did not get switched on us..., if it did, we simply return with no reschedule                            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {                                // we don't stop on disconnection from master, we keep pinging it                                threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);                            }                        }                        @Override                        public void handleException(TransportException exp) {                            if (!running) {                                return;                            }                            synchronized (masterNodeMutex) {                                // check if the master node did not get switched on us...                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {                                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {                                        handleTransportDisconnect(masterToPing);                                        return;                                    } else if (exp.getCause() instanceof NotMasterException) {                                        logger.debug("[master] pinging a master {} that is no longer a master", masterNode);                                        notifyMasterFailure(masterToPing, exp, "no longer master");                                        return;                                    } else if (exp.getCause() instanceof ThisIsNotTheMasterYouAreLookingForException) {                                        logger.debug("[master] pinging a master {} that is not the master", masterNode);                                        notifyMasterFailure(masterToPing, exp,"not master");                                        return;                                    } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {                                        logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure"                                            , masterNode);                                        notifyMasterFailure(masterToPing, exp,"do not exists on master, act as master failure");                                        return;                                    }                                    int retryCount = ++MasterFaultDetection.this.retryCount;                                    logger.trace(() -> new ParameterizedMessage(                                            "[master] failed to ping [{}], retry [{}] out of [{}]",                                            masterNode, retryCount, pingRetryCount), exp);                                    if (retryCount >= pingRetryCount) {                                        logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout",                                            masterNode, pingRetryCount, pingRetryTimeout);                                        // not good, failure                                        notifyMasterFailure(masterToPing, null, "failed to ping, tried [" + pingRetryCount                                            + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");                                    } else {                                        // resend the request, not reschedule, rely on send timeout                                        transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);                                    }                                }                            }                        }                        @Override                        public String executor() {                            return ThreadPool.Names.SAME;                        }                    }            );        }    }    private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {        if (notifiedMasterFailure.compareAndSet(false, true)) {            try {                threadPool.generic().execute(() -> {                    for (Listener listener : listeners) {                        listener.onMasterFailure(masterNode, cause, reason);                    }                });            } catch (EsRejectedExecutionException e) {                logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);            }            stop("master failure, " + reason);        }    }
  • MasterPinger的run方法首先判断masterToPing是否为null,如果为null则在注册MasterPinger的延时任务;如果不为null则发送MasterPingRequest请求给masterToPing
  • TransportResponseHandler的handleResponse方法会清空MasterFaultDetection.this.retryCount,然后判断masterNode是否变化,没有变化则继续注册MasterPinger的延时任务
  • TransportResponseHandler的handleException方法会在masterNode没有变化的前提下对异常进行处理,如果是ConnectTransportException则执行handleTransportDisconnect方法,如果是NotMasterException、ThisIsNotTheMasterYouAreLookingForException、NodeDoesNotExistOnMasterException则执行notifyMasterFailure方法,其他异常则进行重试
  • 重试时先递增MasterFaultDetection.this.retryCount,如果重试次数大于等于pingRetryCount则直接执行notifyMasterFailure方法,否则进行重试发送MasterPingRequest请求
  • notifyMasterFailure方法则回调MasterFaultDetection.Listener的onMasterFailure方法

ZenDiscovery.MasterNodeFailureListener

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

    private class MasterNodeFailureListener implements MasterFaultDetection.Listener {        @Override        public void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason) {            handleMasterGone(masterNode, cause, reason);        }    }    private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {        if (lifecycleState() != Lifecycle.State.STARTED) {            // not started, ignore a master failure            return;        }        if (localNodeMaster()) {            // we might get this on both a master telling us shutting down, and then the disconnect failure            return;        }        logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);        synchronized (stateMutex) {            if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {                // flush any pending cluster states from old master, so it will not be set as master again                pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));                rejoin("master left (reason = " + reason + ")");            }        }    }    protected void rejoin(String reason) {        assert Thread.holdsLock(stateMutex);        ClusterState clusterState = committedState.get();        logger.warn("{}, current nodes: {}", reason, clusterState.nodes());        nodesFD.stop();        masterFD.stop(reason);        // TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle        // before a decision is made.        joinThreadControl.startNewThreadIfNotRunning();        if (clusterState.nodes().getMasterNodeId() != null) {            // remove block if it already exists before adding new one            assert clusterState.blocks().hasGlobalBlockWithId(noMasterBlockService.getNoMasterBlock().id()) == false :                "NO_MASTER_BLOCK should only be added by ZenDiscovery";            ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())                .addGlobalBlock(noMasterBlockService.getNoMasterBlock())                .build();            DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();            clusterState = ClusterState.builder(clusterState)                .blocks(clusterBlocks)                .nodes(discoveryNodes)                .build();            committedState.set(clusterState);            clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied        }    }    private class JoinThreadControl {        private final AtomicBoolean running = new AtomicBoolean(false);        private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>();        /** returns true if join thread control is started and there is currently an active join thread */        public boolean joinThreadActive() {            Thread currentThread = currentJoinThread.get();            return running.get() && currentThread != null && currentThread.isAlive();        }        /** returns true if join thread control is started and the supplied thread is the currently active joinThread */        public boolean joinThreadActive(Thread joinThread) {            return running.get() && joinThread.equals(currentJoinThread.get());        }        /** cleans any running joining thread and calls {@link #rejoin} */        public void stopRunningThreadAndRejoin(String reason) {            assert Thread.holdsLock(stateMutex);            currentJoinThread.set(null);            rejoin(reason);        }        /** starts a new joining thread if there is no currently active one and join thread controlling is started */        public void startNewThreadIfNotRunning() {            assert Thread.holdsLock(stateMutex);            if (joinThreadActive()) {                return;            }            threadPool.generic().execute(new Runnable() {                @Override                public void run() {                    Thread currentThread = Thread.currentThread();                    if (!currentJoinThread.compareAndSet(null, currentThread)) {                        return;                    }                    while (running.get() && joinThreadActive(currentThread)) {                        try {                            innerJoinCluster();                            return;                        } catch (Exception e) {                            logger.error("unexpected error while joining cluster, trying again", e);                            // Because we catch any exception here, we want to know in                            // tests if an uncaught exception got to this point and the test infra uncaught exception                            // leak detection can catch this. In practise no uncaught exception should leak                            assert ExceptionsHelper.reThrowIfNotNull(e);                        }                    }                    // cleaning the current thread from currentJoinThread is done by explicit calls.                }            });        }        /**         * marks the given joinThread as completed and makes sure another thread is running (starting one if needed)         * If the given thread is not the currently running join thread, the command is ignored.         */        public void markThreadAsDoneAndStartNew(Thread joinThread) {            assert Thread.holdsLock(stateMutex);            if (!markThreadAsDone(joinThread)) {                return;            }            startNewThreadIfNotRunning();        }        /** marks the given joinThread as completed. Returns false if the supplied thread is not the currently active join thread */        public boolean markThreadAsDone(Thread joinThread) {            assert Thread.holdsLock(stateMutex);            return currentJoinThread.compareAndSet(joinThread, null);        }        public void stop() {            running.set(false);            Thread joinThread = currentJoinThread.getAndSet(null);            if (joinThread != null) {                joinThread.interrupt();            }        }        public void start() {            running.set(true);        }    }
  • ZenDiscovery的MasterNodeFailureListener实现了MasterFaultDetection.Listener接口,其onMasterFailure方法执行的是handleMasterGone方法;handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear,然后进行rejoin
  • rejoin方法首先执行nodesFD.stop()及masterFD.stop(reason),然后触发joinThreadControl.startNewThreadIfNotRunning(),最后构造新的clusterState,执行clusterApplier.onNewClusterState
  • joinThreadControl.startNewThreadIfNotRunning()方法主要是执行innerJoinCluster方法

小结

  • FaultDetection实现了Closeable接口,它定义了FDConnectionListener,其构造器在registerConnectionListener为true的情况下会给transportService添加FDConnectionListener,而close方法则是将FDConnectionListener从transportService中移除;FaultDetection还定义了抽象方法handleTransportDisconnect
  • MasterFaultDetection继承了FaultDetection,其构造器给transportService注册了MasterPingRequestHandler;其handleTransportDisconnect方法在connectOnNetworkDisconnect为true的情况下会对node进行重试,如果重试成功则重新注册MasterPinger的延时任务,如果重试失败或者是connectOnNetworkDisconnect为false的情况下会调用notifyMasterFailure方法;notifyMasterFailure方法则会回调MasterFaultDetection.Listener的onMasterFailure方法
  • ZenDiscovery的MasterNodeFailureListener实现了MasterFaultDetection.Listener接口,其onMasterFailure方法执行的是handleMasterGone方法;handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear,然后进行rejoin
  • ZenDiscovery的processNextCommittedClusterState方法在当前node不是master的时候会在masterFD.masterNode()为null或者masterFD.masterNode()与newClusterState.nodes().getMasterNode()不同时执行masterFD.restart方法
  • MasterFaultDetection的restart方法内部先执行innerStop,然后再执行innerStart;innerStop主要是执行masterPinger.stop()并设置masterPinger及masterNode为null;innerStart方法则创建并注册MasterPinger的延时任务,延时pingInterval执行
  • MasterPinger的run方法首先判断masterToPing是否为null,如果为null则在注册MasterPinger的延时任务;如果不为null则发送MasterPingRequest请求给masterToPing;请求成功时会清空MasterFaultDetection.this.retryCount,然后判断masterNode是否变化,没有变化则继续注册MasterPinger的延时任务;请求失败则根据异常做不同处理,比如执行handleTransportDisconnect方法,或者执行notifyMasterFailure方法,或者则进行重试

doc

  • Cluster fault detection