因为ES曾经存在多个版本,次要是每一个版本的启动流程都不一样,我这里不想独自去剖析某一个版本如何进行启动的,解析ES如何去响应HTTP申请的,以及背地如何去实现。上面简略给大家剖析下,HTTP服务器实现。

HTTP Server

Elasticsearch Netty注册服务器 Netty4HttpServerTransport

 protected void doStart() {        boolean success = false;        try {            serverBootstrap = new ServerBootstrap();            serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,                HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));            // NettyAllocator will return the channel type designed to work with the configuredAllocator            serverBootstrap.channel(NettyAllocator.getServerChannelType());            // Set the allocators for both the server channel and the child channels created            serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());            serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());            serverBootstrap.childHandler(configureServerChannelHandler());            serverBootstrap.handler(new ServerChannelExceptionHandler(this));            serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));          // 省略局部代码            final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);            if (tcpSendBufferSize.getBytes() > 0) {                serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));            }            final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);            if (tcpReceiveBufferSize.getBytes() > 0) {                serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));            }            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);            final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);            serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);            serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);            // 绑定端口和地址            bindServer();            success = true;        } finally {            if (success == false) {                doStop(); // otherwise we leak threads since we never moved to started            }        }    }

用过Netty晓得下面代码什么意思,设置worker线程,TCP设置,设置管道handler。Netty的连贯进去个别都是在childHandler()设置ChannelInitializer 实现类中增加,看下configureServerChannelHandler()次要初始化了HttpChannelHandler,在initChannel()能看到增加了那个处理器。

        protected void initChannel(Channel ch) throws Exception {            Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);            ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);            ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));            final HttpRequestDecoder decoder = new HttpRequestDecoder(                handlingSettings.getMaxInitialLineLength(),                handlingSettings.getMaxHeaderSize(),                handlingSettings.getMaxChunkSize());            decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);            ch.pipeline().addLast("decoder", decoder);            ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());            ch.pipeline().addLast("encoder", new HttpResponseEncoder());            final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());            aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);            ch.pipeline().addLast("aggregator", aggregator);            if (handlingSettings.isCompression()) {                ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));            }            if (handlingSettings.isCorsEnabled()) {                ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));            }            ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));            ch.pipeline().addLast("handler", requestHandler);            transport.serverAcceptedChannel(nettyHttpChannel);        }

从下面代码晓得解决申请的是: requestHandler,它的实现类: Netty4HttpRequestHandler

    protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {        Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();        FullHttpRequest request = msg.getRequest();        boolean success = false;        Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());        try {            if (request.decoderResult().isFailure()) {                Throwable cause = request.decoderResult().cause();                if (cause instanceof Error) {                    ExceptionsHelper.maybeDieOnAnotherThread(cause);                    serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));                } else {                    serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);                }            } else {                serverTransport.incomingRequest(httpRequest, channel);            }            success = true;        } finally {            if (success == false) {                httpRequest.release();            }

能够看出解决http申请的办法,委派了Netty4HttpServerTransport,也就是下面进行Netty server的类。这里的逻辑将httpRequest,channel 转换成Elasticsearch 模板对象,屏蔽掉底层api,再从线程池中获取ThreadContext进行工作执行。相似一个http散发器。具体代码就不展现进去,一连串的办法调用,看下图

TransportAction.doExecute是一个形象办法,由NodeClient.transportAction返回的实现类去调用执行。每一个URL都会有对应的transportAction实现类,这个和咱们平时MVC架构不一样。NodeClient内置了Map<ActionType, TransportAction> actions,外面蕴含所有HTTP申请解决办法,有300多个值对应不同场景的解决。
看下最简略的响应,当我申请ES:9200端口时,返回根底信息,由TransportMainAction如何响应的

public class TransportMainAction extends HandledTransportAction<MainRequest, MainResponse> {    private final String nodeName;    private final ClusterService clusterService;    @Inject    public TransportMainAction(Settings settings, TransportService transportService,                               ActionFilters actionFilters, ClusterService clusterService) {        super(MainAction.NAME, transportService, actionFilters, MainRequest::new);        this.nodeName = Node.NODE_NAME_SETTING.get(settings);        this.clusterService = clusterService;    }    @Override    protected void doExecute(Task task, MainRequest request, ActionListener<MainResponse> listener) {        ClusterState clusterState = clusterService.state();        listener.onResponse(            new MainResponse(nodeName, Version.CURRENT, clusterState.getClusterName(),                    clusterState.metaData().clusterUUID(), Build.CURRENT));    }}

总结

剖析这么多代码, ES解决HTTP申请链尽管是执行过程比拟绕,然而理论代码还是比较简单,整体还是去剖析TransportAction.doExecute如何响应申请的。