Livy基于netty构建了一个RPC通信层。本篇咱们来探索一下Livy的RPC层的实现细节。读者该当具备netty编程的基础知识。

RPC相干的代码次要在rsc目录和org.apache.livy.rsc包中。

KryoMessageCodec

Kryo是一种对象序列化和反序列化工具。通信单方须要相互发送音讯,livy抉择了Kryo作为音讯的编解码器,并在netty框架中实现编码和解码接口:

class KryoMessageCodec extends ByteToMessageCodec<Object> {    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...}        @Override    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...}}

当申请音讯到来时,netty首先会调用decode对音讯进行解码;当音讯要发送到对端的最初关头,netty会调用encode对音讯进行编码。

SaslHandler

livy的rpc通信反对基于sasl的认证。所以在livy的rpc实现中,有一个叫SaslHandlerSimpleChannelInboundHandler。在正式通信前,客户端和服务端须要通过一次认证的过程。这里不列举代码,然而将认证的过程做一个剖析。回顾一下第三篇中外围架构细节局部的时序图,一个session的创立过程为:livyServer启动一个RpcServer1和一个SparkSubmit(提交driver)。这时有个细节是,livyServer会生成一个clientId,记录在内存中,并把clientId通过配置文件传给driver。driver启动后要连贯RpcServer1,就要带上这个clientId。livy通过SaslMessage音讯来封装clientId

static class SaslMessage {  final String clientId;  final byte[] payload;  SaslMessage() {    this(null, null);  }  SaslMessage(byte[] payload) {    this(null, payload);  }  SaslMessage(String clientId, byte[] payload) {    this.clientId = clientId;    this.payload = payload;  }}

driver会先发送SaslMessage给RpcServer1,livyServer收到后,从本人内存中寻找是否存在SaslMessage.clientId,如果存在就算认证通过了。driver接下来才得以进一步发送其余音讯。

所以,一个rpc信道的建设分为未认证阶段和认证实现阶段。livy是基于netty实现的通信层,咱们晓得netty是通过增加pipeline的形式增加解决环节的。在服务端实现bind,或者客户端实现connect后的pipeline是这样的:

客户端通过发送hello发动"认证"(认证的逻辑下面提到了)。认证实现后,SaslHandler会从pipeline中移除,并增加新的业务handler,称为RpcDispatcherRpcDispatcher依据性能不同有不同的实现。上面的代码片段中,SaslHandler将本身从netty的pipeline中移除:

abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> {...    @Override    protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg)    throws Exception {        LOG.debug("Handling SASL challenge message...");        ...        // If negotiation is complete, remove this handler from the pipeline, and register it with        // the Kryo instance to handle encryption if needed.         ctx.channel().pipeline().remove(this);        ...    }...}

上面的代码片段,在netty中增加须要的RpcDispatcher

void setDispatcher(RpcDispatcher dispatcher) {  Utils.checkNotNull(dispatcher);  Utils.checkState(this.dispatcher == null, "Dispatcher already set.");  this.dispatcher = dispatcher;  channel.pipeline().addLast("dispatcher", dispatcher);  dispatcher.registerRpc(channel, this);}

RpcDispatcher

RpcDispatcher顾名思义是一种解决申请的散发器,负责把申请分发给适合的处理函数解决。在livy中只有是从链路中收到的音讯都由RpcDispatcher散发和解决。

音讯分为CALLREPLYERROR三类,从源码的MessageHeader看得出来:

static enum MessageType {  CALL,  REPLY,  ERROR;}static class MessageHeader {  final long id; final MessageType type; MessageHeader() {    this(-1, null); }  MessageHeader(long id, MessageType type) {    this.id = id; this.type = type; }}

MessageHeader中蕴含申请id和申请type。

发动RPC申请一方,会将申请暂存在rpcCalls缓存中,应答方会返回REPLY或者ERROR。申请方的RpcDispatcher此时解决REPLYERROR的时候,从rpcCalls中找到匹配的Promise,并激活。上面的流程展现了这个过程:

上述利用Promise实现了一种典型的异步申请框架

对于CALL类型的音讯,RpcDispatcher采纳反射的形式,实现真正的散发动作,与许多web框架的做法十分相似。

在第五篇"解释器的实现"中,提到的ReplDriver就是一种RpcDispatcher,回顾一下其中的handle办法:

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = {    ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = {    ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = {    ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = {    ...}

综上所述,通过反射,RpcDispatcher将音讯分发给对应的handle办法解决。

在livy中蕴含如下几种RpcDispatcher

  • RSCDriver,解决通用Job类音讯。在driver侧应用
  • ReplDriver,继承自RSCDriver,解决ReplJob类音讯,在driver侧应用
  • RegistrationHandler,只解决RemoteDriverAddress音讯。是livyServer在启动driver后,为了可能接管driver反向发送过去的RemoteDriverAddress

总结

本篇从源码角度,分析了livy中rpc通信的要害局部。livy采纳kryo做编解码;在通信初期采纳sasl进行认证和握手;实现认证后,采纳反射实现了一套申请散发机制。此外,livy大量采纳netty框架提供的Promise,提供了一种异步RPC机制,也值得学习和借鉴。