本文主要研究一下elasticsearch的MembershipAction

MembershipAction

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

public class MembershipAction {    private static final Logger logger = LogManager.getLogger(MembershipAction.class);    public static final String DISCOVERY_JOIN_ACTION_NAME = "internal:discovery/zen/join";    public static final String DISCOVERY_JOIN_VALIDATE_ACTION_NAME = "internal:discovery/zen/join/validate";    public static final String DISCOVERY_LEAVE_ACTION_NAME = "internal:discovery/zen/leave";    //......    private final TransportService transportService;    private final MembershipListener listener;    public MembershipAction(TransportService transportService, MembershipListener listener,                            Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) {        this.transportService = transportService;        this.listener = listener;        transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,            ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());        transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,            () -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,            new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));        transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,            ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());    }    public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {        transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode),            EmptyTransportResponseHandler.INSTANCE_SAME);    }    public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {        transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node),            EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);    }    public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {        transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node),            EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);    }    /**     * Validates the join request, throwing a failure if it failed.     */    public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) {        transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state),            EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);    }    //......}
  • MembershipAction定义三类请求,分别是LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的TransportRequestHandler,分别是LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler

TransportRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequest.java

public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest {    public static class Empty extends TransportRequest {        public static final Empty INSTANCE = new Empty();    }    /**     * Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent".     */    private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;    public TransportRequest() {    }    public TransportRequest(StreamInput in) throws IOException {        parentTaskId = TaskId.readFromStream(in);    }    /**     * Set a reference to task that created this request.     */    @Override    public void setParentTask(TaskId taskId) {        this.parentTaskId = taskId;    }    /**     * Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".     */    @Override    public TaskId getParentTask() {        return parentTaskId;    }    @Override    public void readFrom(StreamInput in) throws IOException {        super.readFrom(in);        parentTaskId = TaskId.readFromStream(in);    }    @Override    public void writeTo(StreamOutput out) throws IOException {        super.writeTo(out);        parentTaskId.writeTo(out);    }}
  • TransportRequest继承了TransportMessage类,同时声明实现TaskAwareRequest接口

LeaveRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    public static class LeaveRequest extends TransportRequest {        private DiscoveryNode node;        public LeaveRequest() {        }        private LeaveRequest(DiscoveryNode node) {            this.node = node;        }        @Override        public void readFrom(StreamInput in) throws IOException {            super.readFrom(in);            node = new DiscoveryNode(in);        }        @Override        public void writeTo(StreamOutput out) throws IOException {            super.writeTo(out);            node.writeTo(out);        }    }
  • LeaveRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入DiscoveryNode

JoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    public static class JoinRequest extends TransportRequest {        private DiscoveryNode node;        public DiscoveryNode getNode() {            return node;        }        public JoinRequest() {        }        private JoinRequest(DiscoveryNode node) {            this.node = node;        }        @Override        public void readFrom(StreamInput in) throws IOException {            super.readFrom(in);            node = new DiscoveryNode(in);        }        @Override        public void writeTo(StreamOutput out) throws IOException {            super.writeTo(out);            node.writeTo(out);        }    }
  • JoinRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入DiscoveryNode

ValidateJoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    static class ValidateJoinRequest extends TransportRequest {        private ClusterState state;        ValidateJoinRequest() {}        ValidateJoinRequest(ClusterState state) {            this.state = state;        }        @Override        public void readFrom(StreamInput in) throws IOException {            super.readFrom(in);            this.state = ClusterState.readFrom(in, null);        }        @Override        public void writeTo(StreamOutput out) throws IOException {            super.writeTo(out);            this.state.writeTo(out);        }    }
  • ValidateJoinRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入ClusterState

TransportRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java

public interface TransportRequestHandler<T extends TransportRequest> {    /**     * Override this method if access to the Task parameter is needed     */    default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception {        messageReceived(request, channel);    }    void messageReceived(T request, TransportChannel channel) throws Exception;}
  • TransportRequestHandler接口定义了messageReceived方法,同时还提供了一个messageReceived的default方法

LeaveRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    private class LeaveRequestRequestHandler implements TransportRequestHandler<LeaveRequest> {        @Override        public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception {            listener.onLeave(request.node);            channel.sendResponse(TransportResponse.Empty.INSTANCE);        }    }
  • LeaveRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onLeave方法

JoinRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    private class JoinRequestRequestHandler implements TransportRequestHandler<JoinRequest> {        @Override        public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {            listener.onJoin(request.getNode(), new JoinCallback() {                @Override                public void onSuccess() {                    try {                        channel.sendResponse(TransportResponse.Empty.INSTANCE);                    } catch (Exception e) {                        onFailure(e);                    }                }                @Override                public void onFailure(Exception e) {                    try {                        channel.sendResponse(e);                    } catch (Exception inner) {                        inner.addSuppressed(e);                        logger.warn("failed to send back failure on join request", inner);                    }                }            });        }    }
  • JoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onJoin方法

ValidateJoinRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {        private final Supplier<DiscoveryNode> localNodeSupplier;        private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators;        ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier,                                          Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {            this.localNodeSupplier = localNodeSupplier;            this.joinValidators = joinValidators;        }        @Override        public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {            DiscoveryNode node = localNodeSupplier.get();            assert node != null : "local node is null";            joinValidators.stream().forEach(action -> action.accept(node, request.state));            channel.sendResponse(TransportResponse.Empty.INSTANCE);        }    }
  • ValidateJoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用joinValidators挨个进行校验

MembershipAction.MembershipListener

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    public interface MembershipListener {        void onJoin(DiscoveryNode node, JoinCallback callback);        void onLeave(DiscoveryNode node);    }    public interface JoinCallback {        void onSuccess();        void onFailure(Exception e);    }
  • MembershipListener接口定义了onJoin及onLeave方法,其中onJoin方法接收JoinCallback;它有一个同名实现类,在ZenDiscovery类中

ZenDiscovery.MembershipListener

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {    //......    private class MembershipListener implements MembershipAction.MembershipListener {        @Override        public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {            handleJoinRequest(node, ZenDiscovery.this.clusterState(), callback);        }        @Override        public void onLeave(DiscoveryNode node) {            handleLeaveRequest(node);        }    }    void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) {        if (nodeJoinController == null) {            throw new IllegalStateException("discovery module is not yet started");        } else {            // we do this in a couple of places including the cluster update thread. This one here is really just best effort            // to ensure we fail as fast as possible.            onJoinValidators.stream().forEach(a -> a.accept(node, state));            if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {                MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());            }            // try and connect to the node, if it fails, we can raise an exception back to the client...            transportService.connectToNode(node);            // validate the join request, will throw a failure if it fails, which will get back to the            // node calling the join request            try {                membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);            } catch (Exception e) {                logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),                    e);                callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));                return;            }            nodeJoinController.handleJoinRequest(node, callback);        }    }    private void handleLeaveRequest(final DiscoveryNode node) {        if (lifecycleState() != Lifecycle.State.STARTED) {            // not started, ignore a node failure            return;        }        if (localNodeMaster()) {            removeNode(node, "zen-disco-node-left", "left");        } else if (node.equals(clusterState().nodes().getMasterNode())) {            handleMasterGone(node, null, "shut_down");        }    }    private void removeNode(final DiscoveryNode node, final String source, final String reason) {        masterService.submitStateUpdateTask(                source + "(" + node + "), reason(" + reason + ")",                new NodeRemovalClusterStateTaskExecutor.Task(node, reason),                ClusterStateTaskConfig.build(Priority.IMMEDIATE),                nodeRemovalExecutor,                nodeRemovalExecutor);    }    private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {        if (lifecycleState() != Lifecycle.State.STARTED) {            // not started, ignore a master failure            return;        }        if (localNodeMaster()) {            // we might get this on both a master telling us shutting down, and then the disconnect failure            return;        }        logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);        synchronized (stateMutex) {            if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {                // flush any pending cluster states from old master, so it will not be set as master again                pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));                rejoin("master left (reason = " + reason + ")");            }        }    }    //......}
  • ZenDiscovery.MembershipListener的onJoin方法调用了handleJoinRequest方法,该方法主要是调用了nodeJoinController.handleJoinRequest(node, callback);onLeave方法调用了handleLeaveRequest方法,该方法针对local的执行removeNode,否则执行handleMasterGone
  • removeNode方法主要是执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor及ClusterStateTaskListener均为nodeRemovalExecutor
  • handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason))

NodeJoinController.handleJoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

public class NodeJoinController {    //......    /**     * processes or queues an incoming join request.     * <p>     * Note: doesn't do any validation. This should have been done before.     */    public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {        if (electionContext != null) {            electionContext.addIncomingJoin(node, callback);            checkPendingJoinsAndElectIfNeeded();        } else {            masterService.submitStateUpdateTask("zen-disco-node-join",                node, ClusterStateTaskConfig.build(Priority.URGENT),                joinTaskExecutor, new JoinTaskListener(callback, logger));        }    }    //......}
  • NodeJoinController的handleJoinRequest方法在electionContext不为null的时候执行electionContext.addIncomingJoin;否则执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor为joinTaskExecutor,ClusterStateTaskListener为JoinTaskListener

MasterService.submitStateUpdateTask

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

public class MasterService extends AbstractLifecycleComponent {    //......    public <T> void submitStateUpdateTask(String source, T task,                                          ClusterStateTaskConfig config,                                          ClusterStateTaskExecutor<T> executor,                                          ClusterStateTaskListener listener) {        submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);    }    public <T> void submitStateUpdateTasks(final String source,                                           final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,                                           final ClusterStateTaskExecutor<T> executor) {        if (!lifecycle.started()) {            return;        }        final ThreadContext threadContext = threadPool.getThreadContext();        final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);        try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {            threadContext.markAsSystemContext();            List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()                .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))                .collect(Collectors.toList());            taskBatcher.submitTasks(safeTasks, config.timeout());        } catch (EsRejectedExecutionException e) {            // ignore cases where we are shutting down..., there is really nothing interesting            // to be done here...            if (!lifecycle.stoppedOrClosed()) {                throw e;            }        }    }    class Batcher extends TaskBatcher {        Batcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {            super(logger, threadExecutor);        }        @Override        protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {            threadPool.generic().execute(                () -> tasks.forEach(                    task -> ((UpdateTask) task).listener.onFailure(task.source,                        new ProcessClusterEventTimeoutException(timeout, task.source))));        }        @Override        protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {            ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;            List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;            runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));        }        class UpdateTask extends BatchedTask {            final ClusterStateTaskListener listener;            UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,                       ClusterStateTaskExecutor<?> executor) {                super(priority, source, executor, task);                this.listener = listener;            }            @Override            public String describeTasks(List<? extends BatchedTask> tasks) {                return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(                    tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));            }        }    }    protected class TaskInputs {        public final String summary;        public final List<Batcher.UpdateTask> updateTasks;        public final ClusterStateTaskExecutor<Object> executor;        TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {            this.summary = summary;            this.executor = executor;            this.updateTasks = updateTasks;        }        public boolean runOnlyWhenMaster() {            return executor.runOnlyOnMaster();        }        public void onNoLongerMaster() {            updateTasks.forEach(task -> task.listener.onNoLongerMaster(task.source()));        }    }    //......}
  • submitStateUpdateTasks方法主要是创建Batcher.UpdateTask,然后通过taskBatcher.submitTasks提交运行;Batcher继承了TaskBatcher,其run方法是调用runTasks方法,传递的参数为TaskInputs;TaskInputs的runOnlyWhenMaster方法调用的是executor.runOnlyOnMaster(),onNoLongerMaster调用的是task.listener.onNoLongerMaster(task.source())方法

ClusterStateTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java

public interface ClusterStateTaskExecutor<T> {    ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;    default boolean runOnlyOnMaster() {        return true;    }    default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {    }    default String describeTasks(List<T> tasks) {        return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator);    }    //......}
  • ClusterStateTaskExecutor定义了execute方法,同时提供了runOnlyOnMaster、clusterStatePublished、describeTasks这几个default方法;它有很多实现类,比如JoinTaskExecutor、NodeRemovalClusterStateTaskExecutor

JoinTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

    // visible for testing    public static class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {        private final AllocationService allocationService;        private final ElectMasterService electMasterService;        private final Logger logger;        private final int minimumMasterNodesOnLocalNode;        public JoinTaskExecutor(Settings settings, AllocationService allocationService, ElectMasterService electMasterService,                                Logger logger) {            this.allocationService = allocationService;            this.electMasterService = electMasterService;            this.logger = logger;            minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);        }        @Override        public ClusterTasksResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {            final ClusterTasksResult.Builder<DiscoveryNode> results = ClusterTasksResult.builder();            final DiscoveryNodes currentNodes = currentState.nodes();            boolean nodesChanged = false;            ClusterState.Builder newState;            if (joiningNodes.size() == 1  && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {                return results.successes(joiningNodes).build(currentState);            } else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {                assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes;                // use these joins to try and become the master.                // Note that we don't have to do any validation of the amount of joining nodes - the commit                // during the cluster state publishing guarantees that we have enough                newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);                nodesChanged = true;            } else if (currentNodes.isLocalNodeElectedMaster() == false) {                logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());                throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");            } else {                newState = ClusterState.builder(currentState);            }            DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());            assert nodesBuilder.isLocalNodeElectedMaster();            Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();            Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion();            // we only enforce major version transitions on a fully formed clusters            final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;            // processing any joins            for (final DiscoveryNode node : joiningNodes) {                if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {                    // noop                } else if (currentNodes.nodeExists(node)) {                    logger.debug("received a join request for an existing node [{}]", node);                } else {                    try {                        if (enforceMajorVersion) {                            MembershipAction.ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);                        }                        MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);                        // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices                        // we have to reject nodes that don't support all indices we have in this cluster                        MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData());                        nodesBuilder.add(node);                        nodesChanged = true;                        minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());                        maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());                    } catch (IllegalArgumentException | IllegalStateException e) {                        results.failure(node, e);                        continue;                    }                }                results.success(node);            }            if (nodesChanged) {                newState.nodes(nodesBuilder);                return results.build(allocationService.reroute(newState.build(), "node_join"));            } else {                // we must return a new cluster state instance to force publishing. This is important                // for the joining node to finalize its join and set us as a master                return results.build(newState.build());            }        }        //......        @Override        public boolean runOnlyOnMaster() {            // we validate that we are allowed to change the cluster state during cluster state processing            return false;        }        @Override        public void clusterStatePublished(ClusterChangedEvent event) {            electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());        }    }
  • JoinTaskExecutor的execute方法要是构建根据joiningNodes构建ClusterState,如果nodes有变化,则调用allocationService.reroute(newState.build(), "node_join")对location进行reroute

NodeRemovalClusterStateTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

    public static class NodeRemovalClusterStateTaskExecutor        implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {        private final AllocationService allocationService;        private final ElectMasterService electMasterService;        private final Consumer<String> rejoin;        private final Logger logger;        public static class Task {            private final DiscoveryNode node;            private final String reason;            public Task(final DiscoveryNode node, final String reason) {                this.node = node;                this.reason = reason;            }            public DiscoveryNode node() {                return node;            }            public String reason() {                return reason;            }            @Override            public String toString() {                return node + " " + reason;            }        }        public NodeRemovalClusterStateTaskExecutor(                final AllocationService allocationService,                final ElectMasterService electMasterService,                final Consumer<String> rejoin,                final Logger logger) {            this.allocationService = allocationService;            this.electMasterService = electMasterService;            this.rejoin = rejoin;            this.logger = logger;        }        @Override        public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {            final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());            boolean removed = false;            for (final Task task : tasks) {                if (currentState.nodes().nodeExists(task.node())) {                    remainingNodesBuilder.remove(task.node());                    removed = true;                } else {                    logger.debug("node [{}] does not exist in cluster state, ignoring", task);                }            }            if (!removed) {                // no nodes to remove, keep the current cluster state                return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);            }            final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);            final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);            if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {                final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());                rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",                                                         masterNodes, electMasterService.minimumMasterNodes()));                return resultBuilder.build(currentState);            } else {                ClusterState ptasksDisassociatedState = PersistentTasksCustomMetaData.disassociateDeadNodes(remainingNodesClusterState);                return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));            }        }        // visible for testing        // hook is used in testing to ensure that correct cluster state is used to test whether a        // rejoin or reroute is needed        ClusterState remainingNodesClusterState(final ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) {            return ClusterState.builder(currentState).nodes(remainingNodesBuilder).build();        }        @Override        public void onFailure(final String source, final Exception e) {            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);        }        @Override        public void onNoLongerMaster(String source) {            logger.debug("no longer master while processing node removal [{}]", source);        }    }
  • NodeRemovalClusterStateTaskExecutor同时实现了ClusterStateTaskExecutor, ClusterStateTaskListener接口;其execute方法主要是从currentState移除相应的node,构建remainingNodesClusterState,对于hasEnoughMasterNodes的情况则执行allocationService.disassociateDeadNodes,否则执行名为rejoin的Consumer

小结

  • MembershipAction定义三类请求,分别是LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的TransportRequestHandler,分别是LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler
  • LeaveRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onLeave方法;JoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onJoin方法;ValidateJoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用joinValidators挨个进行校验
  • MembershipListener接口定义了onJoin及onLeave方法,其中onJoin方法接收JoinCallback;它有一个同名实现类,在ZenDiscovery类中;ZenDiscovery.MembershipListener的onJoin方法调用了handleJoinRequest方法,该方法主要是调用了nodeJoinController.handleJoinRequest(node, callback);onLeave方法调用了handleLeaveRequest方法,该方法针对local的执行removeNode,否则执行handleMasterGone;removeNode方法主要是执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor及ClusterStateTaskListener均为nodeRemovalExecutor;handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason))
  • NodeJoinController的handleJoinRequest方法在electionContext不为null的时候执行electionContext.addIncomingJoin;否则执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor为joinTaskExecutor,ClusterStateTaskListener为JoinTaskListener
  • MasterService的submitStateUpdateTasks方法主要是创建Batcher.UpdateTask,然后通过taskBatcher.submitTasks提交运行;Batcher继承了TaskBatcher,其run方法是调用runTasks方法,传递的参数为TaskInputs;TaskInputs的runOnlyWhenMaster方法调用的是executor.runOnlyOnMaster(),onNoLongerMaster调用的是task.listener.onNoLongerMaster(task.source())方法
  • ClusterStateTaskExecutor定义了execute方法,同时提供了runOnlyOnMaster、clusterStatePublished、describeTasks这几个default方法;它有很多实现类,比如JoinTaskExecutor、NodeRemovalClusterStateTaskExecutor;JoinTaskExecutor的execute方法要是构建根据joiningNodes构建ClusterState,如果nodes有变化,则调用allocationService.reroute(newState.build(), "node_join")对location进行reroute;NodeRemovalClusterStateTaskExecutor同时实现了ClusterStateTaskExecutor, ClusterStateTaskListener接口;其execute方法主要是从currentState移除相应的node,构建remainingNodesClusterState,对于hasEnoughMasterNodes的情况则执行allocationService.disassociateDeadNodes,否则执行名为rejoin的Consumer

doc

  • MembershipAction