关于开发:分布式-DBLE-网络模块源码解析二

4次阅读

共计 6579 个字符,预计需要花费 17 分钟才能阅读完成。

作者:路路
酷爱技术、乐于分享的技术人,目前次要从事数据库相干技术的钻研。
本文起源:原创投稿
* 爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。


前言

在上一篇文章中,我讲了网络 IO 的基础知识,本篇文章将从源码角度具体解说 DBLE 的网络模块,包含 DBLE 是如何解决 MySQL 包的,多路复用在 DBLE 中是如何实现的,以及申请的异步化解决相干逻辑。

DBLE 是如何解决 MySQL 包的?

咱们将以客户端连贯 DBLE 为例,从源码角度解说 DBLE 的相干解决流程。

客户端与 DBLE 建设连贯的流程如下图所示(因为 DBLE 实现了 MySQL 协定,所以与客户端连贯 MySQL 的流程一样):

次要包含以下四个步骤:

1、客户端发动 connect 连贯;

2、服务端发送握手包;

3、客户端回复握手包;

4、服务端返回 OK 包,示意连贯建设实现,进入命令阶段。

咱们间接看源码:

1、DBLE 解决客户端 connect

DBLE 解决客户端 connect 的代码在 NIOAcceptor#run 办法中:

public void run() {
        // 这里的 selector 即 IO 多路复用选择器,一个 selector 能够解决多个客户端连贯申请
        final Selector tSelector = this.selector;
        for (; ;) {
            try {tSelector.select(1000L);
                Set<SelectionKey> keys = tSelector.selectedKeys();
                try {for (SelectionKey key : keys) {if (key.isValid() && key.isAcceptable()) {
                            // 当连贯无效且可承受时,解决客户端连贯
                            accept();} else {key.cancel();
                        }
                    }
                } catch (final Throwable e) {LOGGER.warn("caught Throwable err:", e);
                } finally {keys.clear();
                }
            } catch (Exception e) {LOGGER.info(getName(), e);
            }
        }
    }

能够看出上述办法调用了 accept 来承受客户端发动的 TCP 连贯,持续看该类的 accept 办法:

private void accept() {
        SocketChannel channel = null;
        try {
            // 与客户端建设 TCP 连贯
            channel = serverChannel.accept();
            channel.configureBlocking(false);
            NIOSocketWR socketWR = new NIOSocketWR();
            FrontendConnection c = factory.make(channel, socketWR);
            socketWR.initFromConnection(c);
            c.setId(ID_GENERATOR.getId());
            IOProcessor processor = DbleServer.getInstance().nextFrontProcessor();
            c.setProcessor(processor);
            NIOReactor reactor = reactorPool.getNextReactor();
            // 将已建设好的连贯注册给 NIOReactor
            reactor.postRegister(c);
        } catch (Exception e) {LOGGER.info(getName(), e);
            closeChannel(channel);
        }
    }

能够看出上述代码将客户端发动建设的 TCP 连贯注册给了 NIOReactor 来治理。

到这里 DBLE 对客户端的 connect 曾经解决实现了,他们之间曾经实现了 TCP 连贯的建设,同时 DBLE 将客户端申请的连贯注册给了NIOReactor

2、服务端发送握手包

接着下面咱们持续看 NIOReactor#postRegister 办法:

// 该办法将连贯放入队列,并唤醒 reactorR 的 selector,其中 reactorR 为 NIOReactor 的外部类 RW
void postRegister(AbstractConnection c) {reactorR.registerQueue.offer(c);
        reactorR.selector.wakeup();}

咱们间接看 RW 解决注册队列里客户端连贯的代码,在 RW#register 办法中:

private void register(Selector finalSelector) {
            ……
            while ((c = registerQueue.poll()) != null) {
                try {
                     // 上面这行代码须要留神,该代码将连贯注册到了 RW 的 selector 多路复用选择器中,使得该选择器后续可能读取该连贯发送过去的数据
                    ((NIOSocketWR) c.getSocketWR()).register(finalSelector);
                    // 服务端发送握手包的逻辑在上面这个办法中
                    c.register();} 
            ……
        }

AbstractConnection#register办法中又调用了 AbstractService#register 办法,对于客户端连贯申请来讲,这里最终调用的是 MySQLFrontAuthService#register 办法:

public void register() throws IOException {
        // 终于看到了 greeting,该办法向客户端发送了握手包
        greeting();
        this.connection.getSocketWR().asyncRead();
    }

MySQLFrontAuthService#greeting办法实现了拼装握手包,并将握手包发送给客户端的逻辑:

private void greeting() {
        // generate auth data
        byte[] rand1 = RandomUtil.randomBytes(8);
        byte[] rand2 = RandomUtil.randomBytes(12);

        // save auth data
        byte[] rand = new byte[rand1.length + rand2.length];
        System.arraycopy(rand1, 0, rand, 0, rand1.length);
        System.arraycopy(rand2, 0, rand, rand1.length, rand2.length);
        this.seed = rand;

        HandshakeV10Packet hs = new HandshakeV10Packet();
        hs.setPacketId(0);
        hs.setProtocolVersion(Versions.PROTOCOL_VERSION);  // [0a] protocol version   V10
        hs.setServerVersion(Versions.getServerVersion());
        hs.setThreadId(connection.getId());
        hs.setSeed(rand1);
        hs.setServerCapabilities(getServerCapabilities());
        int charsetIndex = CharsetUtil.getCharsetDefaultIndex(SystemConfig.getInstance().getCharset());
        hs.setServerCharsetIndex((byte) (charsetIndex & 0xff));
        hs.setServerStatus(2);
        hs.setRestOfScrambleBuff(rand2);
        hs.setAuthPluginName(pluginName.name().getBytes());

        // 这里的调用即发送握手包到客户端
        hs.write(connection);
    }

到这里就实现了服务端发送握手包的逻辑。

3、DBLE 解决客户端的握手回复包

服务端发送了握手包给客户端,客户端收到后须要发送握手回复包过去了,个别该握手回复包中会蕴含用户相干信息。

那么 DBLE 如何读取并解决客户端发送过去的握手回复包呢?

相应的代码在 RW#run 办法中,那为什么在这个办法中能解决客户端发送过去的数据呢?因为之前在解决客户端连贯的时候,曾经把相应的连贯注册给了 RW 的多路复用选择器,所以它当然能解决相应连贯的数据了,记不得的同学能够看后面 RW#register 办法中的正文。

RW#run办法中解决客户端发送数据的次要代码如下:

public void run() {
            final Selector finalSelector = this.selector;
            Set<SelectionKey> keys = null;
            for (; ;) {
                    ……                
                    // 当连贯中有数据的时候,这里会返回相应的 selection keys
                    keys = finalSelector.selectedKeys();
                    if (keys.size() == 0) {continue;}
                    // 对有相应事件的连贯进行解决
                    executeKeys(keys);
             }
                    ……
     }

咱们持续看 RW#executeKeys 办法(对代码做了一些精简,但不影响了解):

private void executeKeys(Set<SelectionKey> keys) {for (SelectionKey key : keys) {
                AbstractConnection con = null;
                Object att = key.attachment();
                con = (AbstractConnection) att;
                if (key.isValid() && key.isReadable()) {
                    // 这里即为读取客户端发送过去的数据
                    con.asyncRead();}
            }
        }

跟着代码走,相应的解决逻辑在 NIOSocketWR#asyncRead 办法中:

public void asyncRead() throws IOException {ByteBuffer theBuffer = con.findReadBuffer();
        // 读取客户端发送过去的数据到缓存 theBuffer 中
        int got = channel.read(theBuffer);
        // 解决相应的数据
        con.onReadData(got);
    }

AbstractConnection#onReadData办法中又进一步调用了 AbstractService#handle 办法来解决数据,所以咱们间接看 AbstractService#handle 办法:

public void handle(ByteBuffer dataBuffer) {this.sessionStart();
        boolean hasReming = true;
        int offset = 0;
        while (hasReming) {
            // 上面这行代码理论解决了客户端传过来的数据包,外面蕴含计算包总长度、判断读取的数据包是否残缺等逻辑
            ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);
            switch (result.getCode()) {
                // 客户端首次发来的握手包,所以是残缺的数据包,进入这里的解决逻辑,这里将读取的数据封装成 task 工作,提交到队列中,而后通过线程异步解决
                case REACH_END_BUFFER:
                    connection.readReachEnd();
                    byte[] packetData = result.getPacketData();
                    if (packetData != null) {taskCreate(packetData);
                    }
                    dataBuffer.clear();
                    hasReming = false;
                    break;
                case BUFFER_PACKET_UNCOMPLETE:
                    connection.compactReadBuffer(dataBuffer, result.getOffset());
                    hasReming = false;
                    break;
                case BUFFER_NOT_BIG_ENOUGH:
                    connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());
                    hasReming = false;
                    break;
                case STLL_DATA_REMING:
                    byte[] partData = result.getPacketData();
                    if (partData != null) {taskCreate(partData);
                    }
                    offset = result.getOffset();
                    continue;
                default:
                    throw new RuntimeException("unknown error when read data");
            }
        }
    }

到这里 DBLE 实现了读取客户端发送过去的握手包,并将它封装成了异步工作以备下一步解决。

4、DBLE 异步解决工作并返回 OK 包

异步是高性能的秘诀之一 。下面 DBLE 将读取到的数据封装成了工作,而后交由线程异步解决。
咱们间接来看工作解决的相干代码,在 AbstractService#execute 办法中:

public void execute(ServiceTask task) {task.increasePriority();
        handleData(task);
    }

对于客户端握手回复包的解决,最初调用的代码在 MySQLFrontAuthService#handleAuthPacket 办法中,所以对于该场景,咱们间接看该办法的相干代码:

private void handleAuthPacket(byte[] data) {
        // 将读取到的数据转换为 AuthPacket
        AuthPacket auth = new AuthPacket();
        auth.read(data);
        this.authPacket = auth;
        ……
        // 查看用户名和明码
        auth();
        ……
    }

下面的 auth 办法里又调用了 MySQLFrontAuthService#checkForResult 办法,所以咱们间接看该办法:

private void checkForResult(AuthResultInfo info) {
            ……
            AbstractService service = BusinessServiceFactory.getBusinessService(info, connection);
            connection.setService(service);
            // 验证通过后,拼装并发送 OK 包给客户端
            MySQLPacket packet = new OkPacket();
            packet.setPacketId(needAuthSwitched ? 4 : 2);
            packet.write(connection);
            ……
    }

到这里,DBLE 曾经解决完了握手回复包,并返回 OK 包给客户端,整个客户端与 DBLE 的连贯建设过程就完结了。

该过程完结后,将进入 MySQL 协定的 Command 阶段,如果你通过命令行连贯 DBLE 的话,即进入了上面的界面:

mysql>

是不是没想到在进入这个命令界背后产生了这么多……

多路复用在 DBLE 中是如何实现的

其实这个问题的答案,如果认真看后面代码章节的话就曾经可能晓得了。DBLE 的多路复用其实就是通过 JAVA 的多路复用选择器 Selector 来实现的,通过将连贯注册给Selector,这样只是在连贯中有数据时候才进行读取,可能实现一个线程监听多个连贯。

申请的异步化解决

DBLE 在读取完数据后,并没有在以后线程中解决这些数据,而是将数据封装成工作提交到队列中去,而后通过另外的线程来进行解决,这即是申请异步化解决,可能极大的进步性能,这在下面的源码解读章节里也进行了阐明。

总结

明天从一个实例登程,从源码角度具体解读了 DBLE 对网络数据包的解决流程。通过 Selector 实现多路复用,将接管到的数据封装成工作提交到队列以进行异步解决。这是 DBLE 高性能网络 IO 解决的机密。当然可能还有一些代码细节在文章中没有讲到,大家如果有疑难的中央能够进一步浏览源码,也能够评论区留言。

正文完
 0