Producer线程模型剖析
线程传递过程:网络eventloop线程接管申请->传给group线程解决->group解决完后->返还给网络线程去响应
eventloop线程找业务线程:通过查找operationMeta获取,operationMeta是存储解决业务逻辑元数据信息,代码AbstractRestInvocation中体现这一过程
try { operationMeta.getExecutor().execute(() -> { synchronized (this.requestEx) { try { if (isInQueueTimeout()) { throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request."); } if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) { // already timeout // in this time, request maybe recycled and reused by web container, do not use requestEx LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.", operationMeta.getHttpMethod(), operationMeta.getMicroserviceQualifiedName()); return; } runOnExecutor(); } catch (InvocationException e) { LOGGER.error("Invocation failed, cause={}", e.getMessage()); sendFailResponse(e); } catch (Throwable e) { LOGGER.error("Processing rest server request error", e); sendFailResponse(e); } } }); } catch (Throwable e) { LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName()); sendFailResponse(e); }
业务线程找eventloop线程:eventloop网络线程在放到responseEx上下文中,这个在VertxRestDispatcher能够看到responseEx预置了网络线程
HttpServletResponseEx responseEx = new VertxServerResponseToHttpServletResponse(context.response());//vertx context 设置 public VertxServerResponseToHttpServletResponse(HttpServerResponse serverResponse) { this.context = Vertx.currentContext(); this.serverResponse = serverResponse; Objects.requireNonNull(context, "must run in vertx context."); }
线程切换过程具体
1.接管申请的vertx eventloop线程
2.从vertx eventloop线程转入业务线程解决
3.进入ProducerOperationHandler的handler,依然是业务线程group
4.进入ProducerOperationHandler的syncInvoke的,最终进入sendResponseQuietly,线程依然是group
// ProducerOperationHandler的asyncResp public void syncInvoke(Invocation invocation, SwaggerProducerOperation producerOperation, AsyncResponse asyncResp) {.... asyncResp.handle(response); }//上述的asyncResp正是AbstractRestInvocation传进去的sendResponseQuietly protected void doInvoke() throws Throwable { invocation.onStartHandlersRequest(); invocation.next(resp -> sendResponseQuietly(resp)); }
5.响应 group线程,通过驱动responseEx去响应,把响应交由eventloop线程去解决,eventloop线程在responseEx的context,即context.runOnContext(V -> internalFlushBuffer());
// responseEx protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e) {... responseEx.flushBuffer();...}// 进入VertxServerResponseToHttpServletResponse的flushBuffer public void flushBuffer() { if (context == Vertx.currentContext()) { internalFlushBuffer(); return; } context.runOnContext(V -> internalFlushBuffer()); }//通过serverResponse.end(bodyBuffer)发送响应,vertx框架做的事件 public void internalFlushBuffer() {... serverResponse.end(bodyBuffer); }