关于tars:TarsJava网络编程源码分析

4次阅读

共计 12307 个字符,预计需要花费 31 分钟才能阅读完成。

作者:vivo 互联网服务器团队 - Jin Kai

本文从 Java NIO 网络编程的基础知识讲到了 Tars 框架应用 NIO 进行网络编程的源码剖析。

一、Tars 框架根本介绍

Tars 是腾讯开源的反对多语言的高性能 RPC 框架,起源于腾讯外部 2008 年至今始终应用的对立利用框架 TAF(Total Application Framework),目前反对 C ++、Java、PHP、Nodejs、Go 语言。

该框架为用户提供了波及到开发、运维、以及测试的一整套解决方案,帮忙一个产品或者服务疾速开发、部署、测试、上线。它集可扩大协定编解码、高性能 RPC 通信框架、名字路由与发现、公布监控、日志统计、配置管理等于一体,通过它能够疾速用微服务的形式构建本人的稳固牢靠的分布式应用,并实现残缺无效的服务治理。

官网仓库地址:

https://github.com/TarsCloud/Tars

vivo 推送平台也深度应用了该框架,部署服务节点超过一千个,通过线上每日一百多亿音讯推送量的考验。

此前已在 vivo 互联网技术公众号公布过《Tars Java 客户端源码剖析》,此篇文章为续集。

Tars-java 最新稳定版 1.7.2 以及之前的版本都应用 Java NIO 进行网络编程;本文将别离具体介绍 java NIO 的原理和 Tars 应用 NIO 进行网络编程的细节。

二、Java NIO 原理介绍

从 1.4 版本开始,Java 提供了一种新的 IO 解决形式:NIO (New IO 或 Non-blocking IO) 是一个能够代替规范 Java IO 的 API,它是面向缓冲区而不是字节流,它是非阻塞的,反对 IO 多路复用。

2.1 Channels (通道) and Buffers (缓冲区)

规范的 IO 基于字节流进行操作的,而 NIO 是基于通道(Channel)和缓冲区(Buffer)进行操作。数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,下图是一个残缺流程。

Channel 类型:

  1. 反对文件读写数据的 FileChannel
  2. 能通过 UDP 读写网络中的数据的 DatagramChannel
  3. 能通过 TCP 读写网络数据的 SocketChannel
  4. 能够监听新进来的 TCP 连贯,对每一个新进来的连贯都会创立一个 SocketChannel 的 ServerSocketChannel。

SocketChannel:

  • 关上 SocketChannel:SocketChannel socketChannel = SocketChannel.open();
  • 敞开 SocketChannel:socketChannel.close();
  • 从 Channel 中读取的数据放到 Buffer:int bytesRead = inChannel.read(buf);
  • 将 Buffer 中的数据写到 Channel:int bytesWritten = inChannel.write(buf);

ServerSocketChannel:

通过 ServerSocketChannel.accept() 办法监听新进来的连贯,当 accept()办法返回的时候,它返回一个蕴含新进来的连贯的 SocketChannel,因而 accept()办法会始终阻塞到有新连贯达到。

通常不会仅仅只监听一个连贯, 在 while 循环中调用 accept()办法. 如上面的例子:

代码 1:

while(true){SocketChannel socketChannel = serverSocketChannel.accept();
     //do something with socketChannel...
}

ServerSocketChannel 能够设置成非阻塞模式。在非阻塞模式下,accept() 办法会立即返回,如果还没有新进来的连贯, 返回的将是 null。因而,须要查看返回的 SocketChannel 是否是 null。

代码 2:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
serverSocketChannel.configureBlocking(false);
while(true){SocketChannel socketChannel = serverSocketChannel.accept();
    if(socketChannel != null){//do something with socketChannel...}
}

Buffer 类型:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

Buffer 的调配:

ByteBuffer buf = ByteBuffer.allocate(2048);

Buffer 的读写:

个别是以下四个步骤:

  1. 写入数据到 Buffer,最大写入量是 capacity,写模式下 limit 值即为 capacity 值,position 即为写到的地位。
  2. 调用 flip()办法将 Buffer 从写模式切换到读模式,此时 position 挪动到开始地位 0,limit 挪动到 position 的地位。
  3. 从 Buffer 中读取数据,在读模式下能够读取之前写入到 buffer 的所有数据,即为 limit 地位。
  4. 调用 clear()办法或者 compact()办法。clear()办法将 position 设为 0,limit 被设置成 capacity 的值。compact()办法将所有未读的数据拷贝到 Buffer 起始处,而后将 position 设到最初一个未读元素前面。

mark() 与 reset()办法 通过调用 Buffer.mark() 办法,能够标记 Buffer 中的一个特定 position,之后能够通过调用 Buffer.reset()办法复原到这个 position。

duplicate() 此办法返回承载先前字节缓冲区内容的新字节缓冲区。

remaining() limit 减去 position 的值

2.2 Selector(选择器)

Java NIO 引入了选择器的概念,选择器用于监听多个通道的事件。单个的线程能够监听多个数据通道。要应用 Selector,得向 Selector 注册 Channel,而后调用它的 select()办法。这个办法会始终阻塞到某个注册的通道有事件就绪。一旦这个办法返回,线程就能够解决这些事件。

代码 3:

channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_READ);

留神 register()办法的第二个参数,这是一个监听的汇合,即在通过 Selector 监听 Channel 时关注什么事件汇合。

SelectionKey 蕴含:

1) interest 汇合:
selectionKey.interestOps()  能够监听四种不同类型的事件:OP\_ACCEPT、OP\_CONNECT、OP\_WRITE、OP\_READ

2) ready 汇合:
selectionKey.readyOps();  ready 汇合是通道曾经准备就绪的操作的汇合,提供 4 个不便的办法:

  • selectionKey.isAcceptable();
  • selectionKey.isConnectable();
  • selectionKey.isReadable();
  • selectionKey.isWritable();

3) Channel: selectionKey.channel();

4) Selector: selectionKey.selector();

5) 可选的附加对象:

selectionKey.attachment();  能够将一个对象或者更多信息附着到 SelectionKey 上,这样就能不便的辨认特定的通道。

提醒:

OP\_ACCEPT 和 OP\_CONNECT 的区别:简略来说,客户端建设连贯是 connect,服务器筹备接管连贯是 accept。一个典型的客户端服务器网络交互流程如下图

selectedKeys()  一旦调用了 select()办法,并且返回值表明有一个或更多个通道就绪了,而后能够通过调用 selector 的 selectedKeys()办法,拜访已选择键集(selected key set)中的就绪通道。

wakeUp() 某个线程调用 select()办法后阻塞了,即便没有通道曾经就绪,也有方法让其从 select()办法返回。只有让其它线程在阻塞线程调用 select()办法的对象上调用 Selector.wakeup()办法即可。阻塞在 select()办法上的线程会立马返回。如果有其它线程调用了 wakeup()办法,但以后没有线程阻塞在 select()办法上,下个调用 select()办法的线程会立刻 wake up。

close() 用完 Selector 后调用其 close()办法会敞开该 Selector,且使注册到该 Selector 上的所有 SelectionKey 实例有效。通道自身并不会敞开。

通过 Selector 抉择通道:

  • int select() 阻塞直到至多有一个通道在你注册的事件上就绪了
  • int select(long timeout) 减少最长阻塞毫秒数
  • int selectNow() 不会阻塞,不论什么通道就绪都立即返回

三、Tars NIO 网络编程

理解完 Java NIO 的原理,咱们来看看 Tars 是如何应用 NIO 进行网络编程的。

Tars 的网络模型是多 reactor 多线程模型。有一点非凡的是 tars 的 reactor 线程组里随机选一个线程解决网络事件,并且该线程同时也能解决读写。

外围类之间的关系如下:

3.1 一个典型的 Java NIO 服务端开发流程

  1. 创立 ServerSocketChannel, 设置为非阻塞,并绑定端口
  2. 创立 Selector 对象
  3. 给 ServerSocketChannel 注册 SelectionKey.OP_ACCEPT 事件
  4. 启动一个线程循环,调用 Selector 的 select 办法来查看 IO 就绪事件,一旦有 IO 就绪事件,就告诉用户线程去解决 IO 事件
  5. 如果有 Accept 事件,就创立一个 SocketChannel,并注册 SelectionKey.OP_READ
  6. 如果有读事件,判断一下是否全包,如果全包,就交给后端线程解决
  7. 写事件比拟非凡。isWriteable 示意的是本机的写缓冲区是否可写。这个在绝大多少状况下都是为真的。在 Netty 中只有写半包的时候才须要注册写事件,如果一次写就齐全把数据写入了缓冲区就不须要注册写事件。

3.2 Tars 客户端发动申请到服务器的流程

  1. Communicator.stringToProxy()  依据 servantName 等配置信息创立通信器。
  2. ServantProxyFactory.getServantProxy() 调用工厂办法创立 servant 代理。
  3. ObjectProxyFactory.getObjectProxy()  调用工厂办法创立 obj 代理。
  4. TarsProtocolInvoker.create() 创立协定调用者。
  5. ServantProtocolInvoker.initClient(Url url)  依据 servantProxyConfig 中的配置信息找到 servant 的 ip 端口等进行初始化 ServantClient。
  6. ClientPoolManager.getSelectorManager() 如果第一次调用 selectorManager 是空的就会去初始化 selectorManager。
  7. reactorSet = new Reactor[selectorPoolSize];     SelectorManager 初始化结构类中的会依据 selectorPoolSize(默认是 2)的配置创立 Reactor 线程数组。线程名称的前缀是 servant-proxy- 加上 CommunicatorId,CommunicatorId 生成规定是由 locator 的地址生成的 UUID。
  8. 启动 reactor 线程。

3.3 Tars 服务端启动步骤

  1. tars 反对 TCP 和 UDP 两种协定,RPC 场景下是应用 TCP 协定。
  2. new SelectorManager() 依据配置信息初始化 selectorManager,线程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;线程名称前缀是 server-tcp-reactor,而后启动 reactor 线程数组中的所有线程。
  3. 开启服务端监听的 ServerSocketChannel,绑定服务端本地 ip 和监听的端口号,设置 TCP 连贯申请队列的最大容量为 1024;设置非阻塞模式。
  4. 选取 reactor 线程数组中第 0 个线程作为服务端监听连贯 OP_ACCEPT 就绪事件的线程。

代码 4:

public void bind(AppService appService) throws IOException {
 
    // 此处略去非关键代码
 
    if (endpoint.type().equals("tcp")) {  // 1
        this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false);     // 2
        this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());
        this.selectorManager.start();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024);   // 3
        serverChannel.configureBlocking(false);
              selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT);  // 4
    } else if (endpoint.type().equals("udp")) {this.selectorManager = new SelectorManager(1, new ServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true);
        this.selectorManager.start();
        // UDP 开启的是 DatagramChannel
        DatagramChannel serverChannel = DatagramChannel.open();
        DatagramSocket socket = serverChannel.socket();
        socket.bind(new InetSocketAddress(endpoint.host(), endpoint.port()));
        serverChannel.configureBlocking(false);
        // UDP 协定不须要建连,监听的是 OP_READ 就绪事件
        this.selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_READ);
    }
}

3.4 Reactor 线程启动流程

  1. 多路复用器开始轮询查看 是否有就绪的事件。
  2. 解决 register 队列中残余的 channel 注册到以后 reactor 线程的多路复用器 selector 中。
  3. 获取已选键集中所有就绪的 channel。
  4. 更新 Session 中最近操作工夫,Tars 服务端启动时会调用 startSessionManager() , 单线程每 30s 扫描一次 session 会话列表,会查看每个 session 的 lastUpdateOperationTime 与以后工夫的时间差,如果超过 60 秒会将过期 session 对应的 channel 踢除。
  5. 散发 IO 事件进行解决。
  6. 解决 unregister 队列中残余的 channel, 从以后 reactor 线程的多路复用器 selector 中解除注册。

代码 5:

public void run() {while (!Thread.interrupted()) {selector.select();  // 1
            processRegister();  // 2
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();   //  3
            while (iter.hasNext()) {SelectionKey key = iter.next();
                iter.remove();
                if (!key.isValid()) continue;
                try {if (key.attachment() != null && key.attachment() instanceof Session) {((Session) key.attachment()).updateLastOperationTime(); //4}
                 dispatchEvent(key);    // 5
                } catch (Throwable ex) {disConnectWithException(key, ex);
                }
            }
            processUnRegister();  // 6}
}

3.5 IO 事件散发解决

每个 reactor 线程都有一个专门的 Accepter 类去解决各种 IO 事件。TCPAccepter 能够解决全副的四种事件(OP\_ACCEPT、OP\_CONNECT、OP\_WRITE、OP\_READ)、UDPAccepter 因为不须要建设连贯所以只须要解决读和写两种事件。

1. 解决 OP_ACCEPT

  1. 获取 channel,解决 TCP 申请。
  2. 为这个 TCP 申请创立 TCPSession,会话的状态是服务器已连贯
  3. 会话注册到 sessionManager 中,Tars 服务可配置最大连接数 maxconns,如果超过就会敞开以后会话。
  4. 寻找下一个 reactor 线程进行多路复用器与 channel 的绑定。

代码 6:

public void handleAcceptEvent(SelectionKey key) throws IOException {ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 1
    SocketChannel channel = server.accept();
       channel.socket().setTcpNoDelay(selectorManager.isTcpNoDelay());
    channel.configureBlocking(false);
    Utils.setQosFlag(channel.socket());
    TCPSession session = new TCPSession(selectorManager);    // 2
    session.setChannel(channel);
    session.setStatus(SessionStatus.SERVER_CONNECTED);
    session.setKeepAlive(selectorManager.isKeepAlive());
    session.setTcpNoDelay(selectorManager.isTcpNoDelay());
    SessionManager.getSessionManager().registerSession(session);   // 3
      selectorManager.nextReactor().registerChannel(channel, SelectionKey.OP_READ, session); // 4
}

2. 解决 OP_CONNECT

  1. 获取客户端连贯过去的 channel 通道
  2. 获取 Session
  3. 与服务器建设连贯,将关注的趣味 OPS 设置为 ready 就绪事件,session 中的状态批改为客户端已连贯
  4. 解决 OP_CONNECT

代码 7:

public void handleConnectEvent(SelectionKey key) throws IOException {SocketChannel client = (SocketChannel) key.channel();  // 1
    TCPSession session = (TCPSession) key.attachment();   //2
    if (session == null) throw new RuntimeException("The session is null when connecting to ...");
    try {  // 3
        client.finishConnect();
        key.interestOps(SelectionKey.OP_READ);
        session.setStatus(SessionStatus.CLIENT_CONNECTED);
    } finally {session.finishConnect();
    }
}

3. 解决 OP_WRITE、解决 OP_READ

代码 8:

public void handleReadEvent(SelectionKey key) throws IOException {TCPSession session = (TCPSession) key.attachment();
    if (session == null) throw new RuntimeException("The session is null when reading data...");
    session.read();}
public void handleWriteEvent(SelectionKey key) throws IOException {TCPSession session = (TCPSession) key.attachment();
    if (session == null) throw new RuntimeException("The session is null when writing data...");
    session.doWrite();}

3.6 seesion 中网络读写的事件具体处理过程

1. 读事件处理

申请 2k 的 ByteBuffer 空间,读取 channel 中的数据到 readBuffer 中。依据 sessionStatus 判断是客户端读响应还是服务器读申请,别离进行解决。

代码 9:

protected void read() throws IOException {int ret = readChannel();
    if (this.status == SessionStatus.CLIENT_CONNECTED) {readResponse();
    } else if (this.status == SessionStatus.SERVER_CONNECTED) {readRequest();
    } else {throw new IllegalStateException("The current session status is invalid. [status:" + this.status + "]");
    }
    if (ret < 0) {close();
        return;
    }
}
private int readChannel() throws IOException {
    int readBytes = 0, ret = 0;
    ByteBuffer data = ByteBuffer.allocate(1024 * 2);  // 1
    if (readBuffer == null) {readBuffer = IoBuffer.allocate(bufferSize);
    }
       // 2
    while ((ret = ((SocketChannel) channel).read(data)) > 0) {data.flip();  // 3
        readBytes += data.remaining();
        readBuffer.put(data.array(), data.position(), data.remaining());
        data.clear();}
    return ret < 0 ? ret : readBytes;
}

① 客户端读响应

从以后 readBuffer 中的内容复制到一个新的长期 buffer 中,并且切换到读模式,应用 TarsCodec 类解析出 buffer 内的协定字段到 response,WorkThread 线程告诉 Ticket 解决 response。如果 response 为空,则重置 tempBuffer 到 mark 的地位,从新解析协定。

代码 10:

public void readResponse() {
    Response response = null;
    IoBuffer tempBuffer = null;
        tempBuffer = readBuffer.duplicate().flip();
        while (true) {tempBuffer.mark();
            if (tempBuffer.remaining() > 0) {response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);
            } else {response = null;}
            if (response != null) {if (response.getTicketNumber() == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession().hashCode());
                selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));
            } else {tempBuffer.reset();
                readBuffer = resetIoBuffer(tempBuffer);
                break;
            }
        }
}

② 服务器读申请

工作放入线程池交给 WorkThread 线程,最终交给 Processor 类出构建申请的响应体,包含分布式上下文,而后通过 FilterChain 的解决,最终通过 jdk 提供的反射办法 invoke 服务端本地的办法而后返回 response。如果线程池抛出回绝异样,则返回 SERVEROVERLOAD = -9,服务端过载爱护。如果 request 为空,则重置 tempBuffer 到 mark 的地位,从新解析协定。

代码 11:

public void readRequest() {
    Request request = null;
    IoBuffer tempBuffer = readBuffer.duplicate().flip();
        while (true) {tempBuffer.mark();
            if (tempBuffer.remaining() > 0) {request = selectorManager.getProtocolFactory().getDecoder().decodeRequest(tempBuffer, this);
            } else {request = null;}
            if (request != null) {
                try {request.resetBornTime();
                    selectorManager.getThreadPool().execute(new WorkThread(request, selectorManager));
                } catch (RejectedExecutionException e) {selectorManager.getProcessor().overload(request, request.getIoSession());
                } catch (Exception ex) {ex.printStackTrace();
                }
            } else {tempBuffer.reset();
                readBuffer = resetIoBuffer(tempBuffer);
                break;
            }
        }
}

2. 写事件处理

同样也包含客户端写申请和服务端写响应两种,其实这两种都是往 TCPSession 中的 LinkedBlockingQueue(有界队列最大 8K)中插入 ByteBuffer。LinkedBlockingQueue 中的 ByteBuffer 最终会由 TCPAcceptor 中的 handleWriteEvent 监听写就绪事件并生产。

代码 12:

protected void write(IoBuffer buffer) throws IOException {if (buffer == null) return;
    if (channel == null || key == null) throw new IOException("Connection is closed");
    if (!this.queue.offer(buffer.buf())) {throw new IOException("The session queue is full. [ queue size:" + queue.size() + "]");
    }
    if (key != null) {key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
        key.selector().wakeup();
    }
}

四、总结

本文次要介绍了 Java NIO 编程的基础知识 和 Tars-Java 1.7.2 版本的网络编程模块的源码实现。

在最新的 Tars-Java 的 master 分支中咱们能够发现网络编程曾经由 NIO 改成了 Netty,尽管 Netty 更加成熟稳固,然而作为学习者理解 NIO 的原理也是把握网络编程的必经之路。

更多对于 Tars 框架的介绍能够拜访:

https://tarscloud.org/

本文剖析源码地址(v1.7.x 分支):

https://github.com/TarsCloud/TarsJava

正文完
 0