因为 ES 曾经存在多个版本,次要是每一个版本的启动流程都不一样,我这里不想独自去剖析某一个版本如何进行启动的,解析 ES 如何去响应 HTTP 申请的,以及背地如何去实现。上面简略给大家剖析下,HTTP 服务器实现。
HTTP Server
Elasticsearch Netty 注册服务器 Netty4HttpServerTransport
protected void doStart() {
boolean success = false;
try {serverBootstrap = new ServerBootstrap();
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
// NettyAllocator will return the channel type designed to work with the configuredAllocator
serverBootstrap.channel(NettyAllocator.getServerChannelType());
// Set the allocators for both the server channel and the child channels created
serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
// 省略局部代码
final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
if (tcpSendBufferSize.getBytes() > 0) {serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
}
final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.getBytes() > 0) {serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
}
serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
// 绑定端口和地址
bindServer();
success = true;
} finally {if (success == false) {doStop(); // otherwise we leak threads since we never moved to started
}
}
}
用过 Netty 晓得下面代码什么意思,设置 worker 线程,TCP 设置,设置管道 handler。Netty 的连贯进去个别都是在 childHandler() 设置 ChannelInitializer 实现类中增加,看下 configureServerChannelHandler() 次要初始化了 HttpChannelHandler,在 initChannel() 能看到增加了那个处理器。
protected void initChannel(Channel ch) throws Exception {Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(handlingSettings.getMaxInitialLineLength(),
handlingSettings.getMaxHeaderSize(),
handlingSettings.getMaxChunkSize());
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline().addLast("aggregator", aggregator);
if (handlingSettings.isCompression()) {ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
if (handlingSettings.isCorsEnabled()) {ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
}
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
}
从下面代码晓得解决申请的是:requestHandler,它的实现类:Netty4HttpRequestHandler
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
boolean success = false;
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
try {if (request.decoderResult().isFailure()) {Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {serverTransport.incomingRequest(httpRequest, channel);
}
success = true;
} finally {if (success == false) {httpRequest.release();
}
能够看出解决 http 申请的办法,委派了 Netty4HttpServerTransport,也就是下面进行 Netty server 的类。这里的逻辑将 httpRequest,channel 转换成 Elasticsearch 模板对象,屏蔽掉底层 api,再从线程池中获取 ThreadContext 进行工作执行。相似一个 http 散发器。具体代码就不展现进去,一连串的办法调用,看下图
TransportAction.doExecute 是一个形象办法,由 NodeClient.transportAction 返回的实现类去调用执行。每一个 URL 都会有对应的 transportAction 实现类,这个和咱们平时 MVC 架构不一样。NodeClient 内置了 Map<ActionType, TransportAction> actions,外面蕴含所有 HTTP 申请解决办法,有 300 多个值对应不同场景的解决。
看下最简略的响应,当我申请 ES:9200 端口时,返回根底信息, 由 TransportMainAction 如何响应的
public class TransportMainAction extends HandledTransportAction<MainRequest, MainResponse> {
private final String nodeName;
private final ClusterService clusterService;
@Inject
public TransportMainAction(Settings settings, TransportService transportService,
ActionFilters actionFilters, ClusterService clusterService) {super(MainAction.NAME, transportService, actionFilters, MainRequest::new);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.clusterService = clusterService;
}
@Override
protected void doExecute(Task task, MainRequest request, ActionListener<MainResponse> listener) {ClusterState clusterState = clusterService.state();
listener.onResponse(new MainResponse(nodeName, Version.CURRENT, clusterState.getClusterName(),
clusterState.metaData().clusterUUID(), Build.CURRENT));
}
}
总结
剖析这么多代码,ES 解决 HTTP 申请链尽管是执行过程比拟绕,然而理论代码还是比较简单,整体还是去剖析 TransportAction.doExecute 如何响应申请的。