乐趣区

一文带你实现RPC框架

想要获取更多文章可以访问我的博客 – 代码无止境。

现在大部分的互联网公司都会采用微服务架构,但具体实现微服务架构的方式有所不同,主流上分为两种,一种是基于 Http 协议的远程调用,另外一种是基于 RPC 方式的调用。两种方式都有自己的代表框架,前者是著名的 Spring Cloud,后者则是有阿里巴巴开源的 Dubbo,二者都被广泛的采用。今天这篇文章,我们就一起来了解一下 RPC,并且和大家一起动手实现一个简单的 RPC 框架的 Demo。

什么是 RPC

RPC 是一种远程调用过程,是一种通过网络远程调用其他服务的协议。通俗的说就是,A 通过打电话的方式让 B 帮忙办一件事,B 办完事后将结果告知 A。我们下面通过一张图来大概了解一下在一个完整的 RPC 框架中存在的角色以及整个远程调用的过程。

通过上面的图可以看出来,在 RPC 框架中主要有以下 4 个角色:

  • registry – 注册中心,当服务提供者启动时会向注册中心注册,然后注册中心会告知所有的消费者有新的服务提供者。
  • provider – 服务提供者,远程调用过程中的被消费方。
  • consumer – 服务消费者,远程调用过程中的消费方。
  • monitor – 监视器,它主要负责统计服务的消费和调用情况。

启动服务提供者后,服务提供者会以异步的方式向注册中心注册。然后启动服务消费者,它会订阅注册中心中服务提供者列表,当有服务提供者的信息发生改变时,注册中心会通知所有的消费者。当消费者发起远程调用时,会通过动态代理将需要请求的参数以及方法签名等信息通过 Netty 发送给服务提供者,服务提供者收到调用的信息后调用对应的方法并将产生的结果返回给消费者,这样就完成了一个完整的远程调用。当然了这个过程中可能还会将调用信息异步发送给 monitor 用于监控和统计。

阅读过上面的内容后,你应该对 RPC 框架有了一个大概的认识。为了更好更深入的了解 RPC 框架的原理,下面我们就一起来动手实现一个简单的 RPC 框架吧。

框架核心部分

首先我们要实现的是整个 RPC 框架的核心部分,这部分的主要包含以下内容:

  1. RPC 服务的注解的实现。
  2. 服务提供者初始化、注册、以及响应远程调用的实现。
  3. 服务消费者订阅注册中心、监听服务提供者的变化的实现。
  4. 动态代理的实现。

整个核心部分将以一个 Spring Boot Starter 的形式实现,这样我们可以很方便的在 Spring Boot 项目中使用它。

注解

我们需要使用一个注解来标识服务提供者所提供服务的实现类,方便在初始化的时候将其交由 Spring 管理,也只有这样我们才可以在远程调用发生时可以找到它们。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {Class<?> value();

}

value属性用来标记这个服务的实现类对应的接口,RPC 框架中服务提供者和消费者之间会共同引用一个服务接口的包,当我们需要远程调用的时候实际上只需要调用接口中定义的方法即可。
除了一个标识服务实现类的注解之外,我们还需要一个标识服务消费者注入服务实现的注解@RpcConsumer,被其修饰的属性在初始化的时候都会被我们设置上动态代理,这一点在后面会详细讲到,我们先来看下它的具体实现吧。

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcConsumer {

    /**
     * 服务名称
     * @return
     */
    String providerName();}

服务提供者

服务提供者启动的时候,我们 RPC 框架需要做以下几件事情:

  1. 扫描服务提供者中所有提供服务的类(被 @RpcService 修饰的类),并将其交由 BeanFactory 管理。
  2. 启动 Netty 服务端,用来收到消费者的调用消息,并且返回调用结果。
  3. 向注册中心注册,本例中使用的注册中心是 Zookeeper。

这部分我们定义了一个 ProviderAutoConfiguration 类来实现这几个步骤,

@PostConstruct
public void  init() {logger.info("rpc server start scanning provider service...");
    Map<String, Object> beanMap = this.applicationContext.getBeansWithAnnotation(RpcService.class);
    if (null != beanMap && !beanMap.isEmpty()) {beanMap.entrySet().forEach(one -> {initProviderBean(one.getKey(), one.getValue());
        });
    }
    logger.info("rpc server scan over...");
    // 如果有服务的话才启动 netty server
    if (!beanMap.isEmpty()) {startNetty(rpcProperties.getPort());
    }
}

看上面的代码,首先我们获取到了所有被 @RpcService 注解修饰的实体,并且调用了 initProviderBean 方法逐一对其处理,然后我们启动了 Netty。那么我们需要在 initProviderBean 方法中做些什么呢?其实很简单,就是逐一将其交由 BeanFactory 管理。

private void initProviderBean(String beanName, Object bean) {
    RpcService rpcService = this.applicationContext
                .findAnnotationOnBean(beanName, RpcService.class);
    BeanFactory.addBean(rpcService.value(), bean);
}

将服务实现类交由 Spring 管理之后,我们还需要启动 Netty 用来接收远程调用信息,启动 Netty 的代码在这里我就不全部粘出来了,大家可以在源码中查看。在 Netty 启动成功之后,其实我们还执行了下面的代码,用来向 ZK 注册。

new RegistryServer(rpcProperties.getRegisterAddress(),
                    rpcProperties.getTimeout(), rpcProperties.getServerName(),
                    rpcProperties.getHost(), port)
                    .register();

整个注册的过程也非常容易理解,首先是创建了一个 ZK 连接,然后是判断是否有 /rpc 的根节点,如果没有的话就创建一个,最后就是在根节点下创建一个 EPHEMERAL_SEQUENTIAL 类型的节点,这种类型的节点在 ZK 重启之后会自动清除,这样可以保证注册中心重启后会自动清除服务提供者的信息。而在节点中会存储服务提供者的名称,IP 地址以及端口号的信息,这样 RPC 框架就可以根据这些信息顺利的定位到服务提供者。

public void register() throws ZkConnectException {
    try {
        // 获取 zk 连接
        ZooKeeper zooKeeper = new ZooKeeper(addr, timeout, event -> {logger.info("registry zk connect success...");
        });
        if (zooKeeper.exists(Constants.ZK_ROOT_DIR, false) == null) {zooKeeper.create(Constants.ZK_ROOT_DIR, Constants.ZK_ROOT_DIR.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
        zooKeeper.create(Constants.ZK_ROOT_DIR + "/" + serverName,
                (serverName + ","+ host + ":" + port).getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("provider register success {}", serverName);
    } catch (Exception e) {throw new ZkConnectException("register to zk exception," + e.getMessage(), e.getCaus());
    }
}

就这样我们 RPC 框架与服务提供者相关的内容就完成了,接下来要完成的是服务消费者部分。

服务消费者

对于服务消费者,我们框架需要对它的处理就是,为所有的 RPC 服务(被 @RpcConsumer 修饰的属性)设置上动态代理。具体的设置代码如下所示(PS: 这段代码写在 ConsumerAutoConfiguration 类中哦):

@Bean
public BeanPostProcessor beanPostProcessor() {return new BeanPostProcessor() {
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName)
                throws BeansException {Class<?> objClz = bean.getClass();
            for (Field field : objClz.getDeclaredFields()) {RpcConsumer rpcConsumer = field.getAnnotation(RpcConsumer.class);
                if (null != rpcConsumer) {Class<?> type = field.getType();
                    field.setAccessible(true);
                    try {field.set(bean, rpcProxy.create(type, rpcConsumer.providerName()));
                    } catch (IllegalAccessException e) {e.printStackTrace();
                    } finally {field.setAccessible(false);
                    }
                }
            }
            return bean;
        }
    };
}

BeanPostProcessor也称为 Bean 后置处理器,它是 Spring 中定义的接口,在 Spring 容器的创建过程中(具体为 Bean 初始化前后)会回调 BeanPostProcessor 中定义的两个方法。上面实现的 postProcessBeforeInitialization 是在 Bean 初始化之前调用的,还有一个 postProcessAfterInitialization 方法是在 Bean 初始化之后调用的。
如上面代码所示,我们会在每一个带有 @RpcConsumer 的实例初始化之前利用反射机制为其设置一个 RpcProxy 的代理,可以看到我们在创建这个动态代理的时候还需要服务提供者的名称,这是因为在动态代理的实现里面需要使用服务提供者的名称来查询服务提供者的地址信息。那么这个动态代理的实现又是怎样的呢?这就是我们下一步需要做的事情。

动态代理

在这个 RPC 框架里面动态代理主要实现的内容就是,当服务消费者调用服务提供者提供的接口时,将调用信息通过 Netty 发送给对应的服务调用者,然后由服务提供者完成相关的处理并且将处理结果返回给服务消费者。下面我们就一起来看一下 RpcProxy 的是如何实现这部分功能的。

@Component
public class RpcProxy {

    @Autowired
    private ServiceDiscovery serviceDiscovery;

    public <T> T create(Class<?> interfaceClass, String providerName) {return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass},
                (proxy, method, args) -> {
            // 通过 netty 向 Rpc 服务发送请求。// 构建一个请求。RpcRequest request = new RpcRequest();
            request.setRequestId(UUID.randomUUID().toString())
                    .setClassName(method.getDeclaringClass().getName())
                    .setMethodName(method.getName())
                    .setParamTypes(method.getParameterTypes())
                    .setParams(args);
            // 获取一个服务提供者。ProviderInfo providerInfo = serviceDiscovery.discover(providerName);
            // 解析服务提供者的地址信息,数组第一个元素为 ip 地址,第二个元素为端口号。String[] addrInfo = providerInfo.getAddr().split(":");
            String host = addrInfo[0];
            int port = Integer.parseInt(addrInfo[1]);
            RpcClient rpcClient = new RpcClient(host, port);
            // 使用 Netty 向服务提供者发送调用消息,并接收请求结果。RpcResponse response = rpcClient.send(request);
            if (response.isError()) {throw response.getError();
            } else {return response.getResult();
            }
        });
    }
}

其实在代理里面首先我们会构造请求信息实体,然后会根据服务提供者的名称获取一个服务提供者的地址,最后再将请求信息发送给服务提供者并接收调用结果。获取服务提供者的方法会在后面消费者和提供者的通用配置里面讲解。我们在这里重点来看一下发送调用信息并接收调用结果的实现。

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
    
    ... 此处省略对象属性信息,可查看源码。public RpcResponse send(RpcRequest request){EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ... 此处省略 Netty 相关配置,可查看源码。// 连接服务器
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.channel().writeAndFlush(request).sync();
            future = new CompletableFuture<>();
            future.get();
            if (response != null) {
                // 关闭 netty 连接。channelFuture.channel().closeFuture().sync();}
            return response;
        } catch (Exception e) {logger.error("client send msg error,", e);
            return null;
        } finally {workerGroup.shutdownGracefully();
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcResponse rpcResponse) throws Exception {logger.info("client get request result,{}", rpcResponse);
        this.response = rpcResponse;
        future.complete("");
    }
}

通过上面的代码可以看出向服务提供者发送消息是异步的,我们通过 CompletableFutureget()方法阻塞当前线程,直到接收到调用结果(PS: 我们在 channelRead0 方法中收到返回结果后会将其设置成完成状态)。看到这里,你可能会问服务提供者收到调用请求信息后如何处理的呢?具体的处理逻辑我们写在了 ServerHandler 这个类中,可以看出在 channelRead0 方法收到一条调用信息之后,调用 handle 方法来处理具体的调用过程,在 handle 方法中会使用反射机制找到所调用方法的具体实现,然后执行调用过程并获取结果,最后再使用 Netty 将结果返回给消费者服务。

public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcRequest request) throws Exception {logger.info("provider accept request,{}", request);
        // 返回的对象。RpcResponse rpcResponse = new RpcResponse();
        // 将请求 id 原路带回
        rpcResponse.setRequestId(request.getRequestId());
        try {Object result = handle(request);
            rpcResponse.setResult(result);
        } catch (Exception e) {rpcResponse.setError(e);
        }
        channelHandlerContext.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(RpcRequest request) throws Exception {String className = request.getClassName();
        Class<?> objClz = Class.forName(className);
        Object o = BeanFactory.getBean(objClz);
        // 获取调用的方法名称。String methodName = request.getMethodName();
        // 参数类型
        Class<?>[] paramsTypes = request.getParamTypes();
        // 具体参数。Object[] params = request.getParams();
        // 调用实现类的指定的方法并返回结果。Method method = objClz.getMethod(methodName, paramsTypes);
        Object res = method.invoke(o, params);
        return res;
    }
}

消费者和提供者的通用配置

除了 ProviderAutoConfigurationConsumerAutoConfiguration两个配置类,我们还定义了一个 RpcAutoConfiguration 类来配置一些其他的东西,如下所示。

public class RpcAutoConfiguration {
    ...

    @Bean
    @ConditionalOnMissingBean
    public ServiceDiscovery serviceDiscovery() {
        ServiceDiscovery serviceDiscovery =
                null;
        try {serviceDiscovery = new ServiceDiscovery(rpcProperties.getRegisterAddress());
        } catch (ZkConnectException e) {logger.error("zk connect failed:", e);
        }
        return serviceDiscovery;
    }

    @Bean
    @ConditionalOnMissingBean
    public RpcProxy rpcProxy() {RpcProxy rpcProxy = new RpcProxy();
        rpcProxy.setServiceDiscovery(serviceDiscovery());
        return rpcProxy;
    }
}

在这个配置类里面,主要初始化了一个 ServiceDiscovery 的对象以及一个 RpcProxy 的对象。其中 RpcProxy 是动态代理,在上面我们已经详细了解过了。那么这里就来着重了解一下 ServiceDiscovery 是干啥的吧。
大家还记得我们在文章开始的时候贴出来的那张图片吗?在服务消费者初始化的时候会去订阅服务提供者内容的变化,ServiceDiscovery的主要功能就是这个,其主要代码如下所示(如果你需要完整的代码,可以查看本文源码)。

public class ServiceDiscovery {

    // 存储服务提供者的信息。private volatile List<ProviderInfo> dataList = new ArrayList<>();

    public ServiceDiscovery(String registoryAddress) throws ZkConnectException {
        try {
            // 获取 zk 连接。ZooKeeper zooKeeper = new ZooKeeper(registoryAddress, 2000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {logger.info("consumer connect zk success!");
                }
            });
            watchNode(zooKeeper);
        } catch (Exception e) {throw new ZkConnectException("connect to zk exception," + e.getMessage(), e.getCause());
        }
    }

    /**
     * 监听服务提供者的变化
     */
    public void watchNode(final ZooKeeper zk) {...}

    /**
     * 获取一个服务提供者
     */
    public ProviderInfo discover(String providerName) {....}
}

在这个类的构造方法里面,我们和 ZK 注册中心建立了一个连接,并且在 watchNode 方法中监听服务提供者节点的变化,当有服务提供者信息有变化时会去修改 dataList 里的内容,这样可以保证在服务本地维持一份可用的服务提供者的信息。而在远程调用发生的时候我们会通过 discover 方法(PS:前面有见到过哦)去 dataList 里面寻找一个可用的服务提供者来提供服务。

Starter 的配置

我们还需要在 resources 目录下新建一个 META-INF 目录,然后在该目录下新建一个 spring.factories 文件,里面的内容如下面代码所示。它主要是用来指定在 Spring Boot 项目启动的时候需要加载的其他配置。如果你有不明白的地方可以查询一下 Spring Boot 自定义 Stater 的相关内容。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.itweknow.sbrpccorestarter.config.RpcAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ProviderAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ConsumerAutoConfiguration

到这一步我们框架的核心部分就完成了,它将会以一个 Spring Boot Stater 的形式提供给服务提供者和服务消费者使用,接下来我们就将分别定义一个服务提供者和一个消费者来测试我们自己实现的 RPC 框架。

创建服务提供者

在创建服务提供者之前,我们需要新建一个与服务消费者之间共享的服务接口。因为前面提到过,在服务消费者眼里的远程调用实际上就是调用本地的接口方法而已。在这个项目里我们就创建了一个 HelloRpcService.java 的接口,如下所示:

public interface HelloRpcService {String sayHello();
}

在接口定义完成之后,我们就来创建我们的服务提供者,并且实现上面定义的 HelloRpcService 接口。在服务提供者服务里还需要依赖 RPC 框架的核心 Starter 以及服务接口包,我们需要在 pom.xml 中添加下面的依赖。

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-core-starter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

添加完依赖后,我们就来看下 HelloRpcService 的具体实现吧:

@RpcService(HelloRpcService.class)
public class HelloRpcServiceImpl implements HelloRpcService {
    
    @Override
    public String sayHello() {return "Hello RPC!";}
}

其实现很简单,主要是要需要在实现类上加上 @RpcService 注解,这样在项目启动的时候 RPC 框架才会扫描到它,并将其交给 BeanFactory 管理。接下来还需要配置的是一些 RPC 框架需要的配置项,包括服务名称,ZK 的地址以及 Netty 启动的端口等信息。这些信息在框架是通过 RpcProperties 这个配置类来读取的,有兴趣的同学可以在源码中找到它。

spring.rpc.host=localhost
# netty 服务的端口号
spring.rpc.port=21810
# zk 地址
spring.rpc.register-address=localhost:2181
spring.rpc.server-name=provider
# 连接 zk 的超时时间
spring.rpc.timeout=2000

创建服务消费者

服务消费者同样也需要 RPC 核心框架的 Starter 以及服务接口的依赖,和 RPC 框架的一些基础配置项,和服务提供者类似,这里就不粘出来了。这里需要说明的一点是,为了方便测试,服务消费者是一个 Web 服务,所以它还添加了 spring-boot-starter-web 的依赖。下面我们就一起来看下服务消费者是如何调用远程服务的吧。

@RestController
@RequestMapping("/hello-rpc")
public class HelloRpcController {@RpcConsumer(providerName = "provider")
    private HelloRpcService helloRpcService;

    @GetMapping("/hello")
    public String hello() {return helloRpcService.sayHello();
    }
}

我们在消费者服务中写了一个 hello 的接口,在接口里面调用了 HelloRpcService 接口里的 sayHello() 方法,看过前面内容的同学应该知道,被 @RpcConsumer 修饰的 helloRpcService 属性在初始化的时候会为其设置一个动态代理,当我们调用这个接口里面的方法时,会通过 Netty 向服务提供者发送调用信息,然后由服务提供者调用相应方法并返回结果。
到这一步,我们可以说完成了一个简单的 RPC 框架以及其使用,下面我们就一起来验证一下结果吧。

测试

在测试之前我们需要在自己本地电脑上安装 Zookeeper,具体的安装方式非常简单。可以参考这篇文章。
安装好 Zookeeper 后,我们需要完成以下几个步骤:

  1. 启动 Zookeeper。
  2. 启动服务提供者。
  3. 启动服务消费者。

第一次启动服务消费者的过程中,你的控制台可以能会报一个找不到 /rpc 节点的错误,产生这个错误的原因是我们在第一次启动的时候 ZK 里面并不存在 /rpc 这个节点,但是如果你仔细研究源码的话,会发现当这个节点不存在的时候,我们会创建一个。所以直接忽略这个异常即可。完成以上几步之后,我们只需要在浏览器中访问http://127.0.0.1:8080/hello-rpc/hello,如果你看到了下面的结果,那么恭喜你,整个 RPC 框架完美的运行成功了。

结束语

本文的主要内容是和大家一起完成了一个 Demo 版的 RPC 框架,其主要目的是让大家更深刻的理解 RPC 的原理以及其调用过程。当然由于文章篇幅的原因,很多代码没有直接在文中给出,您可以在 Github 上找到完整的实现。如果您有什么问题可以在 Github 上提交 Issue 或者发送邮件到我的邮箱(gancy.programmer@gmail.com),如果您觉得这篇文章写的还行的话,希望您能给我个 Star,这是对我最好的鼓励。

PS: 学习不止,码不停蹄!如果您喜欢我的文章,就关注我吧!

退出移动版