关于微服务:Producer处理客户端请求线程模型分析

5次阅读

共计 2557 个字符,预计需要花费 7 分钟才能阅读完成。

Producer 线程模型剖析

线程传递过程:网络 eventloop 线程接管申请 -> 传给 group 线程解决 ->group 解决完后 -> 返还给网络线程去响应

eventloop 线程找业务线程: 通过查找 operationMeta 获取,operationMeta 是存储解决业务逻辑元数据信息,代码 AbstractRestInvocation 中体现这一过程


    try {operationMeta.getExecutor().execute(() -> {synchronized (this.requestEx) {
          try {if (isInQueueTimeout()) {throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request.");
            }
            if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
              // already timeout
              // in this time, request maybe recycled and reused by web container, do not use requestEx
              LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
                  operationMeta.getHttpMethod(),
                  operationMeta.getMicroserviceQualifiedName());
              return;
            }

            runOnExecutor();} catch (InvocationException e) {LOGGER.error("Invocation failed, cause={}", e.getMessage());
            sendFailResponse(e);
          } catch (Throwable e) {LOGGER.error("Processing rest server request error", e);
            sendFailResponse(e);
          }
        }
      });
    } catch (Throwable e) {LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName());
      sendFailResponse(e);
    }

业务线程找 eventloop 线程:eventloop 网络线程在放到 responseEx 上下文中,这个在 VertxRestDispatcher 能够看到 responseEx 预置了网络线程

    HttpServletResponseEx responseEx = new VertxServerResponseToHttpServletResponse(context.response());

//vertx context 设置
  public VertxServerResponseToHttpServletResponse(HttpServerResponse serverResponse) {this.context = Vertx.currentContext();
    this.serverResponse = serverResponse;

    Objects.requireNonNull(context, "must run in vertx context.");
  }

线程切换过程具体

1. 接管申请的 vertx eventloop 线程

2. 从 vertx eventloop 线程转入业务线程解决

3. 进入 ProducerOperationHandler 的 handler,依然是业务线程 group

4. 进入 ProducerOperationHandler 的 syncInvoke 的, 最终进入 sendResponseQuietly, 线程依然是 group

// ProducerOperationHandler 的 asyncResp
  public void syncInvoke(Invocation invocation, SwaggerProducerOperation producerOperation, AsyncResponse asyncResp) {
....
    asyncResp.handle(response);
  }

// 上述的 asyncResp 正是 AbstractRestInvocation 传进去的 sendResponseQuietly

  protected void doInvoke() throws Throwable {invocation.onStartHandlersRequest();
    invocation.next(resp -> sendResponseQuietly(resp));
  }


5. 响应 group 线程,通过驱动 responseEx 去响应, 把响应交由 eventloop 线程去解决,eventloop 线程在 responseEx 的 context,即 context.runOnContext(V -> internalFlushBuffer());

// responseEx
  protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e) {
...
      responseEx.flushBuffer();
...
}

// 进入 VertxServerResponseToHttpServletResponse 的 flushBuffer
  public void flushBuffer() {if (context == Vertx.currentContext()) {internalFlushBuffer();
      return;
    }
    context.runOnContext(V -> internalFlushBuffer());
  }

// 通过 serverResponse.end(bodyBuffer) 发送响应,vertx 框架做的事件

  public void internalFlushBuffer() {
...
    serverResponse.end(bodyBuffer);
  }

正文完
 0