本文主要研究一下elasticsearch的SeedHostsResolver

ConfiguredHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

    public interface ConfiguredHostsResolver {        /**         * Attempt to resolve the configured unicast hosts list to a list of transport addresses.         *         * @param consumer Consumer for the resolved list. May not be called if an error occurs or if another resolution attempt is in         *                 progress.         */        void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer);    }
  • ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表

SeedHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java

public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {    public static final Setting<Integer> LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =        Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Setting.Property.NodeScope,            Setting.Property.Deprecated);    public static final Setting<TimeValue> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =        Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5),            Setting.Property.NodeScope, Setting.Property.Deprecated);    public static final Setting<Integer> DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING =        Setting.intSetting("discovery.seed_resolver.max_concurrent_resolvers", 10, 0, Setting.Property.NodeScope);    public static final Setting<TimeValue> DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING =        Setting.positiveTimeSetting("discovery.seed_resolver.timeout", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope);    private static final Logger logger = LogManager.getLogger(SeedHostsResolver.class);    private final Settings settings;    private final AtomicBoolean resolveInProgress = new AtomicBoolean();    private final TransportService transportService;    private final SeedHostsProvider hostsProvider;    private final SetOnce<ExecutorService> executorService = new SetOnce<>();    private final TimeValue resolveTimeout;    private final String nodeName;    private final int concurrentConnects;    public SeedHostsResolver(String nodeName, Settings settings, TransportService transportService,                             SeedHostsProvider seedProvider) {        this.settings = settings;        this.nodeName = nodeName;        this.transportService = transportService;        this.hostsProvider = seedProvider;        resolveTimeout = getResolveTimeout(settings);        concurrentConnects = getMaxConcurrentResolvers(settings);    }    public static int getMaxConcurrentResolvers(Settings settings) {        if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.exists(settings)) {            if (DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.exists(settings)) {                throw new IllegalArgumentException("it is forbidden to set both ["                    + DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.getKey() + "] and ["                    + LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.getKey() + "]");            }            return LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);        }        return DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.get(settings);    }    public static TimeValue getResolveTimeout(Settings settings) {        if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.exists(settings)) {            if (DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.exists(settings)) {                throw new IllegalArgumentException("it is forbidden to set both ["                    + DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey() + "] and ["                    + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.getKey() + "]");            }            return LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);        }        return DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.get(settings);    }    /**     * Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of     * addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up     * to the specified resolve timeout.     *     * @param executorService  the executor service used to parallelize hostname lookups     * @param logger           logger used for logging messages regarding hostname lookups     * @param hosts            the hosts to resolve     * @param limitPortCounts  the number of ports to resolve (should be 1 for non-local transport)     * @param transportService the transport service     * @param resolveTimeout   the timeout before returning from hostname lookups     * @return a list of resolved transport addresses     */    public static List<TransportAddress> resolveHostsLists(        final ExecutorService executorService,        final Logger logger,        final List<String> hosts,        final int limitPortCounts,        final TransportService transportService,        final TimeValue resolveTimeout) {        Objects.requireNonNull(executorService);        Objects.requireNonNull(logger);        Objects.requireNonNull(hosts);        Objects.requireNonNull(transportService);        Objects.requireNonNull(resolveTimeout);        if (resolveTimeout.nanos() < 0) {            throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");        }        // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete        final List<Callable<TransportAddress[]>> callables =            hosts                .stream()                .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))                .collect(Collectors.toList());        final List<Future<TransportAddress[]>> futures;        try {            futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            return Collections.emptyList();        }        final List<TransportAddress> transportAddresses = new ArrayList<>();        final Set<TransportAddress> localAddresses = new HashSet<>();        localAddresses.add(transportService.boundAddress().publishAddress());        localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));        // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the        // hostname with the corresponding task by iterating together        final Iterator<String> it = hosts.iterator();        for (final Future<TransportAddress[]> future : futures) {            final String hostname = it.next();            if (!future.isCancelled()) {                assert future.isDone();                try {                    final TransportAddress[] addresses = future.get();                    logger.trace("resolved host [{}] to {}", hostname, addresses);                    for (int addressId = 0; addressId < addresses.length; addressId++) {                        final TransportAddress address = addresses[addressId];                        // no point in pinging ourselves                        if (localAddresses.contains(address) == false) {                            transportAddresses.add(address);                        }                    }                } catch (final ExecutionException e) {                    assert e.getCause() != null;                    final String message = "failed to resolve host [" + hostname + "]";                    logger.warn(message, e.getCause());                } catch (InterruptedException e) {                    Thread.currentThread().interrupt();                    // ignore                }            } else {                logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);            }        }        return Collections.unmodifiableList(transportAddresses);    }    @Override    protected void doStart() {        logger.debug("using max_concurrent_resolvers [{}], resolver timeout [{}]", concurrentConnects, resolveTimeout);        final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_configured_hosts_resolver]");        executorService.set(EsExecutors.newScaling(nodeName + "/" + "unicast_configured_hosts_resolver",            0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, transportService.getThreadPool().getThreadContext()));    }    @Override    protected void doStop() {        ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS);    }    @Override    protected void doClose() {    }    @Override    public void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer) {        if (lifecycle.started() == false) {            logger.debug("resolveConfiguredHosts: lifecycle is {}, not proceeding", lifecycle);            return;        }        if (resolveInProgress.compareAndSet(false, true)) {            transportService.getThreadPool().generic().execute(new AbstractRunnable() {                @Override                public void onFailure(Exception e) {                    logger.debug("failure when resolving unicast hosts list", e);                }                @Override                protected void doRun() {                    if (lifecycle.started() == false) {                        logger.debug("resolveConfiguredHosts.doRun: lifecycle is {}, not proceeding", lifecycle);                        return;                    }                    List<TransportAddress> providedAddresses                        = hostsProvider.getSeedAddresses((hosts, limitPortCounts)                        -> resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts,                        transportService, resolveTimeout));                    consumer.accept(providedAddresses);                }                @Override                public void onAfter() {                    resolveInProgress.set(false);                }                @Override                public String toString() {                    return "SeedHostsResolver resolving unicast hosts list";                }            });        }    }}
  • SeedHostsResolver继承了AbstractLifecycleComponent,同时实现了ConfiguredHostsResolver接口;它提供了getMaxConcurrentResolvers、getResolveTimeout、resolveHostsLists(使用线程池并发执行transportService.addressesFromString)这几个静态方法
  • doStart方法使用EsExecutors.newScaling创建了EsThreadPoolExecutor;doStop方法则使用ThreadPool.terminate来终止线程池
  • resolveConfiguredHosts方法首先将resolveInProgress从false设置为true,之后通过transportService.getThreadPool()执行hostsProvider.getSeedAddresses,执行完成则设置resolveInProgress为false

小结

  • ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表
  • SeedHostsResolver继承了AbstractLifecycleComponent,同时实现了ConfiguredHostsResolver接口;它提供了getMaxConcurrentResolvers、getResolveTimeout、resolveHostsLists(使用线程池并发执行transportService.addressesFromString)这几个静态方法
  • doStart方法使用EsExecutors.newScaling创建了EsThreadPoolExecutor;doStop方法则使用ThreadPool.terminate来终止线程池;resolveConfiguredHosts方法首先将resolveInProgress从false设置为true,之后通过transportService.getThreadPool()执行hostsProvider.getSeedAddresses,执行完成则设置resolveInProgress为false

doc

  • SeedHostsResolver