Connector的初始化
catalina解析server.xml是通过degister来实现的,degister解析到<Connector标签后做的事件如下代码所见
ConnectorCreateRule@Override public void begin(String namespace, String name, Attributes attributes) throws Exception { Service svc = (Service)digester.peek(); Executor ex = null; if ( attributes.getValue("executor")!=null ) { ex = svc.getExecutor(attributes.getValue("executor")); } Connector con = new Connector(attributes.getValue("protocol")); if (ex != null) { setExecutor(con, ex); } String sslImplementationName = attributes.getValue("sslImplementationName"); if (sslImplementationName != null) { setSSLImplementationName(con, sslImplementationName); } digester.push(con); }
connector依据标签属性,拿到对应的protocol协定,拿到配在service标签外部的线程池,protocol的名称转化成Connector中ProtocolHandler类型的成员变量, 后续将以Http11NioProtocol来做解说
public Connector(String protocol) { setProtocol(protocol); // Instantiate protocol handler ProtocolHandler p = null; try { Class<?> clazz = Class.forName(protocolHandlerClassName); // 反射调用ProtocolHandler的构造方法的时候会做后续的初始化 p = (ProtocolHandler) clazz.getConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"), e); } finally { this.protocolHandler = p; }
ProtocolHandler的结构
ProtocolHandler有其形象办法,Http11NioProtocol构造方法中的tcp实现由NioEndpoint来做,因而Connector结构起来的时候,对应的ProtocolHanlder、endpoint的关联关系曾经关联好
public AbstractProtocol(AbstractEndpoint<S, ?> endpoint) { this.endpoint = endpoint; ConnectionHandler<S> cHandler = new ConnectionHandler<>(this); setHandler(cHandler); getEndpoint().setHandler(cHandler); setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); }
结构后的init办法
在解析server.xml的根本层次结构,成员变量填充残缺后,须要调用生命周期的init办法
org/apache/catalina/startup/Catalina.load getServer().init(); -----> connector.init(); --------------> protocolHandler.init();----------> endpoint.init(); ------------->
看一下NioEndpoint的init办法
protected void initServerSocket() throws Exception { if (getUseInheritedChannel()) { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } else {// 绑定服务端的传输层端口 serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); } serverSock.configureBlocking(true); //mimic APR behavior }
建设Nio的Acceptor线程和Selector事件线程
org.apache.tomcat.util.net.NioEndpoint#startInternalpublic void startInternal() throws Exception { if (!running) { running = true; paused = false; if (socketProperties.getProcessorCache() != 0) { processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); } if (socketProperties.getEventCache() != 0) { eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); } if (socketProperties.getBufferPool() != 0) { nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); } // Create worker collection if (getExecutor() == null) { // 创立Tomcat默认线程池,未手动配置 线程数10 - 200 createExecutor(); } // 这个是tomcat的连接数限制器 initializeConnectionLatch(); // Start poller thread poller = new Poller(); Thread pollerThread = new Thread(poller, getName() + "-Poller"); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); startAcceptorThread(); } }
咱们来看一下Poller的实现和Acctptor的实现
Poller的实现
public class Poller implements Runnable { private Selector selector; // PollerEvent private NioSocketWrapper socketWrapper; private int interestOps; // 向多路复用器注册socket和须要解决的socket工夫 private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>(); ......... public Poller() throws IOException { this.selector = Selector.open(); }@Override public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { // If we are here, means we have other stuff to do // Do a non blocking select keyCount = selector.selectNow(); } else { // 默认阻塞1秒钟,监听须要交给线程池的解决工作 keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } ... } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); iterator.remove(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper != null) { // 配角, 工作解决都在外面 processKey(sk, socketWrapper); } } // Process timeouts timeout(keyCount,hasEvents); }............. }
Acctptor的实现
public class Acceptor<U> implements Runnable { @Override public void run() { int errorDelay = 0; long pauseStart = 0; try { // Loop until we receive a shutdown command while (!stopCalled) { ............ if (stopCalled) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections if (endpoint.isPaused()) { continue; } U socket = null; try { // 期待新连贯 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn't get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (!stopCalled && !endpoint.isPaused()) { // 由endpoint来解决新连贯、把新连贯存在map里、向poller注册PollerEndpoint // 当前读写事件就由Poller交给线程池来治理 if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } ........ } finally { stopLatch.countDown(); } state = AcceptorState.ENDED; }}
当新连贯接入时,会把新连贯注册到至底层为Selector的多路复用器上,Tomcat的Connector领有了承受新连贯和解决socket事件的能力
我的项目实战
仍旧应用mytomcat.war, 外面有一个servlet FirstServlet,追随tomcat一起启动,容器启动,我的项目的部署暂且不探讨
接下来咱们看一下一个简略的Servlet如何在tomcat中来流转
tomcat启动时, Acceptor将阻塞到 socket = endpoint.serverSocketAccept();这一行
在浏览器输出http://localhost:8080/mytomcat/servlet1, Acceptor解除阻塞并且获取到客户端socket
来看一下endpoint.setSocketOptions(socket)做了啥
@Override protected boolean setSocketOptions(SocketChannel socket) { ......... NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this); channel.reset(socket, newWrapper); connections.put(socket, newWrapper); socketWrapper = newWrapper; // Set socket properties // Disable blocking, polling will be used socket.configureBlocking(false); socketProperties.setProperties(socket.socket()); // 注册新连贯 poller.register(socketWrapper); return true; ...
再看一下Poller的代码 监听了新连贯的OP_READ事件
public void register(final NioSocketWrapper socketWrapper) { socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER); addEvent(pollerEvent); }
这样,http://localhost:8080/mytomcat/servlet1的申请就会交给Poller解决,具体代码见Poller的run函数
public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { // If we are here, means we have other stuff to do // Do a non blocking select keyCount = selector.selectNow(); } else { // 默认阻塞1秒钟,监听须要交给线程池的解决工作 keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } ... } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); iterator.remove(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper != null) { // 配角, 工作解决都在外面 processKey(sk, socketWrapper); } } // Process timeouts timeout(keyCount,hasEvents); }............. }
processKey的时候向线程池提交一个SocketProcessor工作
多线程工作的执行
通过一系列变换,connecotr对应的Handler为Http11Nio Protocol,
将协定局部的解决交由Http11Processor
CoyoteAdapter
tomcat并非间接把申请封装为HttpServletRequest对象和HttpServletResponse对象,自身connector反对多种协定,不只应用http协定。所以tomcat存在比拟底层的org.apache.coyote.Request, 也存在继承HttpServletRequest的org.apache.catalina.connector.Request,CoyoteAdapter负责做二者的转化并且把转化后的HttpServletRequest对象和HttpServletResponse对象交给tomcat容器的Pipeline。
来看一下CoyoteAdapter的service办法
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); if (request == null) { // Create objects request = connector.createRequest(); request.setCoyoteRequest(req); response = connector.createResponse(); response.setCoyoteResponse(res); // 做两种类型request的转换 request.setResponse(response); response.setRequest(request); } try { // Parse and set Catalina and configuration specific // 上面有解说这个办法 postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { // check valves if we support async request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container //把HttpServletRequest 和HttpServletRequest对象交给tomcat的职责链Pipeline。connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); } ........ }
咱们来看一下postParseRequest(req, resp)办法
protected boolean postParseRequest(org.apache.coyote.Request req, Request request, org.apache.coyote.Response res, Response response) throws IOException, ServletException { // 解析出是否要Https if (req.scheme().isNull()) { // Use connector scheme and secure configuration, (defaults to // "http" and false respectively) req.scheme().setString(connector.getScheme()); request.setSecure(connector.getSecure()); } else { // Use processor specified scheme to determine secure state request.setSecure(req.scheme().equals("https")); }// 代理相干String proxyName = connector.getProxyName(); int proxyPort = connector.getProxyPort(); if (proxyPort != 0) { req.setServerPort(proxyPort); } else if (req.getServerPort() == -1) { // Not explicitly set. Use default ports based on the scheme if (req.scheme().equals("https")) { req.setServerPort(443); } else { req.setServerPort(80); } } if (proxyName != null) { req.serverName().setString(proxyName); }// 预检办法申请// Check for ping OPTIONS * request if (undecodedURI.equals("*")) { if (req.method().equalsIgnoreCase("OPTIONS")) { StringBuilder allow = new StringBuilder(); allow.append("GET, HEAD, POST, PUT, DELETE, OPTIONS"); // Trace if allowed if (connector.getAllowTrace()) { allow.append(", TRACE"); } res.setHeader("Allow", allow.toString()); // Access log entry as processing won't reach AccessLogValve connector.getService().getContainer().logAccess(request, response, 0, true); return false; } else { response.sendError(400, sm.getString("coyoteAdapter.invalidURI")); } } ........ while (mapRequired) { // This will map the the latest version by default // tomcat能够部署多个利用,找到具体在某个Context、Host、填充到request对象中。 connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData()); }.......
摸索一下tomcat的职责链
正式进入Tomcat的职责链中,上面咱们来看一下pipeline中有什么
因为已有环境,本文间接给出默认的链装构造
StandardEngineValve----> AccessLogValve----> ErrorReportValve---- > StandardHostValve ----> StandardContextValve -----> StandardWrapperValve---> ApplicationFilterChain ---后续执行指定的Servlet
排除容器自身的节点StandardEngineValve, StandardHostValve,StandardContextValve三个节点。 AccessLogValve,ErrorReportValve, StandardWrapperValve值得咱们关注
AccessLogValve:应用来记录拜访申请的根本信息,记录到access.log中等
ErrorReportValve:来解决前面的谬误,跳到失败页面等
StandardWrapperValve: 如下具体讲解
@Override public void invoke(Request request, Response response) throws IOException, ServletException { ........... // 获取对应servlet对应的Wrapper, servlet可能还未加载 StandardWrapper wrapper = (StandardWrapper) getContainer(); Servlet servlet = null; Context context = (Context) wrapper.getParent(); ........ try { if (!unavailable) { // 获取真正的servlet, 如果未加载初始化,在加载初始化后返回 servlet = wrapper.allocate(); } } catch (UnavailableException e) { container.getLogger().error(sm.getString("standardWrapper.allocateException", wrapper.getName()), e); checkWrapperAvailable(response, wrapper); } .................// ApplicationFilterChain自身是servlet标准中的过滤器, 持有servelt对象,应用职责链调用实现后,再调用servlet, 至此,tomcat的申请流程到此,后续交给下层应用程序来解决申请 ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet); ......... filterChain.doFilter(request.getRequest(), response.getResponse());
至此,tomcat讲申请解决完后递交给下层应用程序来解决,并返回后果,顺次写回
本文到此结束,tomcat的源码还有很多,如容器之间的关系,多类的加载安全性,动静部署等,有趣味的敌人能够持续钻研