Connector的初始化

catalina解析server.xml是通过degister来实现的,degister解析到<Connector标签后做的事件如下代码所见

ConnectorCreateRule@Override    public void begin(String namespace, String name, Attributes attributes)            throws Exception {        Service svc = (Service)digester.peek();        Executor ex = null;        if ( attributes.getValue("executor")!=null ) {            ex = svc.getExecutor(attributes.getValue("executor"));        }        Connector con = new Connector(attributes.getValue("protocol"));        if (ex != null) {            setExecutor(con, ex);        }        String sslImplementationName = attributes.getValue("sslImplementationName");        if (sslImplementationName != null) {            setSSLImplementationName(con, sslImplementationName);        }        digester.push(con);    }

connector依据标签属性,拿到对应的protocol协定,拿到配在service标签外部的线程池,protocol的名称转化成Connector中ProtocolHandler类型的成员变量, 后续将以Http11NioProtocol来做解说

public Connector(String protocol) {        setProtocol(protocol);        // Instantiate protocol handler        ProtocolHandler p = null;        try {            Class<?> clazz = Class.forName(protocolHandlerClassName);            // 反射调用ProtocolHandler的构造方法的时候会做后续的初始化            p = (ProtocolHandler) clazz.getConstructor().newInstance();        } catch (Exception e) {            log.error(sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"), e);        } finally {            this.protocolHandler = p;        }

ProtocolHandler的结构

ProtocolHandler有其形象办法,Http11NioProtocol构造方法中的tcp实现由NioEndpoint来做,因而Connector结构起来的时候,对应的ProtocolHanlder、endpoint的关联关系曾经关联好

public AbstractProtocol(AbstractEndpoint<S, ?> endpoint) {        this.endpoint = endpoint;        ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);        setHandler(cHandler);        getEndpoint().setHandler(cHandler);        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);    }

结构后的init办法

在解析server.xml的根本层次结构,成员变量填充残缺后,须要调用生命周期的init办法

org/apache/catalina/startup/Catalina.load       getServer().init(); -----> connector.init();    -------------->  protocolHandler.init();----------> endpoint.init(); ------------->

看一下NioEndpoint的init办法

 protected void initServerSocket() throws Exception {        if (getUseInheritedChannel()) {            // Retrieve the channel provided by the OS            Channel ic = System.inheritedChannel();            if (ic instanceof ServerSocketChannel) {                serverSock = (ServerSocketChannel) ic;            }            if (serverSock == null) {                throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));            }        } else {//    绑定服务端的传输层端口            serverSock = ServerSocketChannel.open();            socketProperties.setProperties(serverSock.socket());            InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());            serverSock.socket().bind(addr,getAcceptCount());        }        serverSock.configureBlocking(true); //mimic APR behavior    }

建设Nio的Acceptor线程和Selector事件线程

org.apache.tomcat.util.net.NioEndpoint#startInternalpublic void startInternal() throws Exception {        if (!running) {            running = true;            paused = false;            if (socketProperties.getProcessorCache() != 0) {                processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,                        socketProperties.getProcessorCache());            }            if (socketProperties.getEventCache() != 0) {                eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,                        socketProperties.getEventCache());            }            if (socketProperties.getBufferPool() != 0) {                nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,                        socketProperties.getBufferPool());            }            // Create worker collection            if (getExecutor() == null) {            //   创立Tomcat默认线程池,未手动配置  线程数10 - 200                createExecutor();            }            // 这个是tomcat的连接数限制器            initializeConnectionLatch();            // Start poller thread            poller = new Poller();            Thread pollerThread = new Thread(poller, getName() + "-Poller");            pollerThread.setPriority(threadPriority);            pollerThread.setDaemon(true);            pollerThread.start();            startAcceptorThread();        }    }

咱们来看一下Poller的实现和Acctptor的实现

Poller的实现

public class Poller implements Runnable {        private Selector selector;        // PollerEvent          private NioSocketWrapper socketWrapper;   private int interestOps;        // 向多路复用器注册socket和须要解决的socket工夫        private final SynchronizedQueue<PollerEvent> events =                new SynchronizedQueue<>();            .........        public Poller() throws IOException {            this.selector = Selector.open();        }@Override        public void run() {            // Loop until destroy() is called            while (true) {                boolean hasEvents = false;                try {                    if (!close) {                        hasEvents = events();                        if (wakeupCounter.getAndSet(-1) > 0) {                            // If we are here, means we have other stuff to do                            // Do a non blocking select                            keyCount = selector.selectNow();                        } else {                        // 默认阻塞1秒钟,监听须要交给线程池的解决工作                            keyCount = selector.select(selectorTimeout);                        }                        wakeupCounter.set(0);                    }                    ...                } catch (Throwable x) {                    ExceptionUtils.handleThrowable(x);                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);                    continue;                }                Iterator<SelectionKey> iterator =                    keyCount > 0 ? selector.selectedKeys().iterator() : null;                // Walk through the collection of ready keys and dispatch                // any active event.                while (iterator != null && iterator.hasNext()) {                    SelectionKey sk = iterator.next();                    iterator.remove();                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();                    // Attachment may be null if another thread has called                    // cancelledKey()                    if (socketWrapper != null) {                        // 配角, 工作解决都在外面                        processKey(sk, socketWrapper);                    }                }                // Process timeouts                timeout(keyCount,hasEvents);            }.............        }

Acctptor的实现

public class Acceptor<U> implements Runnable { @Override    public void run() {        int errorDelay = 0;        long pauseStart = 0;        try {            // Loop until we receive a shutdown command            while (!stopCalled) {            ............                if (stopCalled) {                    break;                }                state = AcceptorState.RUNNING;                try {                    //if we have reached max connections, wait                    endpoint.countUpOrAwaitConnection();                    // Endpoint might have been paused while waiting for latch                    // If that is the case, don't accept new connections                    if (endpoint.isPaused()) {                        continue;                    }                    U socket = null;                    try {                        // 期待新连贯                        socket = endpoint.serverSocketAccept();                    } catch (Exception ioe) {                        // We didn't get a socket                        endpoint.countDownConnection();                        if (endpoint.isRunning()) {                            // Introduce delay if necessary                            errorDelay = handleExceptionWithDelay(errorDelay);                            // re-throw                            throw ioe;                        } else {                            break;                        }                    }                    // Successful accept, reset the error delay                    errorDelay = 0;                    // Configure the socket                    if (!stopCalled && !endpoint.isPaused()) {                        // 由endpoint来解决新连贯、把新连贯存在map里、向poller注册PollerEndpoint                        // 当前读写事件就由Poller交给线程池来治理                        if (!endpoint.setSocketOptions(socket)) {                            endpoint.closeSocket(socket);                        }                    } else {                        endpoint.destroySocket(socket);                    }                }                ........        } finally {            stopLatch.countDown();        }        state = AcceptorState.ENDED;    }}

当新连贯接入时,会把新连贯注册到至底层为Selector的多路复用器上,Tomcat的Connector领有了承受新连贯和解决socket事件的能力

我的项目实战

仍旧应用mytomcat.war, 外面有一个servlet FirstServlet,追随tomcat一起启动,容器启动,我的项目的部署暂且不探讨
接下来咱们看一下一个简略的Servlet如何在tomcat中来流转

tomcat启动时, Acceptor将阻塞到 socket = endpoint.serverSocketAccept();这一行
在浏览器输出http://localhost:8080/mytomcat/servlet1, Acceptor解除阻塞并且获取到客户端socket

来看一下endpoint.setSocketOptions(socket)做了啥

@Override    protected boolean setSocketOptions(SocketChannel socket) {       .........            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);            channel.reset(socket, newWrapper);            connections.put(socket, newWrapper);            socketWrapper = newWrapper;            // Set socket properties            // Disable blocking, polling will be used            socket.configureBlocking(false);          socketProperties.setProperties(socket.socket());        // 注册新连贯            poller.register(socketWrapper);            return true;        ...

再看一下Poller的代码 监听了新连贯的OP_READ事件

public void register(final NioSocketWrapper socketWrapper) {            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.            PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);            addEvent(pollerEvent);        }

这样,http://localhost:8080/mytomcat/servlet1的申请就会交给Poller解决,具体代码见Poller的run函数

public void run() {            // Loop until destroy() is called            while (true) {                boolean hasEvents = false;                try {                    if (!close) {                        hasEvents = events();                        if (wakeupCounter.getAndSet(-1) > 0) {                            // If we are here, means we have other stuff to do                            // Do a non blocking select                            keyCount = selector.selectNow();                        } else {                        // 默认阻塞1秒钟,监听须要交给线程池的解决工作                            keyCount = selector.select(selectorTimeout);                        }                        wakeupCounter.set(0);                    }                    ...                } catch (Throwable x) {                    ExceptionUtils.handleThrowable(x);                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);                    continue;                }                Iterator<SelectionKey> iterator =                    keyCount > 0 ? selector.selectedKeys().iterator() : null;                // Walk through the collection of ready keys and dispatch                // any active event.                while (iterator != null && iterator.hasNext()) {                    SelectionKey sk = iterator.next();                    iterator.remove();                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();                    // Attachment may be null if another thread has called                    // cancelledKey()                    if (socketWrapper != null) {                        // 配角, 工作解决都在外面                        processKey(sk, socketWrapper);                    }                }                // Process timeouts                timeout(keyCount,hasEvents);            }.............        }

processKey的时候向线程池提交一个SocketProcessor工作

多线程工作的执行
通过一系列变换,connecotr对应的Handler为Http11Nio Protocol,
将协定局部的解决交由Http11Processor

CoyoteAdapter

tomcat并非间接把申请封装为HttpServletRequest对象和HttpServletResponse对象,自身connector反对多种协定,不只应用http协定。所以tomcat存在比拟底层的org.apache.coyote.Request, 也存在继承HttpServletRequest的org.apache.catalina.connector.Request,CoyoteAdapter负责做二者的转化并且把转化后的HttpServletRequest对象和HttpServletResponse对象交给tomcat容器的Pipeline。
来看一下CoyoteAdapter的service办法

 public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception {        Request request = (Request) req.getNote(ADAPTER_NOTES);        Response response = (Response) res.getNote(ADAPTER_NOTES);        if (request == null) {            // Create objects            request = connector.createRequest();            request.setCoyoteRequest(req);            response = connector.createResponse();            response.setCoyoteResponse(res);            // 做两种类型request的转换            request.setResponse(response);            response.setRequest(request);        }        try {            // Parse and set Catalina and configuration specific            // 上面有解说这个办法            postParseSuccess = postParseRequest(req, request, res, response);            if (postParseSuccess) {                // check valves if we support async                request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());                // Calling the container               //把HttpServletRequest 和HttpServletRequest对象交给tomcat的职责链Pipeline。connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);            }            ........    }

咱们来看一下postParseRequest(req, resp)办法

protected boolean postParseRequest(org.apache.coyote.Request req, Request request, org.apache.coyote.Response res,            Response response) throws IOException, ServletException {           // 解析出是否要Https        if (req.scheme().isNull()) {            // Use connector scheme and secure configuration, (defaults to            // "http" and false respectively)            req.scheme().setString(connector.getScheme());            request.setSecure(connector.getSecure());        } else {            // Use processor specified scheme to determine secure state            request.setSecure(req.scheme().equals("https"));        }// 代理相干String proxyName = connector.getProxyName();        int proxyPort = connector.getProxyPort();        if (proxyPort != 0) {            req.setServerPort(proxyPort);        } else if (req.getServerPort() == -1) {            // Not explicitly set. Use default ports based on the scheme            if (req.scheme().equals("https")) {                req.setServerPort(443);            } else {                req.setServerPort(80);            }        }        if (proxyName != null) {            req.serverName().setString(proxyName);        }// 预检办法申请// Check for ping OPTIONS * request        if (undecodedURI.equals("*")) {            if (req.method().equalsIgnoreCase("OPTIONS")) {                StringBuilder allow = new StringBuilder();                allow.append("GET, HEAD, POST, PUT, DELETE, OPTIONS");                // Trace if allowed                if (connector.getAllowTrace()) {                    allow.append(", TRACE");                }                res.setHeader("Allow", allow.toString());                // Access log entry as processing won't reach AccessLogValve                connector.getService().getContainer().logAccess(request, response, 0, true);                return false;            } else {                response.sendError(400, sm.getString("coyoteAdapter.invalidURI"));            }        }        ........ while (mapRequired) {            // This will map the the latest version by default          // tomcat能够部署多个利用,找到具体在某个Context、Host、填充到request对象中。 connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData());     }.......

摸索一下tomcat的职责链

正式进入Tomcat的职责链中,上面咱们来看一下pipeline中有什么
因为已有环境,本文间接给出默认的链装构造
StandardEngineValve----> AccessLogValve----> ErrorReportValve---- > StandardHostValve ----> StandardContextValve -----> StandardWrapperValve---> ApplicationFilterChain ---后续执行指定的Servlet

排除容器自身的节点StandardEngineValve, StandardHostValve,StandardContextValve三个节点。 AccessLogValve,ErrorReportValve, StandardWrapperValve值得咱们关注
AccessLogValve:应用来记录拜访申请的根本信息,记录到access.log中等
ErrorReportValve:来解决前面的谬误,跳到失败页面等
StandardWrapperValve: 如下具体讲解

@Override    public void invoke(Request request, Response response) throws IOException, ServletException {        ...........        // 获取对应servlet对应的Wrapper, servlet可能还未加载        StandardWrapper wrapper = (StandardWrapper) getContainer();        Servlet servlet = null;        Context context = (Context) wrapper.getParent();        ........        try {            if (!unavailable) {            // 获取真正的servlet, 如果未加载初始化,在加载初始化后返回                servlet = wrapper.allocate();            }        } catch (UnavailableException e) {            container.getLogger().error(sm.getString("standardWrapper.allocateException", wrapper.getName()), e);            checkWrapperAvailable(response, wrapper);        }             .................// ApplicationFilterChain自身是servlet标准中的过滤器, 持有servelt对象,应用职责链调用实现后,再调用servlet, 至此,tomcat的申请流程到此,后续交给下层应用程序来解决申请        ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);        .........         filterChain.doFilter(request.getRequest(), response.getResponse()); 

至此,tomcat讲申请解决完后递交给下层应用程序来解决,并返回后果,顺次写回
本文到此结束,tomcat的源码还有很多,如容器之间的关系,多类的加载安全性,动静部署等,有趣味的敌人能够持续钻研