玩转Elasticsearch源码-ActionModule启动分析

24次阅读

共计 20111 个字符,预计需要花费 51 分钟才能阅读完成。

ActionModule 的用途
org.elasticsearch.action.ActionModule 主要维护了请求和响应相关组件,包括客户端请求处理器(actions,restController)和过滤器(actionFilters),它们可能来自 ES 本身或者来自 plugin。
源码分析
构造方法
先看看构造方法,每一步都简单加了解释
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
CircuitBreakerService circuitBreakerService, UsageService usageService) {
this.transportClient = transportClient;// 服务端为 false,客户端为 true
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;// 将提供的索引表达式转换为实际的具体索引
this.indexScopedSettings = indexScopedSettings;//index 级别的配置
this.clusterSettings = clusterSettings;// 集群级别的配置
this.settingsFilter = settingsFilter;// 允许通过简单正则表达式模式或完整设置键筛选设置对象的类。它用于 rest 层上的响应过滤,例如过滤出 access keys 等敏感信息。
this.actionPlugins = actionPlugins;//actionPlugins 主要实现了 ActionPlugin 的 getActions()
actions = setupActions(actionPlugins);// 安装 actions
actionFilters = setupActionFilters(actionPlugins);// 从 ActionPlugin 中获取,后面在 TransportAction 里面 requestFilterChain 会去调用,以后再细讲
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);// 封装是否创建新索引的逻辑
destructiveOperations = new DestructiveOperations(settings, clusterSettings);// 帮助处理破坏性操作和通配符使用。
Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;//RestHandler 包装器,下面从 actionPlugins 里面遍历获取
for (ActionPlugin plugin : actionPlugins) {
UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
if (newRestWrapper != null) {
logger.debug(“Using REST wrapper from plugin ” + plugin.getClass().getName());
if (restWrapper != null) {
throw new IllegalArgumentException(“Cannot have more than one plugin implementing a REST wrapper”);
}
restWrapper = newRestWrapper;
}
}
if (transportClient) {// 客户端不需要暴露 http 服务
restController = null;
} else {// 服务端创建一个 RestController, 实现了 Dispatcher 接口,系统接受到请求后从 netty 的 handler 一路调用到 dispatchRequest 方法
restController = new RestController(settings, headers, restWrapper, nodeClient, circuitBreakerService, usageService);
}
}
首先设置了 settings 等参数,然后进入
setupActions 代码比较好理解,直接贴源码:
static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
// Subclass NamedRegistry for easy registration
class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
ActionRegistry() {
super(“action”);
}

public void register(ActionHandler<?, ?> handler) {
register(handler.getAction().name(), handler);
}

public <Request extends ActionRequest, Response extends ActionResponse> void register(
GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>… supportTransportActions) {
register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
}
ActionRegistry actions = new ActionRegistry();

actions.register(MainAction.INSTANCE, TransportMainAction.class);
actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);

actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
actions.register(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
actions.register(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
actions.register(ShrinkAction.INSTANCE, TransportShrinkAction.class);
actions.register(ResizeAction.INSTANCE, TransportResizeAction.class);
actions.register(RolloverAction.INSTANCE, TransportRolloverAction.class);
actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
actions.register(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
actions.register(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
actions.register(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
actions.register(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
actions.register(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
actions.register(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class,
TransportGetFieldMappingsIndexAction.class);
actions.register(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
actions.register(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
actions.register(FlushAction.INSTANCE, TransportFlushAction.class);
actions.register(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
actions.register(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
actions.register(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
actions.register(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
actions.register(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
actions.register(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
actions.register(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
actions.register(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);

actions.register(IndexAction.INSTANCE, TransportIndexAction.class);
actions.register(GetAction.INSTANCE, TransportGetAction.class);
actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class);
actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
TransportShardMultiTermsVectorAction.class);
actions.register(DeleteAction.INSTANCE, TransportDeleteAction.class);
actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);
actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class);

//Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class);
actions.register(DeleteStoredScriptAction.INSTANCE, TransportDeleteStoredScriptAction.class);

actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class,
TransportFieldCapabilitiesIndexAction.class);

actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
actions.register(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
actions.register(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);

actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);

return unmodifiableMap(actions.getRegistry());
}
注册了一大堆 Action 然后返回 Map<String, ActionHandler<?, ?>> 对象可以注意到 ActionHandler 包装了 action 和 transportAction 的 Class 对象。action 是对各个类型请求的 request 和 response 的包装,并非是真正的功能实现者,可以看到它只是提供了两个新建 response 和 request 的方法,及一个字 NAME 字段,这个 NAME 字段会用于后面 action 调用中。每个 action 对应的功能实现是在对应的 transportAction 中。
configure 方法
configure 方法由 ES 封装的注入器 Injector 调用,看看调用栈可以知道由 ES 启动时候 Node 构造方法里面发起的
injector = modules.createInjector();
configure 里面逻辑其实就是绑定一些对象到 Injector 中:
protected void configure() {
bind(ActionFilters.class).toInstance(actionFilters);
bind(DestructiveOperations.class).toInstance(destructiveOperations);

if (false == transportClient) {
// Supporting classes only used when not a transport client
bind(AutoCreateIndex.class).toInstance(autoCreateIndex);
bind(TransportLivenessAction.class).asEagerSingleton();

// register GenericAction -> transportAction Map used by NodeClient
@SuppressWarnings(“rawtypes”)
MapBinder<GenericAction, TransportAction> transportActionsBinder
= MapBinder.newMapBinder(binder(), GenericAction.class, TransportAction.class);
for (ActionHandler<?, ?> action : actions.values()) {
// bind the action as eager singleton, so the map binder one will reuse it
bind(action.getTransportAction()).asEagerSingleton();
transportActionsBinder.addBinding(action.getAction()).to(action.getTransportAction()).asEagerSingleton();
for (Class<?> supportAction : action.getSupportTransportActions()) {
bind(supportAction).asEagerSingleton();
}
}
}
}
initRestHandlers
initRestHandlers 是在 Node 构造方法里面直接调用的,主要是注册一大堆 RestHandler 到 restController 用于处理 http 请求
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
List<AbstractCatAction> catActions = new ArrayList<>();
Consumer<RestHandler> registerHandler = a -> {
if (a instanceof AbstractCatAction) {
catActions.add((AbstractCatAction) a);
}
};
registerHandler.accept(new RestMainAction(settings, restController));
registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter));
registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController));
registerHandler.accept(new RestNodesStatsAction(settings, restController));
registerHandler.accept(new RestNodesUsageAction(settings, restController));
registerHandler.accept(new RestNodesHotThreadsAction(settings, restController));
registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController));
registerHandler.accept(new RestClusterStatsAction(settings, restController));
registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter));
registerHandler.accept(new RestClusterHealthAction(settings, restController));
registerHandler.accept(new RestClusterUpdateSettingsAction(settings, restController));
registerHandler.accept(new RestClusterGetSettingsAction(settings, restController, clusterSettings, settingsFilter));
registerHandler.accept(new RestClusterRerouteAction(settings, restController, settingsFilter));
registerHandler.accept(new RestClusterSearchShardsAction(settings, restController));
registerHandler.accept(new RestPendingClusterTasksAction(settings, restController));
registerHandler.accept(new RestPutRepositoryAction(settings, restController));
registerHandler.accept(new RestGetRepositoriesAction(settings, restController, settingsFilter));
registerHandler.accept(new RestDeleteRepositoryAction(settings, restController));
registerHandler.accept(new RestVerifyRepositoryAction(settings, restController));
registerHandler.accept(new RestGetSnapshotsAction(settings, restController));
registerHandler.accept(new RestCreateSnapshotAction(settings, restController));
registerHandler.accept(new RestRestoreSnapshotAction(settings, restController));
registerHandler.accept(new RestDeleteSnapshotAction(settings, restController));
registerHandler.accept(new RestSnapshotsStatusAction(settings, restController));

registerHandler.accept(new RestGetAllAliasesAction(settings, restController));
registerHandler.accept(new RestGetAllMappingsAction(settings, restController));
registerHandler.accept(new RestGetAllSettingsAction(settings, restController, indexScopedSettings, settingsFilter));
registerHandler.accept(new RestGetIndicesAction(settings, restController, indexScopedSettings, settingsFilter));
registerHandler.accept(new RestIndicesStatsAction(settings, restController));
registerHandler.accept(new RestIndicesSegmentsAction(settings, restController));
registerHandler.accept(new RestIndicesShardStoresAction(settings, restController));
registerHandler.accept(new RestGetAliasesAction(settings, restController));
registerHandler.accept(new RestIndexDeleteAliasesAction(settings, restController));
registerHandler.accept(new RestIndexPutAliasAction(settings, restController));
registerHandler.accept(new RestIndicesAliasesAction(settings, restController));
registerHandler.accept(new RestCreateIndexAction(settings, restController));
registerHandler.accept(new RestShrinkIndexAction(settings, restController));
registerHandler.accept(new RestSplitIndexAction(settings, restController));
registerHandler.accept(new RestRolloverIndexAction(settings, restController));
registerHandler.accept(new RestDeleteIndexAction(settings, restController));
registerHandler.accept(new RestCloseIndexAction(settings, restController));
registerHandler.accept(new RestOpenIndexAction(settings, restController));

registerHandler.accept(new RestUpdateSettingsAction(settings, restController));
registerHandler.accept(new RestGetSettingsAction(settings, restController, indexScopedSettings, settingsFilter));

registerHandler.accept(new RestAnalyzeAction(settings, restController));
registerHandler.accept(new RestGetIndexTemplateAction(settings, restController));
registerHandler.accept(new RestPutIndexTemplateAction(settings, restController));
registerHandler.accept(new RestDeleteIndexTemplateAction(settings, restController));

registerHandler.accept(new RestPutMappingAction(settings, restController));
registerHandler.accept(new RestGetMappingAction(settings, restController));
registerHandler.accept(new RestGetFieldMappingAction(settings, restController));

registerHandler.accept(new RestRefreshAction(settings, restController));
registerHandler.accept(new RestFlushAction(settings, restController));
registerHandler.accept(new RestSyncedFlushAction(settings, restController));
registerHandler.accept(new RestForceMergeAction(settings, restController));
registerHandler.accept(new RestUpgradeAction(settings, restController));
registerHandler.accept(new RestClearIndicesCacheAction(settings, restController));

registerHandler.accept(new RestIndexAction(settings, restController));
registerHandler.accept(new RestGetAction(settings, restController));
registerHandler.accept(new RestGetSourceAction(settings, restController));
registerHandler.accept(new RestMultiGetAction(settings, restController));
registerHandler.accept(new RestDeleteAction(settings, restController));
registerHandler.accept(new org.elasticsearch.rest.action.document.RestCountAction(settings, restController));
registerHandler.accept(new RestTermVectorsAction(settings, restController));
registerHandler.accept(new RestMultiTermVectorsAction(settings, restController));
registerHandler.accept(new RestBulkAction(settings, restController));
registerHandler.accept(new RestUpdateAction(settings, restController));

registerHandler.accept(new RestSearchAction(settings, restController));
registerHandler.accept(new RestSearchScrollAction(settings, restController));
registerHandler.accept(new RestClearScrollAction(settings, restController));
registerHandler.accept(new RestMultiSearchAction(settings, restController));

registerHandler.accept(new RestValidateQueryAction(settings, restController));

registerHandler.accept(new RestExplainAction(settings, restController));

registerHandler.accept(new RestRecoveryAction(settings, restController));

// Scripts API
registerHandler.accept(new RestGetStoredScriptAction(settings, restController));
registerHandler.accept(new RestPutStoredScriptAction(settings, restController));
registerHandler.accept(new RestDeleteStoredScriptAction(settings, restController));

registerHandler.accept(new RestFieldCapabilitiesAction(settings, restController));

// Tasks API
registerHandler.accept(new RestListTasksAction(settings, restController, nodesInCluster));
registerHandler.accept(new RestGetTaskAction(settings, restController));
registerHandler.accept(new RestCancelTasksAction(settings, restController, nodesInCluster));

// Ingest API
registerHandler.accept(new RestPutPipelineAction(settings, restController));
registerHandler.accept(new RestGetPipelineAction(settings, restController));
registerHandler.accept(new RestDeletePipelineAction(settings, restController));
registerHandler.accept(new RestSimulatePipelineAction(settings, restController));

// CAT API
registerHandler.accept(new RestAllocationAction(settings, restController));
registerHandler.accept(new RestShardsAction(settings, restController));
registerHandler.accept(new RestMasterAction(settings, restController));
registerHandler.accept(new RestNodesAction(settings, restController));
registerHandler.accept(new RestTasksAction(settings, restController, nodesInCluster));
registerHandler.accept(new RestIndicesAction(settings, restController, indexNameExpressionResolver));
registerHandler.accept(new RestSegmentsAction(settings, restController));
// Fully qualified to prevent interference with rest.action.count.RestCountAction
registerHandler.accept(new org.elasticsearch.rest.action.cat.RestCountAction(settings, restController));
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
registerHandler.accept(new org.elasticsearch.rest.action.cat.RestRecoveryAction(settings, restController));
registerHandler.accept(new RestHealthAction(settings, restController));
registerHandler.accept(new org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction(settings, restController));
registerHandler.accept(new RestAliasAction(settings, restController));
registerHandler.accept(new RestThreadPoolAction(settings, restController));
registerHandler.accept(new RestPluginsAction(settings, restController));
registerHandler.accept(new RestFielddataAction(settings, restController));
registerHandler.accept(new RestNodeAttrsAction(settings, restController));
registerHandler.accept(new RestRepositoriesAction(settings, restController));
registerHandler.accept(new RestSnapshotAction(settings, restController));
registerHandler.accept(new RestTemplatesAction(settings, restController));
for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings,
settingsFilter, indexNameExpressionResolver, nodesInCluster)) {
registerHandler.accept(handler);
}
}
registerHandler.accept(new RestCatAction(settings, restController, catActions));
}
再深入去看 RestController 里面的注册逻辑可以看到是用 Trie 树来做路径匹配的,具体可以看 TrieNode 类。

正文完
 0