本文主要研究一下elasticsearch的ElectMasterService

ElectMasterService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java

public class ElectMasterService {    private static final Logger logger = LogManager.getLogger(ElectMasterService.class);    public static final Setting<Integer> DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING =        Setting.intSetting("discovery.zen.minimum_master_nodes", -1, Property.Dynamic, Property.NodeScope, Property.Deprecated);    private volatile int minimumMasterNodes;    /**     * a class to encapsulate all the information about a candidate in a master election     * that is needed to decided which of the candidates should win     */    public static class MasterCandidate {        public static final long UNRECOVERED_CLUSTER_VERSION = -1;        final DiscoveryNode node;        final long clusterStateVersion;        public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {            Objects.requireNonNull(node);            assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;            assert node.isMasterNode();            this.node = node;            this.clusterStateVersion = clusterStateVersion;        }        public DiscoveryNode getNode() {            return node;        }        public long getClusterStateVersion() {            return clusterStateVersion;        }        @Override        public String toString() {            return "Candidate{" +                "node=" + node +                ", clusterStateVersion=" + clusterStateVersion +                '}';        }        /**         * compares two candidates to indicate which the a better master.         * A higher cluster state version is better         *         * @return -1 if c1 is a batter candidate, 1 if c2.         */        public static int compare(MasterCandidate c1, MasterCandidate c2) {            // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted            // list, so if c2 has a higher cluster state version, it needs to come first.            int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);            if (ret == 0) {                ret = compareNodes(c1.getNode(), c2.getNode());            }            return ret;        }    }    public ElectMasterService(Settings settings) {        this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);        logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);    }    public void minimumMasterNodes(int minimumMasterNodes) {        this.minimumMasterNodes = minimumMasterNodes;    }    public int minimumMasterNodes() {        return minimumMasterNodes;    }    public int countMasterNodes(Iterable<DiscoveryNode> nodes) {        int count = 0;        for (DiscoveryNode node : nodes) {            if (node.isMasterNode()) {                count++;            }        }        return count;    }    public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {        if (candidates.isEmpty()) {            return false;        }        if (minimumMasterNodes < 1) {            return true;        }        assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :            "duplicates ahead: " + candidates;        return candidates.size() >= minimumMasterNodes;    }    /**     * Elects a new master out of the possible nodes, returning it. Returns {@code null}     * if no master has been elected.     */    public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {        assert hasEnoughCandidates(candidates);        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);        sortedCandidates.sort(MasterCandidate::compare);        return sortedCandidates.get(0);    }    /** selects the best active master to join, where multiple are discovered */    public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {        return activeMasters.stream().min(ElectMasterService::compareNodes).get();    }    public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {        final int count = countMasterNodes(nodes);        return count > 0 && (minimumMasterNodes < 0 || count >= minimumMasterNodes);    }    public boolean hasTooManyMasterNodes(Iterable<DiscoveryNode> nodes) {        final int count = countMasterNodes(nodes);        return count > 1 && minimumMasterNodes <= count / 2;    }    public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, ClusterState newState) {        // check if min_master_nodes setting is too low and log warning        if (hasTooManyMasterNodes(oldState.nodes()) == false && hasTooManyMasterNodes(newState.nodes())) {            logger.warn("value for setting \"{}\" is too low. This can result in data loss! Please set it to at least a quorum of master-" +                    "eligible nodes (current value: [{}], total number of master-eligible nodes used for publishing in this round: [{}])",                ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNodes(),                newState.getNodes().getMasterNodes().size());        }    }    /**     * Returns the given nodes sorted by likelihood of being elected as master, most likely first.     * Non-master nodes are not removed but are rather put in the end     */    static List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {        ArrayList<DiscoveryNode> sortedNodes = CollectionUtils.iterableAsArrayList(nodes);        CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes);        return sortedNodes;    }    /**     * Returns a list of the next possible masters.     */    public DiscoveryNode[] nextPossibleMasters(ObjectContainer<DiscoveryNode> nodes, int numberOfPossibleMasters) {        List<DiscoveryNode> sortedNodes = sortedMasterNodes(Arrays.asList(nodes.toArray(DiscoveryNode.class)));        if (sortedNodes == null) {            return new DiscoveryNode[0];        }        List<DiscoveryNode> nextPossibleMasters = new ArrayList<>(numberOfPossibleMasters);        int counter = 0;        for (DiscoveryNode nextPossibleMaster : sortedNodes) {            if (++counter >= numberOfPossibleMasters) {                break;            }            nextPossibleMasters.add(nextPossibleMaster);        }        return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]);    }    private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {        List<DiscoveryNode> possibleNodes = CollectionUtils.iterableAsArrayList(nodes);        if (possibleNodes.isEmpty()) {            return null;        }        // clean non master nodes        possibleNodes.removeIf(node -> !node.isMasterNode());        CollectionUtil.introSort(possibleNodes, ElectMasterService::compareNodes);        return possibleNodes;    }    /** master nodes go before other nodes, with a secondary sort by id **/     private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {        if (o1.isMasterNode() && !o2.isMasterNode()) {            return -1;        }        if (!o1.isMasterNode() && o2.isMasterNode()) {            return 1;        }        return o1.getId().compareTo(o2.getId());    }}
  • ElectMasterService的构造器读取discovery.zen.minimum_master_nodes配置到变量minimumMasterNodes
  • ElectMasterService定义了静态类MasterCandidate,它提供了compare静态方法用于比较两个MasterCandidate,它首先对比clusterStateVersion,如果该值相同再进行compareNodes,compareNodes会先判断下是否masterNode,都不是则对比他们各自的node的id
  • electMaster方法首先通过hasEnoughCandidates来确定是否有足够的candidates,足够的话则对他们进行排序,最后取第一个作为master返回

ZenDiscovery.findMaster

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {    //......    private DiscoveryNode findMaster() {        logger.trace("starting to ping");        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();        if (fullPingResponses == null) {            logger.trace("No full ping responses");            return null;        }        if (logger.isTraceEnabled()) {            StringBuilder sb = new StringBuilder();            if (fullPingResponses.size() == 0) {                sb.append(" {none}");            } else {                for (ZenPing.PingResponse pingResponse : fullPingResponses) {                    sb.append("\n\t--> ").append(pingResponse);                }            }            logger.trace("full ping responses:{}", sb);        }        final DiscoveryNode localNode = transportService.getLocalNode();        // add our selves        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));        // filter responses        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);        List<DiscoveryNode> activeMasters = new ArrayList<>();        for (ZenPing.PingResponse pingResponse : pingResponses) {            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {                activeMasters.add(pingResponse.master());            }        }        // nodes discovered during pinging        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();        for (ZenPing.PingResponse pingResponse : pingResponses) {            if (pingResponse.node().isMasterNode()) {                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));            }        }        if (activeMasters.isEmpty()) {            if (electMaster.hasEnoughCandidates(masterCandidates)) {                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);                logger.trace("candidate {} won election", winner);                return winner.getNode();            } else {                // if we don't have enough master nodes, we bail, because there are not enough master to elect from                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",                            masterCandidates, electMaster.minimumMasterNodes());                return null;            }        } else {            assert !activeMasters.contains(localNode) :                "local node should never be elected as master when other nodes indicate an active master";            // lets tie break between discovered nodes            return electMaster.tieBreakActiveMasters(activeMasters);        }    }    //......}
  • ZenDiscovery的findMaster方法在activeMasters.isEmpty()时会通过electMaster.electMaster(masterCandidates)来选取winner作为master返回

小结

  • ElectMasterService的构造器读取discovery.zen.minimum_master_nodes配置到变量minimumMasterNodes;ElectMasterService定义了静态类MasterCandidate,它提供了compare静态方法用于比较两个MasterCandidate,它首先对比clusterStateVersion,如果该值相同再进行compareNodes,compareNodes会先判断下是否masterNode,都不是则对比他们各自的node的id
  • electMaster方法首先通过hasEnoughCandidates来确定是否有足够的candidates,足够的话则对他们进行排序,最后取第一个作为master返回
  • ZenDiscovery的findMaster方法在activeMasters.isEmpty()时会通过electMaster.electMaster(masterCandidates)来选取winner作为master返回

doc

  • Important discovery and cluster formation settings