阐明

上一篇代码基于 socket 的实现非常简单,然而对于理论生产,个别应用 netty。

至于 netty 的长处能够参考:

为什么抉择 netty?

http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty

代码实现

maven 引入

<dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>${netty.version}</version></dependency>

引入 netty 对应的 maven 包,此处为 4.1.17.Final。

服务端代码实现

netty 的服务端启动代码是比拟固定的。

package com.github.houbb.rpc.server.core;import com.github.houbb.log.integration.core.Log;import com.github.houbb.log.integration.core.LogFactory;import com.github.houbb.rpc.server.constant.RpcServerConst;import com.github.houbb.rpc.server.handler.RpcServerHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * rpc 服务端 * @author binbin.hou * @since 0.0.1 */public class RpcServer extends Thread {    private static final Log log = LogFactory.getLog(RpcServer.class);    /**     * 端口号     */    private final int port;    public RpcServer() {        this.port = RpcServerConst.DEFAULT_PORT;    }    public RpcServer(int port) {        this.port = port;    }    @Override    public void run() {        // 启动服务端        log.info("RPC 服务开始启动服务端");        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(workerGroup, bossGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<Channel>() {                        @Override                        protected void initChannel(Channel ch) throws Exception {                            ch.pipeline().addLast(new RpcServerHandler());                        }                    })                    // 这个参数影响的是还没有被accept 取出的连贯                    .option(ChannelOption.SO_BACKLOG, 128)                    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。                    .childOption(ChannelOption.SO_KEEPALIVE, true);            // 绑定端口,开始接管进来的链接            ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();            log.info("RPC 服务端启动实现,监听【" + port + "】端口");            channelFuture.channel().closeFuture().syncUninterruptibly();            log.info("RPC 服务端敞开实现");        } catch (Exception e) {            log.error("RPC 服务异样", e);        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }}

为了简略,服务端启动端口号固定,RpcServerConst 常量类内容如下:

public final class RpcServerConst {    private RpcServerConst(){}    /**     * 默认端口     * @since 0.0.1     */    public static final int DEFAULT_PORT = 9627;}

RpcServerHandler

当然,还有一个比拟外围的类就是 RpcServerHandler

public class RpcServerHandler extends SimpleChannelInboundHandler {    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        // do nothing now    }}

目前是空实现,后续能够增加对应的日志输入及逻辑解决。

测试

启动测试的代码非常简单:

/** * 服务启动代码测试 * @param args 参数 */public static void main(String[] args) {    new RpcServer().start();}

阐明

下面咱们实现了服务端的实现,这一节来一起看一下 client 客户端代码实现。

代码实现

RpcClient

/* * Copyright (c)  2019. houbinbin Inc. * rpc All rights reserved. */package com.github.houbb.rpc.client.core;import com.github.houbb.log.integration.core.Log;import com.github.houbb.log.integration.core.LogFactory;import com.github.houbb.rpc.client.handler.RpcClientHandler;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;/** * <p> rpc 客户端 </p> * * <pre> Created: 2019/10/16 11:21 下午  </pre> * <pre> Project: rpc  </pre> * * @author houbinbin * @since 0.0.2 */public class RpcClient extends Thread {    private static final Log log = LogFactory.getLog(RpcClient.class);    /**     * 监听端口号     */    private final int port;    public RpcClient(int port) {        this.port = port;    }    public RpcClient() {        this(9527);    }    @Override    public void run() {        // 启动服务端        log.info("RPC 服务开始启动客户端");        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            Bootstrap bootstrap = new Bootstrap();            ChannelFuture channelFuture = bootstrap.group(workerGroup)                    .channel(NioSocketChannel.class)                    .option(ChannelOption.SO_KEEPALIVE, true)                    .handler(new ChannelInitializer<Channel>(){                        @Override                        protected void initChannel(Channel ch) throws Exception {                            ch.pipeline()                                    .addLast(new LoggingHandler(LogLevel.INFO))                                    .addLast(new RpcClientHandler());                        }                    })                    .connect("localhost", port)                    .syncUninterruptibly();            log.info("RPC 服务启动客户端实现,监听端口:" + port);            channelFuture.channel().closeFuture().syncUninterruptibly();            log.info("RPC 服务开始客户端已敞开");        } catch (Exception e) {            log.error("RPC 客户端遇到异样", e);        } finally {            workerGroup.shutdownGracefully();        }    }}

.connect("localhost", port) 申明了客户端须要连贯的服务端,此处和服务端的端口保持一致。

RpcClientHandler

客户端解决类也比较简单,临时留空。

/* * Copyright (c)  2019. houbinbin Inc. * rpc All rights reserved. */package com.github.houbb.rpc.client.handler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * <p> 客户端解决类 </p> * * <pre> Created: 2019/10/16 11:30 下午  </pre> * <pre> Project: rpc  </pre> * * @author houbinbin * @since 0.0.2 */public class RpcClientHandler extends SimpleChannelInboundHandler {    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        // do nothing.    }}

启动测试

服务端

首先启动服务端。

客户端

而后启动客户端连贯服务端,实现如下:

/** * 服务启动代码测试 * @param args 参数 */public static void main(String[] args) {    new RpcClient().start();}

小结

为了便于大家学习,以上源码曾经开源:

https://github.com/houbb/rpc

我是老马,期待与你的下次重逢。