基于 Netty 自定义 RPC
RPC 又称近程过程调用,咱们所知的近程调用分为两种,当初在服务间通信的形式也根本以这两种为主
1. 是基于 HTTP 的 restful 模式的狭义近程调用,以 spring could 的 feign 和 restTemplate 为代表,采纳的协定是 HTTP 的 7 层 调用协定,并且协定的参数和响应序列化根本以 JSON 格局和 XML 格局为主。
2. 是基于 TCP 的广义的 RPC 近程调用,以阿里的 Dubbo 为代表,次要通过 netty 来实现 4 层网络协议,NIO 来异步传输,序列化也能够是 JSON 或者 hessian2 以及 java 自带的序列化等,能够配置。
接下来咱们次要以第二种的 RPC 近程调用来本人实现
需要:
模拟 dubbo,消费者和提供者约定接口和协定,消费者近程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信应用 Netty
步骤
- 创立一个公共的接口我的项目以及创立接口及办法,用于消费者和提供者之间的约定。
- 创立一个提供者,该类须要监听消费者的申请,并依照约定返回数据。
- 创立一个消费者,该类须要通明的调用本人不存在的办法,外部须要应用 Netty 申请提供者返回数据
1. 公共模块
包目录构造如下:
首先,在公共模块中增加 netty 的 maven 依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
提供者 (服务端) 及消费者(客户端)工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者间接通过接口来进行 TCP 通信及肯定的协定定制获取提供者的实现返回值
接口的定义
/**
* @author 振帅
* @create 2021/1/13 2:10
* @description: IUserService
* 一个一般的接口,参数是反对序列化的 String 类型,返回值同理
*/
public interface IUserService {public String sayHello(String msg);
}
2. 提供者的实现(服务端)
包目录构造如下:
ServerBoot :启动类,启动服务
UserServiceHandler:自定义的业务处理器
UserServiceImpl:公共模块接口的实现
pom.xml 文件须要引入公共模块
<dependencies>
<dependency>
<groupId>com.lagou</groupId>
<artifactId>rpc_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
2.1 接口的实现
/**
* @author 振帅
* @create 2021/1/13 2:12
* @description: UserServiceImpl
*/
public class UserServiceImpl implements IUserService {
// 未来客户端要近程调用的办法
public String sayHello(String msg) {System.out.println("==>" + msg);
return "服务器返回数据 :" + msg;
}
// 创立一个办法启动服务器
public static void startServer(String ip, int port) throws InterruptedException {
//1. 创立两个线程池对象
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
//2. 创立服务端的启动疏导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3. 配置启动疏导对象
serverBootstrap.group(bossGroup,workGroup)
// 设置通道为 NIO
.channel(NioServerSocketChannel.class)
// 创立监听 channel
.childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 获取管道对象
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 给管道对象 pipLine 设置编码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
// 把咱们自定义 ChannelHandler 增加到通道中
pipeline.addLast(new UserServiceHandler());
}
});
//4. 绑定端口
serverBootstrap.bind(port).sync();}
}
netty 服务端启动步骤
- 创立两个线程池对象(NioEventLoopGroup)
1)bossGroup: 负责接管用户连贯, 监听客户端申请
2)workGroup: 负责解决用户的 io 读写操作, 解决与客户端的数据通讯
- 创立启动疏导类
- 设置启动疏导类
1)两个线程池增加到组中
2)设置一个通道类型 NIO
3)绑定一个初始化监听
- 绑定端口
- 敞开通道
NioEventLoopGroup
** 一个 Netty 服务端启动时,通常会有两个 NioEventLoopGroup:** 一个是监听线程组,次要是监听客户端申请,另一个是工作线程组,次要是解决与客户端的数据通讯。
Netty 客户端只有一个 NioEventLoopGroup,就是用来解决与服务端通信的线程组。
NioEventLoopGroup 能够了解为一个线程池,外部保护了一组线程,每个线程负责解决多个 Channel 上的事件,而一个 Channel 只对应于一个线程,这样能够回避多线程下的数据同步问题。
Channel
- Channel,示意一个连贯,能够了解为每一个申请,就是一个 Channel。
- ChannelHandler,外围解决业务就在这里,用于解决业务申请。
- ChannelHandlerContext,用于传输业务数据。
- ChannelPipeline,用于保留处理过程须要用到的 ChannelHandler 和 ChannelHandlerContext。
ServerBootstrap
服务端的启动疏导对象
客户端的启动疏导对象为Bootstrap
ChannelHandler
起源:https://www.cnblogs.com/qdhxh…
ChannelHandler:外围解决业务
ChannelHandler 下次要是两个子接口
ChannelInboundHandler(入栈): 解决输出数据和 Channel 状态类型扭转。
适配器: ChannelInboundHandlerAdapter(适配器设计模式)
罕用的: SimpleChannelInboundHandler
ChannelOutboundHandler(出栈): 解决输入数据
适配器: ChannelOutboundHandlerAdapter
每一个 Handler 都肯定会解决出栈或者入栈(可能两者都解决数据), 例如对于入栈的 Handler 可能会继承 SimpleChannelInboundHandler 或者 ChannelInboundHandlerAdapter,
而 SimpleChannelInboundHandler 又是继承于 ChannelInboundHandlerAdapter,最大的区别在于 SimpleChannelInboundHandler 会对没有外界援用的资源进行肯定的清理,
并且入栈的音讯能够通过泛型来规定。
这里为什么有设配器模式呢?
咱们在写自定义 Handel 时候, 很少会间接实现下面两个接口, 因为接口中有很多默认办法须要实现, 所以这里就采纳了设配器模式,ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 就是设配器模式的产物, 让它去实现下面接口, 实现它所有办法。那么你本人写自定义 Handel 时, 只有 继承 它, 就毋庸重写下面接口的所有办法了。
ChannelInitializer
实现了 ChannelHandler 创立 serverbootstrap 的时候会常常用到,它用于对刚刚接管的 channel 进行初始化
ChannelPipeline
管道对象
ChannelPipeline 类是 ChannelHandler 实例对象的链表,用于解决或截获通道的接管和发送数据。它提供了一种高级的截取过滤模式(相似 serverlet 中的 filter 性能),让用户能够在 ChannelPipeline 中齐全管制一个事件以及如何解决 ChannelHandler 与 ChannelPipeline 的交互。
对于每个新的通道 Channel,都会创立一个新的 ChannelPipeline,并将器 pipeline 附加到 channel 中。
// 在 ChannelPipeline 的第一个地位增加 ChannelHandler
addFirst(...)
// 在 ChannelPipeline 的开端增加 ChannelHandler
addLast(...)
网络只能传输字节数据,
netty 发送或接管音讯后,必须将音讯数据从一种模式转化为另一种。
接管音讯后,须要将音讯从字节码转成 java 对象(由某种解码器解码);
发送音讯前,须要将 java 对象转成字节(由某种类型的编码器进行编码)。
// 给管道对象 pipLine 设置编码解码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
// 把咱们自定义 ChannelHandler 增加到通道中
pipeline.addLast(new UserServiceHandler());
2.2 自定义 ChannelHandler
继承 ChannelInboundHandlerAdapter 重写 channelRead 办法,当客户端读取数据时,该办法会被调用
/**
* @author 振帅
* @create 2021/1/13 2:27
* @description: UserServiceHandler 自定义的业务处理器
*/
public class UserServiceHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端读取数据时,该办法会被调用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 留神 客户端未来发送申请的时候会传递一个参数:UserService#sayHello#are you ok
//1. 判断以后的申请是否合乎规定
if (msg.toString().startsWith("UserService")) {
//2. 如果合乎规定,调用实现类获取一个 result
UserServiceImpl userService = new UserServiceImpl();
String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
//3. 把调用实现类的办法获取的后果写到客户端
ctx.writeAndFlush(result);
}
}
}
ChannelHandlerContext
上下文对象 存储 handler 信息 写操作
咱们先约定客户端传递参数的格局为:UserService#sayHello# + msg;
所以这里须要 msg.toString().substring(msg.toString().lastIndexOf(“#”) + 1)来获取 msg 的信息。
2.3 服务端的启动
/**
* 启动类
*/
public class ServerBoot {public static void main(String[] args) throws InterruptedException {
// 启动服务器
UserServiceImpl.startServer("127.0.0.1",8998);
}
}
3. 消费者的实现(客户端)
包目录构造如下:
ConsumerBoot :启动类,启动服务
RPCConsumer:消费者
UserClientHandler:自定义 Handler
3.1RPCConsumer 的实现
次要有以下几个步骤
- 创立一个线程池对象 — 它要解决咱们自定义事件
- 申明一个自定义事件处理器 UserClientHandler
- 编写办法,初始化客户端(创立连接池 bootStrap 设置 bootStrap 连贯服务器)
- 编写一个办法,应用 jdk 动静代理创建对象
/**
* 消费者
*/
public class RPCConsumer {
/**
* 1. 创立一个线程池对象 -- 它要解决咱们自定义事件
*/
private static ExecutorService executorService =
// 线程池线程数以以后 CPU 核数为准
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 2. 申明一个自定义事件处理器 UserClientHandler
*/
private static UserClientHandler userClientHandler;
/**
* 3. 编写办法,初始化客户端(创立连接池 bootStrap 设置 bootStrap 连贯服务器)*/
public static void initClient() throws InterruptedException {// 1). 初始化 UserClientHandler
userClientHandler = new UserClientHandler();
// 2). 创立连接池对象
NioEventLoopGroup group = new NioEventLoopGroup();
// 3). 创立客户端的疏导对象
Bootstrap bootstrap = new Bootstrap();
// 4). 配置疏导对象
bootstrap.group(group)
// 设置通道为 NIO
.channel(NioSocketChannel.class)
// 设置申请协定为 TCP
.option(ChannelOption.TCP_NODELAY,true)
// 监听 channel 并初始化
.handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {
// 获取管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 设置编码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
// 增加自定义事件处理器
pipeline.addLast(userClientHandler);
}
});
// 5). 连贯服务器
bootstrap.connect("127.0.0.1",8998).sync();}
/**
* 4. 编写一个办法,应用 jdk 动静代理创建对象
*/
public static Object createProxy(Class<?> serviceClass, final String providerParam) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass},
new InvocationHandler() {public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1. 初始化客户端 client
if (userClientHandler ==null) {initClient();
}
//2. 给 UserClientHandler 设置 param 参数
userClientHandler.setParam(providerParam + args[0]);
//3. 应用线程池,开启一个线程解决 call() 写操作 并返回后果
Object result = executorService.submit(userClientHandler).get();
//4.return 后果
return result;
}
});
}
}
Executors
次要用于提供线程池相干的操作,提供了一系列工厂办法用于创立线程池,返回的线程池都实现了 ExecutorService 接口。
// 创立固定数目线程的线程池。public static ExecutorService newFiexedThreadPool(int Threads)
Java 通过 Executors 提供四种线程池,别离为:
newCachedThreadPool创立一个可缓存线程池,如果线程池长度超过解决须要,可灵便回收闲暇线程,若无可回收,则新建线程。
newFixedThreadPool 创立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中期待。
newScheduledThreadPool 创立一个定长线程池,反对定时及周期性工作执行。
newSingleThreadExecutor 创立一个单线程化的线程池,它只会用惟一的工作线程来执行工作,保障所有工作依照指定程序 (FIFO, LIFO, 优先级) 执行。
ExecutorService
Runtime.getRuntime().availableProcessors()
线程池线程数以以后 CPU 核数为准
Runtime.getRuntime().availableProcessors()办法询问 jvm,jvm 去问操作系统,操作系统去问硬件。
3.2 初始化客户端
和服务端的相似,不过客户端只须要创立一个连接池对象
客户端的疏导对象是BootStrap, 而服务端是ServerBootstrap
option 次要是针对 boss 线程组,child 次要是针对 worker 线程组
option / handler / attr 办法
option: 设置通道的选项参数,对于服务端而言就是 ServerSocketChannel,客户端而言就是 SocketChannel;
handler: 设置主通道的处理器,对于服务端而言就是 ServerSocketChannel,也就是用来解决 Acceptor 的操作;对于客户端的 SocketChannel,次要是用来解决 业务操作;
attr: 设置通道的属性;
option / handler / attr办法都定义在 AbstractBootstrap 中,所以服务端和客户端的疏导类办法调用都是调用的父类的对应办法。
childHandler / childOption / childAttr 办法(只有服务端 ServerBootstrap 才有 child 类型的办法)
对于服务端而言,有两种通道须要解决,一种是 ServerSocketChannel:用于解决用户连贯的 accept 操作,另一种是 SocketChannel,示意对应客户端连贯。而对于客户端,个别都只有一种 channel,也就是 SocketChannel。
因而以 child 结尾的办法,都定义在 ServerBootstrap 中,示意解决或配置服务端接管到的对应客户端连贯的 SocketChannel 通道。
3.3UserClientHandler 自定义事件处理器
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
//1. 定义成员变量
private ChannelHandlerContext context;// 事件处理器上下文对象(存储 handler 信息 写操作)private String result;// 记录服务器返回的数据
private String param;// 记录将要返回给服务器的数据
//2. 实现 channelActive 客户端和服务器连贯时,该办法就主动执行
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 初始化 ChannelHandlerContext
this.context = ctx;
}
//3. 实现 channelRead 当咱们读到服务器数据时,该办法主动执行
//synchronized 同步
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 将读到的服务器的数据 msg 设置为成员变量的值
this.result = msg.toString();
// 唤醒写操作
notify();}
//4. 将客户端的数据写到服务器
public synchronized Object call() throws Exception {
//context 给服务器写数据
context.writeAndFlush(param);
wait();
return result;
}
//5. 设置参数的办法
public void setParam(String param){this.param = param;}
}
3.4 客户端的启动
public class ConsumerBoot {
// 定义参数
private static final String PROVIDER_NAME = "UserService#sayHello#";
public static void main(String[] args) throws InterruptedException {
//1. 创立代理对象
IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDER_NAME);
// 2 循环给服务器写数据
while (true) {String result = service.sayHello("are you ok!!");
System.out.println(result);
Thread.sleep(2000);
}
}
}