作者:路路
酷爱技术、乐于分享的技术人,目前次要从事数据库相干技术的钻研。
本文起源:原创投稿
*爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。
前言
在上一篇文章中,我讲了网络IO的基础知识,本篇文章将从源码角度具体解说DBLE的网络模块,包含DBLE是如何解决MySQL包的,多路复用在DBLE中是如何实现的,以及申请的异步化解决相干逻辑。
DBLE是如何解决MySQL包的?
咱们将以客户端连贯DBLE为例,从源码角度解说DBLE的相干解决流程。
客户端与DBLE建设连贯的流程如下图所示(因为DBLE实现了MySQL协定,所以与客户端连贯MySQL的流程一样):
次要包含以下四个步骤:
1、 客户端发动connect连贯;
2、服务端发送握手包;
3、客户端回复握手包;
4、服务端返回OK包,示意连贯建设实现,进入命令阶段。
咱们间接看源码:
1、DBLE解决客户端connect
DBLE解决客户端connect的代码在NIOAcceptor#run
办法中:
public void run() { //这里的selector即IO多路复用选择器,一个selector能够解决多个客户端连贯申请 final Selector tSelector = this.selector; for (; ; ) { try { tSelector.select(1000L); Set<SelectionKey> keys = tSelector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { //当连贯无效且可承受时,解决客户端连贯 accept(); } else { key.cancel(); } } } catch (final Throwable e) { LOGGER.warn("caught Throwable err: ", e); } finally { keys.clear(); } } catch (Exception e) { LOGGER.info(getName(), e); } } }
能够看出上述办法调用了accept
来承受客户端发动的TCP连贯,持续看该类的accept
办法:
private void accept() { SocketChannel channel = null; try { //与客户端建设TCP连贯 channel = serverChannel.accept(); channel.configureBlocking(false); NIOSocketWR socketWR = new NIOSocketWR(); FrontendConnection c = factory.make(channel, socketWR); socketWR.initFromConnection(c); c.setId(ID_GENERATOR.getId()); IOProcessor processor = DbleServer.getInstance().nextFrontProcessor(); c.setProcessor(processor); NIOReactor reactor = reactorPool.getNextReactor(); //将已建设好的连贯注册给NIOReactor reactor.postRegister(c); } catch (Exception e) { LOGGER.info(getName(), e); closeChannel(channel); } }
能够看出上述代码将客户端发动建设的TCP连贯注册给了NIOReactor
来治理。
到这里DBLE对客户端的connect曾经解决实现了,他们之间曾经实现了TCP连贯的建设,同时DBLE将客户端申请的连贯注册给了NIOReactor
。
2、服务端发送握手包
接着下面咱们持续看NIOReactor#postRegister
办法:
//该办法将连贯放入队列,并唤醒reactorR的selector,其中reactorR为NIOReactor的外部类RWvoid postRegister(AbstractConnection c) { reactorR.registerQueue.offer(c); reactorR.selector.wakeup(); }
咱们间接看RW解决注册队列里客户端连贯的代码,在RW#register
办法中:
private void register(Selector finalSelector) { …… while ((c = registerQueue.poll()) != null) { try { //上面这行代码须要留神,该代码将连贯注册到了RW的selector多路复用选择器中,使得该选择器后续可能读取该连贯发送过去的数据 ((NIOSocketWR) c.getSocketWR()).register(finalSelector); //服务端发送握手包的逻辑在上面这个办法中 c.register(); } …… }
AbstractConnection#register
办法中又调用了AbstractService#register
办法,对于客户端连贯申请来讲,这里最终调用的是MySQLFrontAuthService#register
办法:
public void register() throws IOException { //终于看到了greeting,该办法向客户端发送了握手包 greeting(); this.connection.getSocketWR().asyncRead(); }
MySQLFrontAuthService#greeting
办法实现了拼装握手包,并将握手包发送给客户端的逻辑:
private void greeting() { // generate auth data byte[] rand1 = RandomUtil.randomBytes(8); byte[] rand2 = RandomUtil.randomBytes(12); // save auth data byte[] rand = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, rand, 0, rand1.length); System.arraycopy(rand2, 0, rand, rand1.length, rand2.length); this.seed = rand; HandshakeV10Packet hs = new HandshakeV10Packet(); hs.setPacketId(0); hs.setProtocolVersion(Versions.PROTOCOL_VERSION); // [0a] protocol version V10 hs.setServerVersion(Versions.getServerVersion()); hs.setThreadId(connection.getId()); hs.setSeed(rand1); hs.setServerCapabilities(getServerCapabilities()); int charsetIndex = CharsetUtil.getCharsetDefaultIndex(SystemConfig.getInstance().getCharset()); hs.setServerCharsetIndex((byte) (charsetIndex & 0xff)); hs.setServerStatus(2); hs.setRestOfScrambleBuff(rand2); hs.setAuthPluginName(pluginName.name().getBytes()); //这里的调用即发送握手包到客户端 hs.write(connection); }
到这里就实现了服务端发送握手包的逻辑。
3、DBLE解决客户端的握手回复包
服务端发送了握手包给客户端,客户端收到后须要发送握手回复包过去了,个别该握手回复包中会蕴含用户相干信息。
那么DBLE如何读取并解决客户端发送过去的握手回复包呢?
相应的代码在RW#run
办法中,那为什么在这个办法中能解决客户端发送过去的数据呢?因为之前在解决客户端连贯的时候,曾经把相应的连贯注册给了RW
的多路复用选择器,所以它当然能解决相应连贯的数据了,记不得的同学能够看后面RW#register
办法中的正文。
RW#run
办法中解决客户端发送数据的次要代码如下:
public void run() { final Selector finalSelector = this.selector; Set<SelectionKey> keys = null; for (; ; ) { …… //当连贯中有数据的时候,这里会返回相应的selection keys keys = finalSelector.selectedKeys(); if (keys.size() == 0) { continue; } //对有相应事件的连贯进行解决 executeKeys(keys); } …… }
咱们持续看RW#executeKeys
办法(对代码做了一些精简,但不影响了解):
private void executeKeys(Set<SelectionKey> keys) { for (SelectionKey key : keys) { AbstractConnection con = null; Object att = key.attachment(); con = (AbstractConnection) att; if (key.isValid() && key.isReadable()) { //这里即为读取客户端发送过去的数据 con.asyncRead(); } } }
跟着代码走,相应的解决逻辑在NIOSocketWR#asyncRead
办法中:
public void asyncRead() throws IOException { ByteBuffer theBuffer = con.findReadBuffer(); //读取客户端发送过去的数据到缓存theBuffer中 int got = channel.read(theBuffer); //解决相应的数据 con.onReadData(got); }
AbstractConnection#onReadData
办法中又进一步调用了AbstractService#handle
办法来解决数据,所以咱们间接看AbstractService#handle
办法:
public void handle(ByteBuffer dataBuffer) { this.sessionStart(); boolean hasReming = true; int offset = 0; while (hasReming) { //上面这行代码理论解决了客户端传过来的数据包,外面蕴含计算包总长度、判断读取的数据包是否残缺等逻辑 ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress); switch (result.getCode()) { //客户端首次发来的握手包,所以是残缺的数据包,进入这里的解决逻辑,这里将读取的数据封装成task工作,提交到队列中,而后通过线程异步解决 case REACH_END_BUFFER: connection.readReachEnd(); byte[] packetData = result.getPacketData(); if (packetData != null) { taskCreate(packetData); } dataBuffer.clear(); hasReming = false; break; case BUFFER_PACKET_UNCOMPLETE: connection.compactReadBuffer(dataBuffer, result.getOffset()); hasReming = false; break; case BUFFER_NOT_BIG_ENOUGH: connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength()); hasReming = false; break; case STLL_DATA_REMING: byte[] partData = result.getPacketData(); if (partData != null) { taskCreate(partData); } offset = result.getOffset(); continue; default: throw new RuntimeException("unknown error when read data"); } } }
到这里DBLE实现了读取客户端发送过去的握手包,并将它封装成了异步工作以备下一步解决。
4、DBLE异步解决工作并返回OK包
异步是高性能的秘诀之一。下面DBLE将读取到的数据封装成了工作,而后交由线程异步解决。
咱们间接来看工作解决的相干代码,在AbstractService#execute
办法中:
public void execute(ServiceTask task) { task.increasePriority(); handleData(task); }
对于客户端握手回复包的解决,最初调用的代码在MySQLFrontAuthService#handleAuthPacket
办法中,所以对于该场景,咱们间接看该办法的相干代码:
private void handleAuthPacket(byte[] data) { //将读取到的数据转换为AuthPacket AuthPacket auth = new AuthPacket(); auth.read(data); this.authPacket = auth; …… //查看用户名和明码 auth(); …… }
下面的auth
办法里又调用了MySQLFrontAuthService#checkForResult
办法,所以咱们间接看该办法:
private void checkForResult(AuthResultInfo info) { …… AbstractService service = BusinessServiceFactory.getBusinessService(info, connection); connection.setService(service); //验证通过后,拼装并发送OK包给客户端 MySQLPacket packet = new OkPacket(); packet.setPacketId(needAuthSwitched ? 4 : 2); packet.write(connection); …… }
到这里,DBLE曾经解决完了握手回复包,并返回OK包给客户端,整个客户端与DBLE的连贯建设过程就完结了。
该过程完结后,将进入MySQL协定的Command阶段,如果你通过命令行连贯DBLE的话,即进入了上面的界面:
mysql>
是不是没想到在进入这个命令界背后产生了这么多……
多路复用在DBLE中是如何实现的
其实这个问题的答案,如果认真看后面代码章节的话就曾经可能晓得了。DBLE的多路复用其实就是通过JAVA的多路复用选择器Selector
来实现的,通过将连贯注册给Selector
,这样只是在连贯中有数据时候才进行读取,可能实现一个线程监听多个连贯。
申请的异步化解决
DBLE在读取完数据后,并没有在以后线程中解决这些数据,而是将数据封装成工作提交到队列中去,而后通过另外的线程来进行解决,这即是申请异步化解决,可能极大的进步性能,这在下面的源码解读章节里也进行了阐明。
总结
明天从一个实例登程,从源码角度具体解读了DBLE对网络数据包的解决流程。通过Selector
实现多路复用,将接管到的数据封装成工作提交到队列以进行异步解决。这是DBLE高性能网络IO解决的机密。当然可能还有一些代码细节在文章中没有讲到,大家如果有疑难的中央能够进一步浏览源码,也能够评论区留言。