玩转Elasticsearch源码-一图看懂ES启动流程

26次阅读

共计 9876 个字符,预计需要花费 25 分钟才能阅读完成。

开篇
直接看图
上图中虚线表示进入具体流程,实线表示下一步,为了后面讲解方便每个步骤都加了编号。先简单介绍下启动流程主要涉及的类:

org.elasticsearch.bootstrap.Elasticsearch: 启动入口,main 方法就在这个类里面,执行逻辑对应图中绿色部分
org.elasticsearch.bootstrap.Bootstrap: 包含主要启动流程代码,执行逻辑对应图中红色部分
org.elasticsearch.node.Node: 代表集群中的节点,执行逻辑对应图中蓝色部分

流程讲解
1. main 方法
2. 设置了一个空的 SecurityManager:
// we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the // presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy)// 我们希望 JVM 认为已经安装了一个安全管理器,这样,如果基于安全管理器的存在或缺少安全管理器的内部策略决策就会像有一个安全管理器一样(e.g.、DNS 缓存策略) // grant all permissions so that we can later set the security manager to the one that we want// 授予所有权限,以便稍后可以将安全管理器设置为所需的权限
添加 StatusConsoleListener 到 STATUS_LOGGER:
We want to detect situations where we touch logging before the configuration is loaded . If we do this , Log 4 j will status log an error message at the error level . With this error listener , we can capture if this happens . More broadly , we can detect any error – level status log message which likely indicates that something is broken . The listener is installed immediately on startup , and then when we get around to configuring logging we check that no error – level log messages have been logged by the status logger . If they have we fail startup and any such messages can be seen on the console 我们希望检测在加载配置之前进行日志记录的情况。如果这样做,log4j 将在错误级别记录一条错误消息。使用这个错误监听器,我们可以捕捉到这种情况。更广泛地说,我们可以检测任何错误级别的状态日志消息,这些消息可能表示某个东西坏了。侦听器在启动时立即安装,然后在配置日志记录时,我们检查状态日志记录器没有记录错误级别的日志消息。如果它们启动失败,我们可以在控制台上看到任何此类消息。
实例化 Elasticsearch:
Elasticsearch() {
super(“starts elasticsearch”, () -> {}); // () -> {} 是启动前的回调
// 下面解析 version,daemonize,pidfile,quiet 参数
versionOption = parser.acceptsAll(Arrays.asList(“V”, “version”),
“Prints elasticsearch version information and exits”);
daemonizeOption = parser.acceptsAll(Arrays.asList(“d”, “daemonize”),
“Starts Elasticsearch in the background”)
.availableUnless(versionOption);
pidfileOption = parser.acceptsAll(Arrays.asList(“p”, “pidfile”),
“Creates a pid file in the specified path on start”)
.availableUnless(versionOption)
.withRequiredArg()
.withValuesConvertedBy(new PathConverter());
quietOption = parser.acceptsAll(Arrays.asList(“q”, “quiet”),
“Turns off standard output/error streams logging in console”)
.availableUnless(versionOption)
.availableUnless(daemonizeOption);
}
3. 注册 ShutdownHook,用于关闭系统时捕获 IOException 到 terminal
shutdownHookThread = new Thread(() -> {
try {
this.close();
} catch (final IOException e) {
try (
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
e.printStackTrace(pw);
terminal.println(sw.toString());
} catch (final IOException impossible) {
// StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter
// say that an exception here is impossible
throw new AssertionError(impossible);
}
}
});
Runtime.getRuntime().addShutdownHook(shutdownHookThread);
然后调用 beforeMain.run(),其实就是上面实例化 Elasticsearch 对象时创建的()->{} lambda 表达式。
4. 进入 Command 类的 mainWithoutErrorHandling 方法
void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
final OptionSet options = parser.parse(args);// 根据提供给解析器的选项规范解析给定的命令行参数

if (options.has(helpOption)) {
printHelp(terminal);
return;
}

if (options.has(silentOption)) {//terminal 打印最少内容
terminal.setVerbosity(Terminal.Verbosity.SILENT);
} else if (options.has(verboseOption)) {//terminal 打印详细内容
terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
} else {
terminal.setVerbosity(Terminal.Verbosity.NORMAL);
}

execute(terminal, options);
}
5. 进入 EnvironmentAwareCommand 的 execute 方法
protected void execute(Terminal terminal, OptionSet options) throws Exception {
final Map<String, String> settings = new HashMap<>();
for (final KeyValuePair kvp : settingOption.values(options)) {
if (kvp.value.isEmpty()) {
throw new UserException(ExitCodes.USAGE, “setting [” + kvp.key + “] must not be empty”);
}
if (settings.containsKey(kvp.key)) {
final String message = String.format(
Locale.ROOT,
“setting [%s] already set, saw [%s] and [%s]”,
kvp.key,
settings.get(kvp.key),
kvp.value);
throw new UserException(ExitCodes.USAGE, message);
}
settings.put(kvp.key, kvp.value);
}

// 确保给定的设置存在,如果尚未设置,则从系统属性中读取它。
putSystemPropertyIfSettingIsMissing(settings, “path.data”, “es.path.data”);
putSystemPropertyIfSettingIsMissing(settings, “path.home”, “es.path.home”);
putSystemPropertyIfSettingIsMissing(settings, “path.logs”, “es.path.logs”);

execute(terminal, options, createEnv(terminal, settings));
}
6. 进入 InternalSettingsPreparer 的 prepareEnvironment 方法,读取 elasticsearch.yml 并创建 Environment。细节比较多,后面再细讲。

7. 判断是否有 - v 参数,没有则准备进入 init 流程
protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
if (options.nonOptionArguments().isEmpty() == false) {
throw new UserException(ExitCodes.USAGE, “Positional arguments not allowed, found ” + options.nonOptionArguments());
}
if (options.has(versionOption)) {// 如果有 -v 参数,打印版本号后直接退出
terminal.println(“Version: ” + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot())
+ “, Build: ” + Build.CURRENT.shortHash() + “/” + Build.CURRENT.date()
+ “, JVM: ” + JvmInfo.jvmInfo().version());
return;
}

final boolean daemonize = options.has(daemonizeOption);
final Path pidFile = pidfileOption.value(options);
final boolean quiet = options.has(quietOption);

try {
init(daemonize, pidFile, quiet, env);
} catch (NodeValidationException e) {
throw new UserException(ExitCodes.CONFIG, e.getMessage());
}
}
8. 调用 Bootstrap.init
9. 实例化 Boostrap。保持 keepAliveThread 存活,可能是用于监控
Bootstrap() {
keepAliveThread = new Thread(new Runnable() {
@Override
public void run() {
try {
keepAliveLatch.await();
} catch (InterruptedException e) {
// bail out
}
}
}, “elasticsearch[keepAlive/” + Version.CURRENT + “]”);
keepAliveThread.setDaemon(false);
// keep this thread alive (non daemon thread) until we shutdown 保持这个线程存活(非守护进程线程),直到我们关机
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
keepAliveLatch.countDown();
}
});
}
10. 加载 elasticsearch.keystore 文件,重新创建 Environment,然后调用 LogConfigurator 的静态方法 configure,读取 config 目录下 log4j2.properties 然后配 log4j 属性
11. 创建 pid 文件,检查 lucene 版本,不对应则抛出异常
private static void checkLucene() {
if (Version.CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) == false) {
throw new AssertionError(“Lucene version mismatch this version of Elasticsearch requires lucene version [”
+ Version.CURRENT.luceneVersion + “] but the current lucene version is [” + org.apache.lucene.util.Version.LATEST + “]”);
}
}
12. 设置 ElasticsearchUncaughtExceptionHandler 用于打印 fatal 日志
// install the default uncaught exception handler; must be done before security is
// initialized as we do not want to grant the runtime permission
// 安装默认未捕获异常处理程序; 必须在初始化 security 之前完成,因为我们不想授予运行时权限
// setDefaultUncaughtExceptionHandler
Thread.setDefaultUncaughtExceptionHandler(
new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));
13. 进入 Boostrap.setup
14.spawner.spawnNativePluginControllers(environment);尝试为给定模块生成控制器 (native Controller) 守护程序。生成的进程将通过其 stdin,stdout 和 stderr 流保持与此 JVM 的连接,但对此包之外的代码不能使用对这些流的引用。
15. 初始化本地资源 initializeNatives():
检查用户是否作为根用户运行,是的话抛异常;系统调用和 mlockAll 检查;尝试设置最大线程数,最大虚拟内存,最大 FD 等。初始化探针 initializeProbes(),用于操作系统,进程,jvm 的监控。
16. 又加一个 ShutdownHook
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
} catch (IOException ex) {
throw new ElasticsearchException(“failed to stop node”, ex);
}
}
});
}
17. 比较简单,直接看代码
try {
// look for jar hell
JarHell.checkJarHell();
} catch (IOException | URISyntaxException e) {
throw new BootstrapException(e);
}

// Log ifconfig output before SecurityManager is installed
IfConfig.logIfNecessary();

// install SM after natives, shutdown hooks, etc.
try {
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
} catch (IOException | NoSuchAlgorithmException e) {
throw new BootstrapException(e);
}
18. 实例化 Node
重写 validateNodeBeforeAcceptingRequests 方法。具体主要包括三部分,第一是启动插件服务(es 提供了插件功能来进行扩展功能,这也是它的一个亮点),加载需要的插件,第二是配置 node 环境,最后就是通过 guice 加载各个模块。下面 22~32 就是具体步骤。
19. 进入 Boostrap.start
20.node.start 启动节点
21.keepAliveThread.start
22.Node 实例化第一步,创建 NodeEnvironment

23. 生成 nodeId,打印 nodeId,nodeName 和 jvmInfo 和进程信息
24. 创建 PluginsService 对象,创建过程中会读取并加载所有的模块和插件
25. 又创建 Environment
// create the environment based on the finalized (processed) view of the settings 根据设置的最终 (处理) 视图创建环境
// this is just to makes sure that people get the same settings, no matter where they ask them from 这只是为了确保人们得到相同的设置,无论他们从哪里询问
this.environment = new Environment(this.settings, environment.configFile());
26. 创建 ThreadPool,然后给 DeprecationLogger 设置 ThreadContext
27. 创建 NodeClient,用于执行 actions
28. 创建各个 Service:
ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService
29. 创建并添加 modules:
ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule
30.Guice 绑定和注入对象
31. 初始化 NodeClient
client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
() -> clusterService.localNode().getId());
32. 初始化 rest 处理器,这个非常重要,后面会专门讲解
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug(“initializing HTTP handlers …”); // 初始化 http handler
actionModule.initRestHandlers(() -> clusterService.state().nodes());
}
33. 修改状态为 State.STARTED
34. 启动 pluginLifecycleComponents
35. 通过 injector 获取各个类的对象,调用 start() 方法启动(实际进入各个类的中 doStart 方法)
LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService
36. 启动 HttpServerTransport 和 TransportService 并绑定端口
if (WRITE_PORTS_FILE_SETTING.get(settings)) {
if (NetworkModule.HTTP_ENABLED.get(settings)) {
HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
writePortsFile(“http”, http.boundAddress());
}
TransportService transport = injector.getInstance(TransportService.class);
writePortsFile(“transport”, transport.boundAddress());
}
总结

本文只是讲解了 ES 启动的整体流程,其中很多细节会在本系列继续深入讲解
ES 的源码读起来还是比较费劲的,流程比较长,没有 Spring 源码读起来体验好,这也是开源软件和开源框架的区别之一,前者会遇到大量的流程细节,注重具体功能的实现,后者有大量扩展点,更注重扩展性。
为什么要读开源源码?

1. 知道底层实现,能够更好地使用,出问题能够快速定位和解决。2. 学习别人优秀的代码和处理问题的方式,提高自己的系统设计能力。3. 有机会可以对其进行扩展和改造。

正文完
 0