一、根本RPC框架简介

在分布式计算中,近程过程调用(Remote Procedure Call,缩写 RPC)容许运行于一台计算机的程序调用另一个地址空间计算机的程序,就像调用本地程序一样,无需额定地为这个交互作用波及到的代理对象构建、网络协议等进行编程。

个别RPC架构,有至多三种构造,别离为注册核心,服务提供者和服务消费者。如图1.1所示,注册核心提供注册服务和注册信息变更的告诉服务,服务提供者运行在服务器来提供服务,服务消费者应用服务提供者的服务。

服务提供者(RPC Server),运行在服务端,提供服务接口定义与服务实现类,并对外裸露服务接口。注册核心(Registry),运行在服务端,负责记录服务提供者的服务对象,并提供近程服务信息的查问服务和变更告诉服务。服务消费者(RPC Client),运行在客户端,通过近程代理对象调用近程服务。

1.1 RPC调用流程

如下图所示,形容了RPC的调用流程,其中IDL(Interface Description Language)为接口描述语言,使得在不同平台上运行的程序和用不同语言编写的程序能够互相通信交换。

1)客户端调用客户端桩模块。该调用是本地过程调用,其中参数以失常形式推入堆栈。

2)客户端桩模块将参数打包到音讯中,并进行零碎调用以发送音讯。打包参数称为编组。

3)客户端的本地操作系统将音讯从客户端计算机发送到服务器计算机。

4)服务器计算机上的本地操作系统将传入的数据包传递到服务器桩模块。

5)服务器桩模块从音讯中解包出参数。解包参数称为解组。

6)最初,服务器桩模块执行服务器程序流程。回复是沿相同的方向执行雷同的步骤。

二、Tars Java客户端设计介绍

Tars Java客户端整体设计与支流的RPC框架基本一致。咱们先介绍Tars Java客户端初始化过程。

2.1 Tars Java客户端初始化过程

如图2.1所示,形容了Tars Java的初始化过程。

1)先出创立一个CommunicatorConfig配置项,命名为communicatorConfig,其中按需设置locator, moduleName, connections等参数。

2)通过上述的CommunicatorConfig配置项,命名为config,那么调用CommunicatorFactory.getInstance().getCommunicator(config),创立一个Communicator对象,命名为communicator。

3)假如objectName="MESSAGE.ControlCenter.Dispatcher",须要生成的代理接口为Dispatcher.class,调用communicator.stringToProxy(objectName, Dispatcher.class)办法来生成代理对象的实现类。

4)在stringToProxy()办法里,首先通过初始化QueryHelper代理对象,调用getServerNodes()办法获取近程服务对象列表,并设置该返回值到communicatorConfig的objectName字段里。具体的代理对象的代码剖析,见下文中的“2.3 代理生成”章节。

5)判断在之前调用stringToProxy是否有设置LoadBalance参数,如果没有的话,就生成默认的采纳RR轮训算法的DefaultLoadBalance对象。

6)创立TarsProtocolInvoker协定调用对象,其中过程有通过解析communicatorConfig中的objectName和simpleObjectName来获取URL列表,其中一个URL对应一个近程服务对象,TarsProtocolInvoker初始化各个URL对应的ServantClient对象,其中一个URL依据communicatorConfig的connections配置项确认生成多少个ServantClient对象。而后应用ServantClients等参数初始化TarsInvoker对象,并将这些TarsInvoker对象汇合设置到TarsProtocolInvoker的allInvokers成员变量中,其中每个URL对应一个TarsInvoker对象。上述分析表明,一个近程服务节点对应一个TarsInvoker对象,一个TarsInvoker对象蕴含connections个ServantClient对象,对于TCP协定,那么就是一个ServantClient对象对应一个TCP连贯。

7)应用api, objName, servantProxyConfig,loadBalance,protocolInvoker, this.communicator参数生成一个实现JDK代理接口InvocationHandler的ObjectProxy对象。

8)生成ObjectProxy对象的同时进行初始化操作,首先会执行loadBalancer.refresh()办法刷新近程服务节点到负载均衡器中便于后续tars近程调用进行路由。

9)而后注册统计信息上报器,其中是上报办法采纳JDK的ScheduledThreadPoolExecutor进行定时轮训上报。

10)注册服务列表刷新器,采纳的技术办法和上述统计信息上报器基本一致。

2.2 应用范例

以下代码为最简化示例,其中CommunicatorConfig里的配置采纳默认值,communicator通过CommunicatorConfig配置生成后,间接指定近程服务对象的具体服务对象名、IP和端口生成一个近程服务代理对象。

Tars Java代码应用范例// 先初始化根本Tars配置CommunicatorConfig cfg = new CommunicatorConfig();// 通过上述的CommunicatorConfig配置生成一个Communicator对象。Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);// 指定Tars近程服务的服务对象名、IP和端口生成一个近程服务代理对象。

// 先初始化根本Tars配置    CommunicatorConfig cfg = new CommunicatorConfig();    // 通过上述的CommunicatorConfig配置生成一个Communicator对象。    Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);    // 指定Tars近程服务的服务对象名、IP和端口生成一个近程服务代理对象。    HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000");    //同步调用,阻塞直到近程服务对象的办法返回后果    String ret = proxy.hello(3000, "Hello World");    System.out.println(ret);    //异步调用,不关注异步调用最终的状况    proxy.async_hello(null, 3000, "Hello World");      //异步调用,注册一个实现TarsAbstractCallback接口的回执解决对象,该实现类别离解决调用胜利,调用超时和调用异样的状况。    proxy.async_hello(new HelloPrxCallback() {        @Override        public void callback_expired() { //超时事件处理        }        @Override        public void callback_exception(Throwable ex) { //异样事件处理        }        @Override        public void callback_hello(String ret) { //调用胜利事件处理            Main.logger.info("invoke async method successfully {}", ret);       }    }, 1000, "Hello World");

在上述例子中,演示了常见的两种调用形式,别离为同步调用和异步调用。其中异步调用,如果调用方想捕获异步调用的最终后果,能够注册一个实现TarsAbstractCallback接口的实现类,对tars调用的异样,超时和胜利事件进行解决。

2.3 代理生成

Tars Java的客户端桩模块的近程代理对象是采纳JDK原生Proxy办法。如下文的源码所示,ObjectProxy实现了java.lang.reflect.InvocationHandler的接口办法,该接口是JDK自带的代理接口。

代理实现

public final class ObjectProxy<T> implements ServantProxy, InvocationHandler {    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        String methodName = method.getName();        Class<?>[] parameterTypes = method.getParameterTypes();        InvokeContext context = this.protocolInvoker.createContext(proxy, method, args);        try {            if ("toString".equals(methodName) && parameterTypes.length == 0) {                return this.toString();            } else if                //***** 省略代码 *****            } else {                // 在负载均衡器选取一个近程调用类,进行应用层协定的封装,最初调用TCP传输层进行发送。                Invoker invoker = this.loadBalancer.select(context);                return invoker.invoke(context);            }        } catch (Throwable var8) {            // ***** 省略代码 *****        }    }}

当然生成上述近程服务代理类,波及到辅助类,Tars Java采纳ServantProxyFactory来生成上述的ObjectProxy,并存储ObjectProxy对象到Map构造,便于调用方二次应用时间接复用已存在的近程服务代理对象。

具体相干逻辑如源码所示,ObjectProxyFactory是生成ObjectProxy的辅助工厂类,和ServantProxyFactory不同,其自身不缓存生成的代理对象。

class ServantProxyFactory {    private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap();    // ***** 省略代码 *****    public <T> Object getServantProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) {        Object proxy = this.cache.get(objName);        if (proxy == null) {            this.lock.lock(); // 加锁,保障只生成一个近程服务代理对象。            try {                proxy = this.cache.get(objName);                if (proxy == null) {                    // 创立实现JDK的java.lang.reflect.InvocationHandler接口的对象                    ObjectProxy<T> objectProxy = this.communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, servantProxyConfig, loadBalance, protocolInvoker);                    // 应用JDK的java.lang.reflect.Proxy来生成理论的代理对象                    this.cache.putIfAbsent(objName, this.createProxy(clazz, objectProxy));                    proxy = this.cache.get(objName);                }            } finally {                this.lock.unlock();            }        }        return proxy;    }    /** 应用JDK自带的Proxy.newProxyInstance生成代理对象 */    private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz, ServantProxy.class}, objectProxy);    }    // ***** 省略代码 *****}

从以上的源码中,能够看到createProxy应用了JDK的Proxy.newProxyInstance办法来生成近程服务代理对象。

2.4 近程服务寻址办法

作为一个RPC近程框架,在分布式系统中,调用近程服务,波及到如何路由的问题,也就是如何从多个近程服务节点中抉择一个服务节点进行调用,当然Tars Java反对直连特定节点的形式调用近程服务,如上文的2.2 应用范例所介绍。

如图下图所示,ClientA某个时刻的一次调用应用了Service3节点进行近程服务调用,而ClientB某个时刻的一次调用采纳Service2节点。Tars Java提供多种负载平衡算法实现类,其中有采纳RR轮训算法的RoundRobinLoadBalance,一致性哈希算法的ConsistentHashLoadBalance和一般哈希算法的HashLoadBalance。

如下述源码所示,如果要自定义负载均衡器来定义近程调用的路由规定,那么须要实现com.qq.tars.rpc.common.LoadBalance接口,其中LoadBalance.select()办法负责依照路由规定,选取对应的Invoker对象,而后进行近程调用,具体逻辑见源码代理实现。因为近程服务节点可能产生变更,比方高低线近程服务节点,须要刷新本地负载均衡器的路由信息,那么此信息更新的逻辑在LoadBalance.refresh()办法里实现。

负载平衡接口

public interface LoadBalance<T> {    /** 依据负载平衡策略,筛选invoker */    Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;    /** 告诉invoker列表的更新 */    void refresh(Collection<Invoker<T>> invokers);}

2.5 网络模型

Tars Java的IO模式采纳的JDK的NIO的Selector模式。这里以TCP协定来形容网络解决,如下述源码所示,Reactor是一个线程,其中的run()办法中,调用了selector.select()办法,意思是如果除非此时网络产生一个事件,否则将始终线程阻塞上来。

如果此时呈现一个网络事件,那么此时线程将会被唤醒,执行后续代码,其中一个代码是dispatcheEvent(key),也就是将进行事件的散发。

其中将依据对应条件,调用acceptor.handleConnectEvent(key)办法来解决客户端连贯胜利事件,或acceptor.handleAcceptEvent(key)办法来解决服务器承受连贯胜利事件,或调用acceptor.handleReadEvent(key)办法从Socket里读取数据,或acceptor.handleWriteEvent(key)办法来写数据到Socket 。

Reactor事件处理

public final class Reactor extends Thread {    protected volatile Selector selector = null;    private Acceptor acceptor = null;    //***** 省略代码 *****    public void run() {        try {            while (!Thread.interrupted()) {                // 阻塞直到有网络事件产生。                selector.select();                //***** 省略代码 *****                while (iter.hasNext()) {                    SelectionKey key = iter.next();                    iter.remove();                    if (!key.isValid()) continue;                    try {                        //***** 省略代码 *****                        // 散发传输层协定TCP或UDP网络事件                        dispatchEvent(key);                //***** 省略代码 *****            }        }        //***** 省略代码 *****    }        //***** 省略代码 *****    private void dispatchEvent(final SelectionKey key) throws IOException {        if (key.isConnectable()) {            acceptor.handleConnectEvent(key);        } else if (key.isAcceptable()) {            acceptor.handleAcceptEvent(key);        } else if (key.isReadable()) {            acceptor.handleReadEvent(key);        } else if (key.isValid() && key.isWritable()) {            acceptor.handleWriteEvent(key);        }    }}

网络解决采纳Reactor事件驱动模式,Tars定义一个Reactor对象对应一个Selector对象,针对每个近程服务(整体服务集群,非单个节点程序)默认创立2个Reactor对象进行解决,通过批改com.qq.tars.net.client.selectorPoolSize这个JVM启动参数值来决定一个近程服务具体创立几个Reactor对象。

上图中的解决读IO事件(Read Event)实现和写IO事件(Write Event)的线程池是在Communicator初始化的时候配置的。具体逻辑如源码所示,其中线程池参数配置由CommunicatorConfig的corePoolSize, maxPoolSize, keepAliveTime等参数决定。

读写事件线程池初始化

private void initCommunicator(CommunicatorConfig config) throws CommunicatorConfigException {    //***** 省略代码 *****    this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config);    //***** 省略代码 *****}public class ClientPoolManager {    public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) {        //***** 省略代码 *****        clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig));        //***** 省略代码 *****        return clientPoolExecutor;    }             private static ThreadPoolExecutor createThreadPool(CommunicatorConfig communicatorConfig) {        int corePoolSize = communicatorConfig.getCorePoolSize();        int maxPoolSize = communicatorConfig.getMaxPoolSize();        int keepAliveTime = communicatorConfig.getKeepAliveTime();        int queueSize = communicatorConfig.getQueueSize();        TaskQueue taskqueue = new TaskQueue(queueSize);        String namePrefix = "tars-client-executor-";        TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory(namePrefix));        taskqueue.setParent(executor);        return executor;    }}

2.6 近程调用交互模型

调用代理类的办法,那么会进入实现InvocationHandler接口的ObjectProxy中的invoke办法。

下图形容了近程服务调用的流程状况。这里着重讲几个点,一个是如何写数据到网络IO。第二个是Tars Java通过什么形式进行同步或者异步调用,底层采纳了什么技术。

2.6.1 写 IO 流程

如图(底层代码写IO过程)所示,ServantClient将调用底层网络写操作,在invokeWithSync办法中,获得ServantClient本身成员变量TCPSession,调用TCPSession.write()办法,如图(底层代码写IO过程)和以下源码( 读写事件线程池初始化)所示,先获取Encode进行申请内容编码成IoBuffer对象,最初将IoBuffer的java.nio.ByteBuffer内容放入TCPSession的queue成员变量中,而后调用key.selector().wakeup(),唤醒Reactor中run()办法中的Selector.select(),执行后续的写操作。

具体Reactor逻辑见上文2.5 网络模型内容,如果Reactor查看条件发现能够写IO的话也就是key.isWritable()为true,那么最终会循环从TCPSession.queue中取出ByteBuffer对象,调用SocketChannel.write(byteBuffer)执行理论的写网络Socket操作,代码逻辑见源码中的doWrite()办法。

读写事件线程池初始化

public class TCPSession extends Session {    public void write(Request request) throws IOException {        try {            IoBuffer buffer = selectorManager.getProtocolFactory().getEncoder().encodeRequest(request, this);            write(buffer);        //***** 省略代码 *****    }    protected void write(IoBuffer buffer) throws IOException {        //***** 省略代码 *****        if (!this.queue.offer(buffer.buf())) {            throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");        }        if (key != null) {            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);            key.selector().wakeup();        }    }    protected synchronized int doWrite() throws IOException {        int writeBytes = 0;        while (true) {            ByteBuffer wBuf = queue.peek();            //***** 省略代码 *****            int bytesWritten = ((SocketChannel) channel).write(wBuf);            //***** 省略代码 *****        return writeBytes;    }}

2.6.2 同步和异步调用的底层技术实现

对于同步办法调用,如图(近程调用流程)和源码(ServantClient的同步调用)所示,ServantClient调用底层网络写操作,在invokeWithSync办法中创立一个Ticket对象,Ticket顾名思义就是票的意思,这张票惟一标识本次网络调用状况。

ServantClient的同步调用

public class ServantClient {    public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException {            //***** 省略代码 *****            ticket = TicketManager.createTicket(request, session, this.syncTimeout);            Session current = session;            current.write(request);            if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) {            //***** 省略代码 *****            response = ticket.response();            //***** 省略代码 *****            return response;            //***** 省略代码 *****        return response;    }}

如代码所示,在执行完session.write()操作后,紧接着执行ticket.await()办法,该办法线程期待直到近程服务回复返回后果到客户端,ticket.await()被唤醒后,将执行后续操作,最终invokeWithSync办法返回response对象。其中Ticket的期待唤醒性能外部采纳java.util.concurrent.CountDownLatch来实现。

对于异步办法调用,将会执行ServantClient.invokeWithAsync办法,也会创立一个Ticket,并且执行Session.write()操作,尽管不会调用ticket.await(),然而在Reactor接管到近程回复时,首先会先解析Tars协定头失去Response对象,而后将Response对象放入如图(Tars-Java的网络事件处理模型)所示的IO读写线程池中进行进一步解决,如下述源码(异步回调事件处理)所示,最终会调用WorkThread.run()办法,在run()办法里执行ticket.notifyResponse(resp),该办法外面会执行相似上述代码2.1中的实现TarsAbstractCallback接口的调用胜利回调的办法。

异步回调事件处理

public final class WorkThread implements Runnable {    public void run() {        try {            //***** 省略代码 *****                Ticket<Response> ticket = TicketManager.getTicket(resp.getTicketNumber());            //***** 省略代码 *****                ticket.notifyResponse(resp);                ticket.countDown();                TicketManager.removeTicket(ticket.getTicketNumber());            }            //***** 省略代码 *****    }}

如下述源码所示,TicketManager会有一个定时工作轮训查看所有的调用是否超时,如果(currentTime - t.startTime) > t.timeout条件成立,那么会调用t.expired()告知回调对象,本次调用超时。

调用超时事件处理

public class TicketManager {            //***** 省略代码 *****    static {        executor.scheduleAtFixedRate(new Runnable() {            long currentTime = -1;            public void run() {                Collection<Ticket<?>> values = tickets.values();                currentTime = System.currentTimeMillis();                for (Ticket<?> t : values) {                    if ((currentTime - t.startTime) > t.timeout) {                        removeTicket(t.getTicketNumber());                        t.expired();                    }                }            }        }, 500, 500, TimeUnit.MILLISECONDS);    }}

三、总结

代码的调用个别都是层层递归调用,代码的调用深度和广度都很大,通过调试代码的形式一步步学习源码的形式,更加容易了解源码的含意和设计理念。

Tars与其余RPC框架,并没有什么本质区别,通过类比其余框架的设计理念,能够更加深刻了解Tars Java设计理念。

四、参考文献

1.Remote procedure call

2.Tars Java源码Github仓库

3.RPC框架简介与原理

作者:vivo 互联网服务器团队-Ke Shengkai