共计 2557 个字符,预计需要花费 7 分钟才能阅读完成。
Producer 线程模型剖析
线程传递过程:网络 eventloop 线程接管申请 -> 传给 group 线程解决 ->group 解决完后 -> 返还给网络线程去响应
eventloop 线程找业务线程: 通过查找 operationMeta 获取,operationMeta 是存储解决业务逻辑元数据信息,代码 AbstractRestInvocation 中体现这一过程
try {operationMeta.getExecutor().execute(() -> {synchronized (this.requestEx) {
try {if (isInQueueTimeout()) {throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request.");
}
if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
// already timeout
// in this time, request maybe recycled and reused by web container, do not use requestEx
LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
operationMeta.getHttpMethod(),
operationMeta.getMicroserviceQualifiedName());
return;
}
runOnExecutor();} catch (InvocationException e) {LOGGER.error("Invocation failed, cause={}", e.getMessage());
sendFailResponse(e);
} catch (Throwable e) {LOGGER.error("Processing rest server request error", e);
sendFailResponse(e);
}
}
});
} catch (Throwable e) {LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName());
sendFailResponse(e);
}
业务线程找 eventloop 线程:eventloop 网络线程在放到 responseEx 上下文中,这个在 VertxRestDispatcher 能够看到 responseEx 预置了网络线程
HttpServletResponseEx responseEx = new VertxServerResponseToHttpServletResponse(context.response());
//vertx context 设置
public VertxServerResponseToHttpServletResponse(HttpServerResponse serverResponse) {this.context = Vertx.currentContext();
this.serverResponse = serverResponse;
Objects.requireNonNull(context, "must run in vertx context.");
}
线程切换过程具体
1. 接管申请的 vertx eventloop 线程
2. 从 vertx eventloop 线程转入业务线程解决
3. 进入 ProducerOperationHandler 的 handler,依然是业务线程 group
4. 进入 ProducerOperationHandler 的 syncInvoke 的, 最终进入 sendResponseQuietly, 线程依然是 group
// ProducerOperationHandler 的 asyncResp
public void syncInvoke(Invocation invocation, SwaggerProducerOperation producerOperation, AsyncResponse asyncResp) {
....
asyncResp.handle(response);
}
// 上述的 asyncResp 正是 AbstractRestInvocation 传进去的 sendResponseQuietly
protected void doInvoke() throws Throwable {invocation.onStartHandlersRequest();
invocation.next(resp -> sendResponseQuietly(resp));
}
5. 响应 group 线程,通过驱动 responseEx 去响应, 把响应交由 eventloop 线程去解决,eventloop 线程在 responseEx 的 context,即 context.runOnContext(V -> internalFlushBuffer());
// responseEx
protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e) {
...
responseEx.flushBuffer();
...
}
// 进入 VertxServerResponseToHttpServletResponse 的 flushBuffer
public void flushBuffer() {if (context == Vertx.currentContext()) {internalFlushBuffer();
return;
}
context.runOnContext(V -> internalFlushBuffer());
}
// 通过 serverResponse.end(bodyBuffer) 发送响应,vertx 框架做的事件
public void internalFlushBuffer() {
...
serverResponse.end(bodyBuffer);
}
正文完