作者:vivo 互联网服务器团队- Jin Kai

本文从Java NIO网络编程的基础知识讲到了Tars框架应用NIO进行网络编程的源码剖析。

一、Tars框架根本介绍

Tars是腾讯开源的反对多语言的高性能RPC框架,起源于腾讯外部2008年至今始终应用的对立利用框架TAF(Total Application Framework),目前反对C++、Java、PHP、Nodejs、Go语言。

该框架为用户提供了波及到开发、运维、以及测试的一整套解决方案,帮忙一个产品或者服务疾速开发、部署、测试、上线。它集可扩大协定编解码、高性能RPC通信框架、名字路由与发现、公布监控、日志统计、配置管理等于一体,通过它能够疾速用微服务的形式构建本人的稳固牢靠的分布式应用,并实现残缺无效的服务治理。

官网仓库地址:

https://github.com/TarsCloud/Tars

vivo推送平台也深度应用了该框架,部署服务节点超过一千个,通过线上每日一百多亿音讯推送量的考验。

此前已在vivo互联网技术公众号公布过《Tars Java 客户端源码剖析》,此篇文章为续集。

Tars-java 最新稳定版1.7.2以及之前的版本都应用Java NIO进行网络编程;本文将别离具体介绍java NIO的原理和Tars 应用NIO进行网络编程的细节。

二、Java NIO原理介绍

从1.4版本开始,Java提供了一种新的IO解决形式:NIO (New IO 或 Non-blocking IO) 是一个能够代替规范Java IO 的API,它是面向缓冲区而不是字节流,它是非阻塞的,反对IO多路复用。

2.1 Channels (通道) and Buffers (缓冲区)

规范的IO基于字节流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作。数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,下图是一个残缺流程。

Channel类型:

  1. 反对文件读写数据的FileChannel
  2. 能通过UDP读写网络中的数据的DatagramChannel
  3. 能通过TCP读写网络数据的SocketChannel
  4. 能够监听新进来的TCP连贯,对每一个新进来的连贯都会创立一个SocketChannel的ServerSocketChannel 。

SocketChannel:

  • 关上 SocketChannel:SocketChannel socketChannel = SocketChannel.open();
  • 敞开 SocketChannel:socketChannel.close();
  • 从Channel中读取的数据放到Buffer: int bytesRead = inChannel.read(buf);
  • 将Buffer中的数据写到Channel: int bytesWritten = inChannel.write(buf);

ServerSocketChannel:

通过 ServerSocketChannel.accept() 办法监听新进来的连贯,当accept()办法返回的时候,它返回一个蕴含新进来的连贯的SocketChannel,因而accept()办法会始终阻塞到有新连贯达到。

通常不会仅仅只监听一个连贯,在while循环中调用 accept()办法. 如上面的例子:

代码1:
while(true){    SocketChannel socketChannel = serverSocketChannel.accept();     //do something with socketChannel...}

ServerSocketChannel能够设置成非阻塞模式。在非阻塞模式下,accept() 办法会立即返回,如果还没有新进来的连贯,返回的将是null。因而,须要查看返回的SocketChannel是否是null。

代码2:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(8888));serverSocketChannel.configureBlocking(false);while(true){    SocketChannel socketChannel = serverSocketChannel.accept();    if(socketChannel != null){        //do something with socketChannel...    }}

Buffer类型:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

Buffer的调配:

ByteBuffer buf = ByteBuffer.allocate(2048);

Buffer的读写:

个别是以下四个步骤:

  1. 写入数据到Buffer,最大写入量是capacity,写模式下limit值即为capacity值,position即为写到的地位。
  2. 调用flip()办法将Buffer从写模式切换到读模式,此时position挪动到开始地位0,limit挪动到position的地位。
  3. 从Buffer中读取数据,在读模式下能够读取之前写入到buffer的所有数据,即为limit地位。
  4. 调用clear()办法或者compact()办法。clear()办法将position设为0,limit被设置成capacity的值。compact()办法将所有未读的数据拷贝到Buffer起始处,而后将position设到最初一个未读元素前面。

mark() 与 reset()办法 通过调用Buffer.mark()办法,能够标记Buffer中的一个特定position,之后能够通过调用Buffer.reset()办法复原到这个position。

duplicate() 此办法返回承载先前字节缓冲区内容的新字节缓冲区。

remaining() limit 减去 position的值

2.2 Selector(选择器)

Java NIO引入了选择器的概念,选择器用于监听多个通道的事件。单个的线程能够监听多个数据通道。要应用Selector,得向Selector注册Channel,而后调用它的select()办法。这个办法会始终阻塞到某个注册的通道有事件就绪。一旦这个办法返回,线程就能够解决这些事件。

代码3:
channel.configureBlocking(false);SelectionKey key = channel.register(selector,Selectionkey.OP_READ);

留神register()办法的第二个参数,这是一个监听的汇合,即在通过Selector监听Channel时关注什么事件汇合。

SelectionKey蕴含:

1) interest汇合:
selectionKey.interestOps()  能够监听四种不同类型的事件:OP\_ACCEPT、OP\_CONNECT、OP\_WRITE、OP\_READ

2) ready汇合:
selectionKey.readyOps();  ready 汇合是通道曾经准备就绪的操作的汇合,提供4个不便的办法:

  • selectionKey.isAcceptable();
  • selectionKey.isConnectable();
  • selectionKey.isReadable();
  • selectionKey.isWritable();

3) Channel: selectionKey.channel();

4) Selector: selectionKey.selector();

5) 可选的附加对象:

selectionKey.attachment();  能够将一个对象或者更多信息附着到SelectionKey上,这样就能不便的辨认特定的通道。

提醒:

OP\_ACCEPT和OP\_CONNECT的区别:简略来说,客户端建设连贯是connect,服务器筹备接管连贯是accept。一个典型的客户端服务器网络交互流程如下图

selectedKeys()  一旦调用了select()办法,并且返回值表明有一个或更多个通道就绪了,而后能够通过调用selector的selectedKeys()办法,拜访已选择键集(selected key set)中的就绪通道。

wakeUp() 某个线程调用select()办法后阻塞了,即便没有通道曾经就绪,也有方法让其从select()办法返回。只有让其它线程在阻塞线程调用select()办法的对象上调用Selector.wakeup()办法即可。阻塞在select()办法上的线程会立马返回。如果有其它线程调用了wakeup()办法,但以后没有线程阻塞在select()办法上,下个调用select()办法的线程会立刻wake up。

close() 用完Selector后调用其close()办法会敞开该Selector,且使注册到该Selector上的所有SelectionKey实例有效。通道自身并不会敞开。

通过Selector抉择通道:

  • int select() 阻塞直到至多有一个通道在你注册的事件上就绪了
  • int select(long timeout) 减少最长阻塞毫秒数
  • int selectNow() 不会阻塞,不论什么通道就绪都立即返回

三、 Tars NIO网络编程

理解完 Java NIO的原理,咱们来看看Tars是如何应用NIO进行网络编程的。

Tars的网络模型是多reactor多线程模型。有一点非凡的是tars的reactor线程组里随机选一个线程解决网络事件,并且该线程同时也能解决读写。

外围类之间的关系如下:

3.1 一个典型的Java NIO服务端开发流程

  1. 创立ServerSocketChannel,设置为非阻塞,并绑定端口
  2. 创立Selector对象
  3. 给ServerSocketChannel注册SelectionKey.OP_ACCEPT事件
  4. 启动一个线程循环,调用Selector的select办法来查看IO就绪事件,一旦有IO就绪事件,就告诉用户线程去解决IO事件
  5. 如果有Accept事件,就创立一个SocketChannel,并注册SelectionKey.OP_READ
  6. 如果有读事件,判断一下是否全包,如果全包,就交给后端线程解决
  7. 写事件比拟非凡。isWriteable示意的是本机的写缓冲区是否可写。这个在绝大多少状况下都是为真的。在Netty中只有写半包的时候才须要注册写事件,如果一次写就齐全把数据写入了缓冲区就不须要注册写事件。

3.2 Tars客户端发动申请到服务器的流程

  1. Communicator.stringToProxy()  依据servantName等配置信息创立通信器。
  2. ServantProxyFactory.getServantProxy() 调用工厂办法创立servant代理。
  3. ObjectProxyFactory.getObjectProxy()  调用工厂办法创立obj代理。
  4. TarsProtocolInvoker.create() 创立协定调用者。
  5. ServantProtocolInvoker.initClient(Url url)  依据servantProxyConfig中的配置信息找到servant的ip端口等进行初始化ServantClient。
  6. ClientPoolManager.getSelectorManager() 如果第一次调用selectorManager是空的就会去初始化selectorManager。
  7. reactorSet = new Reactor[selectorPoolSize];     SelectorManager初始化结构类中的会依据selectorPoolSize(默认是2)的配置创立Reactor线程数组。线程名称的前缀是servant-proxy-加上CommunicatorId,CommunicatorId生成规定是由locator的地址生成的UUID。
  8. 启动reactor线程。

3.3 Tars服务端启动步骤

  1. tars反对TCP和UDP两种协定,RPC场景下是应用TCP协定。
  2. new SelectorManager() 依据配置信息初始化selectorManager,线程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;线程名称前缀是server-tcp-reactor,而后启动reactor线程数组中的所有线程。
  3. 开启服务端监听的ServerSocketChannel,绑定服务端本地ip和监听的端口号,设置TCP连贯申请队列的最大容量为1024;设置非阻塞模式。
  4. 选取reactor线程数组中第0个线程作为服务端监听连贯OP_ACCEPT就绪事件的线程。
代码4:
public void bind(AppService appService) throws IOException {     // 此处略去非关键代码     if (endpoint.type().equals("tcp")) {  // 1        this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false);     // 2        this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());        this.selectorManager.start();        ServerSocketChannel serverChannel = ServerSocketChannel.open();        serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024);   // 3        serverChannel.configureBlocking(false);              selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT);  // 4    } else if (endpoint.type().equals("udp")) {        this.selectorManager = new SelectorManager(1, new ServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true);        this.selectorManager.start();        // UDP开启的是DatagramChannel        DatagramChannel serverChannel = DatagramChannel.open();        DatagramSocket socket = serverChannel.socket();        socket.bind(new InetSocketAddress(endpoint.host(), endpoint.port()));        serverChannel.configureBlocking(false);        // UDP协定不须要建连,监听的是OP_READ就绪事件        this.selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_READ);    }}

3.4 Reactor线程启动流程

  1. 多路复用器开始轮询查看 是否有就绪的事件。
  2. 解决register队列中残余的channel注册到以后reactor线程的多路复用器selector中。
  3. 获取已选键集中所有就绪的channel。
  4. 更新Session中最近操作工夫,Tars服务端启动时会调用 startSessionManager() , 单线程每30s扫描一次session会话列表,会查看每个session的 lastUpdateOperationTime 与以后工夫的时间差,如果超过60秒会将过期session对应的channel踢除。
  5. 散发IO事件进行解决。
  6. 解决unregister队列中残余的channel,从以后reactor线程的多路复用器selector中解除注册。
代码5:
public void run() {        while (!Thread.interrupted()) {            selector.select();  // 1            processRegister();  // 2            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();   //  3            while (iter.hasNext()) {                SelectionKey key = iter.next();                iter.remove();                if (!key.isValid()) continue;                try {                    if (key.attachment() != null && key.attachment() instanceof Session) {                      ((Session) key.attachment()).updateLastOperationTime(); //4                    }                 dispatchEvent(key);    // 5                } catch (Throwable ex) {                 disConnectWithException(key, ex);                }            }            processUnRegister();  // 6        }}

3.5 IO事件散发解决

每个reactor线程都有一个专门的Accepter类去解决各种IO事件。TCPAccepter能够解决全副的四种事件(OP\_ACCEPT、OP\_CONNECT、OP\_WRITE、OP\_READ)、UDPAccepter因为不须要建设连贯所以只须要解决读和写两种事件。

1. 解决OP_ACCEPT

  1. 获取channel,解决TCP申请。
  2. 为这个TCP申请创立TCPSession,会话的状态是服务器已连贯
  3. 会话注册到sessionManager中,Tars服务可配置最大连接数maxconns,如果超过就会敞开以后会话。
  4. 寻找下一个reactor线程进行多路复用器与channel的绑定。
代码6:
public void handleAcceptEvent(SelectionKey key) throws IOException {    ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 1    SocketChannel channel = server.accept();       channel.socket().setTcpNoDelay(selectorManager.isTcpNoDelay());    channel.configureBlocking(false);    Utils.setQosFlag(channel.socket());    TCPSession session = new TCPSession(selectorManager);    // 2    session.setChannel(channel);    session.setStatus(SessionStatus.SERVER_CONNECTED);    session.setKeepAlive(selectorManager.isKeepAlive());    session.setTcpNoDelay(selectorManager.isTcpNoDelay());    SessionManager.getSessionManager().registerSession(session);   // 3      selectorManager.nextReactor().registerChannel(channel, SelectionKey.OP_READ, session); // 4}

2. 解决OP_CONNECT

  1. 获取客户端连贯过去的channel通道
  2. 获取Session
  3. 与服务器建设连贯,将关注的趣味OPS设置为ready就绪事件,session中的状态批改为客户端已连贯
  4. 解决OP_CONNECT
代码7:
public void handleConnectEvent(SelectionKey key) throws IOException {    SocketChannel client = (SocketChannel) key.channel();  // 1    TCPSession session = (TCPSession) key.attachment();   //2    if (session == null) throw new RuntimeException("The session is null when connecting to ...");    try {  // 3        client.finishConnect();        key.interestOps(SelectionKey.OP_READ);        session.setStatus(SessionStatus.CLIENT_CONNECTED);    } finally {        session.finishConnect();    }}

3.解决OP_WRITE、 解决OP_READ

代码8:
public void handleReadEvent(SelectionKey key) throws IOException {    TCPSession session = (TCPSession) key.attachment();    if (session == null) throw new RuntimeException("The session is null when reading data...");    session.read();}public void handleWriteEvent(SelectionKey key) throws IOException {    TCPSession session = (TCPSession) key.attachment();    if (session == null) throw new RuntimeException("The session is null when writing data...");    session.doWrite();}

3.6 seesion中网络读写的事件具体处理过程

1. 读事件处理

申请2k的ByteBuffer空间,读取channel中的数据到readBuffer中。依据sessionStatus判断是客户端读响应还是服务器读申请,别离进行解决。

代码9:
protected void read() throws IOException {    int ret = readChannel();    if (this.status == SessionStatus.CLIENT_CONNECTED) {        readResponse();    } else if (this.status == SessionStatus.SERVER_CONNECTED) {        readRequest();    } else {        throw new IllegalStateException("The current session status is invalid. [status:" + this.status + "]");    }    if (ret < 0) {        close();        return;    }}private int readChannel() throws IOException {    int readBytes = 0, ret = 0;    ByteBuffer data = ByteBuffer.allocate(1024 * 2);  // 1    if (readBuffer == null) {        readBuffer = IoBuffer.allocate(bufferSize);    }       // 2    while ((ret = ((SocketChannel) channel).read(data)) > 0) {        data.flip();  // 3        readBytes += data.remaining();        readBuffer.put(data.array(), data.position(), data.remaining());        data.clear();    }    return ret < 0 ? ret : readBytes;}

① 客户端读响应

从以后readBuffer中的内容复制到一个新的长期buffer中,并且切换到读模式,应用TarsCodec类解析出buffer内的协定字段到response,WorkThread线程告诉Ticket解决response。如果response为空,则重置tempBuffer到mark的地位,从新解析协定。

代码10:
public void readResponse() {    Response response = null;    IoBuffer tempBuffer = null;        tempBuffer = readBuffer.duplicate().flip();        while (true) {            tempBuffer.mark();            if (tempBuffer.remaining() > 0) {                response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);            } else {                response = null;            }            if (response != null) {                if (response.getTicketNumber() == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession().hashCode());                selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));            } else {                tempBuffer.reset();                readBuffer = resetIoBuffer(tempBuffer);                break;            }        }}

② 服务器读申请

工作放入线程池交给 WorkThread线程,最终交给Processor类出构建申请的响应体,包含分布式上下文,而后通过FilterChain的解决,最终通过jdk提供的反射办法invoke服务端本地的办法而后返回response。如果线程池抛出回绝异样,则返回SERVEROVERLOAD = -9,服务端过载爱护。如果request为空,则重置tempBuffer到mark的地位,从新解析协定。

代码11:
public void readRequest() {    Request request = null;    IoBuffer tempBuffer = readBuffer.duplicate().flip();        while (true) {            tempBuffer.mark();            if (tempBuffer.remaining() > 0) {                request = selectorManager.getProtocolFactory().getDecoder().decodeRequest(tempBuffer, this);            } else {                request = null;            }            if (request != null) {                try {                    request.resetBornTime();                    selectorManager.getThreadPool().execute(new WorkThread(request, selectorManager));                } catch (RejectedExecutionException e) {                  selectorManager.getProcessor().overload(request, request.getIoSession());                } catch (Exception ex) {                  ex.printStackTrace();                }            } else {                    tempBuffer.reset();                readBuffer = resetIoBuffer(tempBuffer);                break;            }        }}

2. 写事件处理

同样也包含客户端写申请和服务端写响应两种,其实这两种都是往TCPSession中的LinkedBlockingQueue(有界队列最大8K)中插入ByteBuffer。LinkedBlockingQueue中的ByteBuffer最终会由TCPAcceptor中的handleWriteEvent监听写就绪事件并生产。

代码12:
protected void write(IoBuffer buffer) throws IOException {    if (buffer == null) return;    if (channel == null || key == null) throw new IOException("Connection is closed");    if (!this.queue.offer(buffer.buf())) {        throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");    }    if (key != null) {        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);        key.selector().wakeup();    }}

四、总结

本文次要介绍了Java NIO编程的基础知识 和 Tars-Java 1.7.2版本的网络编程模块的源码实现。

在最新的Tars-Java的master分支中咱们能够发现网络编程曾经由NIO改成了Netty,尽管Netty更加成熟稳固,然而作为学习者理解NIO的原理也是把握网络编程的必经之路。

更多对于Tars框架的介绍能够拜访:

https://tarscloud.org/

本文剖析源码地址(v1.7.x分支):

https://github.com/TarsCloud/TarsJava