关于netty入门:使用Netty手写实现RPC的功能

概要RPC(Remote Procedure Call)是指近程过程调用,也就是说两台服务器A,B,一个利用部署在A服务器上,想要调用B服务器上利用提供的函数/办法。在分布式系统中的零碎环境建设和利用程序设计中有着宽泛的利用。 常见的RPC框架有 Apache DubboGoogle gRPCApache ThriftSpring Cloud的Http实现优良的开源框架有高性能,能够像调用本地办法一样调用近程服务,本文着重探讨以下流程的实现 低侵入利用Netty自定义网络协议实现近程调用本文的代码可在github上自取,链接:https://github.com/tangbu/myrpc RPC的流程以上,咱们能够看到在实现RPC的过程中,咱们须要着重解决一下几点 低侵入(咱们应用动静代理来实现办法级别间接调用)实现RpcRequest和RpcResponse的序列化和反序列化基于TCP自定义报文,承载RpcRequest和RpcResponse解决网络连接,网络传输代码实现动静代理实现低侵入(咱们应用jdk动静代理)假如应用层存在这样一个接口 public interface HelloWorldService { String helloWorld(String name);}咱们在调用HelloWorldService#helloWorld的时候,心愿自定义外面的逻辑,应用RPC来调用,为此咱们就应用动静代理来实现 public class DynamicProxy implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws ExecutionException, InterruptedException, JsonProcessingException { System.out.println("在调用办法时走到了动静代理外面"); return null; }}在真正调用helloWorld办法的时候应用如下代码 Class<?> helloWorldServiceClass = HelloWorldService.class; //创立代理类对象 HelloWorldService so = (HelloWorldService) Proxy.newProxyInstance(helloWorldServiceClass.getClassLoader(), new Class[]{HelloWorldService.class}, new DynamicProxy()); String result1 = so.helloWorld("zhangsan");此时,原有接口的逻辑就调到了动静代理办法外面。之后,咱们会将RPC的实现封装在DynamicProxy这个办法里的实现中。 封装RpcRequest对象和RpcResponse对象在执行近程调用的时候,必须通知近程服务,我须要调用那个类,那个办法,办法参数是哪些,入参是什么才能够让他返回后果给我,所以须要封装一下RpcRequest对象和RpcResponse对象 public class RpcRequest { private String requestId; private String className; private String methodName; private Class<?>[] parameterTypes; private Object[] parameters; private int version;// getter setter...}public class RpcResponse { private String requestId; private boolean success; private String message; private Object result;// getter setter...}定义网络协议,将RpcRequest和RpcResponse写成字节放在网络报文中传输自定义报文构造0----7----15---23---31| 1 | 2 | ---------------------| 2 | 3 | 4 |..... 4前面的是音讯体 ---------------------......5.......--------------------- 序号1 0-7 version 1byte序号2 7-39 总报文长度 4byte序号3 39-47 type音讯类型 1byte序号4 47-77 priority音讯优先级 1byte序号5 依据报文总长度减掉1-4的长度就是5的长度相应的依据这个报文构造,能够形象出咱们的TCP的报文Java类 ...

September 6, 2023 · 5 min · jiezi

关于netty入门:Netty对象引用计数

引言Netty 4.0 之后,netty中某些对象的生命周期由它们的援用计数治理,如ByteBuf是利用援用计数来进步调配和开释性能的最驰名类型. 援用计数—1—援用计数对象的初始援用计数为 1。—2—当开释援用计数对象时,它的援用计数减 1。 个别法令是,最初拜访援用计数对象的一方也负责销毁该援用计数对象。具体来说:如果 [发送] 组件应该将援用计数对象传递给另一个 [接管] 组件,发送组件通常不须要销毁它,而是将决定推延到接管组件。如果一个组件应用了一个援用计数的对象并且晓得没有其他人会再拜访它(即,不传递对另一个组件的援用),则该组件应该销毁它。—3—如果援用计数达到 0,则援用计数对象被开释或返回到它来自的对象池,尝试拜访援用计数为 0 的援用计数对象将触发IllegalReferenceCountException。—4—当对象还没有被销毁之前,援用计数也能够通过retain()操作减少。 Derived buffers—1—ByteBuf.duplicate()、ByteBuf.slice()及ByteBuf.order(ByteOrder)创立一个共享父缓冲区内存区域的派生缓冲区。派生缓冲区没有本人的援用计数,但共享父缓冲区的援用计数。—2—相同,ByteBuf.copy()和ByteBuf.readBytes(int)不是派生缓冲区。返回ByteBuf的已调配,须要开释。须要留神的是,父缓冲区及其派生缓冲区共享雷同的援用计数,并且创立派生缓冲区时援用计数不会减少。因而,如果您要将派生缓冲区传递给应用程序的其余组件,则必须先调用retain()它。 ByteBufHolder interface可应用ByteBufHolder 持有 ByteBuf,例如DatagramPacket、HttpContent和WebSocketframe .这些类都继承自ByteBufHolder。和Derived buffers一样,ByteBufHolder持有者共享其蕴含的缓冲区的援用计数。 ChannelHandler 的援用计数Inbound messages(入站音讯)—1—当事件循环将数据读入ByteBuf并触发channelRead()事件时,ChannelHandler相应管道中的 负责开释缓冲区。因而,应用接管到的数据的处理程序应该release()在其channelRead()处理程序办法中调用数据。—2—如果处理程序将缓冲区(或任何援用计数对象)传递给下一个处理程序,则无需开释它。—3—能够间接应用ReferenceCountUtil.release(),或思考扩大SimpleChannelHandlerwhich 对所有音讯调用ReferenceCountUtil.release(msg)。 Outbound messages(出站音讯)与入站音讯不同,出站音讯是由本人的应用程序创立的,Netty 负责在将它们写入网络后开释这些音讯。 特地留神encoders之后正确开释。

November 20, 2022 · 1 min · jiezi

关于netty入门:Netty网络编程Netty入门

1.原生NIO存在的问题2.Netty介绍3.Netty的工作模型4.用Netty编写TCP服务5.工作队列的三种经典应用场景 1.原生NIO存在的问题 后面咱们通过NIO的原生API实现了服务端与客户端的交互:Netty网络编程——NIO编程介绍。咱们在编程的过程中,发现了如下的问题: 1)咱们会发现NIO的类库和API品种繁多,须要把握Selector,ServerSocketChannel,SocketChannel,ByteBuffer等能力顺利编程。 2)咱们还须要对多线程编程,网络编程等十分相熟,咱们能力编写高质量的NIO程序。 3)开发的难度十分大,如果说有:断线重连,网络闪断,半包读写,加密解密等等 4)NIO还会有epoll bug,导致selector空轮训,使cpu空轮训。 2.Netty介绍 为了针对下面这些问题,Netty对JDK自带的NIO进行了封装,解决了上述一系列问题。 1)操作简略,对于各种类型传输有对立的api,而且扩大不便,清晰地把变动的代码和不变的代码拆散开来。 2)使用方便,有具体的文档,而且没有其它依赖项。 3)高性能,吞吐量更高,提早升高。 4)平安。 5)社区沉闷,被发现的bug能够被及时修复。 3.Netty的工作模型 后面咱们介绍了Netty网络编程——Reactor模式高性能架构设计原理,Netty也次要基于Reactor的多线程模型做了肯定的批改。 1)netty线程模型形象出了两组线程池,BossGroup专门负责解决客户端连贯,WorkerGroup专门负责网络的读写。 2)两组线程池的类型都是NioEventLoopGroup,示意一个一直循环解决工作的线程组。 3)每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通讯。 4)每个BossNioEventLoop的循环分3步:4.1)轮询accept事件 4.2)解决accept事件,与client建设连贯,生成NioSocketChannel,并将其注册到某个workerNioEventLoop上。 4.3)解决工作队列的工作(可能会有很耗时的操作,放在工作队列中异步执行) 5)workerNioEventLoop的工作也分三步:5.1)轮询read,write事件5.2)解决IO事件5.3)解决工作队列的工作可能会有很耗时的操作,放在工作队列中异步执行) 6)每个worker解决工作的时候,会应用pipeline(管道),pipeline中蕴含了channel,通过pipeline能够取得对应的通道。 4.用Netty编写TCP服务 咱们初步编写一个程序: Netty服务在6668端口进行监听,客户端能发送音讯给服务器,服务器也能发送音讯给客户端。 NettyServer: package com.example.demo.netty.nettyDemo;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;/** * @author sulingfeng * @title: NettyServer * @projectName netty-learn * @description: TODO * @date 2022/7/7 17:45 */@Slf4jpublic class NettyServer { public static void main(String[] args) throws Exception { //创立两个线程组 //bossGroup只解决连贯申请 //workerGroup负责解决业务逻辑 //bossGroup和workerGroup含有的子线程个数默认理论为cpu核数*2 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ //服务器启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class)//应用NioSocketChannel作为通道的实现 .option(ChannelOption.SO_BACKLOG,128)//设置线程队列的连贯数量下限 .childOption(ChannelOption.SO_KEEPALIVE,true)//放弃流动连贯状态 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //设置处理器,解决的业务逻辑在这里 log.info("客户端socketChannel hashCode = " + ch.hashCode()); ch.pipeline().addLast(new NettyServerHandler()); } }); log.info("服务器筹备好了"); //监听6668端口 ChannelFuture cf = bootstrap.bind(6668).sync(); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(cf.isSuccess()){ log.info("监听端口 6668 胜利"); }else{ log.info("监听端口 6668 失败"); } } }); //对敞开通道事件 进行监听,如果敞开了就返回执行finally cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}NettyServerHandler: ...

July 23, 2022 · 3 min · jiezi