乐趣区

关于Tars:TarsJava客户端源码分析

一、根本 RPC 框架简介

在分布式计算中,近程过程调用(Remote Procedure Call,缩写 RPC)容许运行于一台计算机的程序调用另一个地址空间计算机的程序,就像调用本地程序一样,无需额定地为这个交互作用波及到的代理对象构建、网络协议等进行编程。

个别 RPC 架构,有至多三种构造,别离为注册核心,服务提供者和服务消费者。如图 1.1 所示,注册核心提供注册服务和注册信息变更的告诉服务,服务提供者运行在服务器来提供服务,服务消费者应用服务提供者的服务。

服务提供者(RPC Server),运行在服务端,提供服务接口定义与服务实现类,并对外裸露服务接口。注册核心(Registry),运行在服务端,负责记录服务提供者的服务对象,并提供近程服务信息的查问服务和变更告诉服务。服务消费者(RPC Client),运行在客户端,通过近程代理对象调用近程服务。

1.1 RPC 调用流程

如下图所示,形容了 RPC 的调用流程,其中 IDL(Interface Description Language)为接口描述语言,使得在不同平台上运行的程序和用不同语言编写的程序能够互相通信交换。

1)客户端调用客户端桩模块。该调用是本地过程调用,其中参数以失常形式推入堆栈。

2)客户端桩模块将参数打包到音讯中,并进行零碎调用以发送音讯。打包参数称为编组。

3)客户端的本地操作系统将音讯从客户端计算机发送到服务器计算机。

4)服务器计算机上的本地操作系统将传入的数据包传递到服务器桩模块。

5)服务器桩模块从音讯中解包出参数。解包参数称为解组。

6)最初,服务器桩模块执行服务器程序流程。回复是沿相同的方向执行雷同的步骤。

二、Tars Java 客户端设计介绍

Tars Java 客户端整体设计与支流的 RPC 框架基本一致。咱们先介绍 Tars Java 客户端初始化过程。

2.1 Tars Java 客户端初始化过程

如图 2.1 所示,形容了 Tars Java 的初始化过程。

1)先出创立一个 CommunicatorConfig 配置项,命名为 communicatorConfig,其中按需设置 locator, moduleName, connections 等参数。

2)通过上述的 CommunicatorConfig 配置项,命名为 config,那么调用 CommunicatorFactory.getInstance().getCommunicator(config),创立一个 Communicator 对象,命名为 communicator。

3)假如 objectName=”MESSAGE.ControlCenter.Dispatcher”,须要生成的代理接口为 Dispatcher.class,调用 communicator.stringToProxy(objectName, Dispatcher.class)办法来生成代理对象的实现类。

4)在 stringToProxy()办法里,首先通过初始化 QueryHelper 代理对象,调用 getServerNodes()办法获取近程服务对象列表,并设置该返回值到 communicatorConfig 的 objectName 字段里。具体的代理对象的代码剖析,见下文中的“2.3 代理生成”章节。

5)判断在之前调用 stringToProxy 是否有设置 LoadBalance 参数,如果没有的话,就生成默认的采纳 RR 轮训算法的 DefaultLoadBalance 对象。

6)创立 TarsProtocolInvoker 协定调用对象,其中过程有通过解析 communicatorConfig 中的 objectName 和 simpleObjectName 来获取 URL 列表,其中一个 URL 对应一个近程服务对象,TarsProtocolInvoker 初始化各个 URL 对应的 ServantClient 对象,其中一个 URL 依据 communicatorConfig 的 connections 配置项确认生成多少个 ServantClient 对象。而后应用 ServantClients 等参数初始化 TarsInvoker 对象,并将这些 TarsInvoker 对象汇合设置到 TarsProtocolInvoker 的 allInvokers 成员变量中,其中每个 URL 对应一个 TarsInvoker 对象。上述分析表明,一个近程服务节点对应一个 TarsInvoker 对象,一个 TarsInvoker 对象蕴含 connections 个 ServantClient 对象,对于 TCP 协定,那么就是一个 ServantClient 对象对应一个 TCP 连贯。

7)应用 api, objName, servantProxyConfig,loadBalance,protocolInvoker, this.communicator 参数生成一个实现 JDK 代理接口 InvocationHandler 的 ObjectProxy 对象。

8)生成 ObjectProxy 对象的同时进行初始化操作,首先会执行 loadBalancer.refresh()办法刷新近程服务节点到负载均衡器中便于后续 tars 近程调用进行路由。

9)而后注册统计信息上报器,其中是上报办法采纳 JDK 的 ScheduledThreadPoolExecutor 进行定时轮训上报。

10)注册服务列表刷新器,采纳的技术办法和上述统计信息上报器基本一致。

2.2 应用范例

以下代码为最简化示例,其中 CommunicatorConfig 里的配置采纳默认值,communicator 通过 CommunicatorConfig 配置生成后,间接指定近程服务对象的具体服务对象名、IP 和端口生成一个近程服务代理对象。

Tars Java 代码应用范例 // 先初始化根本 Tars 配置 CommunicatorConfig cfg = new CommunicatorConfig();// 通过上述的 CommunicatorConfig 配置生成一个 Communicator 对象。Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);// 指定 Tars 近程服务的服务对象名、IP 和端口生成一个近程服务代理对象。

// 先初始化根本 Tars 配置
    CommunicatorConfig cfg = new CommunicatorConfig();
    // 通过上述的 CommunicatorConfig 配置生成一个 Communicator 对象。Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);
    // 指定 Tars 近程服务的服务对象名、IP 和端口生成一个近程服务代理对象。HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000");
    // 同步调用,阻塞直到近程服务对象的办法返回后果
    String ret = proxy.hello(3000, "Hello World");
    System.out.println(ret);
    // 异步调用,不关注异步调用最终的状况
    proxy.async_hello(null, 3000, "Hello World");
      // 异步调用, 注册一个实现 TarsAbstractCallback 接口的回执解决对象,该实现类别离解决调用胜利,调用超时和调用异样的状况。proxy.async_hello(new HelloPrxCallback() {
        @Override
        public void callback_expired() { // 超时事件处理}
        @Override
        public void callback_exception(Throwable ex) {// 异样事件处理}
        @Override
        public void callback_hello(String ret) { // 调用胜利事件处理
            Main.logger.info("invoke async method successfully {}", ret);
       }
    }, 1000, "Hello World");

在上述例子中,演示了常见的两种调用形式,别离为同步调用和异步调用。其中异步调用,如果调用方想捕获异步调用的最终后果,能够注册一个实现 TarsAbstractCallback 接口的实现类,对 tars 调用的异样,超时和胜利事件进行解决。

2.3 代理生成

Tars Java 的客户端桩模块的近程代理对象是采纳 JDK 原生 Proxy 办法。如下文的源码所示,ObjectProxy 实现了 java.lang.reflect.InvocationHandler 的接口办法,该接口是 JDK 自带的代理接口。

代理实现

public final class ObjectProxy<T> implements ServantProxy, InvocationHandler {public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        InvokeContext context = this.protocolInvoker.createContext(proxy, method, args);
        try {if ("toString".equals(methodName) && parameterTypes.length == 0) {return this.toString();
            } else if
                //***** 省略代码 *****
            } else {
                // 在负载均衡器选取一个近程调用类,进行应用层协定的封装,最初调用 TCP 传输层进行发送。Invoker invoker = this.loadBalancer.select(context);
                return invoker.invoke(context);
            }
        } catch (Throwable var8) {// ***** 省略代码 *****}
    }
}

当然生成上述近程服务代理类,波及到辅助类,Tars Java 采纳 ServantProxyFactory 来生成上述的 ObjectProxy,并存储 ObjectProxy 对象到 Map 构造,便于调用方二次应用时间接复用已存在的近程服务代理对象。

具体相干逻辑如源码所示,ObjectProxyFactory 是生成 ObjectProxy 的辅助工厂类,和 ServantProxyFactory 不同,其自身不缓存生成的代理对象。

class ServantProxyFactory {private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap();
    // ***** 省略代码 *****
    public <T> Object getServantProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) {Object proxy = this.cache.get(objName);
        if (proxy == null) {this.lock.lock(); // 加锁,保障只生成一个近程服务代理对象。try {proxy = this.cache.get(objName);
                if (proxy == null) {
                    // 创立实现 JDK 的 java.lang.reflect.InvocationHandler 接口的对象
                    ObjectProxy<T> objectProxy = this.communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, servantProxyConfig, loadBalance, protocolInvoker);
                    // 应用 JDK 的 java.lang.reflect.Proxy 来生成理论的代理对象
                    this.cache.putIfAbsent(objName, this.createProxy(clazz, objectProxy));
                    proxy = this.cache.get(objName);
                }
            } finally {this.lock.unlock();
            }
        }
        return proxy;
    }
    /** 应用 JDK 自带的 Proxy.newProxyInstance 生成代理对象 */
    private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz, ServantProxy.class}, objectProxy);
    }
    // ***** 省略代码 *****
}

从以上的源码中,能够看到 createProxy 应用了 JDK 的 Proxy.newProxyInstance 办法来生成近程服务代理对象。

2.4 近程服务寻址办法

作为一个 RPC 近程框架,在分布式系统中,调用近程服务,波及到如何路由的问题,也就是如何从多个近程服务节点中抉择一个服务节点进行调用,当然 Tars Java 反对直连特定节点的形式调用近程服务,如上文的 2.2 应用范例所介绍。

如图下图所示,ClientA 某个时刻的一次调用应用了 Service3 节点进行近程服务调用,而 ClientB 某个时刻的一次调用采纳 Service2 节点。Tars Java 提供多种负载平衡算法实现类,其中有采纳 RR 轮训算法的 RoundRobinLoadBalance,一致性哈希算法的 ConsistentHashLoadBalance 和一般哈希算法的 HashLoadBalance。

如下述源码所示,如果要自定义负载均衡器来定义近程调用的路由规定,那么须要实现 com.qq.tars.rpc.common.LoadBalance 接口,其中 LoadBalance.select()办法负责依照路由规定,选取对应的 Invoker 对象,而后进行近程调用,具体逻辑见源码代理实现。因为近程服务节点可能产生变更,比方高低线近程服务节点,须要刷新本地负载均衡器的路由信息,那么此信息更新的逻辑在 LoadBalance.refresh()办法里实现。

负载平衡接口

public interface LoadBalance<T> {
    /** 依据负载平衡策略,筛选 invoker */
    Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;
    /** 告诉 invoker 列表的更新 */
    void refresh(Collection<Invoker<T>> invokers);
}

2.5 网络模型

Tars Java 的 IO 模式采纳的 JDK 的 NIO 的 Selector 模式。这里以 TCP 协定来形容网络解决,如下述源码所示,Reactor 是一个线程,其中的 run()办法中,调用了 selector.select()办法,意思是如果除非此时网络产生一个事件,否则将始终线程阻塞上来。

如果此时呈现一个网络事件,那么此时线程将会被唤醒,执行后续代码,其中一个代码是 dispatcheEvent(key),也就是将进行事件的散发。

其中将依据对应条件,调用 acceptor.handleConnectEvent(key)办法来解决客户端连贯胜利事件,或 acceptor.handleAcceptEvent(key)办法来解决服务器承受连贯胜利事件,或调用 acceptor.handleReadEvent(key)办法从 Socket 里读取数据,或 acceptor.handleWriteEvent(key)办法来写数据到 Socket。

Reactor 事件处理

public final class Reactor extends Thread {
    protected volatile Selector selector = null;
    private Acceptor acceptor = null;
    //***** 省略代码 *****
    public void run() {
        try {while (!Thread.interrupted()) {
                // 阻塞直到有网络事件产生。selector.select();
                //***** 省略代码 *****
                while (iter.hasNext()) {SelectionKey key = iter.next();
                    iter.remove();
                    if (!key.isValid()) continue;
                    try {
                        //***** 省略代码 *****
                        // 散发传输层协定 TCP 或 UDP 网络事件
                        dispatchEvent(key);
                //***** 省略代码 *****
            }
        }
        //***** 省略代码 *****
    }
        //***** 省略代码 *****
    private void dispatchEvent(final SelectionKey key) throws IOException {if (key.isConnectable()) {acceptor.handleConnectEvent(key);
        } else if (key.isAcceptable()) {acceptor.handleAcceptEvent(key);
        } else if (key.isReadable()) {acceptor.handleReadEvent(key);
        } else if (key.isValid() && key.isWritable()) {acceptor.handleWriteEvent(key);
        }
    }
}

网络解决采纳 Reactor 事件驱动模式,Tars 定义一个 Reactor 对象对应一个 Selector 对象,针对每个近程服务(整体服务集群,非单个节点程序)默认创立 2 个 Reactor 对象进行解决,通过批改 com.qq.tars.net.client.selectorPoolSize 这个 JVM 启动参数值来决定一个近程服务具体创立几个 Reactor 对象。

上图中的解决读 IO 事件(Read Event)实现和写 IO 事件(Write Event)的线程池是在 Communicator 初始化的时候配置的。具体逻辑如源码所示,其中线程池参数配置由 CommunicatorConfig 的 corePoolSize, maxPoolSize, keepAliveTime 等参数决定。

读写事件线程池初始化

private void initCommunicator(CommunicatorConfig config) throws CommunicatorConfigException {
    //***** 省略代码 *****
    this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config);
    //***** 省略代码 *****
}
​
public class ClientPoolManager {public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) {
        //***** 省略代码 *****
        clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig));
        //***** 省略代码 *****
        return clientPoolExecutor;
    }    
     
    private static ThreadPoolExecutor createThreadPool(CommunicatorConfig communicatorConfig) {int corePoolSize = communicatorConfig.getCorePoolSize();
        int maxPoolSize = communicatorConfig.getMaxPoolSize();
        int keepAliveTime = communicatorConfig.getKeepAliveTime();
        int queueSize = communicatorConfig.getQueueSize();
        TaskQueue taskqueue = new TaskQueue(queueSize);
​
        String namePrefix = "tars-client-executor-";
        TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory(namePrefix));
        taskqueue.setParent(executor);
        return executor;
    }
}

2.6 近程调用交互模型

调用代理类的办法,那么会进入实现 InvocationHandler 接口的 ObjectProxy 中的 invoke 办法。

下图形容了近程服务调用的流程状况。这里着重讲几个点,一个是如何写数据到网络 IO。第二个是 Tars Java 通过什么形式进行同步或者异步调用,底层采纳了什么技术。

2.6.1 写 IO 流程

如图(底层代码写 IO 过程)所示,ServantClient 将调用底层网络写操作,在 invokeWithSync 办法中,获得 ServantClient 本身成员变量 TCPSession,调用 TCPSession.write()办法,如图(底层代码写 IO 过程)和以下源码(读写事件线程池初始化)所示,先获取 Encode 进行申请内容编码成 IoBuffer 对象,最初将 IoBuffer 的 java.nio.ByteBuffer 内容放入 TCPSession 的 queue 成员变量中,而后调用 key.selector().wakeup(),唤醒 Reactor 中 run()办法中的 Selector.select(),执行后续的写操作。

具体 Reactor 逻辑见上文 2.5 网络模型 内容,如果 Reactor 查看条件发现能够写 IO 的话也就是 key.isWritable()为 true,那么最终会循环从 TCPSession.queue 中取出 ByteBuffer 对象,调用 SocketChannel.write(byteBuffer)执行理论的写网络 Socket 操作,代码逻辑见源码中的 doWrite()办法。

读写事件线程池初始化

public class TCPSession extends Session {public void write(Request request) throws IOException {
        try {IoBuffer buffer = selectorManager.getProtocolFactory().getEncoder().encodeRequest(request, this);
            write(buffer);
        //***** 省略代码 *****
    }
    protected void write(IoBuffer buffer) throws IOException {
        //***** 省略代码 *****
        if (!this.queue.offer(buffer.buf())) {throw new IOException("The session queue is full. [ queue size:" + queue.size() + "]");
        }
        if (key != null) {key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
            key.selector().wakeup();
        }
    }
    protected synchronized int doWrite() throws IOException {
        int writeBytes = 0;
        while (true) {ByteBuffer wBuf = queue.peek();
            //***** 省略代码 *****
            int bytesWritten = ((SocketChannel) channel).write(wBuf);
            //***** 省略代码 *****
        return writeBytes;
    }
}

2.6.2 同步和异步调用的底层技术实现

对于同步办法调用,如图(近程调用流程)和源码(ServantClient 的同步调用)所示,ServantClient 调用底层网络写操作,在 invokeWithSync 办法中创立一个 Ticket 对象,Ticket 顾名思义就是票的意思,这张票惟一标识本次网络调用状况。

ServantClient 的同步调用

public class ServantClient {public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException {
            //***** 省略代码 *****
            ticket = TicketManager.createTicket(request, session, this.syncTimeout);
            Session current = session;
            current.write(request);
            if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) {
            //***** 省略代码 *****
            response = ticket.response();
            //***** 省略代码 *****
            return response;
            //***** 省略代码 *****
        return response;
    }
}

如代码所示,在执行完 session.write()操作后,紧接着执行 ticket.await()办法,该办法线程期待直到近程服务回复返回后果到客户端,ticket.await()被唤醒后,将执行后续操作,最终 invokeWithSync 办法返回 response 对象。其中 Ticket 的期待唤醒性能外部采纳 java.util.concurrent.CountDownLatch 来实现。

对于异步办法调用,将会执行 ServantClient.invokeWithAsync 办法,也会创立一个 Ticket,并且执行 Session.write()操作,尽管不会调用 ticket.await(),然而在 Reactor 接管到近程回复时,首先会先解析 Tars 协定头失去 Response 对象,而后将 Response 对象放入如图(Tars-Java 的网络事件处理模型)所示的 IO 读写线程池中进行进一步解决,如下述源码(异步回调事件处理)所示,最终会调用 WorkThread.run()办法,在 run()办法里执行 ticket.notifyResponse(resp),该办法外面会执行相似上述代码 2.1 中的实现 TarsAbstractCallback 接口的调用胜利回调的办法。

异步回调事件处理

public final class WorkThread implements Runnable {public void run() {
        try {
            //***** 省略代码 *****
                Ticket<Response> ticket = TicketManager.getTicket(resp.getTicketNumber());
            //***** 省略代码 *****
                ticket.notifyResponse(resp);
                ticket.countDown();
                TicketManager.removeTicket(ticket.getTicketNumber());
            }
            //***** 省略代码 *****
    }
}

如下述源码所示,TicketManager 会有一个定时工作轮训查看所有的调用是否超时,如果 (currentTime – t.startTime) > t.timeout 条件成立,那么会调用 t.expired() 告知回调对象,本次调用超时。

调用超时事件处理

public class TicketManager {
            //***** 省略代码 *****
    static {executor.scheduleAtFixedRate(new Runnable() {
            long currentTime = -1;
            public void run() {Collection<Ticket<?>> values = tickets.values();
                currentTime = System.currentTimeMillis();
                for (Ticket<?> t : values) {if ((currentTime - t.startTime) > t.timeout) {removeTicket(t.getTicketNumber());
                        t.expired();}
                }
            }
        }, 500, 500, TimeUnit.MILLISECONDS);
    }
}

三、总结

代码的调用个别都是层层递归调用,代码的调用深度和广度都很大,通过调试代码的形式一步步学习源码的形式,更加容易了解源码的含意和设计理念。

Tars 与其余 RPC 框架,并没有什么本质区别,通过类比其余框架的设计理念,能够更加深刻了解 Tars Java 设计理念。

四、参考文献

1.Remote procedure call

2.Tars Java 源码 Github 仓库

3.RPC 框架简介与原理

作者:vivo 互联网服务器团队 -Ke Shengkai

退出移动版