关于spark:Livy探究六-RPC的实现

27次阅读

共计 3523 个字符,预计需要花费 9 分钟才能阅读完成。

Livy 基于 netty 构建了一个 RPC 通信层。本篇咱们来探索一下 Livy 的 RPC 层的实现细节。读者该当具备 netty 编程的基础知识。

RPC 相干的代码次要在 rsc 目录和 org.apache.livy.rsc 包中。

KryoMessageCodec

Kryo是一种对象序列化和反序列化工具。通信单方须要相互发送音讯,livy 抉择了 Kryo 作为音讯的编解码器,并在 netty 框架中实现编码和解码接口:

class KryoMessageCodec extends ByteToMessageCodec<Object> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...}
    
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...}
}

当申请音讯到来时,netty 首先会调用 decode 对音讯进行解码;当音讯要发送到对端的最初关头,netty 会调用 encode 对音讯进行编码。

SaslHandler

livy 的 rpc 通信反对基于 sasl 的认证。所以在 livy 的 rpc 实现中,有一个叫 SaslHandlerSimpleChannelInboundHandler。在正式通信前,客户端和服务端须要通过一次认证的过程。这里不列举代码,然而将认证的过程做一个剖析。回顾一下第三篇中外围架构细节局部的时序图,一个 session 的创立过程为:livyServer 启动一个 RpcServer1 和一个 SparkSubmit(提交 driver)。这时有个细节是,livyServer 会生成一个 clientId,记录在内存中,并把 clientId 通过配置文件传给 driver。driver 启动后要连贯RpcServer1,就要带上这个 clientId。livy 通过SaslMessage 音讯来封装clientId

static class SaslMessage {
  final String clientId;
  final byte[] payload;
  SaslMessage() {this(null, null);
  }
  SaslMessage(byte[] payload) {this(null, payload);
  }
  SaslMessage(String clientId, byte[] payload) {
    this.clientId = clientId;
    this.payload = payload;
  }
}

driver 会先发送 SaslMessage 给RpcServer1,livyServer 收到后,从本人内存中寻找是否存在SaslMessage.clientId,如果存在就算认证通过了。driver 接下来才得以进一步发送其余音讯。

所以,一个 rpc 信道的建设分为未认证阶段和认证实现阶段。livy 是基于 netty 实现的通信层,咱们晓得 netty 是通过增加 pipeline 的形式增加解决环节的。在服务端实现 bind,或者客户端实现 connect 后的 pipeline 是这样的:

客户端通过发送 hello 发动 ” 认证 ”(认证的逻辑下面提到了)。认证实现后,SaslHandler会从 pipeline 中 移除 ,并增加新的 业务 handler,称为 RpcDispatcherRpcDispatcher 依据性能不同有不同的实现。上面的代码片段中,SaslHandler将本身从 netty 的 pipeline 中移除:

abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> {
...
    @Override
    protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg)
    throws Exception {LOG.debug("Handling SASL challenge message...");
        ...
        // If negotiation is complete, remove this handler from the pipeline, and register it with
        // the Kryo instance to handle encryption if needed. 
        ctx.channel().pipeline().remove(this);
        ...
    }
...
}

上面的代码片段,在 netty 中增加须要的RpcDispatcher

void setDispatcher(RpcDispatcher dispatcher) {Utils.checkNotNull(dispatcher);
  Utils.checkState(this.dispatcher == null, "Dispatcher already set.");
  this.dispatcher = dispatcher;
  channel.pipeline().addLast("dispatcher", dispatcher);
  dispatcher.registerRpc(channel, this);
}

RpcDispatcher

RpcDispatcher顾名思义是一种解决申请的散发器,负责把申请分发给适合的处理函数解决。在 livy 中只有是从链路中收到的音讯都由 RpcDispatcher 散发和解决。

音讯分为 CALLREPLYERROR 三类,从源码的 MessageHeader 看得出来:

static enum MessageType {
  CALL,
  REPLY,
  ERROR;
}
static class MessageHeader {
  final long id;
 final MessageType type;
 MessageHeader() {this(-1, null);
 }
  MessageHeader(long id, MessageType type) {
    this.id = id;
 this.type = type;
 }
}

MessageHeader 中蕴含申请 id 和申请 type。

发动 RPC 申请一方,会将申请暂存在 rpcCalls 缓存中,应答方会返回 REPLY 或者 ERROR。申请方的RpcDispatcher 此时解决 REPLYERROR 的时候,从 rpcCalls 中找到匹配的Promise,并激活。上面的流程展现了这个过程:

上述利用 Promise 实现了一种典型的异步申请框架

对于 CALL 类型的音讯,RpcDispatcher采纳反射的形式,实现真正的散发动作,与许多 web 框架的做法十分相似。

在第五篇 ” 解释器的实现 ” 中,提到的 ReplDriver 就是一种RpcDispatcher,回顾一下其中的 handle 办法:

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = {...}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = {...}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = {...}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = {...}

综上所述,通过反射,RpcDispatcher将音讯分发给对应的 handle 办法解决。

在 livy 中蕴含如下几种RpcDispatcher

  • RSCDriver,解决 通用 Job类音讯。在 driver 侧应用
  • ReplDriver,继承自 RSCDriver,解决ReplJob 类音讯,在 driver 侧应用
  • RegistrationHandler,只解决 RemoteDriverAddress 音讯。是 livyServer 在启动 driver 后,为了可能接管 driver 反向发送过去的RemoteDriverAddress

总结

本篇从源码角度,分析了 livy 中 rpc 通信的要害局部。livy 采纳 kryo 做编解码;在通信初期采纳 sasl 进行认证和握手;实现认证后,采纳反射实现了一套申请散发机制。此外,livy 大量采纳 netty 框架提供的 Promise,提供了一种异步 RPC 机制,也值得学习和借鉴。

正文完
 0