共计 26401 个字符,预计需要花费 67 分钟才能阅读完成。
序
本文主要研究一下 elasticsearch 的 MembershipAction
MembershipAction
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
public class MembershipAction {private static final Logger logger = LogManager.getLogger(MembershipAction.class);
public static final String DISCOVERY_JOIN_ACTION_NAME = "internal:discovery/zen/join";
public static final String DISCOVERY_JOIN_VALIDATE_ACTION_NAME = "internal:discovery/zen/join/validate";
public static final String DISCOVERY_LEAVE_ACTION_NAME = "internal:discovery/zen/leave";
//......
private final TransportService transportService;
private final MembershipListener listener;
public MembershipAction(TransportService transportService, MembershipListener listener,
Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) {
this.transportService = transportService;
this.listener = listener;
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode),
EmptyTransportResponseHandler.INSTANCE_SAME);
}
public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
/**
* Validates the join request, throwing a failure if it failed.
*/
public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) {transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
//......
}
- MembershipAction 定义三类请求,分别是 LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的 TransportRequestHandler,分别是 LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler
TransportRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequest.java
public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest {
public static class Empty extends TransportRequest {public static final Empty INSTANCE = new Empty();
}
/**
* Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent".
*/
private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
public TransportRequest() {}
public TransportRequest(StreamInput in) throws IOException {parentTaskId = TaskId.readFromStream(in);
}
/**
* Set a reference to task that created this request.
*/
@Override
public void setParentTask(TaskId taskId) {this.parentTaskId = taskId;}
/**
* Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".
*/
@Override
public TaskId getParentTask() {return parentTaskId;}
@Override
public void readFrom(StreamInput in) throws IOException {super.readFrom(in);
parentTaskId = TaskId.readFromStream(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
parentTaskId.writeTo(out);
}
}
- TransportRequest 继承了 TransportMessage 类,同时声明实现 TaskAwareRequest 接口
LeaveRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
public static class LeaveRequest extends TransportRequest {
private DiscoveryNode node;
public LeaveRequest() {}
private LeaveRequest(DiscoveryNode node) {this.node = node;}
@Override
public void readFrom(StreamInput in) throws IOException {super.readFrom(in);
node = new DiscoveryNode(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
node.writeTo(out);
}
}
- LeaveRequest 继承了 TransportRequest,并覆盖了 readFrom 及 writeTo 方法,除了调用父类的对应方法外,还同时读取或写入 DiscoveryNode
JoinRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
public static class JoinRequest extends TransportRequest {
private DiscoveryNode node;
public DiscoveryNode getNode() {return node;}
public JoinRequest() {}
private JoinRequest(DiscoveryNode node) {this.node = node;}
@Override
public void readFrom(StreamInput in) throws IOException {super.readFrom(in);
node = new DiscoveryNode(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
node.writeTo(out);
}
}
- JoinRequest 继承了 TransportRequest,并覆盖了 readFrom 及 writeTo 方法,除了调用父类的对应方法外,还同时读取或写入 DiscoveryNode
ValidateJoinRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
static class ValidateJoinRequest extends TransportRequest {
private ClusterState state;
ValidateJoinRequest() {}
ValidateJoinRequest(ClusterState state) {this.state = state;}
@Override
public void readFrom(StreamInput in) throws IOException {super.readFrom(in);
this.state = ClusterState.readFrom(in, null);
}
@Override
public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
this.state.writeTo(out);
}
}
- ValidateJoinRequest 继承了 TransportRequest,并覆盖了 readFrom 及 writeTo 方法,除了调用父类的对应方法外,还同时读取或写入 ClusterState
TransportRequestHandler
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java
public interface TransportRequestHandler<T extends TransportRequest> {
/**
* Override this method if access to the Task parameter is needed
*/
default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception {messageReceived(request, channel);
}
void messageReceived(T request, TransportChannel channel) throws Exception;
}
- TransportRequestHandler 接口定义了 messageReceived 方法,同时还提供了一个 messageReceived 的 default 方法
LeaveRequestRequestHandler
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
private class LeaveRequestRequestHandler implements TransportRequestHandler<LeaveRequest> {
@Override
public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception {listener.onLeave(request.node);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
- LeaveRequestRequestHandler 实现了 TransportRequestHandler 接口,它的 messageReceived 主要是调用了 MembershipListener 的 onLeave 方法
JoinRequestRequestHandler
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
private class JoinRequestRequestHandler implements TransportRequestHandler<JoinRequest> {
@Override
public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {listener.onJoin(request.getNode(), new JoinCallback() {
@Override
public void onSuccess() {
try {channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {channel.sendResponse(e);
} catch (Exception inner) {inner.addSuppressed(e);
logger.warn("failed to send back failure on join request", inner);
}
}
});
}
}
- JoinRequestRequestHandler 实现了 TransportRequestHandler 接口,它的 messageReceived 主要是调用了 MembershipListener 的 onJoin 方法
ValidateJoinRequestRequestHandler
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
private final Supplier<DiscoveryNode> localNodeSupplier;
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators;
ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
this.localNodeSupplier = localNodeSupplier;
this.joinValidators = joinValidators;
}
@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {DiscoveryNode node = localNodeSupplier.get();
assert node != null : "local node is null";
joinValidators.stream().forEach(action -> action.accept(node, request.state));
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
- ValidateJoinRequestRequestHandler 实现了 TransportRequestHandler 接口,它的 messageReceived 主要是调用 joinValidators 挨个进行校验
MembershipAction.MembershipListener
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
public interface MembershipListener {void onJoin(DiscoveryNode node, JoinCallback callback);
void onLeave(DiscoveryNode node);
}
public interface JoinCallback {void onSuccess();
void onFailure(Exception e);
}
- MembershipListener 接口定义了 onJoin 及 onLeave 方法,其中 onJoin 方法接收 JoinCallback;它有一个同名实现类,在 ZenDiscovery 类中
ZenDiscovery.MembershipListener
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
//......
private class MembershipListener implements MembershipAction.MembershipListener {
@Override
public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {handleJoinRequest(node, ZenDiscovery.this.clusterState(), callback);
}
@Override
public void onLeave(DiscoveryNode node) {handleLeaveRequest(node);
}
}
void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) {if (nodeJoinController == null) {throw new IllegalStateException("discovery module is not yet started");
} else {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
onJoinValidators.stream().forEach(a -> a.accept(node, state));
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
}
// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);
// validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request
try {membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
} catch (Exception e) {logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),
e);
callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
return;
}
nodeJoinController.handleJoinRequest(node, callback);
}
}
private void handleLeaveRequest(final DiscoveryNode node) {if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
return;
}
if (localNodeMaster()) {removeNode(node, "zen-disco-node-left", "left");
} else if (node.equals(clusterState().nodes().getMasterNode())) {handleMasterGone(node, null, "shut_down");
}
}
private void removeNode(final DiscoveryNode node, final String source, final String reason) {
masterService.submitStateUpdateTask(source + "(" + node + "), reason(" + reason + ")",
new NodeRemovalClusterStateTaskExecutor.Task(node, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
nodeRemovalExecutor);
}
private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a master failure
return;
}
if (localNodeMaster()) {
// we might get this on both a master telling us shutting down, and then the disconnect failure
return;
}
logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);
synchronized (stateMutex) {if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
// flush any pending cluster states from old master, so it will not be set as master again
pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
rejoin("master left (reason =" + reason + ")");
}
}
}
//......
}
- ZenDiscovery.MembershipListener 的 onJoin 方法调用了 handleJoinRequest 方法,该方法主要是调用了 nodeJoinController.handleJoinRequest(node, callback);onLeave 方法调用了 handleLeaveRequest 方法,该方法针对 local 的执行 removeNode,否则执行 handleMasterGone
- removeNode 方法主要是执行 masterService.submitStateUpdateTask,传递的 ClusterStateTaskExecutor 及 ClusterStateTaskListener 均为 nodeRemovalExecutor
- handleMasterGone 方法主要是执行 pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException(“master left [{}]”, reason))
NodeJoinController.handleJoinRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java
public class NodeJoinController {
//......
/**
* processes or queues an incoming join request.
* <p>
* Note: doesn't do any validation. This should have been done before.
*/
public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {if (electionContext != null) {electionContext.addIncomingJoin(node, callback);
checkPendingJoinsAndElectIfNeeded();} else {
masterService.submitStateUpdateTask("zen-disco-node-join",
node, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor, new JoinTaskListener(callback, logger));
}
}
//......
}
- NodeJoinController 的 handleJoinRequest 方法在 electionContext 不为 null 的时候执行 electionContext.addIncomingJoin;否则执行 masterService.submitStateUpdateTask,传递的 ClusterStateTaskExecutor 为 joinTaskExecutor,ClusterStateTaskListener 为 JoinTaskListener
MasterService.submitStateUpdateTask
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java
public class MasterService extends AbstractLifecycleComponent {
//......
public <T> void submitStateUpdateTask(String source, T task,
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor,
ClusterStateTaskListener listener) {submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
}
public <T> void submitStateUpdateTasks(final String source,
final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor) {if (!lifecycle.started()) {return;}
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {threadContext.markAsSystemContext();
List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
.collect(Collectors.toList());
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {throw e;}
}
}
class Batcher extends TaskBatcher {Batcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {super(logger, threadExecutor);
}
@Override
protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {threadPool.generic().execute(() -> tasks.forEach(task -> ((UpdateTask) task).listener.onFailure(task.source,
new ProcessClusterEventTimeoutException(timeout, task.source))));
}
@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
}
class UpdateTask extends BatchedTask {
final ClusterStateTaskListener listener;
UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
ClusterStateTaskExecutor<?> executor) {super(priority, source, executor, task);
this.listener = listener;
}
@Override
public String describeTasks(List<? extends BatchedTask> tasks) {return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
}
}
}
protected class TaskInputs {
public final String summary;
public final List<Batcher.UpdateTask> updateTasks;
public final ClusterStateTaskExecutor<Object> executor;
TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {
this.summary = summary;
this.executor = executor;
this.updateTasks = updateTasks;
}
public boolean runOnlyWhenMaster() {return executor.runOnlyOnMaster();
}
public void onNoLongerMaster() {updateTasks.forEach(task -> task.listener.onNoLongerMaster(task.source()));
}
}
//......
}
- submitStateUpdateTasks 方法主要是创建 Batcher.UpdateTask,然后通过 taskBatcher.submitTasks 提交运行;Batcher 继承了 TaskBatcher,其 run 方法是调用 runTasks 方法,传递的参数为 TaskInputs;TaskInputs 的 runOnlyWhenMaster 方法调用的是 executor.runOnlyOnMaster(),onNoLongerMaster 调用的是 task.listener.onNoLongerMaster(task.source()) 方法
ClusterStateTaskExecutor
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
public interface ClusterStateTaskExecutor<T> {ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
default boolean runOnlyOnMaster() {return true;}
default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { }
default String describeTasks(List<T> tasks) {return String.join(",", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator);
}
//......
}
- ClusterStateTaskExecutor 定义了 execute 方法,同时提供了 runOnlyOnMaster、clusterStatePublished、describeTasks 这几个 default 方法;它有很多实现类,比如 JoinTaskExecutor、NodeRemovalClusterStateTaskExecutor
JoinTaskExecutor
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java
// visible for testing
public static class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
private final AllocationService allocationService;
private final ElectMasterService electMasterService;
private final Logger logger;
private final int minimumMasterNodesOnLocalNode;
public JoinTaskExecutor(Settings settings, AllocationService allocationService, ElectMasterService electMasterService,
Logger logger) {
this.allocationService = allocationService;
this.electMasterService = electMasterService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
}
@Override
public ClusterTasksResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {final ClusterTasksResult.Builder<DiscoveryNode> results = ClusterTasksResult.builder();
final DiscoveryNodes currentNodes = currentState.nodes();
boolean nodesChanged = false;
ClusterState.Builder newState;
if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {return results.successes(joiningNodes).build(currentState);
} else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished" + joiningNodes;
// use these joins to try and become the master.
// Note that we don't have to do any validation of the amount of joining nodes - the commit
// during the cluster state publishing guarantees that we have enough
newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);
nodesChanged = true;
} else if (currentNodes.isLocalNodeElectedMaster() == false) {logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
} else {newState = ClusterState.builder(currentState);
}
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
assert nodesBuilder.isLocalNodeElectedMaster();
Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();
Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion();
// we only enforce major version transitions on a fully formed clusters
final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
// processing any joins
for (final DiscoveryNode node : joiningNodes) {if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {// noop} else if (currentNodes.nodeExists(node)) {logger.debug("received a join request for an existing node [{}]", node);
} else {
try {if (enforceMajorVersion) {MembershipAction.ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
}
MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData());
nodesBuilder.add(node);
nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
} catch (IllegalArgumentException | IllegalStateException e) {results.failure(node, e);
continue;
}
}
results.success(node);
}
if (nodesChanged) {newState.nodes(nodesBuilder);
return results.build(allocationService.reroute(newState.build(), "node_join"));
} else {
// we must return a new cluster state instance to force publishing. This is important
// for the joining node to finalize its join and set us as a master
return results.build(newState.build());
}
}
//......
@Override
public boolean runOnlyOnMaster() {
// we validate that we are allowed to change the cluster state during cluster state processing
return false;
}
@Override
public void clusterStatePublished(ClusterChangedEvent event) {electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
}
}
- JoinTaskExecutor 的 execute 方法要是构建根据 joiningNodes 构建 ClusterState,如果 nodes 有变化,则调用 allocationService.reroute(newState.build(), “node_join”) 对 location 进行 reroute
NodeRemovalClusterStateTaskExecutor
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
public static class NodeRemovalClusterStateTaskExecutor
implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {
private final AllocationService allocationService;
private final ElectMasterService electMasterService;
private final Consumer<String> rejoin;
private final Logger logger;
public static class Task {
private final DiscoveryNode node;
private final String reason;
public Task(final DiscoveryNode node, final String reason) {
this.node = node;
this.reason = reason;
}
public DiscoveryNode node() {return node;}
public String reason() {return reason;}
@Override
public String toString() {return node + " " + reason;}
}
public NodeRemovalClusterStateTaskExecutor(
final AllocationService allocationService,
final ElectMasterService electMasterService,
final Consumer<String> rejoin,
final Logger logger) {
this.allocationService = allocationService;
this.electMasterService = electMasterService;
this.rejoin = rejoin;
this.logger = logger;
}
@Override
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
boolean removed = false;
for (final Task task : tasks) {if (currentState.nodes().nodeExists(task.node())) {remainingNodesBuilder.remove(task.node());
removed = true;
} else {logger.debug("node [{}] does not exist in cluster state, ignoring", task);
}
}
if (!removed) {
// no nodes to remove, keep the current cluster state
return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
}
final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
masterNodes, electMasterService.minimumMasterNodes()));
return resultBuilder.build(currentState);
} else {ClusterState ptasksDisassociatedState = PersistentTasksCustomMetaData.disassociateDeadNodes(remainingNodesClusterState);
return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));
}
}
// visible for testing
// hook is used in testing to ensure that correct cluster state is used to test whether a
// rejoin or reroute is needed
ClusterState remainingNodesClusterState(final ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) {return ClusterState.builder(currentState).nodes(remainingNodesBuilder).build();}
@Override
public void onFailure(final String source, final Exception e) {logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
}
@Override
public void onNoLongerMaster(String source) {logger.debug("no longer master while processing node removal [{}]", source);
}
}
- NodeRemovalClusterStateTaskExecutor 同时实现了 ClusterStateTaskExecutor, ClusterStateTaskListener 接口;其 execute 方法主要是从 currentState 移除相应的 node,构建 remainingNodesClusterState,对于 hasEnoughMasterNodes 的情况则执行 allocationService.disassociateDeadNodes,否则执行名为 rejoin 的 Consumer
小结
- MembershipAction 定义三类请求,分别是 LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的 TransportRequestHandler,分别是 LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler
- LeaveRequestRequestHandler 实现了 TransportRequestHandler 接口,它的 messageReceived 主要是调用了 MembershipListener 的 onLeave 方法;JoinRequestRequestHandler 实现了 TransportRequestHandler 接口,它的 messageReceived 主要是调用了 MembershipListener 的 onJoin 方法;ValidateJoinRequestRequestHandler 实现了 TransportRequestHandler 接口,它的 messageReceived 主要是调用 joinValidators 挨个进行校验
- MembershipListener 接口定义了 onJoin 及 onLeave 方法,其中 onJoin 方法接收 JoinCallback;它有一个同名实现类,在 ZenDiscovery 类中;ZenDiscovery.MembershipListener 的 onJoin 方法调用了 handleJoinRequest 方法,该方法主要是调用了 nodeJoinController.handleJoinRequest(node, callback);onLeave 方法调用了 handleLeaveRequest 方法,该方法针对 local 的执行 removeNode,否则执行 handleMasterGone;removeNode 方法主要是执行 masterService.submitStateUpdateTask,传递的 ClusterStateTaskExecutor 及 ClusterStateTaskListener 均为 nodeRemovalExecutor;handleMasterGone 方法主要是执行 pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException(“master left [{}]”, reason))
- NodeJoinController 的 handleJoinRequest 方法在 electionContext 不为 null 的时候执行 electionContext.addIncomingJoin;否则执行 masterService.submitStateUpdateTask,传递的 ClusterStateTaskExecutor 为 joinTaskExecutor,ClusterStateTaskListener 为 JoinTaskListener
- MasterService 的 submitStateUpdateTasks 方法主要是创建 Batcher.UpdateTask,然后通过 taskBatcher.submitTasks 提交运行;Batcher 继承了 TaskBatcher,其 run 方法是调用 runTasks 方法,传递的参数为 TaskInputs;TaskInputs 的 runOnlyWhenMaster 方法调用的是 executor.runOnlyOnMaster(),onNoLongerMaster 调用的是 task.listener.onNoLongerMaster(task.source()) 方法
- ClusterStateTaskExecutor 定义了 execute 方法,同时提供了 runOnlyOnMaster、clusterStatePublished、describeTasks 这几个 default 方法;它有很多实现类,比如 JoinTaskExecutor、NodeRemovalClusterStateTaskExecutor;JoinTaskExecutor 的 execute 方法要是构建根据 joiningNodes 构建 ClusterState,如果 nodes 有变化,则调用 allocationService.reroute(newState.build(), “node_join”) 对 location 进行 reroute;NodeRemovalClusterStateTaskExecutor 同时实现了 ClusterStateTaskExecutor, ClusterStateTaskListener 接口;其 execute 方法主要是从 currentState 移除相应的 node,构建 remainingNodesClusterState,对于 hasEnoughMasterNodes 的情况则执行 allocationService.disassociateDeadNodes,否则执行名为 rejoin 的 Consumer
doc
- MembershipAction