乐趣区

关于java:手撸RPC框架SpringBootNetty4实现RPC框架

【手撸 RPC 框架】SpringBoot+Netty4 实现 RPC 框架

线程模型

Netty 高性能架构设计

简略理解 React 线程模型,参考文章【五分钟疾速了解 Reactor 模型】

举例说明:Reactor 的三种线程模型

线程模型 1:传统阻塞 I/O 服务模型

模型特点:

  • 采纳阻塞 IO 模式获取输出的数据
  • 每个链接都须要独立的线程实现数据的输出,业务解决、数据返回。

问题剖析:

  • 当并发数很大,就会创立大量的线程,占用很大系统资源
  • 连贯创立后,如果以后线程临时没有数据可读,该线程会阻塞在 read 操作,造成线程资源节约。

线程模型 2:Reactor 模式

针对传统阻塞 I / O 服务模型的 2 个毛病,解决方案如下:

  • 基于 I/O 复用模型:多个连贯共用一个阻塞对象,应用程序只须要在一个阻塞对象期待,无需阻塞期待所有连贯。当某个连贯有新的数据能够解决时,操作系统告诉应用程序,线程从阻塞状态返回,开始进行业务解决。Reactor 对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher) 3. 告诉者模式(notifier)
  • 基于线程池复用线程资源:不用再为每个连贯创立线程,将连贯实现后的业务解决任务分配给线程进行解决,一个线程能够解决多个连贯的业务。

单 Reactor 单线程

模型剖析

  • 长处:模型简略,没有多线程、过程通信、竞争的问题,全副都在一个线程中实现
  • 毛病:性能问题,只有一个线程,无奈齐全施展多核 CPU 的性能。Handler 在解决某个连贯上的业务时,整个过程无奈解决其余连贯事件,很容易导致性能瓶颈
  • 毛病:可靠性问题,线程意外终止,或者进入死循环,会导致整个零碎通信模块不可用,不能接管和解决内部音讯,造成节点故障
  • 应用场景:客户端的数量无限,业务解决十分疾速,比方 Redis 在业务解决的工夫复杂度 O(1) 的状况

单 Reactor 多线程

模型剖析

  • 长处:能够充沛的利用多核cpu 的解决能力
  • 毛病:多线程数据共享和拜访比较复杂,reactor 解决所有的事件的监听和响应,在单线程运行,在高并发场景容易呈现性能瓶颈.

主从 Reactor 多线程

模型剖析

  • 长处:父线程与子线程的数据交互简略职责明确,父线程只须要接管新连贯,子线程实现后续的业务解决。
  • 长处:父线程与子线程的数据交互简略,Reactor 主线程只须要把新连贯传给子线程,子线程无需返回数据
  • 毛病:编程复杂度较高
  • 联合实例:这种模型在许多我的项目中宽泛应用,包含 Nginx 主从 Reactor 多过程模型,Memcached 主从多线程,Netty 主从多线程模型的反对

先实现简略的 Netty 通信

服务端示例

public static void main(String[] args) {
    // 创立连接线程组,线程数为 1。只负责解决连贯申请
    NioEventLoopGroup boss = new NioEventLoopGroup(1);
    // 创立工作线程组,线程数默认为 cpu 核数 *2。解决与客户端的业务解决
    NioEventLoopGroup worker = new NioEventLoopGroup();
    // 创立 Server 端的启动对象
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    // 配置线程组
    serverBootstrap.group(boss, worker)
        // 应用 NioServerSocketChannel 作为服务器的通道实现
        .channel(NioServerSocketChannel.class)
        // 给 worker 线程组初始化处理器
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
                    // 增加字符串的编解码器
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    // 增加对象的编解码器,ClassResolvers.weakCachingConcurrentResolver 设置弱援用 WeakReferenceMap 缓存类加载器,避免内存溢出
                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                    .addLast(new ObjectEncoder())
                    // 增加自定义的业务处理器
                    .addLast(new SimpleChannelInboundHandler<Object>() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端连贯啦。。。客户端地址:{}", ctx.channel().remoteAddress());
                        }
                        @Override
                        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {log.info("服务端接管到的数据:{}", o.toString());
                            // 价值 1 个亿的 AI 代码
                            String str = o.toString();
                            str = str.replace("吗", "");
                            str = str.replace("?", "!");
                            str = str.replace("?", "!");
                            channelHandlerContext.writeAndFlush(str);
                        }
                    });
            }
        });
    // 启动并且监听
    ChannelFuture channelFuture = serverBootstrap.bind(8888).syncUninterruptibly();
    // 监听敞开通道
    channelFuture.channel().closeFuture();
}

客户端示例

public static void main(String[] args) {
    // 设置客户端工作线程
    NioEventLoopGroup worker = new NioEventLoopGroup();
    // 创立客户端启动对象
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(worker)
        // 通道连贯者
        .channel(NioSocketChannel.class)
        // 给 worker 线程组初始化处理器
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
                    // 增加字符串的编解码器
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    // 增加对象的编解码器,ClassResolvers.weakCachingConcurrentResolver 设置弱援用 WeakReferenceMap 缓存类加载器,避免内存溢出
                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                    .addLast(new ObjectEncoder())
                    // 增加自定义的业务处理器
                    .addLast(new SimpleChannelInboundHandler<Object>() {

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush("哈哈哈");
                        }

                        @Override
                        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {log.info("客户端接管到的数据:{}", o.toString());
                        }
                    });
            }
        });

    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).syncUninterruptibly();
    // 客户端须要输出信息,创立一个扫描器
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLine()) {String msg = scanner.nextLine();
        // 通过 channel 发送到服务器端
        channel.writeAndFlush(msg + "\r\n");
    }
    channelFuture.channel().closeFuture();
}

快启动试试看把,不过须要留神的是,得先启动服务端哦~

SpringBoot + Netty4 实现 rpc 框架

好了,接下来就让咱们进入正题,让咱们利用咱们所学的常识去实现本人一个简略的 rpc 框架吧

简略说下 RPC(Remote Procedure Call)近程过程调用,简略的了解是一个节点申请另一个节点提供的服务。让两个服务之间调用就像调用本地办法一样。

RPC 时序图:

RPC 流程:

  1. 【客户端】发动调用
  2. 【客户端】数据编码
  3. 【客户端】发送编码后的数据到服务端
  4. 【服务端】接管客户端发送的数据
  5. 【服务端】对数据进行解码
  6. 【服务端】解决音讯业务并返回后果值
  7. 【服务端】对后果值编码
  8. 【服务端】将编码后的后果值回传给客户端
  9. 【客户端】接管后果值
  10. 【客户端】解码后果值
  11. 【客户端】解决返回数据业务

引入依赖

<dependencies>
    <!-- SpringBoot 依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring 容器上下文 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
    </dependency>
    <!-- Spring 配置 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- Netty4 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.58.Final</version>
    </dependency>
    <!-- 工具 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.5.8</version>
    </dependency>
</dependencies>

编写服务端

自定义音讯协定:

/**
 * @author zc
 * @date 2021/3/1 17:43
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcMessage implements Serializable {
    private static final long serialVersionUID = 430507739718447406L;
    /**
     * interface 接口名
     */
    private String name;
    /**
     * 办法名
     */
    private String methodName;
    /**
     * 参数类型
     */
    private Class<?>[] parTypes;
    /**
     * 参数
     */
    private Object[] pars;
    /**
     * 后果值
     */
    private Object result;
}

自定义 Rpc 注解:

/**
 * @author zc
 * @date 2021/3/2 15:36
 */
@Target(value = {ElementType.TYPE, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RpcServer {}

定义 ServerHandle 业务处理器:

/**
 * Netty Server 端 Handle 解决类,音讯体 RpcMessage
 * 实现 ApplicationContextAware 接口:该接口能够加载获取到所有的 spring bean。* 实现了这个接口的 bean,当 spring 容器初始化的时候,会主动的将 ApplicationContext 注入进来
 *
 * @author ZC
 * @date 2021/3/1 22:15
 */
@Slf4j
@ChannelHandler.Sharable
public class ServerHandle extends SimpleChannelInboundHandler<RpcMessage> implements ApplicationContextAware {
    private Map<String, Object> serviceMap;

    /**
     * 在类被 Spring 容器加载时会主动执行 setApplicationAware
     *
     * @param applicationContext Spring 上下文
     * @throws BeansException 异样信息
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 从 Spring 容器中获取到所有领有 @RpcServer 注解的 Beans 汇合,Map<Name(对象类型,对象全路径名), 实例对象 >
        Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcServer.class);
        log.info("被 @RpcServer 注解加载的 Bean: {}", beansWithAnnotation);
        if (beansWithAnnotation.size() > 0) {Map<String, Object> map = new ConcurrentHashMap<>(16);
            for (Object o : beansWithAnnotation.values()) {
                // 获取该实例对象实现的接口 Class
                Class<?> anInterface = o.getClass().getInterfaces()[0];
                // 获取该接口类名,作为 Key,实例对象作为 Value
                map.put(anInterface.getName(), o);
            }
            // 应用变量接住 map
            serviceMap = map;
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端连贯了: {}", ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("异样信息");
        cause.printStackTrace();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {log.info("客户端发送的音讯:{}", rpcMessage);
        // 从 Map 中获取实例对象
        Object service = serviceMap.get(rpcMessage.getName());
        // 获取调用办法
        Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParTypes());
        method.setAccessible(true);
        // 反射调用实例对象办法,获取返回值
        Object result = method.invoke(service, rpcMessage.getPars());
        rpcMessage.setResult(JSONUtil.toJsonStr(result));
        log.info("回给客户端的音讯:{}", rpcMessage);
        //Netty 服务端将数据写会 Channel 并发送给客户端,同时增加一个监听器,当所有数据包发送实现后,敞开通道
        channelHandlerContext.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE);
    }
}

定义 NettyServer 端:

/**
 * Netty 服务端
 *
 * @author zc
 * @date 2021/2/24 13:23
 **/
@Slf4j
public class NettyServer {

    /**
     * server 端处理器
     */
    private final ServerHandle serverHandle;
    /**
     * 服务端通道
     */
    private Channel channel;

    /**
     * 结构器
     *
     * @param serverHandle server 处理器
     */
    public NettyServer(ServerHandle serverHandle) {this.serverHandle = serverHandle;}

    /**
     * 启动
     *
     * @param port 启动端口
     */
    public void start(int port) {EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(serverHandle);
                        }
                    });

            final ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
            log.info("服务端启动 - 端口: {}", port);
            channel = channelFuture.channel();
            channel.closeFuture().syncUninterruptibly();
        } catch (Exception e) {boss.shutdownGracefully();
            worker.shutdownGracefully();}
    }

    /**
     * 敞开以后通道
     */
    public void stop() {channel.close();
    }
}

自定义 rpc 配置属性类:

/**
 * @author zc
 * @date 2021/3/4 23:38
 */
@Component
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyRpcProperties {private int serverPort;}`

创立 Server 端启动配置类:

/**
 * NettyServer 服务端配置类
 *
 * @author zc
 * @date 2021/3/1 18:24
 */
@Slf4j
@Configuration
@EnableConfigurationProperties(NettyRpcProperties.class)
public class ServerBeanConfig {

    private final NettyRpcProperties nettyRpcProperties;

    @Autowired
    public ServerBeanConfig(NettyRpcProperties nettyRpcProperties) {this.nettyRpcProperties = nettyRpcProperties;}

    /**
     * 配置 ServerHandle
     *
     * @return ServerHandle 解决类
     */
    @Bean
    public ServerHandle serverHandle() {return new ServerHandle();
    }

    /**
     * 配置 NettyServer
     *
     * @param handle ServerHandle 解决类
     * @return NettyServer
     */
    @Bean
    public NettyServer nettyServer(ServerHandle handle) {NettyServer nettyServer = new NettyServer(handle);
//        nettyServer.start(nettyRpcProperties.getServerPort());
        return nettyServer;
    }

    /**
     * 解决 SpringBoot 端口无奈监听问题
     */
    @Component
    static class NettyServerStart implements ApplicationRunner {
        private final NettyServer nettyServer;
        private final NettyRpcProperties properties;

        @Autowired
        NettyServerStart(NettyServer nettyServer, NettyRpcProperties properties) {
            this.nettyServer = nettyServer;
            this.properties = properties;
        }

        @Override
        public void run(ApplicationArguments args) throws Exception {log.info("===============ApplicationRunner");
            if (nettyServer != null) {nettyServer.start(properties.getServerPort());
            }
        }
    }
}

注入 Spring 容器

此时有两种形式让该配置主动注入 Spring 容器失效:

  1. 主动注入

    在 resource 目录下创立 META-INF 目录,创立 spring.factories 文件

    在该文件里写上

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=${包门路:xxx.xxx.xxx}.${配置类:ServerBeanConfig}

    配置好之后,在 SpringBoot 启动时会主动加载该配置类。

  2. 通过注解注入

    /**
     * 自定义 SpringBoot 启动注解
     * 注入 ServerBeanConfig 配置类
     *
     * @author ZC
     * @date 2021/3/1 23:48
     */
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @ImportAutoConfiguration({ServerBeanConfig.class})
    public @interface EnableNettyServer {}

编写客户端


创立客户端处理器 `ClientHandle

/**
 * @author zc
 * @date 2021/3/2 15:19
 */
@Slf4j
@ChannelHandler.Sharable
public class ClientHandle extends SimpleChannelInboundHandler<RpcMessage> {
    /**
     * 定义音讯 Map,将连贯通道 Channel 作为 key,音讯返回值作为 value
     */
    private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap;

    public ClientHandle(ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap) {this.rpcMessageConcurrentMap = rpcMessageConcurrentMap;}

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {log.info("客户端收到服务端音讯:{}", rpcMessage);
        rpcMessageConcurrentMap.put(channelHandlerContext.channel(), rpcMessage);
    }
}

创立客户端启动类NettyClient

/**
 * @author ZC
 * @date 2021/3/1 23:30
 */
@Slf4j
public class NettyClient {

    private Channel channel;
    /**
     * 寄存申请编号与响应对象的映射关系
     */
    private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap = new ConcurrentHashMap<>();

    public RpcMessage send(int port, final RpcMessage rpcMessage) {
        // 客户端须要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(new ClientHandle(rpcMessageConcurrentMap));
                        }
                    });
            final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
            log.info("连贯服务端胜利:" + channelFuture.channel().remoteAddress());
            channel = channelFuture.channel();
            channel.writeAndFlush(rpcMessage);
            log.info("发送数据胜利:{}", rpcMessage);
            channel.closeFuture().syncUninterruptibly();
            return rpcMessageConcurrentMap.get(channel);
        } catch (Exception e) {log.error("client exception", e);
            return null;
        } finally {group.shutdownGracefully();
            // 移除申请编号和响应对象间接的映射关系
            rpcMessageConcurrentMap.remove(channel);
        }
    }

    public void stop() {channel.close();
    }
}

定义 Netty 客户端 Bean 后置处理器

/**
 * Netty 客户端 Bean 后置处理器
 * 实现 Spring 后置处理器接口:BeanPostProcessor
 * 在 Bean 对象在实例化和依赖注入结束后,在显示调用初始化办法的前后增加咱们本人的逻辑。留神是 Bean 实例化结束后及依赖注入实现后触发的
 *
 * @author ZC
 * @date 2021/3/2 23:00
 */
@Slf4j
public class NettyClientBeanPostProcessor implements BeanPostProcessor {

    private final NettyClient nettyClient;

    public NettyClientBeanPostProcessor(NettyClient nettyClient) {this.nettyClient = nettyClient;}

    /**
     * 实例化、依赖注入结束,在调用显示的初始化之前实现一些定制的初始化工作
     * 留神:办法返回值不能为 null
     * 如果返回 null 那么在后续初始化办法将报空指针异样或者通过 getBean()办法获取不到 Bean 实例对象
     * 因为后置处理器从 Spring IoC 容器中取出 bean 实例对象没有再次放回 IoC 容器中
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, @Nullable String beanName) throws BeansException {
        // 获取实例 Class
        Class<?> beanClass = bean.getClass();
        do {
            // 获取该类所有字段
            Field[] fields = beanClass.getDeclaredFields();
            for (Field field : fields) {
                // 判断该字段是否领有 @RpcServer
                if (field.getAnnotation(RpcServer.class) != null) {field.setAccessible(true);
                    try {
                        // 通过 JDK 动静代理获取该类的代理对象
                        Object o = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, new ClientInvocationHandle(nettyClient));
                        // 将代理类注入该字段
                        field.set(bean, o);
                        log.info("创立代理类 ===>>> {}", beanName);
                    } catch (IllegalAccessException e) {log.error(e.getMessage());
                    }
                }
            }
        } while ((beanClass = beanClass.getSuperclass()) != null);
        return bean;
    }

    /**
     * 实例化、依赖注入、初始化结束时执行
     * 留神:办法返回值不能为 null
     * 如果返回 null 那么在后续初始化办法将报空指针异样或者通过 getBean()办法获取不到 Bean 实例对象
     * 因为后置处理器从 Spring IoC 容器中取出 bean 实例对象没有再次放回 IoC 容器中
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 能够依据 beanName 不同执行不同的解决操作
        return bean;
    }

    /**
     * JDK 动静代理处理器
     */
    static class ClientInvocationHandle implements InvocationHandler {
        private final NettyClient nettyClient;

        public ClientInvocationHandle(NettyClient nettyClient) {this.nettyClient = nettyClient;}

        /**
         * 代理办法调用
         *
         * @param proxy  代理类
         * @param method 办法
         * @param args   参数
         * @return 返回值
         */
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) {
            // 组装 Netty 参数
            RpcMessage rpcMessage = RpcMessage.builder()
                    .name(method.getDeclaringClass().getName())
                    .methodName(method.getName())
                    .parTypes(method.getParameterTypes())
                    .pars(args)
                    .build();
            // 调用 Netty,发送数据
            RpcMessage send = nettyClient.send(1111, rpcMessage);
            log.info("接管到服务端数据:{}, 返回后果值 ====》》》》{}", send, send.getResult());
            return send.getResult();}
    }
}

定义客户端配置类

/**
 * @author zc
 * @date 2021/3/1 18:24
 */
@Configuration
public class ClientBeanConfig {

    @Bean
    public NettyClient nettyClient() {return new NettyClient();
    }

    @Bean
    public NettyClientBeanPostProcessor nettyClientBeanPostProcessor(NettyClient nettyClient) {return new NettyClientBeanPostProcessor(nettyClient);
    }
}

最初和服务端一样,注入 Spring 容器

/**
 * @author ZC
 * @date 2021/3/1 23:48
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@ImportAutoConfiguration({ClientBeanConfig.class})
public @interface EnableNettyClient {}

至此咱们的 SpringBoot + Netty4 的就曾经实现了最最简略的 rpc 框架模式了;而后咱们就能够援用咱们本人的 rpc 依赖了。

最初再执行一下 maven 命令

mvn install

netty-rpc-examples 例子

接口服务

pom 里啥也没有。。。

定义一个接口

/**
 * @author zc
 * @date 2021/3/1 17:55
 */
public interface Test1Api {void test();

    void test(int id, String name);

    String testStr(int id);

    Object testObj();}

rpc-server 服务端

失常的 SpringBoot 工程

引入 pom

<!-- 自定义 rpc 依赖 -->
<dependency>
    <groupId>cn.happyloves.rpc</groupId>
    <artifactId>netty-rpc</artifactId>
    <version>0.0.1</version>
</dependency>
<!-- 接口依赖 -->
<dependency>
    <groupId>cn.happyloves.netty.rpc.examples.api</groupId>
    <artifactId>rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

配置属性

# 利用名称
spring.application.name=rpc-server
# 应用服务 WEB 拜访端口
server.port=8080
netty.server-port=1111

创立一个实体类

/**
 * @author ZC
 * @date 2021/3/2 23:59
 */
@Data
public class Account implements Serializable {
    private static final long serialVersionUID = 667178018106218163L;
    private Integer id;

    private String name;
    private String username;
    private String password;
}

创立 Server 实现 Test1Api 接口

/**
 * @author ZC
 * @date 2021/3/2 23:59
 */
@Slf4j
@Service
@RpcServer
public class TestServiceImpl implements Test1Api {
    @Override
    public void test() {log.info("111111111");
    }

    @Override
    public void test(int id, String name) {log.info("222222222,{},{}", id, name);
    }

    @Override
    public String testStr(int id) {log.info("33333333333333333,{}", id);
        return "33333333333333333" + id;
    }

    @Override
    public Object testObj() {log.info("444444444444444444");
        Account account = new Account();
        account.setName("张三");
        return account;
    }
}

最初在 SpringBoot 启动类上加上 @EnableNettyServer

/**
 * @author ZC
 * @date 2021/3/2 23:55
 */
@EnableNettyServer
@SpringBootApplication
public class RpcServerApplication {public static void main(String[] args) {SpringApplication.run(RpcServerApplication.class, args);
    }
}

rpc-server 客户端

引入 pom 依赖

<dependency>
    <groupId>cn.happyloves.rpc</groupId>
    <artifactId>netty-rpc</artifactId>
    <version>0.0.1</version>
</dependency>
<dependency>
    <groupId>cn.happyloves.netty.rpc.examples.api</groupId>
    <artifactId>rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

创立 Controller

/**
 * @author ZC
 * @date 2021/3/3 0:04
 */
@RestController
public class ClientController {
    @RpcServer
    private Test1Api testServiceImpl;

    @GetMapping("/test1")
    public void test() {testServiceImpl.test();
    }

    @GetMapping("/test2")
    public void test(int id, String name) {testServiceImpl.test(id, name);
    }

    @GetMapping("/test3")
    public String testStr(int id) {return testServiceImpl.testStr(id);
    }

    @GetMapping("/test4")
    public Object testObj() {return testServiceImpl.testObj();
    }
}

最初在启动类上加上注解 @EnableNettyClient

@EnableNettyClient
@SpringBootApplication
public class RpcClientApplication {public static void main(String[] args) {SpringApplication.run(RpcClientApplication.class, args);
    }
}

先运行服务端,在运行客户端,而后在调用客户端接口就能够看到服务端可能接管到客户端发来的音讯,而后服务端解决并返回,客户端接管并返回。。。

至此,一个小 demo 就实现了。

当然啦,后续还有很多需要须要解决的,比方说以后 demo 中客户端每次通信都须要创立一个实例去连贯、服务的注册、客户端和服务端是同一个利用等等,这个前面再缓缓欠缺吧
赵小胖集体博客:https://zc.happyloves.cn:4443/wordpress/

退出移动版