关于java:JDBC-基本流程源码分析

11次阅读

共计 20175 个字符,预计需要花费 51 分钟才能阅读完成。

最近为了浏览 MyBatis 源码,大略看了下 JDBC 的代码,理解在不应用 ORM 框架的状况下 Java 实现 SQL 查问的原理。

JDBC 定义如下:

JDBC(Java DataBase Connectivity,java 数据库连贯)是一种用于执行 SQL 语句的 Java API,能够为多种关系数据库提供对立拜访,它由一组用 Java 语言编写的类和接口组成。JDBC 提供了一种基准,据此能够构建更高级的工具和接口,使数据库开发人员可能编写数据库应用程序。

也就是说 JDBC 是 SUN 公司提出来的一系列标准,但它只定义了接口标准,具体的实现则交给各个数据库的厂商去做。

1. 应用流程

JDBC 流程:

  1. 通过 SPI 加载 Driver 驱动类。
  2. 建设数据库连贯,获取 Connection 连贯对象。
  3. 通过连贯创立 Statement 对象,执行 SQL 语句,获取返回后果。
  4. 开释资源。

在 Maven 中增加对 mysql 驱动的依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.23</version>
</dependency>

数据库表:

1.1 一般查问

@Test
public void query() throws Exception {
    // 打印至控制台
    DriverManager.setLogWriter(new PrintWriter(System.out));
    // 建设连贯
    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "test", "test");
    // 执行 SQL 查问,获取返回后果
    Statement statement = conn.createStatement();
    statement.setQueryTimeout(60);
    ResultSet resultSet = statement.executeQuery("select * from t_student");
    while (resultSet.next()) {System.out.println("id:" + resultSet.getInt(1) + "address:" + resultSet.getString(2) + "name:" + resultSet.getString(4));
    }
}

执行后果:

DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb")
    trying com.mysql.cj.jdbc.Driver
getConnection returning com.mysql.cj.jdbc.Driver
id:1 address:hunan name:zhangsan
id:2 address:guangzhou name:lisi
id:3 address: 四川 name: 大大 

1.2 参数查问

@Test
public void queryByParam() throws SQLException {DriverManager.setLogWriter(new PrintWriter(System.out));
    // 建设连贯
    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "test", "test");
    // 执行 SQL 查问,获取返回后果
    PreparedStatement preparedStatement = conn.prepareStatement("select * from t_student where id = ? and age = ?");
    preparedStatement.setString(1, "1");
    preparedStatement.setInt(2, 11);
    ResultSet resultSet = preparedStatement.executeQuery();
    while (resultSet.next()) {System.out.println("id:" + resultSet.getInt(1) + "address:" + resultSet.getString(2) + "name:" + resultSet.getString(4));
    }
}

1.3 批量更新

@Test
public void updateBatch() throws SQLException {DriverManager.setLogWriter(new PrintWriter(System.out));
    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "test", "test");
    PreparedStatement preparedStatement = conn.prepareStatement("update t_student set age = ? where id = ?");

    preparedStatement.setInt(1, 10);
    preparedStatement.setString(2, "1");
    preparedStatement.addBatch();

    preparedStatement.setInt(1, 10);
    preparedStatement.setString(2, "2");
    preparedStatement.addBatch();

    int[] result = preparedStatement.executeBatch();
    System.out.println("result =" + result.length);
}

2. 源码解析

2.1 加载驱动

次要流程:

  1. 通过 SPI 加载 MySQL 驱动包中的 Driver 类。
  2. 将 Driver 类注册到 JDBC 的 DriverManager 之中。

2.1.1 SPI 加载驱动类

DriverManager 中定义了动态代码块,会通过 SPI 来加载数据库驱动类。

java.sql.DriverManager

/**
 * Load the initial JDBC drivers by checking the System property
 * jdbc.properties and then use the {@code ServiceLoader} mechanism
 */
static {loadInitialDrivers();
    println("JDBC DriverManager initialized");
}

java.sql.DriverManager#loadInitialDrivers

private static void loadInitialDrivers() {
    // ...
    AccessController.doPrivileged(new PrivilegedAction<Void>() {public Void run() {

            // 采纳 SPI 机制加载数据库驱动
            ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class); 
            Iterator<Driver> driversIterator = loadedDrivers.iterator();

            /* Load these drivers, so that they can be instantiated.
             * It may be the case that the driver class may not be there
             * i.e. there may be a packaged driver with the service class
             * as implementation of java.sql.Driver but the actual class
             * may be missing. In that case a java.util.ServiceConfigurationError
             * will be thrown at runtime by the VM trying to locate
             * and load the service.
             *
             * Adding a try catch block to catch those runtime errors
             * if driver not available in classpath but it's
             * packaged as service and that service is there in classpath.
             */
            try{while(driversIterator.hasNext()) {driversIterator.next();
                }
            } catch(Throwable t) {// Do nothing}
            return null;
        }
    });
    // ...
}

通过 SPI,加载配置文件 META-INF/services/java.sql.Driver,因为我的项目中具备 MySql 驱动的 jar 包,这里是读取到配置文件:
mysql-connector-java-8.0.23.jar!\META-INF\services\java.sql.Driver
文件中的内容为 com.mysql.cj.jdbc.Driver,反射获取失去该类,并进行实例化。

2.1.2 注册驱动类

实例化 MySQL 驱动的时候,会通过 DriverManager#registerDriver 办法来注册驱动。

com.mysql.cj.jdbc.Driver

public class Driver extends NonRegisteringDriver implements java.sql.Driver {
    //
    // Register ourselves with the DriverManager
    //
    static {
        try {java.sql.DriverManager.registerDriver(new Driver()); // 注册驱动
        } catch (SQLException E) {throw new RuntimeException("Can't register driver!");
        }
    }

    /**
     * Construct a new driver and register it with DriverManager
     * 
     * @throws SQLException
     *             if a database error occurs.
     */
    public Driver() throws SQLException {// Required for Class.forName().newInstance()}
}

其底层实现是,将 MySQL 驱动 Driver 对象包装为 DriverInfo,存储在 DriverManager#registeredDrivers 汇合中。

java.sql.DriverManager#registerDriver

// List of registered JDBC drivers
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();

public static synchronized void registerDriver(java.sql.Driver driver)
    throws SQLException {registerDriver(driver, null);
}

public static synchronized void registerDriver(java.sql.Driver driver,
        DriverAction da)
    throws SQLException {

    /* Register the driver if it has not already been added to our list */
    if(driver != null) {registeredDrivers.addIfAbsent(new DriverInfo(driver, da)); // 注册驱动
    } else {
        // This is for compatibility with the original DriverManager
        throw new NullPointerException();}

    println("registerDriver:" + driver);

}

2.2 建设连贯

应用 DriverManager#getConnection 办法来建设与 MySQL 服务器的连贯。

Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "test", "test");

其中遍历 CopyOnWriteArrayList<DriverInfo> registeredDrivers 中已注册的数据库驱动,这里是 com.mysql.cj.jdbc.Driver 实例。

java.sql.DriverManager#getConnection

for(DriverInfo aDriver : registeredDrivers) { // 遍历已注册的数据库驱动
    // If the caller does not have permission to load the driver then
    // skip it.
    if(isDriverAllowed(aDriver.driver, callerCL)) {
        try {println("trying" + aDriver.driver.getClass().getName());
            Connection con = aDriver.driver.connect(url, info); // 建设数据库连贯,返回连贯对象
            if (con != null) {
                // Success!
                println("getConnection returning" + aDriver.driver.getClass().getName());
                return (con);
            }
        } catch (SQLException ex) {if (reason == null) {reason = ex;}
        }

    } else {println("skipping:" + aDriver.getClass().getName());
    }

}

通过 MySQL 数据库驱动来建设连贯:

  1. 依据数据库地址、用户名明码等参数,构建 ConnectionUrl 对象,该对象默认是 SINGLE_CONNECTION 类型。
  2. 依据 ConnectionUrl 对象,创立数据库连贯实例 ConnectionImpl,建设 Socket 连贯。

2.2.1 构建 ConnectionUrl 对象

com.mysql.cj.jdbc.NonRegisteringDriver#connect

public java.sql.Connection connect(String url, Properties info) throws SQLException {
    // ...
    ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
    switch (conStr.getType()) {
        // 单个
        case SINGLE_CONNECTION:
            return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost());
        // 生效转移 
        case FAILOVER_CONNECTION:
        case FAILOVER_DNS_SRV_CONNECTION:
            return FailoverConnectionProxy.createProxyInstance(conStr);
        // 负载平衡        
        case LOADBALANCE_CONNECTION:
        case LOADBALANCE_DNS_SRV_CONNECTION:
            return LoadBalancedConnectionProxy.createProxyInstance(conStr);
        // 复制
        case REPLICATION_CONNECTION:
        case REPLICATION_DNS_SRV_CONNECTION:
            return ReplicationConnectionProxy.createProxyInstance(conStr);
        default:
            return null;
    }
    // ...
}

2.2.2 MySQL 驱动协定阐明

1. Failover

jdbc:mysql://[primary-host]:[port],[secondary-host]:[port],.../[database]?[property=<value>]&[property=<value>]

即 Client 链接生效时,将会尝试与其余 host 建设链接,这个过程对 application 是通明的。读(写)操作总是只产生在一个 host 上。

2. Load Balancing

jdbc:mysql:loadbalance://[host]:[port],[host]:[port],...[/database]?[property=<value>]&[property=<value>]

格局同 failover\replication 相似,所有 host 没有主次之分都是平级,反对参数管制。

负载平衡策略定义了 BalanceStrategy 接口,mysql 反对曾经实现该接口的策略有:

  • BestResponseTimeBalanceStrategy:选中事务响应最快的 host
  • RandomBalanceStrategy:(默认)随机选中一个 host
  • SequentialBalanceStrategy:第一次随机之后程序选后一个至周而复始

3. Replication

jdbc:mysql:replication://[master-host]:[port],[slave-host]:[port],.../database?[property=<value>]

具体格局相似 failover,比拟大的变动是第一个 host 为 master 库是 write/read 模式,前面都是 slave 库是 read 模式,也是反对参数进行配置。

replication 协定是建设在 failover 和 loadbalance 根底上,适应 Replication 架构须要为解决读写拆散、负载平衡场景的。
在事务 read only 模式下申请会被转向到 slave host,若多个 slave 状况下采纳 round-robin(轮询) 策略。
对于非 read only 申请(write/read)都将转向到 master host。

6.5.1.27 后版本反对多个 master,多个 master 下采纳 load balance 策略,具体参考 loadbalance 协定介绍。
7.5.1.28 版本后又反对动静增加节点,也就是程序运行是动静增加新的 host 到 URL 中而不须要重启服务器,咱们常常会聊的动静数据源场景。

本节参考:
mysql 驱动协定之 loadbalance 和 replication
Chapter 8 Multi-Host Connections

2.3 构建 ConnectionImpl 实例

构建 ConnectionImpl 实例,其中会创立 Socket 连贯。

com.mysql.cj.jdbc.ConnectionImpl#getInstance
com.mysql.cj.jdbc.ConnectionImpl#ConnectionImpl(com.mysql.cj.conf.HostInfo)

public ConnectionImpl(HostInfo hostInfo) throws SQLException {
    // ...
    createNewIO(false); // 要害地位
    // ...
}

com.mysql.cj.jdbc.ConnectionImpl#createNewIO
com.mysql.cj.jdbc.ConnectionImpl#connectOneTryOnly

建设会话,这里用的是 BIO。

com.mysql.cj.NativeSession#connect
com.mysql.cj.protocol.a.NativeSocketConnection#connect
com.mysql.cj.protocol.StandardSocketFactory#connect

@SuppressWarnings("unchecked")
public <T extends Closeable> T connect(String hostname, int portNumber, PropertySet pset, int loginTimeout) throws IOException {

    this.loginTimeoutCountdown = loginTimeout;

    if (pset != null) {
        this.host = hostname;

        this.port = portNumber;

        String localSocketHostname = pset.getStringProperty(PropertyKey.localSocketAddress).getValue();
        InetSocketAddress localSockAddr = null;
        if (localSocketHostname != null && localSocketHostname.length() > 0) {localSockAddr = new InetSocketAddress(InetAddress.getByName(localSocketHostname), 0);
        }

        int connectTimeout = pset.getIntegerProperty(PropertyKey.connectTimeout).getValue();

        if (this.host != null) {InetAddress[] possibleAddresses = InetAddress.getAllByName(this.host);

            if (possibleAddresses.length == 0) {throw new SocketException("No addresses for host");
            }

            // save last exception to propagate to caller if connection fails
            SocketException lastException = null;

            // Need to loop through all possible addresses. Name lookup may return multiple addresses including IPv4 and IPv6 addresses. Some versions of
            // MySQL don't listen on the IPv6 address so we try all addresses.
            for (int i = 0; i < possibleAddresses.length; i++) {
                try {this.rawSocket = createSocket(pset);

                    configureSocket(this.rawSocket, pset);

                    InetSocketAddress sockAddr = new InetSocketAddress(possibleAddresses[i], this.port);
                    // bind to the local port if not using the ephemeral port
                    if (localSockAddr != null) {this.rawSocket.bind(localSockAddr);
                    }

                    this.rawSocket.connect(sockAddr, getRealTimeout(connectTimeout)); // 建设 Socket 连贯

                    break;
                } catch (SocketException ex) {
                    lastException = ex;
                    resetLoginTimeCountdown();
                    this.rawSocket = null;
                }
            }

            if (this.rawSocket == null && lastException != null) {throw lastException;}

            resetLoginTimeCountdown();

            this.sslSocket = this.rawSocket;
            return (T) this.rawSocket;
        }
    }

    throw new SocketException("Unable to create socket");
}

2.3 执行语句

2.3.1 Statement

Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery("select * from t_student");

发动 SQL 查问,获取返回后果:

com.mysql.cj.jdbc.StatementImpl#executeQuery

public java.sql.ResultSet executeQuery(String sql) throws SQLException {
    // ...
    this.results = ((NativeSession) locallyScopedConn.getSession()).execSQL(this, sql, this.maxRows, null, 
        createStreamingResultSet(), getResultSetFactory(), cachedMetaData, false);
}

发动 SQL 查问,分为两种形式:1. 字符串;2. 二进制数据包。

com.mysql.cj.NativeSession#execSQL

public <T extends Resultset> T execSQL(Query callingQuery, String query, int maxRows, NativePacketPayload packet, boolean streamResults, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory, ColumnDefinition cachedMetadata, boolean isBatch) {
    // ...
    return packet == null
            ? ((NativeProtocol) this.protocol).sendQueryString(callingQuery, query, this.characterEncoding.getValue(), maxRows, streamResults, cachedMetadata, resultSetFactory)
            : ((NativeProtocol) this.protocol).sendQueryPacket(callingQuery, packet, maxRows, streamResults, cachedMetadata, resultSetFactory);
}

因为 SQL 语句为 select * from t_student,先执行 sendQueryString() 办法将字符串转换为二级制包(NativePacketPayload 类实例),再调用 sendQueryPacket() 办法发送数据。

com.mysql.cj.protocol.a.NativeProtocol#sendQueryString
com.mysql.cj.protocol.a.NativeProtocol#sendQueryPacket

public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults, ColumnDefinition cachedMetadata, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
    // ...
    // Send query command and sql query string
    NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);
    T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory);
    return rs;
}

向 mysql 服务器发送数据包。

com.mysql.cj.protocol.a.NativeProtocol#sendCommand(com.mysql.cj.protocol.Message, boolean, int)
com.mysql.cj.protocol.a.NativeProtocol#send(com.mysql.cj.protocol.Message, int)
com.mysql.cj.protocol.a.TimeTrackingPacketSender#send(byte[], int, byte)
com.mysql.cj.protocol.a.SimplePacketSender#send(byte[], int, byte)

public void send(byte[] packet, int packetLen, byte packetSequence) throws IOException {PacketSplitter packetSplitter = new PacketSplitter(packetLen);
    while (packetSplitter.nextPacket()) {this.outputStream.write(NativeUtils.encodeMysqlThreeByteInteger(packetSplitter.getPacketLen()));
        this.outputStream.write(packetSequence++);
        this.outputStream.write(packet, packetSplitter.getOffset(), packetSplitter.getPacketLen());
    }
    this.outputStream.flush();}

2.3.2 PreparedStatement

PreparedStatement 的性能相似 Statement,但不同的是 PreparedStatement 能够应用占位符,它是由占位符标识须要输出数据的地位,而后再逐个填入数据。当然,PreparedStatement 也能够执行没有占位符的 sql 语句。

PreparedStatement preparedStatement = conn.prepareStatement("select * from t_student where id = ? and age = ?");
preparedStatement.setString(1, "1");
preparedStatement.setInt(2, 11);
ResultSet resultSet = preparedStatement.executeQuery();

接口 java.sql.PreparedStatement 在 MySQL 驱动中对应的实现类为 com.mysql.cj.jdbc.ClientPreparedStatement

应用 PreparedStatement 进行查问:

com.mysql.cj.jdbc.ClientPreparedStatement#executeQuery

public java.sql.ResultSet executeQuery() throws SQLException {
    // ...
    Message sendPacket = ((PreparedQuery<?>) this.query).fillSendPacket();
    this.results = executeInternal(this.maxRows, sendPacket, createStreamingResultSet(), true, cachedMetadata, false);
    return this.results;
}

其中,首先对 SQL 中的占位符进行填补,再打成二进制包进行发送:

com.mysql.cj.AbstractPreparedQuery#fillSendPacket

sendPacket.writeBytes(StringLengthDataType.STRING_FIXED, bindValues[i].getByteValue());

对于原始的 SQL:select * from t_student where id = ? and age = ?
依据占位符拆分成三个字符串,再遍历各个字符串,绑定上参数。

  1. select * from t_student where id =
  2. and age =
  3. 空字符串

补充完参数之后,再打成一个二进制包,发送给数据库服务器。
后续流程与应用 Statement 统一:

com.mysql.cj.jdbc.ClientPreparedStatement#executeInternal
com.mysql.cj.NativeSession#execSQL
com.mysql.cj.protocol.a.NativeProtocol#sendQueryPacket

2.4 获取响应

向 MySQL 发送完数据之后,读取响应。

  1. 先读取响应的列数(即有多少个字段)。

com.mysql.cj.protocol.a.NativeProtocol#sendQueryPacket

public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults, ColumnDefinition cachedMetadata, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
    // ...
    // Send query command and sql query string
    NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);
    T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory); // 读取列数
    return rs;
}
  1. 读取具体每一行的数据(字段格局和字段内容)。

com.mysql.cj.protocol.a.NativeProtocol#readAllResults
com.mysql.cj.protocol.a.NativeProtocol#read
com.mysql.cj.protocol.a.TextResultsetReader#read

public Resultset read(int maxRows, boolean streamResults, NativePacketPayload resultPacket, ColumnDefinition metadata, ProtocolEntityFactory<Resultset, NativePacketPayload> resultSetFactory) throws IOException {
    Resultset rs = null;
    long columnCount = resultPacket.readInteger(IntegerDataType.INT_LENENC); // 获取字段的个数
    if (columnCount > 0) {
        // Build a result set with rows.

        // Read in the column information // 读取全副字段的类型信息
        ColumnDefinition cdef = this.protocol.read(ColumnDefinition.class, new ColumnDefinitionFactory(columnCount, metadata));

        // There is no EOF packet after fields when CLIENT_DEPRECATE_EOF is set
        if (!this.protocol.getServerSession().isEOFDeprecated()) {this.protocol.skipPacket();
            //this.protocol.readServerStatusForResultSets(this.protocol.readPacket(this.protocol.getReusablePacket()), true);
        }

        ResultsetRows rows = null;
        if (!streamResults) {TextRowFactory trf = new TextRowFactory(this.protocol, cdef, resultSetFactory.getResultSetConcurrency(), false);
            ArrayList<ResultsetRow> rowList = new ArrayList<>();

            // 顺次读取每一行数据,获取全副字段的值信息(二进制格局)ResultsetRow row = this.protocol.read(ResultsetRow.class, trf);
            while (row != null) {if ((maxRows == -1) || (rowList.size() < maxRows)) {rowList.add(row);
                }
                row = this.protocol.read(ResultsetRow.class, trf);
            }
            rows = new ResultsetRowsStatic(rowList, cdef); // 对象封装:ResultsetRow -> ResultsetRows

        } else {rows = new ResultsetRowsStreaming<>(this.protocol, cdef, false, resultSetFactory);
            this.protocol.setStreamingData(rows);
        }
        /*
         * Build ResultSet from ResultsetRows
         */
        rs = resultSetFactory.createFromProtocolEntity(rows); // 对象封装:ResultsetRows -> Resultset
    }
    return rs;
}        

2.4.1 读取字段信息

循环遍历每一列,对字段的类型信息进行读取和解析。

com.mysql.cj.protocol.a.NativeProtocol#read
com.mysql.cj.protocol.a.ColumnDefinitionReader#read

@Override
public ColumnDefinition read(ProtocolEntityFactory<ColumnDefinition, NativePacketPayload> sf) {ColumnDefinitionFactory cdf = (ColumnDefinitionFactory) sf;

    long columnCount = cdf.getColumnCount();
    ColumnDefinition cdef = cdf.getColumnDefinitionFromCache();

    if (cdef != null && !cdf.mergeColumnDefinitions()) {for (int i = 0; i < columnCount; i++) {this.protocol.skipPacket();
        }
        return cdef;
    }

    /* read the metadata from the server */
    Field[] fields = null;
    boolean checkEOF = !this.protocol.getServerSession().isEOFDeprecated();

    // Read in the column information

    fields = new Field[(int) columnCount];

    for (int i = 0; i < columnCount; i++) { // 循环遍历每一列,对字段的类型信息进行读取和解析
        NativePacketPayload fieldPacket = this.protocol.readMessage(null); // 读取二进制包
        // next check is needed for SSPS
        if (checkEOF && fieldPacket.isEOFPacket()) {break;}
        fields[i] = unpackField(fieldPacket, this.protocol.getServerSession().getCharacterSetMetadata()); // 解析二进制包
    }

    return cdf.createFromFields(fields);
}

例如,读取到 testdb 库的 t_student 表的 id 字段的信息,封装成 Field 类如下:

2.4.2 读取字段的值

从二进制数据包中,解析失去多个字段的值。

com.mysql.cj.protocol.a.ResultsetRowReader#read

@Override
public ResultsetRow read(ProtocolEntityFactory<ResultsetRow, NativePacketPayload> sf) throws IOException {AbstractRowFactory rf = (AbstractRowFactory) sf;
    NativePacketPayload rowPacket = null;
    NativePacketHeader hdr = this.protocol.getPacketReader().readHeader();

    // read the entire packet(s) // 读到一整行残缺的数据(二进制格局)rowPacket = this.protocol.getPacketReader()
            .readMessage(rf.canReuseRowPacketForBufferRow() ? Optional.ofNullable(this.protocol.getReusablePacket()) : Optional.empty(), hdr);
    this.protocol.checkErrorMessage(rowPacket);
    // Didn't read an error, so re-position to beginning of packet in order to read result set data
    rowPacket.setPosition(rowPacket.getPosition() - 1);

    // exit early with null if there's an EOF packet
    if (!this.protocol.getServerSession().isEOFDeprecated() && rowPacket.isEOFPacket()
            || this.protocol.getServerSession().isEOFDeprecated() && rowPacket.isResultSetOKPacket()) {this.protocol.readServerStatusForResultSets(rowPacket, true);
        return null;
    }

    return sf.createFromMessage(rowPacket); // 解析数据
}

例如 id 为 1 的表数据如下:


这里读取到的二进制内容如下右边所示:

01 31 05 68 75 6e 61 6e     . 1 . h u n a n
02 31 31 08 7a 68 61 6e     . 1 1 . z h a n
67 73 61 6e                 g s a n

解析二进制数据包:

com.mysql.cj.protocol.a.TextRowFactory#createFromMessage

@Override
public ResultsetRow createFromMessage(NativePacketPayload rowPacket) {// use a buffer row for reusable packets (streaming results), blobs and long strings
    // or if we're over the threshold
    boolean useBufferRow = this.canReuseRowPacketForBufferRow || this.columnDefinition.hasLargeFields()
            || rowPacket.getPayloadLength() >= this.useBufferRowSizeThreshold.getValue();

    if (this.resultSetConcurrency == Concurrency.UPDATABLE || !useBufferRow) {byte[][] rowBytes = new byte[this.columnDefinition.getFields().length][];

        for (int i = 0; i < this.columnDefinition.getFields().length; i++) { // 依据字段的个数,拆分二进制包
            rowBytes[i] = rowPacket.readBytes(StringSelfDataType.STRING_LENENC);
        }

        return new ByteArrayRow(rowBytes, this.exceptionInterceptor); // 将拆分后的二进制数据,存储在 ByteArrayRow 对象中。}

    return new TextBufferRow(rowPacket, this.columnDefinition, this.exceptionInterceptor, this.valueDecoder);
}

2.4.3 结构 Resultset

最初,回到 com.mysql.cj.protocol.a.TextResultsetReader#read 办法中。
将每一行数据解析后失去的 ResultsetRow 对象,存入汇合,寄存在 ResultsetRows 对象中。再依据 ResultsetRows 对象来结构 Resultset 对象。

即:ResultsetRow -> ResultsetRows -> Resultset

com.mysql.cj.jdbc.result.ResultSetFactory#createFromProtocolEntity
com.mysql.cj.jdbc.result.ResultSetFactory#createFromResultsetRows

public ResultSetImpl createFromResultsetRows(int resultSetConcurrency, int resultSetType, ResultsetRows rows) throws SQLException {

    ResultSetImpl rs;

    StatementImpl st = this.stmt;

    if (rows.getOwner() != null) {st = ((ResultSetImpl) rows.getOwner()).getOwningStatement();}

    switch (resultSetConcurrency) {
        case java.sql.ResultSet.CONCUR_UPDATABLE:
            rs = new UpdatableResultSet(rows, this.conn, st);
            break;

        default:
            // CONCUR_READ_ONLY
            rs = new ResultSetImpl(rows, this.conn, st); // 对象封装:ResultsetRows -> ResultSet
            break;
    }

    rs.setResultSetType(resultSetType);
    rs.setResultSetConcurrency(resultSetConcurrency);

    if (rows instanceof ResultsetRowsCursor && st != null) {rs.setFetchSize(st.getFetchSize());
    }
    return rs;
}

3. 总结

  1. JDBC 应用 SPI 机制加载数据库驱动,这是为了解决 BootstrapClassloader 无奈加载第三方的类的问题,将第三方的类委托给线程上下文类加载器来加载。
  2. 获取数据库连贯对象 Connection,实际上是对数据库建设 Socket 连贯。能够应用数据库连接池,以反复利用 Connection。
  3. 通过 Statement 来查问数据,底层是向 Socket 写入二进制数据,再从 Socket 读取二进制数据,封装在 Resultset 对象之中。

作者:Sumkor
链接:https://segmentfault.com/a/11…

正文完
 0