乐趣区

关于java:HadoopRPC在client端的源码解析

@Override

public Object invoke(Object proxy, Method method, Object[] args)
        throws ServiceException {
    long startTime = 0;
    if (LOG.isDebugEnabled()) {startTime = Time.now();
    }
    // 判断传入的两个参数,实际上调用的 rename 是 ClientNamenodeProtocolTranslatorPB 的 rename
    // 传入两个参数如下一个是 null
    // 另一个是将申请的参数从字符串转换为 Protocolbuf 的申请
    // RenameRequestProto req = RenameRequestProto.newBuilder().setSrc(src).setDst(dst).build();
    // rpcProxy.rename(null, req).getResult();
    if (args.length != 2) { // RpcController + Message
        throw new ServiceException("Too many parameters for request. Method: ["
                + method.getName() + "]" + ", Expected: 2, Actual:"
                + args.length);
    }
    if (args[1] == null) {
        throw new ServiceException("null param while calling Method: ["
                + method.getName() + "]");
    }
    TraceScope traceScope = null;
    // if Tracing is on then start a new span for this rpc.
    // guard it in the if statement to make sure there isn't
    // any extra string manipulation.
    if (Trace.isTracing()) {
        traceScope = Trace.startSpan(method.getDeclaringClass().getCanonicalName() +
                        "." + method.getName());
    }
    // 结构申请头域,表明在什么借口上调用什么办法
    RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
    
    if (LOG.isTraceEnabled()) {LOG.trace(Thread.currentThread().getId() + ": Call ->" +
                remoteId + ":" + method.getName() +
                "{" + TextFormat.shortDebugString((Message) args[1]) + "}");
    }
    // 获取理论的申请参数,Message theRequest = (Message) args[1];
    final RpcResponseWrapper val;
    try {
        // 真正将数据发送给远端服务!!!val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
                new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
                fallbackToSimpleAuth);
    } catch (Throwable e) {if (LOG.isTraceEnabled()) {LOG.trace(Thread.currentThread().[PerfectMoney 下载](https://www.gendan5.com/wallet/PerfectMoney.html)getId() + ": Exception <-" +
                    remoteId + ":" + method.getName() +
                    "{" + e + "}");
        }
        if (Trace.isTracing()) {traceScope.getSpan().addTimelineAnnotation("Call got exception:" + e.getMessage());
        }
        throw new ServiceException(e);
    } finally {if (traceScope != null) traceScope.close();}
    if (LOG.isDebugEnabled()) {long callTime = Time.now() - startTime;
        LOG.debug("Call:" + method.getName() + "took" + callTime + "ms");
    }
    Message prototype = null;
    try {
        // 取得返回参数
        prototype = getReturnProtoType(method);
    } catch (Exception e) {throw new ServiceException(e);
    }
    Message returnMessage;
    try {
        // 序列化相应信息并返回
        returnMessage = prototype.newBuilderForType()
                .mergeFrom(val.theResponseRead).build(); 
        if (LOG.isTraceEnabled()) {LOG.trace(Thread.currentThread().getId() + ": Response <-" +
                    remoteId + ":" + method.getName() +
                    "{" + TextFormat.shortDebugString(returnMessage) + "}");
        } 
    } catch (Throwable e) {throw new ServiceException(e);
    }
    // 返回后果
    return returnMessage;
}
退出移动版