本文主要研究一下Elasticsearch RestClient的DeadHostState

DeadHostState

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java

/** * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and * when the host should be retried (based on number of previous failed attempts). * Class is immutable, a new copy of it should be created each time the state has to be changed. */final class DeadHostState implements Comparable<DeadHostState> {    private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);    static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);    private final int failedAttempts;    private final long deadUntilNanos;    private final TimeSupplier timeSupplier;    /**     * Build the initial dead state of a host. Useful when a working host stops functioning     * and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.     *     * @param timeSupplier a way to supply the current time and allow for unit testing     */    DeadHostState(TimeSupplier timeSupplier) {        this.failedAttempts = 1;        this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;        this.timeSupplier = timeSupplier;    }    /**     * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence     * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait     * to retry that same host again. Minimum is 1 minute (for a node the only failed once created     * through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)     *     * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt     */    DeadHostState(DeadHostState previousDeadHostState) {        long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),                MAX_CONNECTION_TIMEOUT_NANOS);        this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos;        this.failedAttempts = previousDeadHostState.failedAttempts + 1;        this.timeSupplier = previousDeadHostState.timeSupplier;    }    /**     * Indicates whether it's time to retry to failed host or not.     *     * @return true if the host should be retried, false otherwise     */    boolean shallBeRetried() {        return timeSupplier.nanoTime() - deadUntilNanos > 0;    }    /**     * Returns the timestamp (nanos) till the host is supposed to stay dead without being retried.     * After that the host should be retried.     */    long getDeadUntilNanos() {        return deadUntilNanos;    }    int getFailedAttempts() {        return failedAttempts;    }    @Override    public int compareTo(DeadHostState other) {        if (timeSupplier != other.timeSupplier) {            throw new IllegalArgumentException("can't compare DeadHostStates with different clocks ["                    + timeSupplier + " != " + other.timeSupplier + "]");        }        return Long.compare(deadUntilNanos, other.deadUntilNanos);    }    @Override    public String toString() {        return "DeadHostState{" +                "failedAttempts=" + failedAttempts +                ", deadUntilNanos=" + deadUntilNanos +                ", timeSupplier=" + timeSupplier +                '}';    }    /**     * Time supplier that makes timing aspects pluggable to ease testing     */    interface TimeSupplier {        TimeSupplier DEFAULT = new TimeSupplier() {            @Override            public long nanoTime() {                return System.nanoTime();            }            @Override            public String toString() {                return "nanoTime";            }        };        long nanoTime();    }}
  • DeadHostState有failedAttempts、deadUntilNanos、timeSupplier三个属性,它提供了两类构造器,一类是根据timeSupplier来构造,一类是根据previousDeadHostState来构造
  • 根据previousDeadHostState来构造时会重新计算deadUntilNanos,递增failedAttempts;其中deadUntilNanos为timeSupplier.nanoTime() + timeoutNanos,而timeoutNanos的计算公式为(long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1)
  • DeadHostState提供了shallBeRetried方法,其判断逻辑为timeSupplier.nanoTime() - deadUntilNanos > 0;DeadHostState实现了Comparable的compareTo方法,它是根据deadUntilNanos来进行比较

RestClient

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.java

public class RestClient implements Closeable {    //......    /**     * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones     * if the previous attempt failed and so on. Package private for testing.     */    static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,                                      AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {        /*         * Sort the nodes into living and dead lists.         */        List<Node> livingNodes = new ArrayList<>(Math.max(0, nodeTuple.nodes.size() - blacklist.size()));        List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());        for (Node node : nodeTuple.nodes) {            DeadHostState deadness = blacklist.get(node.getHost());            if (deadness == null) {                livingNodes.add(node);                continue;            }            if (deadness.shallBeRetried()) {                livingNodes.add(node);                continue;            }            deadNodes.add(new DeadNode(node, deadness));        }        if (false == livingNodes.isEmpty()) {            /*             * Normal state: there is at least one living node. If the             * selector is ok with any over the living nodes then use them             * for the request.             */            List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);            nodeSelector.select(selectedLivingNodes);            if (false == selectedLivingNodes.isEmpty()) {                /*                 * Rotate the list using a global counter as the distance so subsequent                 * requests will try the nodes in a different order.                 */                Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());                return selectedLivingNodes;            }        }        /*         * Last resort: there are no good nodes to use, either because         * the selector rejected all the living nodes or because there aren't         * any living ones. Either way, we want to revive a single dead node         * that the NodeSelectors are OK with. We do this by passing the dead         * nodes through the NodeSelector so it can have its say in which nodes         * are ok. If the selector is ok with any of the nodes then we will take         * the one in the list that has the lowest revival time and try it.         */        if (false == deadNodes.isEmpty()) {            final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);            /*             * We'd like NodeSelectors to remove items directly from deadNodes             * so we can find the minimum after it is filtered without having             * to compare many things. This saves us a sort on the unfiltered             * list.             */            nodeSelector.select(new Iterable<Node>() {                @Override                public Iterator<Node> iterator() {                    return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());                }            });            if (false == selectedDeadNodes.isEmpty()) {                return singletonList(Collections.min(selectedDeadNodes).node);            }        }        throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, "                + "living " + livingNodes + " and dead " + deadNodes);    }    /**     * Called after each successful request call.     * Receives as an argument the host that was used for the successful request.     */    private void onResponse(Node node) {        DeadHostState removedHost = this.blacklist.remove(node.getHost());        if (logger.isDebugEnabled() && removedHost != null) {            logger.debug("removed [" + node + "] from blacklist");        }    }    /**     * Called after each failed attempt.     * Receives as an argument the host that was used for the failed attempt.     */    private void onFailure(Node node) {        while(true) {            DeadHostState previousDeadHostState =                blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT));            if (previousDeadHostState == null) {                if (logger.isDebugEnabled()) {                    logger.debug("added [" + node + "] to blacklist");                }                break;            }            if (blacklist.replace(node.getHost(), previousDeadHostState,                    new DeadHostState(previousDeadHostState))) {                if (logger.isDebugEnabled()) {                    logger.debug("updated [" + node + "] already in blacklist");                }                break;            }        }        failureListener.onFailure(node);    }    //......}
  • RestClient的selectNodes方法首先将不在blacklist中的node,或者在blacklist中但是shallBeRetried返回为true的node归为livingNodes,之后通过nodeSelector来从这些livingNodes中选择一个node
  • RestClient的onResponse方法会将该node的host从blacklist中移除
  • RestClient的onFailure方法会往blacklist创建或更新host对应的DeadHostState,如果之前该host没有DeadHostState则使用TimeSupplier.DEFAULT创建一个新的并放入blacklist,如果该host已经有DeadHostState则使用该DeadHostState创建新的DeadHostState然后更新到blacklist中

小结

  • DeadHostState有failedAttempts、deadUntilNanos、timeSupplier三个属性,它提供了两类构造器,一类是根据timeSupplier来构造,一类是根据previousDeadHostState来构造;DeadHostState提供了shallBeRetried方法,其判断逻辑为timeSupplier.nanoTime() - deadUntilNanos > 0
  • 根据previousDeadHostState来构造DeadHostState时会重新计算deadUntilNanos,递增failedAttempts;其中deadUntilNanos为timeSupplier.nanoTime() + timeoutNanos,而timeoutNanos的计算公式为(long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1)
  • RestClient的selectNodes方法首先将不在blacklist中的node,或者在blacklist中但是shallBeRetried返回为true的node归为livingNodes,之后通过nodeSelector来从这些livingNodes中选择一个node;RestClient的onResponse方法会将该node的host从blacklist中移除;RestClient的onFailure方法会往blacklist创建或更新host对应的DeadHostState,如果之前该host没有DeadHostState则使用TimeSupplier.DEFAULT创建一个新的并放入blacklist,如果该host已经有DeadHostState则使用该DeadHostState创建新的DeadHostState然后更新到blacklist中

doc

  • DeadHostState