共计 3523 个字符,预计需要花费 9 分钟才能阅读完成。
Livy 基于 netty 构建了一个 RPC 通信层。本篇咱们来探索一下 Livy 的 RPC 层的实现细节。读者该当具备 netty 编程的基础知识。
RPC 相干的代码次要在 rsc 目录和 org.apache.livy.rsc 包中。
KryoMessageCodec
Kryo
是一种对象序列化和反序列化工具。通信单方须要相互发送音讯,livy 抉择了 Kryo 作为音讯的编解码器,并在 netty 框架中实现编码和解码接口:
class KryoMessageCodec extends ByteToMessageCodec<Object> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...}
}
当申请音讯到来时,netty 首先会调用 decode 对音讯进行解码;当音讯要发送到对端的最初关头,netty 会调用 encode 对音讯进行编码。
SaslHandler
livy 的 rpc 通信反对基于 sasl 的认证。所以在 livy 的 rpc 实现中,有一个叫 SaslHandler
的SimpleChannelInboundHandler
。在正式通信前,客户端和服务端须要通过一次认证的过程。这里不列举代码,然而将认证的过程做一个剖析。回顾一下第三篇中外围架构细节局部的时序图,一个 session 的创立过程为:livyServer 启动一个 RpcServer1
和一个 SparkSubmit(提交 driver)。这时有个细节是,livyServer 会生成一个 clientId
,记录在内存中,并把 clientId 通过配置文件传给 driver。driver 启动后要连贯RpcServer1
,就要带上这个 clientId。livy 通过SaslMessage
音讯来封装clientId
:
static class SaslMessage {
final String clientId;
final byte[] payload;
SaslMessage() {this(null, null);
}
SaslMessage(byte[] payload) {this(null, payload);
}
SaslMessage(String clientId, byte[] payload) {
this.clientId = clientId;
this.payload = payload;
}
}
driver 会先发送 SaslMessage 给RpcServer1
,livyServer 收到后,从本人内存中寻找是否存在SaslMessage.clientId
,如果存在就算认证通过了。driver 接下来才得以进一步发送其余音讯。
所以,一个 rpc 信道的建设分为未认证阶段和认证实现阶段。livy 是基于 netty 实现的通信层,咱们晓得 netty 是通过增加 pipeline 的形式增加解决环节的。在服务端实现 bind,或者客户端实现 connect 后的 pipeline 是这样的:
客户端通过发送 hello
发动 ” 认证 ”(认证的逻辑下面提到了)。认证实现后,SaslHandler
会从 pipeline 中 移除
,并增加新的 业务 handler
,称为 RpcDispatcher
。RpcDispatcher
依据性能不同有不同的实现。上面的代码片段中,SaslHandler
将本身从 netty 的 pipeline 中移除:
abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> {
...
@Override
protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg)
throws Exception {LOG.debug("Handling SASL challenge message...");
...
// If negotiation is complete, remove this handler from the pipeline, and register it with
// the Kryo instance to handle encryption if needed.
ctx.channel().pipeline().remove(this);
...
}
...
}
上面的代码片段,在 netty 中增加须要的RpcDispatcher
:
void setDispatcher(RpcDispatcher dispatcher) {Utils.checkNotNull(dispatcher);
Utils.checkState(this.dispatcher == null, "Dispatcher already set.");
this.dispatcher = dispatcher;
channel.pipeline().addLast("dispatcher", dispatcher);
dispatcher.registerRpc(channel, this);
}
RpcDispatcher
RpcDispatcher
顾名思义是一种解决申请的散发器,负责把申请分发给适合的处理函数解决。在 livy 中只有是从链路中收到的音讯都由 RpcDispatcher
散发和解决。
音讯分为 CALL
,REPLY
,ERROR
三类,从源码的 MessageHeader
看得出来:
static enum MessageType {
CALL,
REPLY,
ERROR;
}
static class MessageHeader {
final long id;
final MessageType type;
MessageHeader() {this(-1, null);
}
MessageHeader(long id, MessageType type) {
this.id = id;
this.type = type;
}
}
MessageHeader 中蕴含申请 id 和申请 type。
发动 RPC 申请一方,会将申请暂存在 rpcCalls
缓存中,应答方会返回 REPLY
或者 ERROR
。申请方的RpcDispatcher
此时解决 REPLY
,ERROR
的时候,从 rpcCalls
中找到匹配的Promise
,并激活。上面的流程展现了这个过程:
上述利用 Promise 实现了一种典型的异步申请框架
对于 CALL
类型的音讯,RpcDispatcher
采纳反射的形式,实现真正的散发动作,与许多 web 框架的做法十分相似。
在第五篇 ” 解释器的实现 ” 中,提到的 ReplDriver
就是一种RpcDispatcher
,回顾一下其中的 handle 办法:
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = {...}
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = {...}
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = {...}
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = {...}
综上所述,通过反射,RpcDispatcher
将音讯分发给对应的 handle
办法解决。
在 livy 中蕴含如下几种RpcDispatcher
:
RSCDriver
,解决通用 Job
类音讯。在 driver 侧应用ReplDriver
,继承自RSCDriver
,解决ReplJob
类音讯,在 driver 侧应用RegistrationHandler
,只解决RemoteDriverAddress
音讯。是 livyServer 在启动 driver 后,为了可能接管 driver 反向发送过去的RemoteDriverAddress
。
总结
本篇从源码角度,分析了 livy 中 rpc 通信的要害局部。livy 采纳 kryo 做编解码;在通信初期采纳 sasl 进行认证和握手;实现认证后,采纳反射实现了一套申请散发机制。此外,livy 大量采纳 netty 框架提供的 Promise,提供了一种异步 RPC 机制,也值得学习和借鉴。