基于Netty自定义RPC

RPC又称近程过程调用,咱们所知的近程调用分为两种,当初在服务间通信的形式也根本以这两种为主

1.是基于HTTP的restful模式的狭义近程调用,以spring could的feign和restTemplate为代表,采纳的协定是HTTP的7层 调用协定,并且协定的参数和响应序列化根本以JSON格局和XML格局为主。

2.是基于TCP的广义的RPC近程调用,以阿里的Dubbo为代表,次要通过netty来实现4层网络协议,NIO来异步传输, 序列化也能够是JSON或者hessian2以及java自带的序列化等,能够配置。

接下来咱们次要以第二种的RPC近程调用来本人实现

需要:

模拟 dubbo,消费者和提供者约定接口和协定,消费者近程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信应用 Netty

步骤

  1. 创立一个公共的接口我的项目以及创立接口及办法,用于消费者和提供者之间的约定。
  2. 创立一个提供者,该类须要监听消费者的申请,并依照约定返回数据。
  3. 创立一个消费者,该类须要通明的调用本人不存在的办法,外部须要应用 Netty 申请提供者返回数据

1.公共模块

包目录构造如下:

首先,在公共模块中增加netty的maven依赖

<dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.1.16.Final</version></dependency>

提供者(服务端)及消费者(客户端)工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者间接通过接口来进行TCP通信及肯定的协定定制获取提供者的实现返回值

接口的定义

/** * @author 振帅 * @create 2021/1/13 2:10 * @description: IUserService  * 一个一般的接口,参数是反对序列化的String类型,返回值同理 */public interface IUserService {    public String sayHello(String msg);}

2.提供者的实现(服务端)

包目录构造如下:

ServerBoot :启动类,启动服务

UserServiceHandler:自定义的业务处理器

UserServiceImpl:公共模块接口的实现

pom.xml文件须要引入公共模块

<dependencies>    <dependency>        <groupId>com.lagou</groupId>        <artifactId>rpc_common</artifactId>        <version>1.0-SNAPSHOT</version>        </dependency></dependencies>

2.1接口的实现

/** * @author 振帅 * @create 2021/1/13 2:12 * @description: UserServiceImpl */public class UserServiceImpl implements IUserService {    //未来客户端要近程调用的办法    public String sayHello(String msg) {        System.out.println("==>" + msg);        return "服务器返回数据 :" + msg;    }    //创立一个办法启动服务器    public static void startServer(String ip, int port) throws InterruptedException {        //1.创立两个线程池对象        NioEventLoopGroup bossGroup = new NioEventLoopGroup();        NioEventLoopGroup workGroup = new NioEventLoopGroup();        //2.创立服务端的启动疏导对象        ServerBootstrap serverBootstrap = new ServerBootstrap();        //3.配置启动疏导对象        serverBootstrap.group(bossGroup,workGroup)                //设置通道为NIO                .channel(NioServerSocketChannel.class)                //创立监听channel                .childHandler(new ChannelInitializer<NioSocketChannel>() {                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {                        //获取管道对象                        ChannelPipeline pipeline = nioSocketChannel.pipeline();                        //给管道对象pipLine 设置编码                        pipeline.addLast(new StringEncoder());                        pipeline.addLast(new StringDecoder());                        //把咱们自定义ChannelHandler增加到通道中                        pipeline.addLast(new UserServiceHandler());                    }                });        //4.绑定端口        serverBootstrap.bind(port).sync();    }}

netty服务端启动步骤

  1. 创立两个线程池对象(NioEventLoopGroup)

    1)bossGroup:负责接管用户连贯,监听客户端申请

    2)workGroup:负责解决用户的io读写操作,解决与客户端的数据通讯

  2. 创立启动疏导类
  3. 设置启动疏导类

    1)两个线程池增加到组中

    2)设置一个通道类型 NIO

    3)绑定一个初始化监听

  4. 绑定端口
  5. 敞开通道

NioEventLoopGroup

 **一个Netty服务端启动时,通常会有两个NioEventLoopGroup:**一个是监听线程组,次要是监听客户端申请,另一个是工作线程组,次要是解决与客户端的数据通讯。 

Netty客户端只有一个NioEventLoopGroup,就是用来解决与服务端通信的线程组。

NioEventLoopGroup能够了解为一个线程池,外部保护了一组线程,每个线程负责解决多个Channel上的事件,而一个Channel只对应于一个线程,这样能够回避多线程下的数据同步问题。

Channel

  1. Channel,示意一个连贯,能够了解为每一个申请,就是一个Channel。
  2. ChannelHandler,外围解决业务就在这里,用于解决业务申请。
  3. ChannelHandlerContext,用于传输业务数据。
  4. ChannelPipeline,用于保留处理过程须要用到的ChannelHandler和ChannelHandlerContext。

ServerBootstrap

服务端的启动疏导对象

客户端的启动疏导对象为Bootstrap

ChannelHandler

起源:https://www.cnblogs.com/qdhxh...

ChannelHandler:外围解决业务

ChannelHandler下次要是两个子接口

ChannelInboundHandler(入栈): 解决输出数据和Channel状态类型扭转。

适配器: ChannelInboundHandlerAdapter(适配器设计模式)

罕用的: SimpleChannelInboundHandler

ChannelOutboundHandler(出栈): 解决输入数据

适配器: ChannelOutboundHandlerAdapter

每一个Handler都肯定会解决出栈或者入栈(可能两者都解决数据),例如对于入栈的Handler可能会继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,

而SimpleChannelInboundHandler又是继承于ChannelInboundHandlerAdapter,最大的区别在于SimpleChannelInboundHandler会对没有外界援用的资源进行肯定的清理,

并且入栈的音讯能够通过泛型来规定。

这里为什么有设配器模式呢?

咱们在写自定义Handel时候,很少会间接实现下面两个接口,因为接口中有很多默认办法须要实现,所以这里就采纳了设配器模式,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter就是设配器模式的产物,让它去实现下面接口,实现它所有办法。那么你本人写自定义Handel时,只有继承它,就毋庸重写下面接口的所有办法了。

ChannelInitializer

实现了ChannelHandler创立serverbootstrap的时候会常常用到,它用于对刚刚接管的channel进行初始化

ChannelPipeline

管道对象

ChannelPipeline类是ChannelHandler实例对象的链表,用于解决或截获通道的接管和发送数据。它提供了一种高级的截取过滤模式(相似serverlet中的filter性能),让用户能够在ChannelPipeline中齐全管制一个事件以及如何解决ChannelHandler与ChannelPipeline的交互。

对于每个新的通道Channel,都会创立一个新的ChannelPipeline,并将器pipeline附加到channel中。

//在ChannelPipeline的第一个地位增加ChannelHandleraddFirst(...)   //在ChannelPipeline的开端增加ChannelHandleraddLast(...)   

网络只能传输字节数据,

netty发送或接管音讯后,必须将音讯数据从一种模式转化为另一种。

接管音讯后,须要将音讯从字节码转成java对象(由某种解码器解码);

发送音讯前,须要将java对象转成字节(由某种类型的编码器进行编码)。

//给管道对象pipLine 设置编码解码pipeline.addLast(new StringEncoder());pipeline.addLast(new StringDecoder());
//把咱们自定义ChannelHandler增加到通道中pipeline.addLast(new UserServiceHandler());

2.2自定义ChannelHandler

继承ChannelInboundHandlerAdapter 重写channelRead办法,当客户端读取数据时,该办法会被调用

/** * @author 振帅 * @create 2021/1/13 2:27 * @description: UserServiceHandler 自定义的业务处理器 */public class UserServiceHandler extends ChannelInboundHandlerAdapter {    /**     * 当客户端读取数据时,该办法会被调用     * @param ctx     * @param msg     * @throws Exception     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //留神 客户端未来发送申请的时候会传递一个参数:UserService#sayHello#are you ok        //1.判断以后的申请是否合乎规定        if (msg.toString().startsWith("UserService")) {            //2.如果合乎规定,调用实现类获取一个result            UserServiceImpl userService = new UserServiceImpl();            String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));            //3.把调用实现类的办法获取的后果写到客户端            ctx.writeAndFlush(result);        }    }}

ChannelHandlerContext

上下文对象 存储handler信息 写操作

咱们先约定客户端传递参数的格局为:UserService#sayHello# + msg;

所以这里须要 msg.toString().substring(msg.toString().lastIndexOf("#") + 1)来获取msg的信息。

2.3服务端的启动

/** *启动类 */public class ServerBoot {    public static void main(String[] args) throws InterruptedException {       //启动服务器        UserServiceImpl.startServer("127.0.0.1",8998);    }}

3.消费者的实现(客户端)

包目录构造如下:

ConsumerBoot :启动类,启动服务

RPCConsumer:消费者

UserClientHandler:自定义Handler

3.1RPCConsumer的实现

次要有以下几个步骤

  1. 创立一个线程池对象 -- 它要解决咱们自定义事件
  2. 申明一个自定义事件处理器 UserClientHandler
  3. 编写办法,初始化客户端(创立连接池 bootStrap 设置bootStrap 连贯服务器)
  4. 编写一个办法,应用jdk动静代理创建对象
/** * 消费者 */public class RPCConsumer {    /**     * 1.创立一个线程池对象 -- 它要解决咱们自定义事件     */    private static ExecutorService executorService =            //线程池线程数以以后CPU核数为准            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());    /**     * 2.申明一个自定义事件处理器 UserClientHandler     */    private static UserClientHandler userClientHandler;    /**     * 3.编写办法,初始化客户端(创立连接池 bootStrap 设置bootStrap 连贯服务器)     */    public static void initClient() throws InterruptedException {        // 1).初始化UserClientHandler        userClientHandler = new UserClientHandler();        // 2).创立连接池对象        NioEventLoopGroup group = new NioEventLoopGroup();        // 3).创立客户端的疏导对象        Bootstrap bootstrap = new Bootstrap();        // 4).配置疏导对象        bootstrap.group(group)                //设置通道为NIO                .channel(NioSocketChannel.class)                //设置申请协定为TCP                .option(ChannelOption.TCP_NODELAY,true)                //监听channel 并初始化                .handler(new ChannelInitializer<SocketChannel>() {                    protected void initChannel(SocketChannel socketChannel) throws Exception {                        //获取管道                        ChannelPipeline pipeline = socketChannel.pipeline();                        //设置编码                        pipeline.addLast(new StringEncoder());                        pipeline.addLast(new StringDecoder());                        //增加自定义事件处理器                        pipeline.addLast(userClientHandler);                    }                });        // 5).连贯服务器        bootstrap.connect("127.0.0.1",8998).sync();    }    /**     * 4.编写一个办法,应用jdk动静代理创建对象     */    public static Object createProxy(Class<?> serviceClass, final String providerParam) {        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),                new Class[]{serviceClass},                new InvocationHandler() {                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                        //1.初始化客户端client                        if (userClientHandler ==null) {                            initClient();                        }                        //2.给UserClientHandler 设置param参数                        userClientHandler.setParam(providerParam + args[0]);                        //3.应用线程池,开启一个线程解决call() 写操作 并返回后果                        Object result = executorService.submit(userClientHandler).get();                        //4.return 后果                        return result;                    }                });    }}

Executors

次要用于提供线程池相干的操作,提供了一系列工厂办法用于创立线程池,返回的线程池都实现了ExecutorService接口。

//创立固定数目线程的线程池。public static ExecutorService newFiexedThreadPool(int Threads) 

Java通过Executors提供四种线程池,别离为:
newCachedThreadPool创立一个可缓存线程池,如果线程池长度超过解决须要,可灵便回收闲暇线程,若无可回收,则新建线程。
newFixedThreadPool 创立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中期待。
newScheduledThreadPool 创立一个定长线程池,反对定时及周期性工作执行。
newSingleThreadExecutor 创立一个单线程化的线程池,它只会用惟一的工作线程来执行工作,保障所有工作依照指定程序(FIFO, LIFO, 优先级)执行。

ExecutorService

Runtime.getRuntime().availableProcessors()

线程池线程数以以后CPU核数为准

Runtime.getRuntime().availableProcessors()办法询问jvm,jvm去问操作系统,操作系统去问硬件 。

3.2初始化客户端

和服务端的相似,不过客户端只须要创立一个连接池对象

客户端的疏导对象是BootStrap,而服务端是ServerBootstrap

option次要是针对boss线程组,child次要是针对worker线程组

option / handler / attr 办法

option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;

handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来解决Acceptor的操作;对于客户端的SocketChannel,次要是用来解决 业务操作;

attr: 设置通道的属性;

option / handler / attr办法都定义在AbstractBootstrap中, 所以服务端和客户端的疏导类办法调用都是调用的父类的对应办法。

childHandler / childOption / childAttr 办法(只有服务端ServerBootstrap才有child类型的办法)

  对于服务端而言,有两种通道须要解决, 一种是ServerSocketChannel:用于解决用户连贯的accept操作, 另一种是SocketChannel,示意对应客户端连贯。而对于客户端,个别都只有一种channel,也就是SocketChannel。

  因而以child结尾的办法,都定义在ServerBootstrap中,示意解决或配置服务端接管到的对应客户端连贯的SocketChannel通道。

3.3UserClientHandler自定义事件处理器

public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {    //1.定义成员变量    private ChannelHandlerContext context;//事件处理器上下文对象(存储handler信息 写操作)    private String result;//记录服务器返回的数据    private String param;//记录将要返回给服务器的数据    //2.实现channelActive 客户端和服务器连贯时,该办法就主动执行    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //初始化ChannelHandlerContext        this.context = ctx;    }    //3.实现channelRead 当咱们读到服务器数据时,该办法主动执行    //synchronized 同步    @Override    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //将读到的服务器的数据msg设置为成员变量的值        this.result = msg.toString();        //唤醒写操作        notify();    }    //4.将客户端的数据写到服务器    public synchronized Object call() throws Exception {        //context给服务器写数据        context.writeAndFlush(param);        wait();        return result;    }    //5.设置参数的办法    public void setParam(String  param){        this.param = param;    }}

3.4客户端的启动

public class ConsumerBoot {    //定义参数    private static final String PROVIDER_NAME = "UserService#sayHello#";    public static void main(String[] args) throws InterruptedException {        //1.创立代理对象       IUserService service  = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDER_NAME);       //2循环给服务器写数据        while (true) {            String result = service.sayHello("are you ok !!");            System.out.println(result);            Thread.sleep(2000);        }    }}