本文主要研究一下elasticsearch的LagDetector

LagDetector

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java

/** * A publication can succeed and complete before all nodes have applied the published state and acknowledged it; however we need every node * eventually either to apply the published state (or a later state) or be removed from the cluster. This component achieves this by * removing any lagging nodes from the cluster after a timeout. */public class LagDetector {    private static final Logger logger = LogManager.getLogger(LagDetector.class);    // the timeout for each node to apply a cluster state update after the leader has applied it, before being removed from the cluster    public static final Setting<TimeValue> CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING =        Setting.timeSetting("cluster.follower_lag.timeout",            TimeValue.timeValueMillis(90000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);    private final TimeValue clusterStateApplicationTimeout;    private final Consumer<DiscoveryNode> onLagDetected;    private final Supplier<DiscoveryNode> localNodeSupplier;    private final ThreadPool threadPool;    private final Map<DiscoveryNode, NodeAppliedStateTracker> appliedStateTrackersByNode = newConcurrentMap();    public LagDetector(final Settings settings, final ThreadPool threadPool, final Consumer<DiscoveryNode> onLagDetected,                       final Supplier<DiscoveryNode> localNodeSupplier) {        this.threadPool = threadPool;        this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings);        this.onLagDetected = onLagDetected;        this.localNodeSupplier = localNodeSupplier;    }    public void setTrackedNodes(final Iterable<DiscoveryNode> discoveryNodes) {        final Set<DiscoveryNode> discoveryNodeSet = new HashSet<>();        discoveryNodes.forEach(discoveryNodeSet::add);        discoveryNodeSet.remove(localNodeSupplier.get());        appliedStateTrackersByNode.keySet().retainAll(discoveryNodeSet);        discoveryNodeSet.forEach(node -> appliedStateTrackersByNode.putIfAbsent(node, new NodeAppliedStateTracker(node)));    }    public void clearTrackedNodes() {        appliedStateTrackersByNode.clear();    }    public void setAppliedVersion(final DiscoveryNode discoveryNode, final long appliedVersion) {        final NodeAppliedStateTracker nodeAppliedStateTracker = appliedStateTrackersByNode.get(discoveryNode);        if (nodeAppliedStateTracker == null) {            // Received an ack from a node that a later publication has removed (or we are no longer master). No big deal.            logger.trace("node {} applied version {} but this node's version is not being tracked", discoveryNode, appliedVersion);        } else {            nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion);        }    }    public void startLagDetector(final long version) {        final List<NodeAppliedStateTracker> laggingTrackers            = appliedStateTrackersByNode.values().stream().filter(t -> t.appliedVersionLessThan(version)).collect(Collectors.toList());        if (laggingTrackers.isEmpty()) {            logger.trace("lag detection for version {} is unnecessary: {}", version, appliedStateTrackersByNode.values());        } else {            logger.debug("starting lag detector for version {}: {}", version, laggingTrackers);            threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.GENERIC, new Runnable() {                @Override                public void run() {                    laggingTrackers.forEach(t -> t.checkForLag(version));                }                @Override                public String toString() {                    return "lag detector for version " + version + " on " + laggingTrackers;                }            });        }    }    @Override    public String toString() {        return "LagDetector{" +            "clusterStateApplicationTimeout=" + clusterStateApplicationTimeout +            ", appliedStateTrackersByNode=" + appliedStateTrackersByNode.values() +            '}';    }    // for assertions    Set<DiscoveryNode> getTrackedNodes() {        return Collections.unmodifiableSet(appliedStateTrackersByNode.keySet());    }    private class NodeAppliedStateTracker {        private final DiscoveryNode discoveryNode;        private final AtomicLong appliedVersion = new AtomicLong();        NodeAppliedStateTracker(final DiscoveryNode discoveryNode) {            this.discoveryNode = discoveryNode;        }        void increaseAppliedVersion(long appliedVersion) {            long maxAppliedVersion = this.appliedVersion.updateAndGet(v -> Math.max(v, appliedVersion));            logger.trace("{} applied version {}, max now {}", this, appliedVersion, maxAppliedVersion);        }        boolean appliedVersionLessThan(final long version) {            return appliedVersion.get() < version;        }        @Override        public String toString() {            return "NodeAppliedStateTracker{" +                "discoveryNode=" + discoveryNode +                ", appliedVersion=" + appliedVersion +                '}';        }        void checkForLag(final long version) {            if (appliedStateTrackersByNode.get(discoveryNode) != this) {                logger.trace("{} no longer active when checking version {}", this, version);                return;            }            long appliedVersion = this.appliedVersion.get();            if (version <= appliedVersion) {                logger.trace("{} satisfied when checking version {}, node applied version {}", this, version, appliedVersion);                return;            }            logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion);            onLagDetected.accept(discoveryNode);        }    }}
  • LagDetector用于检测并移除lagging nodes,其构造器读取cluster.follower_lag.timeout配置,默认为90000ms,最小值为1ms
  • startLagDetector首先获取从appliedStateTrackersByNode中过滤出appliedVersion小于指定version的laggingTrackers,之后延时clusterStateApplicationTimeout执行检测,其run方法会遍历laggingTrackers,挨个执行器NodeAppliedStateTracker.checkForLag方法
  • NodeAppliedStateTracker的checkForLag方法首先进行version判断,最后调用onLagDetected.accept(discoveryNode)

Coordinator

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

public class Coordinator extends AbstractLifecycleComponent implements Discovery {    //......    public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,                       NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,                       Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,                       ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random) {        this.settings = settings;        this.transportService = transportService;        this.masterService = masterService;        this.allocationService = allocationService;        this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);        this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));        this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,            this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);        this.persistedStateSupplier = persistedStateSupplier;        this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);        this.lastKnownLeader = Optional.empty();        this.lastJoin = Optional.empty();        this.joinAccumulator = new InitialJoinAccumulator();        this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);        this.random = random;        this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());        this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen);        configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);        this.peerFinder = new CoordinatorPeerFinder(settings, transportService,            new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);        this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,            this::handlePublishRequest, this::handleApplyCommit);        this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());        this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);        this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);        this.clusterApplier = clusterApplier;        masterService.setClusterStateSupplier(this::getStateForMasterService);        this.reconfigurator = new Reconfigurator(settings, clusterSettings);        this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers,            this::isInitialConfigurationSet, this::setInitialConfiguration);        this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService,            this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);        this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),            transportService::getLocalNode);        this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,            transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);    }    private void removeNode(DiscoveryNode discoveryNode, String reason) {        synchronized (mutex) {            if (mode == Mode.LEADER) {                masterService.submitStateUpdateTask("node-left",                    new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),                    ClusterStateTaskConfig.build(Priority.IMMEDIATE),                    nodeRemovalExecutor,                    nodeRemovalExecutor);            }        }    }    //......}
  • Coordinator的构造器创建了LagDetector,其Consumer<DiscoveryNode>执行的是removeNode方法,该方法在当前mode为LEADER的时候会执行NodeRemovalClusterStateTaskExecutor.Task

小结

  • LagDetector用于检测并移除lagging nodes,其构造器读取cluster.follower_lag.timeout配置,默认为90000ms,最小值为1ms
  • startLagDetector首先获取从appliedStateTrackersByNode中过滤出appliedVersion小于指定version的laggingTrackers,之后延时clusterStateApplicationTimeout执行检测,其run方法会遍历laggingTrackers,挨个执行器NodeAppliedStateTracker.checkForLag方法;NodeAppliedStateTracker的checkForLag方法首先进行version判断,最后调用onLagDetected.accept(discoveryNode)
  • Coordinator的构造器创建了LagDetector,其Consumer<DiscoveryNode>执行的是removeNode方法,该方法在当前mode为LEADER的时候会执行NodeRemovalClusterStateTaskExecutor.Task

doc

  • LagDetector