本文主要研究一下Elasticsearch的DiscoveryPlugin

DiscoveryPlugin

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java

public interface DiscoveryPlugin {    /**     * Override to add additional {@link NetworkService.CustomNameResolver}s.     * This can be handy if you want to provide your own Network interface name like _mycard_     * and implement by yourself the logic to get an actual IP address/hostname based on this     * name.     *     * For example: you could call a third party service (an API) to resolve _mycard_.     * Then you could define in elasticsearch.yml settings like:     *     * <pre>{@code     * network.host: _mycard_     * }</pre>     */    default NetworkService.CustomNameResolver getCustomNameResolver(Settings settings) {        return null;    }    /**     * Returns providers of seed hosts for discovery.     *     * The key of the returned map is the name of the host provider     * (see {@link org.elasticsearch.discovery.DiscoveryModule#DISCOVERY_SEED_PROVIDERS_SETTING}), and     * the value is a supplier to construct the host provider when it is selected for use.     *     * @param transportService Use to form the {@link org.elasticsearch.common.transport.TransportAddress} portion     *                         of a {@link org.elasticsearch.cluster.node.DiscoveryNode}     * @param networkService Use to find the publish host address of the current node     */    default Map<String, Supplier<SeedHostsProvider>> getSeedHostProviders(TransportService transportService,                                                                          NetworkService networkService) {        return Collections.emptyMap();    }    /**     * Returns a consumer that validate the initial join cluster state. The validator, unless <code>null</code> is called exactly once per     * join attempt but might be called multiple times during the lifetime of a node. Validators are expected to throw a     * {@link IllegalStateException} if the node and the cluster-state are incompatible.     */    default BiConsumer<DiscoveryNode,ClusterState> getJoinValidator() { return null; }}
  • DiscoveryPlugin定义了getCustomNameResolver、getSeedHostProviders、getJoinValidator三个default方法

GceDiscoveryPlugin

elasticsearch-7.0.1/plugins/discovery-gce/src/main/java/org/elasticsearch/plugin/discovery/gce/GceDiscoveryPlugin.java

public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Closeable {    /** Determines whether settings those reroutes GCE call should be allowed (for testing purposes only). */    private static final boolean ALLOW_REROUTE_GCE_SETTINGS =        Booleans.parseBoolean(System.getProperty("es.allow_reroute_gce_settings", "false"));    public static final String GCE = "gce";    protected final Settings settings;    private static final Logger logger = LogManager.getLogger(GceDiscoveryPlugin.class);    // stashed when created in order to properly close    private final SetOnce<GceInstancesService> gceInstancesService = new SetOnce<>();    static {        /*         * GCE's http client changes access levels because its silly and we         * can't allow that on any old stack so we pull it here, up front,         * so we can cleanly check the permissions for it. Without this changing         * the permission can fail if any part of core is on the stack because         * our plugin permissions don't allow core to "reach through" plugins to         * change the permission. Because that'd be silly.         */        Access.doPrivilegedVoid( () -> ClassInfo.of(HttpHeaders.class, true));    }    public GceDiscoveryPlugin(Settings settings) {        this.settings = settings;        logger.trace("starting gce discovery plugin...");    }    // overrideable for tests    protected GceInstancesService createGceInstancesService() {        return new GceInstancesServiceImpl(settings);    }    @Override    public Map<String, Supplier<SeedHostsProvider>> getSeedHostProviders(TransportService transportService,                                                                         NetworkService networkService) {        return Collections.singletonMap(GCE, () -> {            gceInstancesService.set(createGceInstancesService());            return new GceSeedHostsProvider(settings, gceInstancesService.get(), transportService, networkService);        });    }    @Override    public NetworkService.CustomNameResolver getCustomNameResolver(Settings settings) {        logger.debug("Register _gce_, _gce:xxx network names");        return new GceNameResolver(new GceMetadataService(settings));    }    @Override    public List<Setting<?>> getSettings() {        List<Setting<?>> settings = new ArrayList<>(            Arrays.asList(                // Register GCE settings                GceInstancesService.PROJECT_SETTING,                GceInstancesService.ZONE_SETTING,                GceSeedHostsProvider.TAGS_SETTING,                GceInstancesService.REFRESH_SETTING,                GceInstancesService.RETRY_SETTING,                GceInstancesService.MAX_WAIT_SETTING)        );        if (ALLOW_REROUTE_GCE_SETTINGS) {            settings.add(GceMetadataService.GCE_HOST);            settings.add(GceInstancesServiceImpl.GCE_ROOT_URL);        }        return Collections.unmodifiableList(settings);    }    @Override    public void close() throws IOException {        IOUtils.close(gceInstancesService.get());    }}
  • GceDiscoveryPlugin实现了DiscoveryPlugin接口,其getCustomNameResolver方法返回的是GceNameResolver,getSeedHostProviders方法返回的是GceSeedHostsProvider

DiscoveryModule

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

public class DiscoveryModule {    private static final Logger logger = LogManager.getLogger(DiscoveryModule.class);    public static final String ZEN_DISCOVERY_TYPE = "legacy-zen";    public static final String ZEN2_DISCOVERY_TYPE = "zen";    public static final String SINGLE_NODE_DISCOVERY_TYPE = "single-node";    public static final Setting<String> DISCOVERY_TYPE_SETTING =        new Setting<>("discovery.type", ZEN2_DISCOVERY_TYPE, Function.identity(), Property.NodeScope);    public static final Setting<List<String>> LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING =        Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(),            Property.NodeScope, Property.Deprecated);    public static final Setting<List<String>> DISCOVERY_SEED_PROVIDERS_SETTING =        Setting.listSetting("discovery.seed_providers", Collections.emptyList(), Function.identity(),            Property.NodeScope);    private final Discovery discovery;    public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,                           NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,                           ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,                           AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) {        final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();        final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();        hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));        hostProviders.put("file", () -> new FileBasedSeedHostsProvider(configFile));        for (DiscoveryPlugin plugin : plugins) {            plugin.getSeedHostProviders(transportService, networkService).forEach((key, value) -> {                if (hostProviders.put(key, value) != null) {                    throw new IllegalArgumentException("Cannot register seed provider [" + key + "] twice");                }            });            BiConsumer<DiscoveryNode, ClusterState> joinValidator = plugin.getJoinValidator();            if (joinValidator != null) {                joinValidators.add(joinValidator);            }        }        List<String> seedProviderNames = getSeedProviderNames(settings);        // for bwc purposes, add settings provider even if not explicitly specified        if (seedProviderNames.contains("settings") == false) {            List<String> extendedSeedProviderNames = new ArrayList<>();            extendedSeedProviderNames.add("settings");            extendedSeedProviderNames.addAll(seedProviderNames);            seedProviderNames = extendedSeedProviderNames;        }        final Set<String> missingProviderNames = new HashSet<>(seedProviderNames);        missingProviderNames.removeAll(hostProviders.keySet());        if (missingProviderNames.isEmpty() == false) {            throw new IllegalArgumentException("Unknown seed providers " + missingProviderNames);        }        List<SeedHostsProvider> filteredSeedProviders = seedProviderNames.stream()            .map(hostProviders::get).map(Supplier::get).collect(Collectors.toList());        String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);        final SeedHostsProvider seedHostsProvider = hostsResolver -> {            final List<TransportAddress> addresses = new ArrayList<>();            for (SeedHostsProvider provider : filteredSeedProviders) {                addresses.addAll(provider.getSeedAddresses(hostsResolver));            }            return Collections.unmodifiableList(addresses);        };        if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {            discovery = new Coordinator(NODE_NAME_SETTING.get(settings),                settings, clusterSettings,                transportService, namedWriteableRegistry, allocationService, masterService,                () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,                clusterApplier, joinValidators, new Random(Randomness.get().nextLong()));        } else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {            discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,                clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState);        } else {            throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");        }        logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames);    }    private List<String> getSeedProviderNames(Settings settings) {        if (LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING.exists(settings)) {            if (DISCOVERY_SEED_PROVIDERS_SETTING.exists(settings)) {                throw new IllegalArgumentException("it is forbidden to set both [" + DISCOVERY_SEED_PROVIDERS_SETTING.getKey() + "] and ["                    + LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING.getKey() + "]");            }            return LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);        }        return DISCOVERY_SEED_PROVIDERS_SETTING.get(settings);    }    public Discovery getDiscovery() {        return discovery;    }}
  • DiscoveryModule的构造器接收DiscoveryPlugin类型的list,然后遍历该列表将其SeedHostsProvider添加到hostProviders中,将joinValidator添加到joinValidators中

小结

  • DiscoveryPlugin定义了getCustomNameResolver、getSeedHostProviders、getJoinValidator三个default方法
  • GceDiscoveryPlugin实现了DiscoveryPlugin接口,其getCustomNameResolver方法返回的是GceNameResolver,getSeedHostProviders方法返回的是GceSeedHostsProvider
  • DiscoveryModule的构造器接收DiscoveryPlugin类型的list,然后遍历该列表将其SeedHostsProvider添加到hostProviders中,将joinValidator添加到joinValidators中

doc

  • DiscoveryPlugin