关于java:基于Netty自定义RPC

基于Netty自定义RPC

RPC又称近程过程调用,咱们所知的近程调用分为两种,当初在服务间通信的形式也根本以这两种为主

1.是基于HTTP的restful模式的狭义近程调用,以spring could的feign和restTemplate为代表,采纳的协定是HTTP的7层 调用协定,并且协定的参数和响应序列化根本以JSON格局和XML格局为主。

2.是基于TCP的广义的RPC近程调用,以阿里的Dubbo为代表,次要通过netty来实现4层网络协议,NIO来异步传输, 序列化也能够是JSON或者hessian2以及java自带的序列化等,能够配置。

接下来咱们次要以第二种的RPC近程调用来本人实现

需要:

​ 模拟 dubbo,消费者和提供者约定接口和协定,消费者近程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信应用 Netty

步骤

  1. 创立一个公共的接口我的项目以及创立接口及办法,用于消费者和提供者之间的约定。
  2. 创立一个提供者,该类须要监听消费者的申请,并依照约定返回数据。
  3. 创立一个消费者,该类须要通明的调用本人不存在的办法,外部须要应用 Netty 申请提供者返回数据

1.公共模块

包目录构造如下:

首先,在公共模块中增加netty的maven依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.16.Final</version>
</dependency>

提供者(服务端)及消费者(客户端)工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者间接通过接口来进行TCP通信及肯定的协定定制获取提供者的实现返回值

接口的定义

/**
 * @author 振帅
 * @create 2021/1/13 2:10
 * @description: IUserService 
 * 一个一般的接口,参数是反对序列化的String类型,返回值同理
 */
public interface IUserService {
    public String sayHello(String msg);
}

2.提供者的实现(服务端)

包目录构造如下:

ServerBoot :启动类,启动服务

UserServiceHandler:自定义的业务处理器

UserServiceImpl:公共模块接口的实现

pom.xml文件须要引入公共模块

<dependencies>
    <dependency>
        <groupId>com.lagou</groupId>
        <artifactId>rpc_common</artifactId>
        <version>1.0-SNAPSHOT</version>    
    </dependency>
</dependencies>

2.1接口的实现

/**
 * @author 振帅
 * @create 2021/1/13 2:12
 * @description: UserServiceImpl
 */
public class UserServiceImpl implements IUserService {

    //未来客户端要近程调用的办法
    public String sayHello(String msg) {
        System.out.println("==>" + msg);
        return "服务器返回数据 :" + msg;
    }

    //创立一个办法启动服务器
    public static void startServer(String ip, int port) throws InterruptedException {

        //1.创立两个线程池对象
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        //2.创立服务端的启动疏导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //3.配置启动疏导对象
        serverBootstrap.group(bossGroup,workGroup)
                //设置通道为NIO
                .channel(NioServerSocketChannel.class)
                //创立监听channel
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //获取管道对象
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();
                        //给管道对象pipLine 设置编码
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        //把咱们自定义ChannelHandler增加到通道中
                        pipeline.addLast(new UserServiceHandler());
                    }
                });

        //4.绑定端口
        serverBootstrap.bind(port).sync();
    }
}

netty服务端启动步骤

  1. 创立两个线程池对象(NioEventLoopGroup)

    1)bossGroup:负责接管用户连贯,监听客户端申请

    2)workGroup:负责解决用户的io读写操作,解决与客户端的数据通讯

  2. 创立启动疏导类
  3. 设置启动疏导类

    1)两个线程池增加到组中

    2)设置一个通道类型 NIO

    3)绑定一个初始化监听

  4. 绑定端口
  5. 敞开通道

NioEventLoopGroup

 **一个Netty服务端启动时,通常会有两个NioEventLoopGroup:**一个是监听线程组,次要是监听客户端申请,另一个是工作线程组,次要是解决与客户端的数据通讯。 

Netty客户端只有一个NioEventLoopGroup,就是用来解决与服务端通信的线程组。

NioEventLoopGroup能够了解为一个线程池,外部保护了一组线程,每个线程负责解决多个Channel上的事件,而一个Channel只对应于一个线程,这样能够回避多线程下的数据同步问题。

Channel

  1. Channel,示意一个连贯,能够了解为每一个申请,就是一个Channel。
  2. ChannelHandler,外围解决业务就在这里,用于解决业务申请。
  3. ChannelHandlerContext,用于传输业务数据。
  4. ChannelPipeline,用于保留处理过程须要用到的ChannelHandler和ChannelHandlerContext。

ServerBootstrap

服务端的启动疏导对象

客户端的启动疏导对象为Bootstrap

ChannelHandler

起源:https://www.cnblogs.com/qdhxh…

ChannelHandler:外围解决业务

ChannelHandler下次要是两个子接口

ChannelInboundHandler(入栈): 解决输出数据和Channel状态类型扭转。

​ 适配器: ChannelInboundHandlerAdapter(适配器设计模式)

​ 罕用的: SimpleChannelInboundHandler

ChannelOutboundHandler(出栈): 解决输入数据

​ 适配器: ChannelOutboundHandlerAdapter

每一个Handler都肯定会解决出栈或者入栈(可能两者都解决数据),例如对于入栈的Handler可能会继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,

而SimpleChannelInboundHandler又是继承于ChannelInboundHandlerAdapter,最大的区别在于SimpleChannelInboundHandler会对没有外界援用的资源进行肯定的清理,

并且入栈的音讯能够通过泛型来规定。

这里为什么有设配器模式呢?

咱们在写自定义Handel时候,很少会间接实现下面两个接口,因为接口中有很多默认办法须要实现,所以这里就采纳了设配器模式,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter就是设配器模式的产物,让它去实现下面接口,实现它所有办法。那么你本人写自定义Handel时,只有继承它,就毋庸重写下面接口的所有办法了。

ChannelInitializer

实现了ChannelHandler创立serverbootstrap的时候会常常用到,它用于对刚刚接管的channel进行初始化

ChannelPipeline

管道对象

ChannelPipeline类是ChannelHandler实例对象的链表,用于解决或截获通道的接管和发送数据。它提供了一种高级的截取过滤模式(相似serverlet中的filter性能),让用户能够在ChannelPipeline中齐全管制一个事件以及如何解决ChannelHandler与ChannelPipeline的交互。

​ 对于每个新的通道Channel,都会创立一个新的ChannelPipeline,并将器pipeline附加到channel中。

//在ChannelPipeline的第一个地位增加ChannelHandler
addFirst(...)   
//在ChannelPipeline的开端增加ChannelHandler
addLast(...)   

网络只能传输字节数据,

netty发送或接管音讯后,必须将音讯数据从一种模式转化为另一种。

接管音讯后,须要将音讯从字节码转成java对象(由某种解码器解码);

发送音讯前,须要将java对象转成字节(由某种类型的编码器进行编码)。

//给管道对象pipLine 设置编码解码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//把咱们自定义ChannelHandler增加到通道中
pipeline.addLast(new UserServiceHandler());

2.2自定义ChannelHandler

继承ChannelInboundHandlerAdapter 重写channelRead办法,当客户端读取数据时,该办法会被调用

/**
 * @author 振帅
 * @create 2021/1/13 2:27
 * @description: UserServiceHandler 自定义的业务处理器
 */
public class UserServiceHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端读取数据时,该办法会被调用
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //留神 客户端未来发送申请的时候会传递一个参数:UserService#sayHello#are you ok
        //1.判断以后的申请是否合乎规定
        if (msg.toString().startsWith("UserService")) {
            //2.如果合乎规定,调用实现类获取一个result
            UserServiceImpl userService = new UserServiceImpl();
            String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            //3.把调用实现类的办法获取的后果写到客户端
            ctx.writeAndFlush(result);
        }

    }
}

ChannelHandlerContext

上下文对象 存储handler信息 写操作

咱们先约定客户端传递参数的格局为:UserService#sayHello# + msg;

所以这里须要 msg.toString().substring(msg.toString().lastIndexOf(“#”) + 1)来获取msg的信息。

2.3服务端的启动

/**
 *启动类
 */
public class ServerBoot {
    public static void main(String[] args) throws InterruptedException {
       //启动服务器
        UserServiceImpl.startServer("127.0.0.1",8998);
    }
}

3.消费者的实现(客户端)

包目录构造如下:

ConsumerBoot :启动类,启动服务

RPCConsumer:消费者

UserClientHandler:自定义Handler

3.1RPCConsumer的实现

次要有以下几个步骤

  1. 创立一个线程池对象 — 它要解决咱们自定义事件
  2. 申明一个自定义事件处理器 UserClientHandler
  3. 编写办法,初始化客户端(创立连接池 bootStrap 设置bootStrap 连贯服务器)
  4. 编写一个办法,应用jdk动静代理创建对象
/**
 * 消费者
 */
public class RPCConsumer {

    /**
     * 1.创立一个线程池对象 -- 它要解决咱们自定义事件
     */
    private static ExecutorService executorService =
            //线程池线程数以以后CPU核数为准
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());


    /**
     * 2.申明一个自定义事件处理器 UserClientHandler
     */
    private static UserClientHandler userClientHandler;


    /**
     * 3.编写办法,初始化客户端(创立连接池 bootStrap 设置bootStrap 连贯服务器)
     */
    public static void initClient() throws InterruptedException {
        // 1).初始化UserClientHandler
        userClientHandler = new UserClientHandler();
        // 2).创立连接池对象
        NioEventLoopGroup group = new NioEventLoopGroup();
        // 3).创立客户端的疏导对象
        Bootstrap bootstrap = new Bootstrap();
        // 4).配置疏导对象
        bootstrap.group(group)
                //设置通道为NIO
                .channel(NioSocketChannel.class)
                //设置申请协定为TCP
                .option(ChannelOption.TCP_NODELAY,true)
                //监听channel 并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //获取管道
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        //设置编码
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        //增加自定义事件处理器
                        pipeline.addLast(userClientHandler);
                    }
                });
        // 5).连贯服务器
        bootstrap.connect("127.0.0.1",8998).sync();
    }

    /**
     * 4.编写一个办法,应用jdk动静代理创建对象
     */
    public static Object createProxy(Class<?> serviceClass, final String providerParam) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serviceClass},
                new InvocationHandler() {
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        //1.初始化客户端client
                        if (userClientHandler ==null) {
                            initClient();
                        }

                        //2.给UserClientHandler 设置param参数
                        userClientHandler.setParam(providerParam + args[0]);

                        //3.应用线程池,开启一个线程解决call() 写操作 并返回后果
                        Object result = executorService.submit(userClientHandler).get();
                        //4.return 后果
                        return result;
                    }
                });
    }

}

Executors

次要用于提供线程池相干的操作,提供了一系列工厂办法用于创立线程池,返回的线程池都实现了ExecutorService接口。

//创立固定数目线程的线程池。
public static ExecutorService newFiexedThreadPool(int Threads) 

Java通过Executors提供四种线程池,别离为:
newCachedThreadPool创立一个可缓存线程池,如果线程池长度超过解决须要,可灵便回收闲暇线程,若无可回收,则新建线程。
newFixedThreadPool 创立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中期待。
newScheduledThreadPool 创立一个定长线程池,反对定时及周期性工作执行。
newSingleThreadExecutor 创立一个单线程化的线程池,它只会用惟一的工作线程来执行工作,保障所有工作依照指定程序(FIFO, LIFO, 优先级)执行。

ExecutorService

Runtime.getRuntime().availableProcessors()

线程池线程数以以后CPU核数为准

Runtime.getRuntime().availableProcessors()办法询问jvm,jvm去问操作系统,操作系统去问硬件 。

3.2初始化客户端

和服务端的相似,不过客户端只须要创立一个连接池对象

客户端的疏导对象是BootStrap,而服务端是ServerBootstrap

option次要是针对boss线程组,child次要是针对worker线程组

option / handler / attr 办法

option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;

handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来解决Acceptor的操作;对于客户端的SocketChannel,次要是用来解决 业务操作;

attr: 设置通道的属性;

option / handler / attr办法都定义在AbstractBootstrap中, 所以服务端和客户端的疏导类办法调用都是调用的父类的对应办法。

childHandler / childOption / childAttr 办法(只有服务端ServerBootstrap才有child类型的办法)

  对于服务端而言,有两种通道须要解决, 一种是ServerSocketChannel:用于解决用户连贯的accept操作, 另一种是SocketChannel,示意对应客户端连贯。而对于客户端,个别都只有一种channel,也就是SocketChannel。

  因而以child结尾的办法,都定义在ServerBootstrap中,示意解决或配置服务端接管到的对应客户端连贯的SocketChannel通道。

3.3UserClientHandler自定义事件处理器

public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    //1.定义成员变量
    private ChannelHandlerContext context;//事件处理器上下文对象(存储handler信息 写操作)
    private String result;//记录服务器返回的数据
    private String param;//记录将要返回给服务器的数据

    //2.实现channelActive 客户端和服务器连贯时,该办法就主动执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //初始化ChannelHandlerContext
        this.context = ctx;
    }

    //3.实现channelRead 当咱们读到服务器数据时,该办法主动执行
    //synchronized 同步
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //将读到的服务器的数据msg设置为成员变量的值
        this.result = msg.toString();
        //唤醒写操作
        notify();
    }

    //4.将客户端的数据写到服务器
    public synchronized Object call() throws Exception {
        //context给服务器写数据
        context.writeAndFlush(param);
        wait();
        return result;
    }

    //5.设置参数的办法
    public void setParam(String  param){
        this.param = param;
    }

}

3.4客户端的启动

public class ConsumerBoot {

    //定义参数
    private static final String PROVIDER_NAME = "UserService#sayHello#";

    public static void main(String[] args) throws InterruptedException {
        //1.创立代理对象
       IUserService service  = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDER_NAME);

       //2循环给服务器写数据
        while (true) {
            String result = service.sayHello("are you ok !!");
            System.out.println(result);
            Thread.sleep(2000);
        }
    }
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理