本文主要研究一下elasticsearch的NodesFaultDetection

NodesFaultDetection

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

public class NodesFaultDetection extends AbstractComponent {    public static interface Listener {        void onNodeFailure(DiscoveryNode node, String reason);    }    private final ThreadPool threadPool;    private final TransportService transportService;    private final boolean connectOnNetworkDisconnect;    private final TimeValue pingInterval;    private final TimeValue pingRetryTimeout;    private final int pingRetryCount;    // used mainly for testing, should always be true    private final boolean registerConnectionListener;    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();    private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();    private final FDConnectionListener connectionListener;    private volatile DiscoveryNodes latestNodes = EMPTY_NODES;    private volatile boolean running = false;    public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {        super(settings);        this.threadPool = threadPool;        this.transportService = transportService;        this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);        this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));        this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));        this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);        this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);        logger.debug("[node  ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);        transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler());        this.connectionListener = new FDConnectionListener();        if (registerConnectionListener) {            transportService.addConnectionListener(connectionListener);        }    }    public NodesFaultDetection start() {        if (running) {            return this;        }        running = true;        return this;    }    public NodesFaultDetection stop() {        if (!running) {            return this;        }        running = false;        return this;    }    public void close() {        stop();        transportService.removeHandler(PingRequestHandler.ACTION);        transportService.removeConnectionListener(connectionListener);    }    //......}
  • NodesFaultDetection继承了AbstractComponent,它定义了一个CopyOnWriteArrayList类型的listeners,一个ConcurrentMap的nodesFD,connectionListener、latestNodes、running等属性
  • 其构造器读取connect_on_network_disconnect(默认true)、ping_interval(默认1s)、ping_timeout(默认30s)、ping_retries(默认为3)、register_connection_listener(默认true)配置,然后给transportService注册了PingRequestHandler.ACTION的PingRequestHandler,添加了FDConnectionListener
  • start方法用于设置running为true;stop用于设置running为false;close方法先执行stop,然后从transportService移除PingRequestHandler.ACTION的handler,并移除connectionListener

PingRequestHandler

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

    class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {        public static final String ACTION = "discovery/zen/fd/ping";        @Override        public PingRequest newInstance() {            return new PingRequest();        }        @Override        public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {            // if we are not the node we are supposed to be pinged, send an exception            // this can happen when a kill -9 is sent, and another node is started using the same port            if (!latestNodes.localNodeId().equals(request.nodeId)) {                throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");            }            channel.sendResponse(new PingResponse());        }        @Override        public String executor() {            return ThreadPool.Names.SAME;        }    }    static class PingRequest extends TransportRequest {        // the (assumed) node id we are pinging        private String nodeId;        PingRequest() {        }        PingRequest(String nodeId) {            this.nodeId = nodeId;        }        @Override        public void readFrom(StreamInput in) throws IOException {            super.readFrom(in);            nodeId = in.readString();        }        @Override        public void writeTo(StreamOutput out) throws IOException {            super.writeTo(out);            out.writeString(nodeId);        }    }    private static class PingResponse extends TransportResponse {        private PingResponse() {        }        @Override        public void readFrom(StreamInput in) throws IOException {            super.readFrom(in);        }        @Override        public void writeTo(StreamOutput out) throws IOException {            super.writeTo(out);        }    }
  • PingRequestHandler的newInstance方法用于创建PingRequest,该对象定义了nodeId属性用于标识它要请求的目标nodeId;而messageReceived方法用于响应PingRequest请求,它会先判断目标nodeId是否跟localNodeId一致,一致的话则返回PingResponse

FDConnectionListener

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

    private class FDConnectionListener implements TransportConnectionListener {        @Override        public void onNodeConnected(DiscoveryNode node) {        }        @Override        public void onNodeDisconnected(DiscoveryNode node) {            handleTransportDisconnect(node);        }    }    private void handleTransportDisconnect(DiscoveryNode node) {        if (!latestNodes.nodeExists(node.id())) {            return;        }        NodeFD nodeFD = nodesFD.remove(node);        if (nodeFD == null) {            return;        }        if (!running) {            return;        }        nodeFD.running = false;        if (connectOnNetworkDisconnect) {            try {                transportService.connectToNode(node);                nodesFD.put(node, new NodeFD());                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node));            } catch (Exception e) {                logger.trace("[node  ] [{}] transport disconnected (with verified connect)", node);                notifyNodeFailure(node, "transport disconnected (with verified connect)");            }        } else {            logger.trace("[node  ] [{}] transport disconnected", node);            notifyNodeFailure(node, "transport disconnected");        }    }    private void notifyNodeFailure(final DiscoveryNode node, final String reason) {        threadPool.generic().execute(new Runnable() {            @Override            public void run() {                for (Listener listener : listeners) {                    listener.onNodeFailure(node, reason);                }            }        });    }
  • FDConnectionListener在onNodeDisconnected的时候会执行handleTransportDisconnect;该方法会将该node从nodesFD中移除,标记该nodeFD的running为false
  • 如果connectOnNetworkDisconnect为true则对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行;如果connect异常或者connectOnNetworkDisconnect为false,否执行notifyNodeFailure方法
  • notifyNodeFailure方法则会触发NodesFaultDetection.Listener.onNodeFailure回调,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法

ZenDiscovery

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {    //......    @Inject    public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,                        TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,                        DiscoveryNodeService discoveryNodeService, ZenPingService pingService) {        super(settings);        this.clusterName = clusterName;        this.threadPool = threadPool;        this.clusterService = clusterService;        this.transportService = transportService;        this.discoveryNodeService = discoveryNodeService;        this.pingService = pingService;        // also support direct discovery.zen settings, for cases when it gets extended        this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));        this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);        this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);        this.masterElectionFilterDataNodes = settings.getAsBoolean("discovery.zen.master_election.filter_data", false);        logger.debug("using ping.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);        this.electMaster = new ElectMasterService(settings);        nodeSettingsService.addListener(new ApplySettings());        this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);        this.masterFD.addListener(new MasterNodeFailureListener());        this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);        this.nodesFD.addListener(new NodeFailureListener());        this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());        this.pingService.setNodesProvider(this);        this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());        transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler());    }    protected void doStart() throws ElasticSearchException {        Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();        // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling        String nodeId = UUID.randomBase64UUID();        localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);        latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();        nodesFD.updateNodes(latestDiscoNodes);        pingService.start();        // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered        asyncJoinCluster();    }    public void publish(ClusterState clusterState) {        if (!master) {            throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");        }        latestDiscoNodes = clusterState.nodes();        nodesFD.updateNodes(clusterState.nodes());        publishClusterState.publish(clusterState);    }    private class NodeFailureListener implements NodesFaultDetection.Listener {        @Override        public void onNodeFailure(DiscoveryNode node, String reason) {            handleNodeFailure(node, reason);        }    }    private void handleNodeFailure(final DiscoveryNode node, String reason) {        if (lifecycleState() != Lifecycle.State.STARTED) {            // not started, ignore a node failure            return;        }        if (!master) {            // nothing to do here...            return;        }        clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() {            @Override            public ClusterState execute(ClusterState currentState) {                DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()                        .putAll(currentState.nodes())                        .remove(node.id());                latestDiscoNodes = builder.build();                currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();                // check if we have enough master nodes, if not, we need to move into joining the cluster again                if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {                    return rejoin(currentState, "not enough master nodes");                }                // eagerly run reroute to remove dead nodes from routing table                RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());                return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();            }            @Override            public void clusterStateProcessed(ClusterState clusterState) {                sendInitialStateEventIfNeeded();            }        });    }    //......}
  • ZenDiscovery的构造器创建了NodesFaultDetection,并给它添加了NodeFailureListener;该listener实现了NodesFaultDetection.Listener接口,其onNodeFailure回调执行的是handleNodeFailure方法,它会执行ProcessedClusterStateUpdateTask,将该node从currentState.nodes()中移除,然后判断masterNode数量是否满足minimumMasterNodes,不够的话会执行rejoin方法,够的话则执行allocationService.reroute
  • 其doStart方法会根据配置文件的node配置创建localNode,然后加入到latestDiscoNodes中,之后执行nodesFD.updateNodes(latestDiscoNodes)方法,然后执行pingService.start()及asyncJoinCluster()
  • 其publish方法则根据clusterState的nodes来更新本地的latestDiscoNodes,然后执行nodesFD.updateNodes(latestDiscoNodes)方法,最后执行publishClusterState.publish(clusterState)

NodesFaultDetection.updateNodes

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

public class NodesFaultDetection extends AbstractComponent {    //......    public void updateNodes(DiscoveryNodes nodes) {        DiscoveryNodes prevNodes = latestNodes;        this.latestNodes = nodes;        if (!running) {            return;        }        DiscoveryNodes.Delta delta = nodes.delta(prevNodes);        for (DiscoveryNode newNode : delta.addedNodes()) {            if (newNode.id().equals(nodes.localNodeId())) {                // no need to monitor the local node                continue;            }            if (!nodesFD.containsKey(newNode)) {                nodesFD.put(newNode, new NodeFD());                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));            }        }        for (DiscoveryNode removedNode : delta.removedNodes()) {            nodesFD.remove(removedNode);        }    }    //......}
  • NodesFaultDetection提供了updateNodes方法用于更新自身的latestNodes,该方法调用了nodes.delta(prevNodes)来计算DiscoveryNodes.Delta,它的addedNodes方法返回新增的node,而emovedNodes()方法返回删除的node
  • 对于newNode先判断是否在nodesFD,如果不在的话,则会添加到nodesFD中,并注册一个SendPingRequest的延时任务,延时pingInterval执行
  • 对于removedNode则将其从nodesFD中移除;handleTransportDisconnect方法也会将一个disconnect的node从ndoesFD中移除,如果重试一次成功则会再次放入nodesFD中

SendPingRequest

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

    private class SendPingRequest implements Runnable {        private final DiscoveryNode node;        private SendPingRequest(DiscoveryNode node) {            this.node = node;        }        @Override        public void run() {            if (!running) {                return;            }            transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),                    new BaseTransportResponseHandler<PingResponse>() {                        @Override                        public PingResponse newInstance() {                            return new PingResponse();                        }                        @Override                        public void handleResponse(PingResponse response) {                            if (!running) {                                return;                            }                            NodeFD nodeFD = nodesFD.get(node);                            if (nodeFD != null) {                                if (!nodeFD.running) {                                    return;                                }                                nodeFD.retryCount = 0;                                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this);                            }                        }                        @Override                        public void handleException(TransportException exp) {                            // check if the master node did not get switched on us...                            if (!running) {                                return;                            }                            if (exp instanceof ConnectTransportException) {                                // ignore this one, we already handle it by registering a connection listener                                return;                            }                            NodeFD nodeFD = nodesFD.get(node);                            if (nodeFD != null) {                                if (!nodeFD.running) {                                    return;                                }                                int retryCount = ++nodeFD.retryCount;                                logger.trace("[node  ] failed to ping [{}], retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount);                                if (retryCount >= pingRetryCount) {                                    logger.debug("[node  ] failed to ping [{}], tried [{}] times, each with  maximum [{}] timeout", node, pingRetryCount, pingRetryTimeout);                                    // not good, failure                                    if (nodesFD.remove(node) != null) {                                        notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");                                    }                                } else {                                    // resend the request, not reschedule, rely on send timeout                                    transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),                                            options().withHighType().withTimeout(pingRetryTimeout), this);                                }                            }                        }                        @Override                        public String executor() {                            return ThreadPool.Names.SAME;                        }                    });        }    }
  • SendPingRequest方法会往目标node发送PingRequest,其超时时间为pingRetryTimeout;其handleResponse方法会判断该node是否在nodesFD中,如果已经被移除了则忽略,如果改nodeFD的running为false,也忽略,否则重置其retryCount,并重新注册SendPingRequest的延时任务,延时pingInterval执行
  • 如果请求出现TransportException则判断是否是ConnectTransportException,如果是则忽略,因为该异常已经由往transportService注册的FDConnectionListener的onNodeDisconnected来处理
  • 如果是其他异常则增加nodeFD.retryCount,当retryCount大于等于配置的pingRetryCount时,则会将该node从nodesFD中移除,并回调notifyNodeFailure方法,具体就是回调了ZenDiscovery的handleNodeFailure方法;如果没有超过配置的pingRetryCount则会进行重试,重新发送PingRequest请求

小结

  • NodesFaultDetection给transportService注册了PingRequestHandler.ACTION的PingRequestHandler,添加了FDConnectionListener;PingRequestHandler用于响应PingRequest请求,返回PingResponse;FDConnectionListener则用于处理ConnectTransportException异常
  • FDConnectionListener的onNodeDisconnected方法会将该node从nodesFD中移除,标记该nodeFD的running为false;如果connectOnNetworkDisconnect为true则会重试一次(对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行);如果connect异常或者connectOnNetworkDisconnect为false,否执行notifyNodeFailure方法;notifyNodeFailure方法则会触发NodesFaultDetection.Listener.onNodeFailure回调,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法
  • ZenDiscovery的doStart方法及publish方法都会执行NodesFaultDetection的updateNodes方法来更新latestNodes,对于新的node则注册延时任务SendPingRequest
  • SendPingRequest执行成功时会重置retryCount并继续注册SendPingRequest的延时任务,如果是非TransportException则进行重试,重试次数超过限制则触发notifyNodeFailure,回调NodesFaultDetection.Listener.onNodeFailure方法,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法
  • ZenDiscovery的NodeFailureListener实现了NodesFaultDetection.Listener接口,其onNodeFailure回调执行的是handleNodeFailure方法,它会执行ProcessedClusterStateUpdateTask,将该node从currentState.nodes()中移除,然后判断masterNode数量是否满足minimumMasterNodes,不够的话会执行rejoin方法,够的话则执行allocationService.reroute

doc

  • elasticsearch reference 0.90