本文主要研究一下elasticsearch的PeerFinder

PeersRequest

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeersRequest.java

public class PeersRequest extends TransportRequest {    private final DiscoveryNode sourceNode;    private final List<DiscoveryNode> knownPeers;    public PeersRequest(DiscoveryNode sourceNode, List<DiscoveryNode> knownPeers) {        assert knownPeers.contains(sourceNode) == false : "local node is not a peer";        this.sourceNode = sourceNode;        this.knownPeers = knownPeers;    }    public PeersRequest(StreamInput in) throws IOException {        super(in);        sourceNode = new DiscoveryNode(in);        knownPeers = in.readList(DiscoveryNode::new);    }    @Override    public void writeTo(StreamOutput out) throws IOException {        super.writeTo(out);        sourceNode.writeTo(out);        out.writeList(knownPeers);    }    public List<DiscoveryNode> getKnownPeers() {        return knownPeers;    }    public DiscoveryNode getSourceNode() {        return sourceNode;    }    @Override    public String toString() {        return "PeersRequest{" +            "sourceNode=" + sourceNode +            ", knownPeers=" + knownPeers +            '}';    }    @Override    public boolean equals(Object o) {        if (this == o) return true;        if (o == null || getClass() != o.getClass()) return false;        PeersRequest that = (PeersRequest) o;        return Objects.equals(sourceNode, that.sourceNode) &&            Objects.equals(knownPeers, that.knownPeers);    }    @Override    public int hashCode() {        return Objects.hash(sourceNode, knownPeers);    }}
  • PeersRequest有两个属性,分别是sourceNode以及knownPeers

PeersResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/cluster/coordination/PeersResponse.java

public class PeersResponse extends TransportResponse {    private final Optional<DiscoveryNode> masterNode;    private final List<DiscoveryNode> knownPeers;    private final long term;    public PeersResponse(Optional<DiscoveryNode> masterNode, List<DiscoveryNode> knownPeers, long term) {        assert masterNode.isPresent() == false || knownPeers.isEmpty();        this.masterNode = masterNode;        this.knownPeers = knownPeers;        this.term = term;    }    public PeersResponse(StreamInput in) throws IOException {        masterNode = Optional.ofNullable(in.readOptionalWriteable(DiscoveryNode::new));        knownPeers = in.readList(DiscoveryNode::new);        term = in.readLong();        assert masterNode.isPresent() == false || knownPeers.isEmpty();    }    @Override    public void writeTo(StreamOutput out) throws IOException {        super.writeTo(out);        out.writeOptionalWriteable(masterNode.orElse(null));        out.writeList(knownPeers);        out.writeLong(term);    }    /**     * @return the node that is currently leading, according to the responding node.     */    public Optional<DiscoveryNode> getMasterNode() {        return masterNode;    }    /**     * @return the collection of known peers of the responding node, or an empty collection if the responding node believes there     * is currently a leader.     */    public List<DiscoveryNode> getKnownPeers() {        return knownPeers;    }    /**     * @return the current term of the responding node. If the responding node is the leader then this is the term in which it is     * currently leading.     */    public long getTerm() {        return term;    }    @Override    public String toString() {        return "PeersResponse{" +            "masterNode=" + masterNode +            ", knownPeers=" + knownPeers +            ", term=" + term +            '}';    }    @Override    public boolean equals(Object o) {        if (this == o) return true;        if (o == null || getClass() != o.getClass()) return false;        PeersResponse that = (PeersResponse) o;        return term == that.term &&            Objects.equals(masterNode, that.masterNode) &&            Objects.equals(knownPeers, that.knownPeers);    }    @Override    public int hashCode() {        return Objects.hash(masterNode, knownPeers, term);    }}
  • PeersResponse有三个属性,分别是masterNode、knownPeers、term

PeerFinder

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

public abstract class PeerFinder {    private static final Logger logger = LogManager.getLogger(PeerFinder.class);    public static final String REQUEST_PEERS_ACTION_NAME = "internal:discovery/request_peers";    // the time between attempts to find all peers    public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_SETTING =        Setting.timeSetting("discovery.find_peers_interval",            TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);    public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING =        Setting.timeSetting("discovery.request_peers_timeout",            TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);    private final Settings settings;    private final TimeValue findPeersInterval;    private final TimeValue requestPeersTimeout;    private final Object mutex = new Object();    private final TransportService transportService;    private final TransportAddressConnector transportAddressConnector;    private final ConfiguredHostsResolver configuredHostsResolver;    private volatile long currentTerm;    private boolean active;    private DiscoveryNodes lastAcceptedNodes;    private final Map<TransportAddress, Peer> peersByAddress = new LinkedHashMap<>();    private Optional<DiscoveryNode> leader = Optional.empty();    private volatile List<TransportAddress> lastResolvedAddresses = emptyList();    public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,                      ConfiguredHostsResolver configuredHostsResolver) {        this.settings = settings;        findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);        requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);        this.transportService = transportService;        this.transportAddressConnector = transportAddressConnector;        this.configuredHostsResolver = configuredHostsResolver;        transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,            PeersRequest::new,            (request, channel, task) -> channel.sendResponse(handlePeersRequest(request)));        transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false,            UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler());    }    public void activate(final DiscoveryNodes lastAcceptedNodes) {        logger.trace("activating with {}", lastAcceptedNodes);        synchronized (mutex) {            assert assertInactiveWithNoKnownPeers();            active = true;            this.lastAcceptedNodes = lastAcceptedNodes;            leader = Optional.empty();            handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected        }        onFoundPeersUpdated(); // trigger a check for a quorum already    }    public void deactivate(DiscoveryNode leader) {        final boolean peersRemoved;        synchronized (mutex) {            logger.trace("deactivating and setting leader to {}", leader);            active = false;            peersRemoved = handleWakeUp();            this.leader = Optional.of(leader);            assert assertInactiveWithNoKnownPeers();        }        if (peersRemoved) {            onFoundPeersUpdated();        }    }    // exposed to subclasses for testing    protected final boolean holdsLock() {        return Thread.holdsLock(mutex);    }    private boolean assertInactiveWithNoKnownPeers() {        assert holdsLock() : "PeerFinder mutex not held";        assert active == false;        assert peersByAddress.isEmpty() : peersByAddress.keySet();        return true;    }    PeersResponse handlePeersRequest(PeersRequest peersRequest) {        synchronized (mutex) {            assert peersRequest.getSourceNode().equals(getLocalNode()) == false;            final List<DiscoveryNode> knownPeers;            if (active) {                assert leader.isPresent() == false : leader;                if (peersRequest.getSourceNode().isMasterNode()) {                    startProbe(peersRequest.getSourceNode().getAddress());                }                peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe);                knownPeers = getFoundPeersUnderLock();            } else {                assert leader.isPresent() || lastAcceptedNodes == null;                knownPeers = emptyList();            }            return new PeersResponse(leader, knownPeers, currentTerm);        }    }    // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package)    public Optional<DiscoveryNode> getLeader() {        synchronized (mutex) {            return leader;        }    }    // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package)    public long getCurrentTerm() {        return currentTerm;    }    public void setCurrentTerm(long currentTerm) {        this.currentTerm = currentTerm;    }    private DiscoveryNode getLocalNode() {        final DiscoveryNode localNode = transportService.getLocalNode();        assert localNode != null;        return localNode;    }    protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term);    protected abstract void onFoundPeersUpdated();    public List<TransportAddress> getLastResolvedAddresses() {        return lastResolvedAddresses;    }    public Iterable<DiscoveryNode> getFoundPeers() {        synchronized (mutex) {            return getFoundPeersUnderLock();        }    }    private List<DiscoveryNode> getFoundPeersUnderLock() {        assert holdsLock() : "PeerFinder mutex not held";        return peersByAddress.values().stream()            .map(Peer::getDiscoveryNode).filter(Objects::nonNull).distinct().collect(Collectors.toList());    }    private Peer createConnectingPeer(TransportAddress transportAddress) {        Peer peer = new Peer(transportAddress);        peer.establishConnection();        return peer;    }    /**     * @return whether any peers were removed due to disconnection     */    private boolean handleWakeUp() {        assert holdsLock() : "PeerFinder mutex not held";        final boolean peersRemoved = peersByAddress.values().removeIf(Peer::handleWakeUp);        if (active == false) {            logger.trace("not active");            return peersRemoved;        }        logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes);        for (ObjectCursor<DiscoveryNode> discoveryNodeObjectCursor : lastAcceptedNodes.getMasterNodes().values()) {            startProbe(discoveryNodeObjectCursor.value.getAddress());        }        configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> {            synchronized (mutex) {                lastResolvedAddresses = providedAddresses;                logger.trace("probing resolved transport addresses {}", providedAddresses);                providedAddresses.forEach(this::startProbe);            }        });        transportService.getThreadPool().scheduleUnlessShuttingDown(findPeersInterval, Names.GENERIC, new AbstractRunnable() {            @Override            public boolean isForceExecution() {                return true;            }            @Override            public void onFailure(Exception e) {                assert false : e;                logger.debug("unexpected exception in wakeup", e);            }            @Override            protected void doRun() {                synchronized (mutex) {                    if (handleWakeUp() == false) {                        return;                    }                }                onFoundPeersUpdated();            }            @Override            public String toString() {                return "PeerFinder handling wakeup";            }        });        return peersRemoved;    }    protected void startProbe(TransportAddress transportAddress) {        assert holdsLock() : "PeerFinder mutex not held";        if (active == false) {            logger.trace("startProbe({}) not running", transportAddress);            return;        }        if (transportAddress.equals(getLocalNode().getAddress())) {            logger.trace("startProbe({}) not probing local node", transportAddress);            return;        }        peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer);    }    private class Zen1UnicastPingRequestHandler implements TransportRequestHandler<UnicastZenPing.UnicastPingRequest> {        @Override        public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {            final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(),                Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(emptyList()));            final PeersResponse peersResponse = handlePeersRequest(peersRequest);            final List<ZenPing.PingResponse> pingResponses = new ArrayList<>();            final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);            pingResponses.add(new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()),                peersResponse.getMasterNode().orElse(null),                clusterName, ClusterState.UNKNOWN_VERSION));            peersResponse.getKnownPeers().forEach(dn -> pingResponses.add(                new ZenPing.PingResponse(ZenPing.PingResponse.FAKE_PING_ID,                    isZen1Node(dn) ? dn : createDiscoveryNodeWithImpossiblyHighId(dn), null, clusterName, ClusterState.UNKNOWN_VERSION)));            channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0])));        }    }    //......}
  • PeerFinder的构造器注册了两个handler,一个是针对REQUEST_PEERS_ACTION_NAME,执行handlePeersRequest;一个是针对UnicastZenPing.ACTION_NAME,其handler为Zen1UnicastPingRequestHandler,该handler里头也调用了handlePeersRequest方法
  • handlePeersRequest主要是针对masterNode及peersRequest.getKnownPeers()挨个执行startProbe,然后通过getFoundPeersUnderLock设置knownPeers,最后通过leader、knownPeers及currentTerm构造PeersResponse返回
  • startProbe方法主要是执行createConnectingPeer,并将结果放入key为transportAddress的名为peersByAddress的map中;createConnectingPeer方法主要是创建Peer,然后调用Peer的establishConnection方法

Peer

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

    private class Peer {        private final TransportAddress transportAddress;        private SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>();        private volatile boolean peersRequestInFlight;        Peer(TransportAddress transportAddress) {            this.transportAddress = transportAddress;        }        @Nullable        DiscoveryNode getDiscoveryNode() {            return discoveryNode.get();        }        boolean handleWakeUp() {            assert holdsLock() : "PeerFinder mutex not held";            if (active == false) {                return true;            }            final DiscoveryNode discoveryNode = getDiscoveryNode();            // may be null if connection not yet established            if (discoveryNode != null) {                if (transportService.nodeConnected(discoveryNode)) {                    if (peersRequestInFlight == false) {                        requestPeers();                    }                } else {                    logger.trace("{} no longer connected", this);                    return true;                }            }            return false;        }        void establishConnection() {            assert holdsLock() : "PeerFinder mutex not held";            assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode();            assert active;            logger.trace("{} attempting connection", this);            transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener<DiscoveryNode>() {                @Override                public void onResponse(DiscoveryNode remoteNode) {                    assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible";                    assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node";                    synchronized (mutex) {                        if (active == false) {                            return;                        }                        assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();                        discoveryNode.set(remoteNode);                        requestPeers();                    }                    assert holdsLock() == false : "PeerFinder mutex is held in error";                    onFoundPeersUpdated();                }                @Override                public void onFailure(Exception e) {                    logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e);                    synchronized (mutex) {                        peersByAddress.remove(transportAddress);                    }                }            });        }        private void requestPeers() {            assert holdsLock() : "PeerFinder mutex not held";            assert peersRequestInFlight == false : "PeersRequest already in flight";            assert active;            final DiscoveryNode discoveryNode = getDiscoveryNode();            assert discoveryNode != null : "cannot request peers without first connecting";            if (discoveryNode.equals(getLocalNode())) {                logger.trace("{} not requesting peers from local node", this);                return;            }            logger.trace("{} requesting peers", this);            peersRequestInFlight = true;            final List<DiscoveryNode> knownNodes = getFoundPeersUnderLock();            final TransportResponseHandler<PeersResponse> peersResponseHandler = new TransportResponseHandler<PeersResponse>() {                @Override                public PeersResponse read(StreamInput in) throws IOException {                    return new PeersResponse(in);                }                @Override                public void handleResponse(PeersResponse response) {                    logger.trace("{} received {}", Peer.this, response);                    synchronized (mutex) {                        if (active == false) {                            return;                        }                        peersRequestInFlight = false;                        response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe);                        response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe);                    }                    if (response.getMasterNode().equals(Optional.of(discoveryNode))) {                        // Must not hold lock here to avoid deadlock                        assert holdsLock() == false : "PeerFinder mutex is held in error";                        onActiveMasterFound(discoveryNode, response.getTerm());                    }                }                @Override                public void handleException(TransportException exp) {                    peersRequestInFlight = false;                    logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp);                }                @Override                public String executor() {                    return Names.GENERIC;                }            };            final String actionName;            final TransportRequest transportRequest;            final TransportResponseHandler<?> transportResponseHandler;            if (isZen1Node(discoveryNode)) {                actionName = UnicastZenPing.ACTION_NAME;                transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings),                    new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(getLocalNode()), null,                        ClusterName.CLUSTER_NAME_SETTING.get(settings), ClusterState.UNKNOWN_VERSION));                transportResponseHandler = peersResponseHandler.wrap(ucResponse -> {                    Optional<DiscoveryNode> optionalMasterNode = Arrays.stream(ucResponse.pingResponses)                        .filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master()))                        .map(ZenPing.PingResponse::node)                        .findFirst();                    List<DiscoveryNode> discoveredNodes = new ArrayList<>();                    if (optionalMasterNode.isPresent() == false) {                        Arrays.stream(ucResponse.pingResponses).map(PingResponse::master).filter(Objects::nonNull)                            .forEach(discoveredNodes::add);                        Arrays.stream(ucResponse.pingResponses).map(PingResponse::node).forEach(discoveredNodes::add);                    }                    return new PeersResponse(optionalMasterNode, discoveredNodes, 0L);                }, UnicastZenPing.UnicastPingResponse::new);            } else {                actionName = REQUEST_PEERS_ACTION_NAME;                transportRequest = new PeersRequest(getLocalNode(), knownNodes);                transportResponseHandler = peersResponseHandler;            }            transportService.sendRequest(discoveryNode, actionName,                transportRequest,                TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(),                transportResponseHandler);        }        @Override        public String toString() {            return "Peer{" +                "transportAddress=" + transportAddress +                ", discoveryNode=" + discoveryNode.get() +                ", peersRequestInFlight=" + peersRequestInFlight +                '}';        }    }
  • Peer的establishConnection方法主要是通过transportAddressConnector.connectToRemoteMasterNode请求masterNode,在结果成功返回时执行requestPeers及onFoundPeersUpdated方法;如果出现异常则从peersByAddress移除该transportAddress对应的Peer
  • requestPeers方法通过getFoundPeersUnderLock方法获取knownNodes构造PeersRequest,然后通过transportService.sendRequest向discoveryNode发送请求,请求成功返回则执行masterNode的startProbe以及knownPeers的startProbe方法,如果当前discoveryNode是masterNode,则触发onActiveMasterFound方法
  • 7.0版本的代码针对之前的版本做了不同的处理,requestPeers方法发送的request为UnicastZenPing.UnicastPingRequest

小结

  • PeerFinder的构造器注册了两个handler,一个是针对REQUEST_PEERS_ACTION_NAME,执行handlePeersRequest;一个是针对UnicastZenPing.ACTION_NAME,其handler为Zen1UnicastPingRequestHandler,该handler里头也调用了handlePeersRequest方法
  • handlePeersRequest主要是针对masterNode及peersRequest.getKnownPeers()挨个执行startProbe,然后通过getFoundPeersUnderLock设置knownPeers,最后通过leader、knownPeers及currentTerm构造PeersResponse返回;startProbe方法主要是执行createConnectingPeer,并将结果放入key为transportAddress的名为peersByAddress的map中;createConnectingPeer方法主要是创建Peer,然后调用Peer的establishConnection方法
  • Peer的establishConnection方法主要是通过transportAddressConnector.connectToRemoteMasterNode请求masterNode,在结果成功返回时执行requestPeers及onFoundPeersUpdated方法;如果出现异常则从peersByAddress移除该transportAddress对应的Peer;requestPeers方法通过getFoundPeersUnderLock方法获取knownNodes构造PeersRequest,然后通过transportService.sendRequest向discoveryNode发送请求,请求成功返回则执行masterNode的startProbe以及knownPeers的startProbe方法,如果当前discoveryNode是masterNode,则触发onActiveMasterFound方法

doc

  • PeerFinder