乐趣区

关于java:xxljob惊艳的设计怎能叫人不爱

通信底层介绍

xxl-job 应用 netty http 的形式进行通信,尽管也反对 Mina,jetty,netty tcp 等形式,然而代码外面固定写死的是 netty http。

通信整体流程

我以调度器告诉执行器执行工作为例,绘制的流动图:

流动图

惊艳的设计

看完了整个解决流程代码,设计上能够说步人后尘,将 netty,多线程的常识使用得行云流水。

我当初就将这些设计上出彩的点总结如下:

| 应用动静代理模式,暗藏通信细节

xxl-job 定义了两个接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封装了向心跳,暂停,触发执行等操作,AdminBiz 封装了回调,注册,勾销注册操作,接口的实现类中,并没有通信相干的解决。

XxlRpcReferenceBean 类的 getObject() 办法会生成一个代理类,这个代理类会进行近程通信。

| 全异步解决

执行器收到音讯进行反序列化,并没有同步执行工作代码,而是将工作信息存储在 LinkedBlockingQueue 中,异步线程从这个队列中获取工作信息,而后执行。

而工作的处理结果,也不是说解决完之后,同步返回的,也是放到回调线程的阻塞队列中,异步的将处理结果返回回去。

这样解决的益处就是缩小了 netty 工作线程的解决工夫,晋升了吞吐量。

| 对异步解决的包装

对异步解决进行了包装,代码看起来是同步调用的。

咱们看下调度器,XxlJobTrigger 类触发工作执行的代码:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        // 这外面做了很多异步解决,最终同步失去处理结果
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

ExecutorBiz.run 办法咱们说过了,是走的动静代理,和执行器进行通信,执行器执行后果也是异步解决完,才返回的,而这里看到的 run 办法是同步期待处理结果返回。

咱们看下 xxl-job 是如何同步获取处理结果的:调度器向执行器收回音讯后,该线程阻塞。等到执行器处理完毕后,将处理结果返回,唤醒被阻塞的线程,调用处拿到返回值。

动静代理代码如下:

// 代理类中的触发调用
if (CallType.SYNC == callType) {
   // future-response set
   XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
   try {
      // do invoke
      client.asyncSend(finalAddress, xxlRpcRequest);

      // future get
      XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
      if (xxlRpcResponse.getErrorMsg() != null) {throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
      }
      return xxlRpcResponse.getResult();} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

      throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
   } finally{
      // future-response remove
      futureResponse.removeInvokerFuture();}
} 

XxlRpcFutureResponse 类中实现了线程的期待,和线程唤醒的解决:

// 返回后果,唤醒线程
public void setResponse(XxlRpcResponse response) {
   this.response = response;
   synchronized (lock) {
      done = true;
      lock.notifyAll();}
}

@Override
    public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (!done) {synchronized (lock) {
                try {if (timeout < 0) {
            // 线程阻塞
                        lock.wait();} else {long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
                        lock.wait(timeoutMillis);
                    }
                } catch (InterruptedException e) {throw e;}
            }
        }

        if (!done) {throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
        }
        return response;
    }

有的同学可能会问了,调度器接管到返回后果,怎么确定唤醒哪个线程呢?

每一次近程调用,都会生成 uuid 的申请 id,这个 id 是在整个调用过程中始终传递的,就像一把钥匙,在你回家的的时候,拿着它就带开门。

这里拿着申请 id 这把钥匙,就能找到对应的 XxlRpcFutureResponse,而后调用 setResponse 办法,设置返回值,唤醒线程。

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){

    // 通过 requestId 找到 XxlRpcFutureResponse,final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
    if (futureResponse == null) {return;}
    if (futureResponse.getInvokeCallback()!=null) {

        // callback type
        try {executeResponseCallback(new Runnable() {
                @Override
                public void run() {if (xxlRpcResponse.getErrorMsg() != null) {futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                    } else {futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                    }
                }
            });
        }catch (Exception e) {logger.error(e.getMessage(), e);
        }
    } else {
        // 外面调用 lock 的 notify 办法
        futureResponse.setResponse(xxlRpcResponse);
    }

    // do remove
    futureResponsePool.remove(requestId);

}
退出移动版