乐趣区

关于tomcat:请求在tomcat中的流转源码分析篇

Connector 的初始化

catalina 解析 server.xml 是通过 degister 来实现的,degister 解析到 <Connector 标签后做的事件如下代码所见

ConnectorCreateRule

@Override
    public void begin(String namespace, String name, Attributes attributes)
            throws Exception {Service svc = (Service)digester.peek();
        Executor ex = null;
        if (attributes.getValue("executor")!=null ) {ex = svc.getExecutor(attributes.getValue("executor"));
        }
        Connector con = new Connector(attributes.getValue("protocol"));
        if (ex != null) {setExecutor(con, ex);
        }
        String sslImplementationName = attributes.getValue("sslImplementationName");
        if (sslImplementationName != null) {setSSLImplementationName(con, sslImplementationName);
        }
        digester.push(con);
    }

connector 依据标签属性,拿到对应的 protocol 协定,拿到配在 service 标签外部的线程池,protocol 的名称转化成 Connector 中 ProtocolHandler 类型的成员变量, 后续将以 Http11NioProtocol 来做解说

public Connector(String protocol) {setProtocol(protocol);
        // Instantiate protocol handler
        ProtocolHandler p = null;
        try {Class<?> clazz = Class.forName(protocolHandlerClassName);
            // 反射调用 ProtocolHandler 的构造方法的时候会做后续的初始化
            p = (ProtocolHandler) clazz.getConstructor().newInstance();
        } catch (Exception e) {log.error(sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"), e);
        } finally {this.protocolHandler = p;}

ProtocolHandler 的结构

ProtocolHandler 有其形象办法,Http11NioProtocol 构造方法中的 tcp 实现由 NioEndpoint 来做,因而 Connector 结构起来的时候,对应的 ProtocolHanlder、endpoint 的关联关系曾经关联好

public AbstractProtocol(AbstractEndpoint<S, ?> endpoint) {
        this.endpoint = endpoint;
        ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
        setHandler(cHandler);
        getEndpoint().setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

结构后的 init 办法

在解析 server.xml 的根本层次结构,成员变量填充残缺后,须要调用生命周期的 init 办法

org/apache/catalina/startup/Catalina.load       getServer().init(); 
-----> connector.init();    -------------->  protocolHandler.init();
----------> endpoint.init(); ------------->

看一下 NioEndpoint 的 init 办法

 protected void initServerSocket() throws Exception {if (getUseInheritedChannel()) {
            // Retrieve the channel provided by the OS
            Channel ic = System.inheritedChannel();
            if (ic instanceof ServerSocketChannel) {serverSock = (ServerSocketChannel) ic;
            }
            if (serverSock == null) {throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
            }
        } else {
//    绑定服务端的传输层端口
            serverSock = ServerSocketChannel.open();
            socketProperties.setProperties(serverSock.socket());
            InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
            serverSock.socket().bind(addr,getAcceptCount());
        }
        serverSock.configureBlocking(true); //mimic APR behavior
    }

建设 Nio 的 Acceptor 线程和 Selector 事件线程

org.apache.tomcat.util.net.NioEndpoint#startInternal

public void startInternal() throws Exception {if (!running) {
            running = true;
            paused = false;

            if (socketProperties.getProcessorCache() != 0) {
                processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getProcessorCache());
            }
            if (socketProperties.getEventCache() != 0) {
                eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
            }
            if (socketProperties.getBufferPool() != 0) {
                nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getBufferPool());
            }

            // Create worker collection
            if (getExecutor() == null) {
            //   创立 Tomcat 默认线程池,未手动配置  线程数 10 - 200
                createExecutor();}

            // 这个是 tomcat 的连接数限制器
            initializeConnectionLatch();

            // Start poller thread
            poller = new Poller();
            Thread pollerThread = new Thread(poller, getName() + "-Poller");
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();

            startAcceptorThread();}
    }

咱们来看一下 Poller 的实现和 Acctptor 的实现

Poller 的实现

public class Poller implements Runnable {

        private Selector selector;
        // PollerEvent          private NioSocketWrapper socketWrapper;   private int interestOps;
        // 向多路复用器注册 socket 和须要解决的 socket 工夫
        private final SynchronizedQueue<PollerEvent> events =
                new SynchronizedQueue<>();
            .........

        public Poller() throws IOException {this.selector = Selector.open();
        }
@Override
        public void run() {// Loop until destroy() is called
            while (true) {

                boolean hasEvents = false;

                try {if (!close) {hasEvents = events();
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            // If we are here, means we have other stuff to do
                            // Do a non blocking select
                            keyCount = selector.selectNow();} else {
                        // 默认阻塞 1 秒钟,监听须要交给线程池的解决工作
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    ...
                } catch (Throwable x) {ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }

                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper != null) {
                        // 配角,工作解决都在外面
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }
.............
        }

Acctptor 的实现

public class Acceptor<U> implements Runnable {

 @Override
    public void run() {
        int errorDelay = 0;
        long pauseStart = 0;

        try {
            // Loop until we receive a shutdown command
            while (!stopCalled) {
            ............

                if (stopCalled) {break;}
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    endpoint.countUpOrAwaitConnection();

                    // Endpoint might have been paused while waiting for latch
                    // If that is the case, don't accept new connections
                    if (endpoint.isPaused()) {continue;}

                    U socket = null;
                    try {
                        // 期待新连贯
                        socket = endpoint.serverSocketAccept();} catch (Exception ioe) {
                        // We didn't get a socket
                        endpoint.countDownConnection();
                        if (endpoint.isRunning()) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {break;}
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (!stopCalled && !endpoint.isPaused()) {
                        // 由 endpoint 来解决新连贯、把新连贯存在 map 里、向 poller 注册 PollerEndpoint
                        // 当前读写事件就由 Poller 交给线程池来治理
                        if (!endpoint.setSocketOptions(socket)) {endpoint.closeSocket(socket);
                        }
                    } else {endpoint.destroySocket(socket);
                    }
                }
                ........
        } finally {stopLatch.countDown();
        }
        state = AcceptorState.ENDED;
    }

}

当新连贯接入时,会把新连贯注册到至底层为 Selector 的多路复用器上,Tomcat 的 Connector 领有了承受新连贯和解决 socket 事件的能力

我的项目实战

仍旧应用 mytomcat.war,外面有一个 servlet FirstServlet, 追随 tomcat 一起启动,容器启动,我的项目的部署暂且不探讨
接下来咱们看一下一个简略的 Servlet 如何在 tomcat 中来流转

tomcat 启动时,Acceptor 将阻塞到 socket = endpoint.serverSocketAccept(); 这一行
在浏览器输出 http://localhost:8080/mytomcat/servlet1,Acceptor 解除阻塞并且获取到客户端 socket

来看一下 endpoint.setSocketOptions(socket)做了啥


@Override
    protected boolean setSocketOptions(SocketChannel socket) {
       .........
            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            channel.reset(socket, newWrapper);
            connections.put(socket, newWrapper);
            socketWrapper = newWrapper;

            // Set socket properties
            // Disable blocking, polling will be used
            socket.configureBlocking(false);
          socketProperties.setProperties(socket.socket());
        // 注册新连贯
            poller.register(socketWrapper);
            return true;
        ...

再看一下 Poller 的代码 监听了新连贯的 OP_READ 事件

public void register(final NioSocketWrapper socketWrapper) {socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
            addEvent(pollerEvent);
        }

这样,http://localhost:8080/mytomcat/servlet1 的申请就会交给 Poller 解决,具体代码见 Poller 的 run 函数


public void run() {// Loop until destroy() is called
            while (true) {

                boolean hasEvents = false;

                try {if (!close) {hasEvents = events();
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            // If we are here, means we have other stuff to do
                            // Do a non blocking select
                            keyCount = selector.selectNow();} else {
                        // 默认阻塞 1 秒钟,监听须要交给线程池的解决工作
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    ...
                } catch (Throwable x) {ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }

                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper != null) {
                        // 配角,工作解决都在外面
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }
.............
        }

processKey 的时候向线程池提交一个 SocketProcessor 工作

多线程工作的执行
通过一系列变换,connecotr 对应的 Handler 为 Http11Nio Protocol,
将协定局部的解决交由 Http11Processor

CoyoteAdapter

tomcat 并非间接把申请封装为 HttpServletRequest 对象和 HttpServletResponse 对象,自身 connector 反对多种协定,不只应用 http 协定。所以 tomcat 存在比拟底层的 org.apache.coyote.Request,也存在继承 HttpServletRequest 的 org.apache.catalina.connector.Request,CoyoteAdapter 负责做二者的转化并且把转化后的 HttpServletRequest 对象和 HttpServletResponse 对象交给 tomcat 容器的 Pipeline。
来看一下 CoyoteAdapter 的 service 办法

 public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception {Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);

        if (request == null) {
            // Create objects
            request = connector.createRequest();
            request.setCoyoteRequest(req);
            response = connector.createResponse();
            response.setCoyoteResponse(res);

            // 做两种类型 request 的转换
            request.setResponse(response);
            response.setRequest(request);
        }


        try {
            // Parse and set Catalina and configuration specific
            // 上面有解说这个办法
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                // check valves if we support async
                request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
               // 把 HttpServletRequest 和 HttpServletRequest 对象交给 tomcat 的职责链 Pipeline。connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
            }
            ........
    }

咱们来看一下 postParseRequest(req, resp)办法

protected boolean postParseRequest(org.apache.coyote.Request req, Request request, org.apache.coyote.Response res,
            Response response) throws IOException, ServletException {

           // 解析出是否要 Https
        if (req.scheme().isNull()) {
            // Use connector scheme and secure configuration, (defaults to
            // "http" and false respectively)
            req.scheme().setString(connector.getScheme());
            request.setSecure(connector.getSecure());
        } else {
            // Use processor specified scheme to determine secure state
            request.setSecure(req.scheme().equals("https"));
        }
// 代理相干

String proxyName = connector.getProxyName();
        int proxyPort = connector.getProxyPort();
        if (proxyPort != 0) {req.setServerPort(proxyPort);
        } else if (req.getServerPort() == -1) {
            // Not explicitly set. Use default ports based on the scheme
            if (req.scheme().equals("https")) {req.setServerPort(443);
            } else {req.setServerPort(80);
            }
        }
        if (proxyName != null) {req.serverName().setString(proxyName);
        }
// 预检办法申请
// Check for ping OPTIONS * request
        if (undecodedURI.equals("*")) {if (req.method().equalsIgnoreCase("OPTIONS")) {StringBuilder allow = new StringBuilder();
                allow.append("GET, HEAD, POST, PUT, DELETE, OPTIONS");
                // Trace if allowed
                if (connector.getAllowTrace()) {allow.append(", TRACE");
                }
                res.setHeader("Allow", allow.toString());
                // Access log entry as processing won't reach AccessLogValve
                connector.getService().getContainer().logAccess(request, response, 0, true);
                return false;
            } else {response.sendError(400, sm.getString("coyoteAdapter.invalidURI"));
            }
        }
        ........
 while (mapRequired) {
            // This will map the the latest version by default
          // tomcat 能够部署多个利用,找到具体在某个 Context、Host、填充到 request 对象中。connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData());

     }
.......

摸索一下 tomcat 的职责链

正式进入 Tomcat 的职责链中,上面咱们来看一下 pipeline 中有什么
因为已有环境,本文间接给出默认的链装构造
StandardEngineValve—-> AccessLogValve—-> ErrorReportValve—- > StandardHostValve —-> StandardContextValve —–> StandardWrapperValve—> ApplicationFilterChain — 后续执行指定的 Servlet

排除容器自身的节点 StandardEngineValve,StandardHostValve,StandardContextValve 三个节点。AccessLogValve,ErrorReportValve,StandardWrapperValve 值得咱们关注
AccessLogValve: 应用来记录拜访申请的根本信息,记录到 access.log 中等
ErrorReportValve: 来解决前面的谬误,跳到失败页面等
StandardWrapperValve: 如下具体讲解

@Override
    public void invoke(Request request, Response response) throws IOException, ServletException {
        ...........
        // 获取对应 servlet 对应的 Wrapper,servlet 可能还未加载
        StandardWrapper wrapper = (StandardWrapper) getContainer();
        Servlet servlet = null;
        Context context = (Context) wrapper.getParent();

        ........
        try {if (!unavailable) {
            // 获取真正的 servlet,如果未加载初始化,在加载初始化后返回
                servlet = wrapper.allocate();}
        } catch (UnavailableException e) {container.getLogger().error(sm.getString("standardWrapper.allocateException", wrapper.getName()), e);
            checkWrapperAvailable(response, wrapper);
        } 
    
        .................

// ApplicationFilterChain 自身是 servlet 标准中的过滤器,持有 servelt 对象,应用职责链调用实现后,再调用 servlet,至此,tomcat 的申请流程到此,后续交给下层应用程序来解决申请
        ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

        .........

         filterChain.doFilter(request.getRequest(), response.getResponse());
 

至此,tomcat 讲申请解决完后递交给下层应用程序来解决,并返回后果,顺次写回
本文到此结束,tomcat 的源码还有很多,如容器之间的关系,多类的加载安全性,动静部署等,有趣味的敌人能够持续钻研

退出移动版