RPC(Remote Procedure Call)是 Hadoop 服务通信的要害库,撑持下层分布式环境下简单的过程间(Inter-Process Communication, IPC)通信逻辑,是分布式系统的根底。容许运行于一台计算机上的程序像调用本地办法一样,调用另一台计算机的子程序。
因为 RPC 服务整体常识较多,本节仅针对对 Yarn RPC 进行简略介绍,具体内容会后续开专栏介绍。
一、RPC 通信模型介绍
为什么会有 RPC 框架?
在分布式或微服务情境下,会有大量的服务间交互,如果用传统的 HTTP 协定端口来通信,须要消耗大量工夫解决网络数据交换上,还要思考编解码等问题。如下图所示。
- 客户端通过 RPC 框架的动静代理失去一个代理类实例,称为 Stub(桩)
- 客户端调用接口办法(理论是 Stub 对应的办法),Stub 会结构一个申请,包含函数名和参数
- 服务端收到这个申请后,先将服务名(函数)解析进去,查找是否有对应的服务提供者
- 服务端找到对应的实现类后,会传入参数调用
- 服务端 RPC 框架失去返回后果后,再进行封装返回给客户端
-
客户端的 Stub 收到返回值后,进行解析,返回给调用者,实现 RPC 调用。
二、Hadoop RPC 介绍
一)简介
Hadoop RPC 是 Hadoop 本人实现的一个 RPC 框架,次要有以下几个特点:
- 透明性:像调用本地办法一样调用近程办法。
- 高性能:Hadoop 各个系统均采纳 Master/Slave 构造,Master 是一个 RPC Server 用于解决各个 Slave 节点发送的申请,须要有高性能。
- 可控性:因为 JDK 中的 RPC 框架 RMI 重量级过大,且封装度太高,不不便管制和批改。因而实现了本人的 RPC 框架,以保障轻量级、高性能、可控性。
框架原理和整体执行流程与第一节介绍的 RPC 框架统一,感兴趣可深刻源码进行理解。
二)总体架构
Hadoop RPC 架构底层依附 Java 的 nio、反射、动静代理等性能实现「客户端 – 服务器(C/S)」通信模型。
下层封装供程序调用的 RPC 接口。
三、案例 demo
上面两个案例的 demo 已上传至 github。有帮忙的话点个⭐️。
https://github.com/Simon-Ace/hadoop_rpc_demo
一)RPC Writable 案例实现
1、新建一个 maven 工程,增加依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
2、定义 RPC 协定
public interface BusinessProtocol {void mkdir(String path);
String getName(String name);
long versionID = 345043000L;
}
3、定义协定实现
public class BusinessIMPL implements BusinessProtocol {
@Override
public void mkdir(String path) {System.out.println("胜利创立了文件夹:" + path);
}
@Override
public String getName(String name) {System.out.println("胜利打了招呼:hello:" + name);
return "bigdata";
}
}
4、通过 Hadoop RPC 构建一个 RPC 服务端
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
public class MyServer {public static void main(String[] args) {
try {
// 构建一个 RPC server 端,提供了一个 BussinessProtocol 协定的 BusinessIMPL 服务实现
RPC.Server server = new RPC.Builder(new Configuration())
.setProtocol(BusinessProtocol.class)
.setInstance(new BusinessIMPL())
.setBindAddress("localhost")
.setPort(6789)
.build();
server.start();} catch (IOException e) {e.printStackTrace();
}
}
}
5、构建一个 RPC 客户端
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.net.InetSocketAddress;
public class MyClient {public static void main(String[] args) {
try {
// 获取代理类实例,也就是 Stub
BusinessProtocol proxy = RPC.getProxy(BusinessProtocol.class, BusinessProtocol.versionID,
new InetSocketAddress("localhost", 6789), new Configuration());
// 通过 Stub 发送申请,理论应用就像调用本地办法一样
proxy.mkdir("/tmp/ABC");
String res = proxy.getName("Simon");
System.out.println("从 RPC 服务端接管到的返回值:" + res);
} catch (IOException e) {e.printStackTrace();
}
}
}
6、测试,先启动服务端,再启动客户端
服务端输入
胜利创立了文件夹:/tmp/ABC
胜利打了招呼:hello:Simon
客户端输入
从 RPC 服务端接管到的返回值:bigdata
二)RPC Protobuf 案例实现
我的项目构造如下
对 proto 文件格式不相熟的同学,参考上一篇文章《2-1 Yarn 根底库概述》
MyResourceTrackerMessage.proto
定义数据格式
syntax = "proto3";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerMessageProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
message MyRegisterNodeManagerRequestProto {
string hostname = 1;
int32 cpu = 2;
int32 memory = 3;
}
message MyRegisterNodeManagerResponseProto {string flag = 1;}
MyResourceTracker.proto
定义 rpc 接口
syntax = "proto3";
import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
service MyResourceTrackerService {rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);
}
2、对 proto 文件编译,生成 java 类
# 在我的项目根目录执行,门路依照本人的进行批改
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.proto
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto
3、定义调用办法接口 MyResourceTracker
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;
public interface MyResourceTracker {MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;
}
4、对调用办法接口的实现(服务端)
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
public class MyResourceTrackerImpl implements MyResourceTracker {
@Override
public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {
// 输入注册的音讯
String hostname = request.getHostname();
int cpu = request.getCpu();
int memory = request.getMemory();
System.out.println("NodeManager 的注册音讯:hostname =" + hostname + ", cpu =" + cpu + ", memory =" + memory);
// 省略解决逻辑
// 构建一个响应对象,用于返回
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();
// 间接返回 True
builder.setFlag("true");
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();
return response;
}
}
5、编写 proto 的协定接口
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import org.apache.hadoop.ipc.ProtocolInfo;
@ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1)
public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface {}
6、编写 proto 的协定接口实现(服务端)
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {
final private MyResourceTracker server;
public MyResourceTrackerServerSidePB(MyResourceTracker server) {this.server = server;}
@Override
public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {
try {return server.registerNodeManager(request);
} catch (Exception e) {e.printStackTrace();
}
return null;
}
}
7、RPC Server 的实现
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import java.io.IOException;
public class ProtobufRpcServer {public static void main(String[] args) throws IOException {Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);
// 构建 Rpc Server
RPC.Server server = new RPC.Builder(conf)
.setProtocol(MyResourceTrackerPB.class)
.setInstance(MyResourceTrackerProto.MyResourceTrackerService
.newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl())))
.setBindAddress("localhost")
.setPort(9998)
.setNumHandlers(1)
.setVerbose(true)
.build();
// Rpc Server 启动
server.start();}
}
8、RPC Client 的实现
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import java.io.IOException;
import java.net.InetSocketAddress;
public class ProtobufRpcClient {public static void main(String[] args) throws IOException {
// 设置 RPC 引擎为 ProtobufRpcEngine
Configuration conf = new Configuration();
String hostname = "localhost";
int port = 9998;
RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);
// 获取代理
MyResourceTrackerPB protocolProxy = RPC
.getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);
// 构建申请对象
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =
builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();
// 发送 RPC 申请,获取响应
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;
try {response = protocolProxy.registerNodeManager(null, bigdata02);
} catch (ServiceException e) {e.printStackTrace();
}
// 解决响应
String flag = response.getFlag();
System.out.println("最终注册后果:flag =" + flag);
}
}
9、测试
先启动服务端,在启动客户端。
四、总结
本节介绍了 Hadoop 底层通信库 RPC。首先介绍了 RPC 的框架和原理,之后对 Hadoop 本人实现的 RPC 进行了介绍,并给出了两个 demo 实际。
强烈建议理解基础知识后,跟着 demo 实现一个案例进去,能够更好的帮忙你了解。
文中 Demo:https://github.com/Simon-Ace/hadoop_rpc_demo
参考文章:
YARN-RPC 网络通信架构设计
YARN- 高并发 RPC 源码实现
[Hadoop3.2.1【HDFS】源码剖析 : RPC 原理 [八] Client 端实现 & 源码](https://blog.csdn.net/zhanglo…)
Hadoop RPC 机制详解
Hadoop2 源码剖析-RPC 摸索实战
《Hadoop 技术底细 – 深刻解析 Yarn 结构设计与实现原理》3.3 节