共计 12307 个字符,预计需要花费 31 分钟才能阅读完成。
作者:vivo 互联网服务器团队 - Jin Kai
本文从 Java NIO 网络编程的基础知识讲到了 Tars 框架应用 NIO 进行网络编程的源码剖析。
一、Tars 框架根本介绍
Tars 是腾讯开源的反对多语言的高性能 RPC 框架,起源于腾讯外部 2008 年至今始终应用的对立利用框架 TAF(Total Application Framework),目前反对 C ++、Java、PHP、Nodejs、Go 语言。
该框架为用户提供了波及到开发、运维、以及测试的一整套解决方案,帮忙一个产品或者服务疾速开发、部署、测试、上线。它集可扩大协定编解码、高性能 RPC 通信框架、名字路由与发现、公布监控、日志统计、配置管理等于一体,通过它能够疾速用微服务的形式构建本人的稳固牢靠的分布式应用,并实现残缺无效的服务治理。
官网仓库地址:
https://github.com/TarsCloud/Tars
vivo 推送平台也深度应用了该框架,部署服务节点超过一千个,通过线上每日一百多亿音讯推送量的考验。
此前已在 vivo 互联网技术公众号公布过《Tars Java 客户端源码剖析》,此篇文章为续集。
Tars-java 最新稳定版 1.7.2 以及之前的版本都应用 Java NIO 进行网络编程;本文将别离具体介绍 java NIO 的原理和 Tars 应用 NIO 进行网络编程的细节。
二、Java NIO 原理介绍
从 1.4 版本开始,Java 提供了一种新的 IO 解决形式:NIO (New IO 或 Non-blocking IO) 是一个能够代替规范 Java IO 的 API,它是面向缓冲区而不是字节流,它是非阻塞的,反对 IO 多路复用。
2.1 Channels (通道) and Buffers (缓冲区)
规范的 IO 基于字节流进行操作的,而 NIO 是基于通道(Channel)和缓冲区(Buffer)进行操作。数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,下图是一个残缺流程。
Channel 类型:
- 反对文件读写数据的 FileChannel
- 能通过 UDP 读写网络中的数据的 DatagramChannel
- 能通过 TCP 读写网络数据的 SocketChannel
- 能够监听新进来的 TCP 连贯,对每一个新进来的连贯都会创立一个 SocketChannel 的 ServerSocketChannel。
SocketChannel:
- 关上 SocketChannel:SocketChannel socketChannel = SocketChannel.open();
- 敞开 SocketChannel:socketChannel.close();
- 从 Channel 中读取的数据放到 Buffer:int bytesRead = inChannel.read(buf);
- 将 Buffer 中的数据写到 Channel:int bytesWritten = inChannel.write(buf);
ServerSocketChannel:
通过 ServerSocketChannel.accept() 办法监听新进来的连贯,当 accept()办法返回的时候,它返回一个蕴含新进来的连贯的 SocketChannel,因而 accept()办法会始终阻塞到有新连贯达到。
通常不会仅仅只监听一个连贯, 在 while 循环中调用 accept()办法. 如上面的例子:
代码 1:
while(true){SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}
ServerSocketChannel 能够设置成非阻塞模式。在非阻塞模式下,accept() 办法会立即返回,如果还没有新进来的连贯, 返回的将是 null。因而,须要查看返回的 SocketChannel 是否是 null。
代码 2:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
serverSocketChannel.configureBlocking(false);
while(true){SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel != null){//do something with socketChannel...}
}
Buffer 类型:
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
Buffer 的调配:
ByteBuffer buf = ByteBuffer.allocate(2048);
Buffer 的读写:
个别是以下四个步骤:
- 写入数据到 Buffer,最大写入量是 capacity,写模式下 limit 值即为 capacity 值,position 即为写到的地位。
- 调用 flip()办法将 Buffer 从写模式切换到读模式,此时 position 挪动到开始地位 0,limit 挪动到 position 的地位。
- 从 Buffer 中读取数据,在读模式下能够读取之前写入到 buffer 的所有数据,即为 limit 地位。
- 调用 clear()办法或者 compact()办法。clear()办法将 position 设为 0,limit 被设置成 capacity 的值。compact()办法将所有未读的数据拷贝到 Buffer 起始处,而后将 position 设到最初一个未读元素前面。
mark() 与 reset()办法 通过调用 Buffer.mark() 办法,能够标记 Buffer 中的一个特定 position,之后能够通过调用 Buffer.reset()办法复原到这个 position。
duplicate() 此办法返回承载先前字节缓冲区内容的新字节缓冲区。
remaining() limit 减去 position 的值
2.2 Selector(选择器)
Java NIO 引入了选择器的概念,选择器用于监听多个通道的事件。单个的线程能够监听多个数据通道。要应用 Selector,得向 Selector 注册 Channel,而后调用它的 select()办法。这个办法会始终阻塞到某个注册的通道有事件就绪。一旦这个办法返回,线程就能够解决这些事件。
代码 3:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_READ);
留神 register()办法的第二个参数,这是一个监听的汇合,即在通过 Selector 监听 Channel 时关注什么事件汇合。
SelectionKey 蕴含:
1) interest 汇合:
selectionKey.interestOps() 能够监听四种不同类型的事件:OP\_ACCEPT、OP\_CONNECT、OP\_WRITE、OP\_READ
2) ready 汇合:
selectionKey.readyOps(); ready 汇合是通道曾经准备就绪的操作的汇合,提供 4 个不便的办法:
- selectionKey.isAcceptable();
- selectionKey.isConnectable();
- selectionKey.isReadable();
- selectionKey.isWritable();
3) Channel: selectionKey.channel();
4) Selector: selectionKey.selector();
5) 可选的附加对象:
selectionKey.attachment(); 能够将一个对象或者更多信息附着到 SelectionKey 上,这样就能不便的辨认特定的通道。
提醒:
OP\_ACCEPT 和 OP\_CONNECT 的区别:简略来说,客户端建设连贯是 connect,服务器筹备接管连贯是 accept。一个典型的客户端服务器网络交互流程如下图
selectedKeys() 一旦调用了 select()办法,并且返回值表明有一个或更多个通道就绪了,而后能够通过调用 selector 的 selectedKeys()办法,拜访已选择键集(selected key set)中的就绪通道。
wakeUp() 某个线程调用 select()办法后阻塞了,即便没有通道曾经就绪,也有方法让其从 select()办法返回。只有让其它线程在阻塞线程调用 select()办法的对象上调用 Selector.wakeup()办法即可。阻塞在 select()办法上的线程会立马返回。如果有其它线程调用了 wakeup()办法,但以后没有线程阻塞在 select()办法上,下个调用 select()办法的线程会立刻 wake up。
close() 用完 Selector 后调用其 close()办法会敞开该 Selector,且使注册到该 Selector 上的所有 SelectionKey 实例有效。通道自身并不会敞开。
通过 Selector 抉择通道:
- int select() 阻塞直到至多有一个通道在你注册的事件上就绪了
- int select(long timeout) 减少最长阻塞毫秒数
- int selectNow() 不会阻塞,不论什么通道就绪都立即返回
三、Tars NIO 网络编程
理解完 Java NIO 的原理,咱们来看看 Tars 是如何应用 NIO 进行网络编程的。
Tars 的网络模型是多 reactor 多线程模型。有一点非凡的是 tars 的 reactor 线程组里随机选一个线程解决网络事件,并且该线程同时也能解决读写。
外围类之间的关系如下:
3.1 一个典型的 Java NIO 服务端开发流程
- 创立 ServerSocketChannel, 设置为非阻塞,并绑定端口
- 创立 Selector 对象
- 给 ServerSocketChannel 注册 SelectionKey.OP_ACCEPT 事件
- 启动一个线程循环,调用 Selector 的 select 办法来查看 IO 就绪事件,一旦有 IO 就绪事件,就告诉用户线程去解决 IO 事件
- 如果有 Accept 事件,就创立一个 SocketChannel,并注册 SelectionKey.OP_READ
- 如果有读事件,判断一下是否全包,如果全包,就交给后端线程解决
- 写事件比拟非凡。isWriteable 示意的是本机的写缓冲区是否可写。这个在绝大多少状况下都是为真的。在 Netty 中只有写半包的时候才须要注册写事件,如果一次写就齐全把数据写入了缓冲区就不须要注册写事件。
3.2 Tars 客户端发动申请到服务器的流程
- Communicator.stringToProxy() 依据 servantName 等配置信息创立通信器。
- ServantProxyFactory.getServantProxy() 调用工厂办法创立 servant 代理。
- ObjectProxyFactory.getObjectProxy() 调用工厂办法创立 obj 代理。
- TarsProtocolInvoker.create() 创立协定调用者。
- ServantProtocolInvoker.initClient(Url url) 依据 servantProxyConfig 中的配置信息找到 servant 的 ip 端口等进行初始化 ServantClient。
- ClientPoolManager.getSelectorManager() 如果第一次调用 selectorManager 是空的就会去初始化 selectorManager。
- reactorSet = new Reactor[selectorPoolSize]; SelectorManager 初始化结构类中的会依据 selectorPoolSize(默认是 2)的配置创立 Reactor 线程数组。线程名称的前缀是 servant-proxy- 加上 CommunicatorId,CommunicatorId 生成规定是由 locator 的地址生成的 UUID。
- 启动 reactor 线程。
3.3 Tars 服务端启动步骤
- tars 反对 TCP 和 UDP 两种协定,RPC 场景下是应用 TCP 协定。
- new SelectorManager() 依据配置信息初始化 selectorManager,线程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;线程名称前缀是 server-tcp-reactor,而后启动 reactor 线程数组中的所有线程。
- 开启服务端监听的 ServerSocketChannel,绑定服务端本地 ip 和监听的端口号,设置 TCP 连贯申请队列的最大容量为 1024;设置非阻塞模式。
- 选取 reactor 线程数组中第 0 个线程作为服务端监听连贯 OP_ACCEPT 就绪事件的线程。
代码 4:
public void bind(AppService appService) throws IOException {
// 此处略去非关键代码
if (endpoint.type().equals("tcp")) { // 1
this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false); // 2
this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());
this.selectorManager.start();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024); // 3
serverChannel.configureBlocking(false);
selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT); // 4
} else if (endpoint.type().equals("udp")) {this.selectorManager = new SelectorManager(1, new ServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true);
this.selectorManager.start();
// UDP 开启的是 DatagramChannel
DatagramChannel serverChannel = DatagramChannel.open();
DatagramSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress(endpoint.host(), endpoint.port()));
serverChannel.configureBlocking(false);
// UDP 协定不须要建连,监听的是 OP_READ 就绪事件
this.selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_READ);
}
}
3.4 Reactor 线程启动流程
- 多路复用器开始轮询查看 是否有就绪的事件。
- 解决 register 队列中残余的 channel 注册到以后 reactor 线程的多路复用器 selector 中。
- 获取已选键集中所有就绪的 channel。
- 更新 Session 中最近操作工夫,Tars 服务端启动时会调用 startSessionManager() , 单线程每 30s 扫描一次 session 会话列表,会查看每个 session 的 lastUpdateOperationTime 与以后工夫的时间差,如果超过 60 秒会将过期 session 对应的 channel 踢除。
- 散发 IO 事件进行解决。
- 解决 unregister 队列中残余的 channel, 从以后 reactor 线程的多路复用器 selector 中解除注册。
代码 5:
public void run() {while (!Thread.interrupted()) {selector.select(); // 1
processRegister(); // 2
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // 3
while (iter.hasNext()) {SelectionKey key = iter.next();
iter.remove();
if (!key.isValid()) continue;
try {if (key.attachment() != null && key.attachment() instanceof Session) {((Session) key.attachment()).updateLastOperationTime(); //4}
dispatchEvent(key); // 5
} catch (Throwable ex) {disConnectWithException(key, ex);
}
}
processUnRegister(); // 6}
}
3.5 IO 事件散发解决
每个 reactor 线程都有一个专门的 Accepter 类去解决各种 IO 事件。TCPAccepter 能够解决全副的四种事件(OP\_ACCEPT、OP\_CONNECT、OP\_WRITE、OP\_READ)、UDPAccepter 因为不须要建设连贯所以只须要解决读和写两种事件。
1. 解决 OP_ACCEPT
- 获取 channel,解决 TCP 申请。
- 为这个 TCP 申请创立 TCPSession,会话的状态是服务器已连贯
- 会话注册到 sessionManager 中,Tars 服务可配置最大连接数 maxconns,如果超过就会敞开以后会话。
- 寻找下一个 reactor 线程进行多路复用器与 channel 的绑定。
代码 6:
public void handleAcceptEvent(SelectionKey key) throws IOException {ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 1
SocketChannel channel = server.accept();
channel.socket().setTcpNoDelay(selectorManager.isTcpNoDelay());
channel.configureBlocking(false);
Utils.setQosFlag(channel.socket());
TCPSession session = new TCPSession(selectorManager); // 2
session.setChannel(channel);
session.setStatus(SessionStatus.SERVER_CONNECTED);
session.setKeepAlive(selectorManager.isKeepAlive());
session.setTcpNoDelay(selectorManager.isTcpNoDelay());
SessionManager.getSessionManager().registerSession(session); // 3
selectorManager.nextReactor().registerChannel(channel, SelectionKey.OP_READ, session); // 4
}
2. 解决 OP_CONNECT
- 获取客户端连贯过去的 channel 通道
- 获取 Session
- 与服务器建设连贯,将关注的趣味 OPS 设置为 ready 就绪事件,session 中的状态批改为客户端已连贯
- 解决 OP_CONNECT
代码 7:
public void handleConnectEvent(SelectionKey key) throws IOException {SocketChannel client = (SocketChannel) key.channel(); // 1
TCPSession session = (TCPSession) key.attachment(); //2
if (session == null) throw new RuntimeException("The session is null when connecting to ...");
try { // 3
client.finishConnect();
key.interestOps(SelectionKey.OP_READ);
session.setStatus(SessionStatus.CLIENT_CONNECTED);
} finally {session.finishConnect();
}
}
3. 解决 OP_WRITE、解决 OP_READ
代码 8:
public void handleReadEvent(SelectionKey key) throws IOException {TCPSession session = (TCPSession) key.attachment();
if (session == null) throw new RuntimeException("The session is null when reading data...");
session.read();}
public void handleWriteEvent(SelectionKey key) throws IOException {TCPSession session = (TCPSession) key.attachment();
if (session == null) throw new RuntimeException("The session is null when writing data...");
session.doWrite();}
3.6 seesion 中网络读写的事件具体处理过程
1. 读事件处理
申请 2k 的 ByteBuffer 空间,读取 channel 中的数据到 readBuffer 中。依据 sessionStatus 判断是客户端读响应还是服务器读申请,别离进行解决。
代码 9:
protected void read() throws IOException {int ret = readChannel();
if (this.status == SessionStatus.CLIENT_CONNECTED) {readResponse();
} else if (this.status == SessionStatus.SERVER_CONNECTED) {readRequest();
} else {throw new IllegalStateException("The current session status is invalid. [status:" + this.status + "]");
}
if (ret < 0) {close();
return;
}
}
private int readChannel() throws IOException {
int readBytes = 0, ret = 0;
ByteBuffer data = ByteBuffer.allocate(1024 * 2); // 1
if (readBuffer == null) {readBuffer = IoBuffer.allocate(bufferSize);
}
// 2
while ((ret = ((SocketChannel) channel).read(data)) > 0) {data.flip(); // 3
readBytes += data.remaining();
readBuffer.put(data.array(), data.position(), data.remaining());
data.clear();}
return ret < 0 ? ret : readBytes;
}
① 客户端读响应
从以后 readBuffer 中的内容复制到一个新的长期 buffer 中,并且切换到读模式,应用 TarsCodec 类解析出 buffer 内的协定字段到 response,WorkThread 线程告诉 Ticket 解决 response。如果 response 为空,则重置 tempBuffer 到 mark 的地位,从新解析协定。
代码 10:
public void readResponse() {
Response response = null;
IoBuffer tempBuffer = null;
tempBuffer = readBuffer.duplicate().flip();
while (true) {tempBuffer.mark();
if (tempBuffer.remaining() > 0) {response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);
} else {response = null;}
if (response != null) {if (response.getTicketNumber() == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession().hashCode());
selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));
} else {tempBuffer.reset();
readBuffer = resetIoBuffer(tempBuffer);
break;
}
}
}
② 服务器读申请
工作放入线程池交给 WorkThread 线程,最终交给 Processor 类出构建申请的响应体,包含分布式上下文,而后通过 FilterChain 的解决,最终通过 jdk 提供的反射办法 invoke 服务端本地的办法而后返回 response。如果线程池抛出回绝异样,则返回 SERVEROVERLOAD = -9,服务端过载爱护。如果 request 为空,则重置 tempBuffer 到 mark 的地位,从新解析协定。
代码 11:
public void readRequest() {
Request request = null;
IoBuffer tempBuffer = readBuffer.duplicate().flip();
while (true) {tempBuffer.mark();
if (tempBuffer.remaining() > 0) {request = selectorManager.getProtocolFactory().getDecoder().decodeRequest(tempBuffer, this);
} else {request = null;}
if (request != null) {
try {request.resetBornTime();
selectorManager.getThreadPool().execute(new WorkThread(request, selectorManager));
} catch (RejectedExecutionException e) {selectorManager.getProcessor().overload(request, request.getIoSession());
} catch (Exception ex) {ex.printStackTrace();
}
} else {tempBuffer.reset();
readBuffer = resetIoBuffer(tempBuffer);
break;
}
}
}
2. 写事件处理
同样也包含客户端写申请和服务端写响应两种,其实这两种都是往 TCPSession 中的 LinkedBlockingQueue(有界队列最大 8K)中插入 ByteBuffer。LinkedBlockingQueue 中的 ByteBuffer 最终会由 TCPAcceptor 中的 handleWriteEvent 监听写就绪事件并生产。
代码 12:
protected void write(IoBuffer buffer) throws IOException {if (buffer == null) return;
if (channel == null || key == null) throw new IOException("Connection is closed");
if (!this.queue.offer(buffer.buf())) {throw new IOException("The session queue is full. [ queue size:" + queue.size() + "]");
}
if (key != null) {key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
key.selector().wakeup();
}
}
四、总结
本文次要介绍了 Java NIO 编程的基础知识 和 Tars-Java 1.7.2 版本的网络编程模块的源码实现。
在最新的 Tars-Java 的 master 分支中咱们能够发现网络编程曾经由 NIO 改成了 Netty,尽管 Netty 更加成熟稳固,然而作为学习者理解 NIO 的原理也是把握网络编程的必经之路。
更多对于 Tars 框架的介绍能够拜访:
https://tarscloud.org/
本文剖析源码地址(v1.7.x 分支):
https://github.com/TarsCloud/TarsJava