Livy基于netty构建了一个RPC通信层。本篇咱们来探索一下Livy的RPC层的实现细节。读者该当具备netty编程的基础知识。
RPC相干的代码次要在rsc目录和org.apache.livy.rsc包中。
KryoMessageCodec
Kryo
是一种对象序列化和反序列化工具。通信单方须要相互发送音讯,livy抉择了Kryo作为音讯的编解码器,并在netty框架中实现编码和解码接口:
class KryoMessageCodec extends ByteToMessageCodec<Object> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...} @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...}}
当申请音讯到来时,netty首先会调用decode对音讯进行解码;当音讯要发送到对端的最初关头,netty会调用encode对音讯进行编码。
SaslHandler
livy的rpc通信反对基于sasl的认证。所以在livy的rpc实现中,有一个叫SaslHandler
的SimpleChannelInboundHandler
。在正式通信前,客户端和服务端须要通过一次认证的过程。这里不列举代码,然而将认证的过程做一个剖析。回顾一下第三篇中外围架构细节局部的时序图,一个session的创立过程为:livyServer启动一个RpcServer1
和一个SparkSubmit(提交driver)。这时有个细节是,livyServer会生成一个clientId
,记录在内存中,并把clientId通过配置文件传给driver。driver启动后要连贯RpcServer1
,就要带上这个clientId。livy通过SaslMessage
音讯来封装clientId
:
static class SaslMessage { final String clientId; final byte[] payload; SaslMessage() { this(null, null); } SaslMessage(byte[] payload) { this(null, payload); } SaslMessage(String clientId, byte[] payload) { this.clientId = clientId; this.payload = payload; }}
driver会先发送SaslMessage给RpcServer1
,livyServer收到后,从本人内存中寻找是否存在SaslMessage.clientId
,如果存在就算认证通过了。driver接下来才得以进一步发送其余音讯。
所以,一个rpc信道的建设分为未认证阶段和认证实现阶段。livy是基于netty实现的通信层,咱们晓得netty是通过增加pipeline的形式增加解决环节的。在服务端实现bind,或者客户端实现connect后的pipeline是这样的:
客户端通过发送hello
发动"认证"(认证的逻辑下面提到了)。认证实现后,SaslHandler
会从pipeline中移除
,并增加新的业务handler
,称为RpcDispatcher
。RpcDispatcher
依据性能不同有不同的实现。上面的代码片段中,SaslHandler
将本身从netty的pipeline中移除:
abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> {... @Override protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg) throws Exception { LOG.debug("Handling SASL challenge message..."); ... // If negotiation is complete, remove this handler from the pipeline, and register it with // the Kryo instance to handle encryption if needed. ctx.channel().pipeline().remove(this); ... }...}
上面的代码片段,在netty中增加须要的RpcDispatcher
:
void setDispatcher(RpcDispatcher dispatcher) { Utils.checkNotNull(dispatcher); Utils.checkState(this.dispatcher == null, "Dispatcher already set."); this.dispatcher = dispatcher; channel.pipeline().addLast("dispatcher", dispatcher); dispatcher.registerRpc(channel, this);}
RpcDispatcher
RpcDispatcher
顾名思义是一种解决申请的散发器,负责把申请分发给适合的处理函数解决。在livy中只有是从链路中收到的音讯都由RpcDispatcher
散发和解决。
音讯分为CALL
,REPLY
,ERROR
三类,从源码的MessageHeader
看得出来:
static enum MessageType { CALL, REPLY, ERROR;}static class MessageHeader { final long id; final MessageType type; MessageHeader() { this(-1, null); } MessageHeader(long id, MessageType type) { this.id = id; this.type = type; }}
MessageHeader中蕴含申请id和申请type。
发动RPC申请一方,会将申请暂存在rpcCalls
缓存中,应答方会返回REPLY
或者ERROR
。申请方的RpcDispatcher
此时解决REPLY
,ERROR
的时候,从rpcCalls
中找到匹配的Promise
,并激活。上面的流程展现了这个过程:
上述利用Promise实现了一种典型的异步申请框架
对于CALL
类型的音讯,RpcDispatcher
采纳反射的形式,实现真正的散发动作,与许多web框架的做法十分相似。
在第五篇"解释器的实现"中,提到的ReplDriver
就是一种RpcDispatcher
,回顾一下其中的handle办法:
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = { ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = { ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = { ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = { ...}
综上所述,通过反射,RpcDispatcher
将音讯分发给对应的handle
办法解决。
在livy中蕴含如下几种RpcDispatcher
:
RSCDriver
,解决通用Job
类音讯。在driver侧应用ReplDriver
,继承自RSCDriver
,解决ReplJob
类音讯,在driver侧应用RegistrationHandler
,只解决RemoteDriverAddress
音讯。是livyServer在启动driver后,为了可能接管driver反向发送过去的RemoteDriverAddress
。
总结
本篇从源码角度,分析了livy中rpc通信的要害局部。livy采纳kryo做编解码;在通信初期采纳sasl进行认证和握手;实现认证后,采纳反射实现了一套申请散发机制。此外,livy大量采纳netty框架提供的Promise,提供了一种异步RPC机制,也值得学习和借鉴。