MyBatis 源码阅读之数据库连接

42次阅读

共计 13958 个字符,预计需要花费 35 分钟才能阅读完成。

MyBatis 源码阅读之数据库连接
MyBatis 的配置文件所有配置会被 org.apache.ibatis.builder.xml.XMLConfigBuilder 类读取,我们可以通过此类来了解各个配置是如何运作的。而 MyBatis 的映射文件配置会被 org.apache.ibatis.builder.xml.XMLMapperBuilder 类读取。我们可以通过此类来了解映射文件的配置时如何被解析的。
本文探讨 事务管理器 和 数据源 相关代码
配置
environment
以下是 mybatis 配置文件中 environments 节点的一般配置。
<!– mybatis-config.xml –>
<environments default=”development”>
<environment id=”development”>
<transactionManager type=”JDBC”>
<property name=”…” value=”…”/>
</transactionManager>
<dataSource type=”POOLED”>
<property name=”driver” value=”${driver}”/>
<property name=”url” value=”${url}”/>
<property name=”username” value=”${username}”/>
<property name=”password” value=”${password}”/>
</dataSource>
</environment>
</environments>
environments 节点的加载也不算复杂,它只会加载 id 为 development 属性值的 environment 节点。它的加载代码在 XMLConfigBuilder 类的 environmentsElement() 方法中,代码不多,逻辑也简单,此处不多讲。
TransactionManager
接下来我们看看 environment 节点下的子节点。transactionManager 节点的 type 值默认提供有 JDBC 和 MANAGED,dataSource 节点的 type 值默认提供有 JNDI、POOLED 和 UNPOOLED。它们对应的类都可以在 Configuration 类的构造器中找到,当然下面我们也一个一个来分析。
现在我们大概了解了配置,然后来分析这些配置与 MyBatis 类的关系。
TransactionFactory
transactionManager 节点对应 TransactionFactory 接口,使用了 抽象工厂模式。MyBatis 给我们提供了两个实现类:ManagedTransactionFactory 和 JdbcTransactionFactory,它们分别对应者 type 属性值为 MANAGED 和 JDBC。
TransactionFactory 有三个方法,我们需要注意的方法只有 newTransaction(),它用来创建一个事务对象。
void setProperties(Properties props);

Transaction newTransaction(Connection conn);

Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit);
其中 JdbcTransactionFactory 创建的事务对象是 JdbcTransaction 的实例,它是对 JDBC 事务的简单封装;ManagedTransactionFactory 创建的事务对象是 ManagedTransaction 的实例,它本身并不控制事务,即 commit 和 rollback 都是不做任何操作,而是交由 JavaEE 容器来控制事务,以方便集成。
DataSourceFactory
DataSourceFactory 是获取数据源的接口,也使用了 抽象工厂模式,代码如下,方法极为简单:
public interface DataSourceFactory {

/**
* 可传入一些属性配置
*/
void setProperties(Properties props);

DataSource getDataSource();
}
MyBatis 默认支持三种数据源,分别是 UNPOOLED、POOLED 和 JNDI。对应三个工厂类:UnpooledDataSourceFactory、PooledDataSourceFactory 和 JNDIDataSourceFactory。
其中 JNDIDataSourceFactory 是使用 JNDI 来获取数据源。我们很少使用,并且代码不是非常复杂,此处不讨论。我们先来看看 UnpooledDataSourceFactory:
public class UnpooledDataSourceFactory implements DataSourceFactory {

private static final String DRIVER_PROPERTY_PREFIX = “driver.”;
private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();

protected DataSource dataSource;

public UnpooledDataSourceFactory() {
this.dataSource = new UnpooledDataSource();
}

@Override
public void setProperties(Properties properties) {
Properties driverProperties = new Properties();
// MetaObject 用于解析实例对象的元信息,如字段的信息、方法的信息
MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
for (Object key : properties.keySet()) {
String propertyName = (String) key;
if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
// 添加驱动的配置属性
String value = properties.getProperty(propertyName);
driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
} else if (metaDataSource.hasSetter(propertyName)) {
// 为数据源添加配置属性
String value = (String) properties.get(propertyName);
Object convertedValue = convertValue(metaDataSource, propertyName, value);
metaDataSource.setValue(propertyName, convertedValue);
} else {
throw new DataSourceException(“Unknown DataSource property: ” + propertyName);
}
}
if (driverProperties.size() > 0) {
metaDataSource.setValue(“driverProperties”, driverProperties);
}
}

@Override
public DataSource getDataSource() {
return dataSource;
}

/**
* 将 String 类型的值转为目标对象字段的类型的值
*/
private Object convertValue(MetaObject metaDataSource, String propertyName, String value) {
Object convertedValue = value;
Class<?> targetType = metaDataSource.getSetterType(propertyName);
if (targetType == Integer.class || targetType == int.class) {
convertedValue = Integer.valueOf(value);
} else if (targetType == Long.class || targetType == long.class) {
convertedValue = Long.valueOf(value);
} else if (targetType == Boolean.class || targetType == boolean.class) {
convertedValue = Boolean.valueOf(value);
}
return convertedValue;
}
}
虽然代码看起来复杂,实际上非常简单,在创建工厂实例时创建它对应的 UnpooledDataSource 数据源。setProperties() 方法用于给数据源添加部分属性配置,convertValue() 方式时一个私有方法,就是处理 当 DataSource 的属性为整型或布尔类型时提供对字符串类型的转换功能而已。
最后我们看看 PooledDataSourceFactory,这个类非常简单,仅仅是继承了 UnpooledDataSourceFactory,然后构造方法替换数据源为 PooledDataSource。
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {

public PooledDataSourceFactory() {
this.dataSource = new PooledDataSource();
}
}
虽然它的代码极少,实际上都在 PooledDataSource 类中。
DataSource
看完了工厂类,我们来看看 MyBatis 提供的两种数据源类:UnpooledDataSource 和 PooledDataSource。
UnpooledDataSource
UnpooledDataSource 看名字就知道是没有池化的特征,相对也简单点,以下代码省略一些不重要的方法
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class UnpooledDataSource implements DataSource {

private ClassLoader driverClassLoader;
private Properties driverProperties;
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();

private String driver;
private String url;
private String username;
private String password;

private Boolean autoCommit;

// 事务隔离级别
private Integer defaultTransactionIsolationLevel;

static {
// 遍历所有可用驱动
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}

// ……

private Connection doGetConnection(Properties properties) throws SQLException {
// 每次获取连接都会检测驱动
initializeDriver();
Connection connection = DriverManager.getConnection(url, properties);
configureConnection(connection);
return connection;
}

/**
* 初始化驱动,这是一个 同步 方法
*/
private synchronized void initializeDriver() throws SQLException {
// 如果不包含驱动,则准备添加驱动
if (!registeredDrivers.containsKey(driver)) {
Class<?> driverType;
try {
// 加载驱动
if (driverClassLoader != null) {
driverType = Class.forName(driver, true, driverClassLoader);
} else {
driverType = Resources.classForName(driver);
}
Driver driverInstance = (Driver)driverType.newInstance();
// 注册驱动代理到 DriverManager
DriverManager.registerDriver(new DriverProxy(driverInstance));
// 缓存驱动
registeredDrivers.put(driver, driverInstance);
} catch (Exception e) {
throw new SQLException(“Error setting driver on UnpooledDataSource. Cause: ” + e);
}
}
}

private void configureConnection(Connection conn) throws SQLException {
// 设置是否自动提交事务
if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
conn.setAutoCommit(autoCommit);
}
// 设置 事务隔离级别
if (defaultTransactionIsolationLevel != null) {
conn.setTransactionIsolation(defaultTransactionIsolationLevel);
}
}

private static class DriverProxy implements Driver {
private Driver driver;

DriverProxy(Driver d) {
this.driver = d;
}

/**
* Driver 仅在 JDK7 中定义了本方法,用于返回本驱动的所有日志记录器的父记录器
* 个人也不是十分明确它的用法,毕竟很少会关注驱动的日志
*/
public Logger getParentLogger() {
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
}

// 其他方法均为调用 driver 对应的方法,此处省略
}
}
这里 DriverProxy 仅被注册到 DriverManager 中,这是一个注意点,我也不懂这种操作,有谁明白的可以留言相互讨论。这里的方法也不是非常复杂,我都已经标有注释,应该都可以看懂,不再细说。
以上便是 UnpooledDataSource 的初始化驱动和获取连接关键代码。
PooledDataSource
接下来我们来看最后一个类 PooledDataSource,它也是直接实现 DataSource,不过因为拥有池化的特性,它的代码复杂不少,当然效率比 UnpooledDataSource 会高出不少。
PooledDataSource 通过两个辅助类 PoolState 和 PooledConnection 来完成池化功能。PoolState 是记录连接池运行时的状态,定义了两个 PooledConnection 集合用于记录空闲连接和活跃连接。PooledConnection 内部定义了两个 Connection 分别表示一个真实连接和代理连接,还有一些其他字段用于记录一个连接的运行时状态。
先来详细了解一下 PooledConnection
/**
* 此处使用默认的访问权限
* 实现了 InvocationHandler
*/
class PooledConnection implements InvocationHandler {

private static final String CLOSE = “close”;
private static final Class<?>[] IFACES = new Class<?>[] {Connection.class};

/** hashCode() 方法返回 */
private final int hashCode;

private final Connection realConnection;

private final Connection proxyConnection;

// 省略 checkoutTimestamp、createdTimestamp、lastUsedTimestamp
private boolean valid;

/*
* Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in
*
* @param connection – the connection that is to be presented as a pooled connection
* @param dataSource – the dataSource that the connection is from
*/
public PooledConnection(Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}

/*
* 设置连接状态为不正常,不可使用
*/
public void invalidate() {
valid = false;
}

/*
* Method to see if the connection is usable
*
* @return True if the connection is usable
*/
public boolean isValid() {
return valid && realConnection != null && dataSource.pingConnection(this);
}

/**
* 自动上一次使用后经过的时间
*/
public long getTimeElapsedSinceLastUse() {
return System.currentTimeMillis() – lastUsedTimestamp;
}

/**
* 存活时间
*/
public long getAge() {
return System.currentTimeMillis() – createdTimestamp;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
// 对于 close() 方法,将连接放回池中
dataSource.pushConnection(this);
return null;
} else {
try {
if (!Object.class.equals(method.getDeclaringClass())) {
checkConnection();
}
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
}

private void checkConnection() throws SQLException {
if (!valid) {
throw new SQLException(“Error accessing PooledConnection. Connection is invalid.”);
}
}
}
本类实现了 InvocationHandler 接口,这个接口是用于 JDK 动态代理的,在这个类的构造器中 proxyConnection 就是创建了此代理对象。来看看 invoke() 方法,它拦截了 close() 方法,不再关闭连接,而是将其继续放入池中,然后其他已实现的方法则是每次调用都需要检测连接是否合法。
再来说说 PoolState 类,这个类没什么可说的,都是一些统计字段,没有复杂逻辑,不再讨论;注意该类是针对一个 PooledDataSource 对象统计的。也就是说 PoolState 的统计字段是关于整个数据源的,而一个 PooledConnection 则是针对单个连接的。
最后我们回过头来看 PooledDataSource 类,数据源的操作就只有两个,获取连接,释放连接,先来看看获取连接
public class PooledDataSource implements DataSource {

private final UnpooledDataSource dataSource;

@Override
public Connection getConnection() throws SQLException {
return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}

@Override
public Connection getConnection(String username, String password) throws SQLException {
return popConnection(username, password).getProxyConnection();
}

/**
* 获取一个连接
*/
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;

// conn == null 也可能是没有获得连接,被通知后再次走流程
while (conn == null) {
synchronized (state) {
// 是否存在空闲连接
if (!state.idleConnections.isEmpty()) {
// 池里存在空闲连接
conn = state.idleConnections.remove(0);
} else {
// 池里不存在空闲连接
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// 池里的激活连接数小于最大数,创建一个新的
conn = new PooledConnection(dataSource.getConnection(), this);
} else {
// 最坏的情况,无法获取连接

// 检测最早使用的连接是否超时
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// 使用超时连接,对超时连接的操作进行回滚
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
state.activeConnections.remove(oldestActiveConnection);
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
/*
* Just log a message for debug and continue to execute the following statement
* like nothing happened. Wrap the bad connection with a new PooledConnection,
* this will help to not interrupt current executing thread and give current
* thread a chance to join the next competition for another valid/good database
* connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug(“Bad connection. Could not roll back”);
}
}
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
oldestActiveConnection.invalidate();
} else {
// 等待可用连接
try {
if (!countedWait) {
state.hadToWaitCount++;
countedWait = true;
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait);
state.accumulatedWaitTime += System.currentTimeMillis() – wt;
} catch (InterruptedException e) {
break;
}
}
}
}
// 已获取连接
if (conn != null) {
// 检测连接是否可用
if (conn.isValid()) {
// 对之前的操作回滚
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
// 激活连接池数 +1
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() – t;
} else {
// 连接坏掉了,超过一定阈值则抛异常提醒
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections
+ poolMaximumLocalBadConnectionTolerance)) {
// 省略日志
throw new SQLException(
“PooledDataSource: Could not get a good connection to the database.”);
}
}
}
}

}

if (conn == null) {
// 省略日志
throw new SQLException(
“PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.”);
}

return conn;
}
}
上面的代码都已经加了注释,总体流程不算复杂:

while => 连接为空

能否直接从池里拿连接 => 可以则获取连接并返回
不能,查看池里的连接是否没满 => 没满则创建一个连接并返回
满了,查看池里最早的连接是否超时 => 超时则强制该连接回滚,然后获取该连接并返回
未超时,等待连接可用

检测连接是否可用

释放连接操作,更为简单,判断更少
protected void pushConnection(PooledConnection conn) throws SQLException {
// 同步操作
synchronized (state) {
// 从活动池中移除连接
state.activeConnections.remove(conn);
if (conn.isValid()) {
// 不超过空闲连接数 并且连接是同一类型的连接
if (state.idleConnections.size() < poolMaximumIdleConnections
&& conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
// 废弃原先的对象
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
// 该对象已经不能用于连接了
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug(“Returned connection ” + newConn.getRealHashCode() + ” to pool.”);
}
state.notifyAll();
} else {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
// 关闭连接
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug(“Closed connection ” + conn.getRealHashCode() + “.”);
}
conn.invalidate();
}
} else {
if (log.isDebugEnabled()) {
log.debug(“A bad connection (” + conn.getRealHashCode()
+ “) attempted to return to the pool, discarding connection.”);
}
state.badConnectionCount++;
}
}
}
部分码注释已添加,这里就说一下总体流程:

从活动池中移除连接

如果该连接可用

连接池未满,则连接放回池中
满了,回滚,关闭连接

总体流程大概就是这样
其他还有两个方法代码较多,但逻辑都很简单:

pingConnection() 执行一条 SQL 检测连接是否可用。

forceCloseAll() 回滚并关闭激活连接池和空闲连接池中的连接

正文完
 0