共计 10081 个字符,预计需要花费 26 分钟才能阅读完成。
写在后面
在产品初期疾速迭代的过程中,往往为了疾速上线而占据市场,在后端开发的过程中往往不会过多的思考分布式和微服务,往往会将后端服务做成一个单体利用,而数据库也是一样,最后会把所有的业务数据都放到一个数据库中,即所谓的单实例数据库。随着业务的迅速倒退,将所有数据都放在一个数据库中曾经不足以撑持业务倒退的须要。此时,就会对系统进行分布式革新,而数据库业务进行分库分表的拆分。那么,问题来了,如何更好的拜访和治理拆分后的数据库呢?业界曾经有很多成熟的解决方案,其中,一个十分优良的解决方案就是:Apache ShardingSphere。明天,咱们就从源码级别来独特探讨下 sharding-jdbc 的外围源码。
sharding-jdbc 经典用法
Sharding-Jdbc 是一个轻量级的分库分表框架,应用时最要害的是配置分库分表策略,其余的和应用一般的 MySQL 驱动一样,简直不必改代码。例如上面的代码片段。
try(DataSource dataSource = ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties()) {Connection connection = dataSource.getConnection();
...
}
咱们在程序中拿到 Connection 对象后,就能够像应用一般的 JDBC 一样来应用 sharding-jdbc 操作数据库了。
sharding-jdbc 包构造
sharding-jdbc
├── sharding-jdbc-core 重写 DataSource/Connection/Statement/ResultSet 四大对象
└── sharding-jdbc-orchestration 配置核心
sharding-core
├── sharding-core-api 接口和配置类
├── sharding-core-common 通用分片策略实现...
├── sharding-core-entry SQL 解析、路由、改写,外围类 BaseShardingEngine
├── sharding-core-route SQL 路由,外围类 StatementRoutingEngine
├── sharding-core-rewrite SQL 改写,外围类 ShardingSQLRewriteEngine
├── sharding-core-execute SQL 执行,外围类 ShardingExecuteEngine
└── sharding-core-merge 后果合并,外围类 MergeEngine
shardingsphere-sql-parser
├── shardingsphere-sql-parser-spi SQLParserEntry,用于初始化 SQLParser
├── shardingsphere-sql-parser-engine SQL 解析,外围类 SQLParseEngine
├── shardingsphere-sql-parser-relation
└── shardingsphere-sql-parser-mysql MySQL 解析器,外围类 MySQLParserEntry 和 MySQLParser
shardingsphere-underlying 根底接口和 api
├── shardingsphere-rewrite SQLRewriteEngine 接口
├── shardingsphere-execute QueryResult 查问后果
└── shardingsphere-merge MergeEngine 接口
shardingsphere-spi SPI 加载工具类
sharding-transaction
├── sharding-transaction-core 接口 ShardingTransactionManager,SPI 加载
├── sharding-transaction-2pc 实现类 XAShardingTransactionManager
└── sharding-transaction-base 实现类 SeataATShardingTransactionManager
sharding-jdbc 中的四大对象
所有的所有都从 ShardingDataSourceFactory 开始的,创立了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。
public static DataSource createDataSource(
final Map<String, DataSource> dataSourceMap,
final ShardingRuleConfiguration shardingRuleConfig,
final Properties props) throws SQLException {
return new ShardingDataSource(dataSourceMap,
new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}
阐明: 本文次要以 ShardingDataSource 为切入点剖析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。
DataSource
这里,波及到两个比拟重要的接口,一个是 DataSource,一个是 Connection。咱们首先来看下它们的类图。
- DataSource
- Connection
DataSource 和 Connection 都比较简单,没有解决过多的逻辑,只是 dataSourceMap, shardingRule 进行简略的封装。
ShardingDataSource 持有对数据源和分片规定,能够通过 getConnection 办法获取 ShardingConnection 连贯。
private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
@Override
public final ShardingConnection getConnection() {return new ShardingConnection(getDataSourceMap(), runtimeContext,
TransactionTypeHolder.get());
}
Connection
ShardingConnection 能够创立 Statement 和 PrepareStatement 两种运行形式,如下代码所示。
@Override
public Statement createStatement(final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability) {
return new ShardingStatement(this, resultSetType,
resultSetConcurrency, resultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability)
throws SQLException {
return new ShardingPreparedStatement(this, sql, resultSetType,
resultSetConcurrency, resultSetHoldability);
}
阐明: ShardingConnection 次要是将创立 ShardingStatement 和 ShardingPreparedStatement 两个对象,次要的执行逻辑都在 Statement 对象中。另外,ShardingConnection 还有两个重要的性能,一个是获取真正的数据库连贯,一个是事务提交性能。
Statement
Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。咱们来看下 Statement 的类图
对于 Statement,我就不做过多的形容了,置信应用过 JDBC 的小伙伴,对 Statement 都不生疏了。
ResultSet
ResultSet 类图如下所示。
咱们从源码中能够看出:ShardingResultSet 只是对 MergedResult 的简略封装。
private final MergedResult mergeResultSet;
@Override
public boolean next() throws SQLException {return mergeResultSet.next();
}
sharding-jdbc-core 外围剖析
ShardingStatement 外部有三个外围的类,一是 SimpleQueryShardingEngine 实现 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最初调用 MergeEngine 对后果进行合并解决。
ShardingStatement
初始化
private final ShardingConnection connection;
private final StatementExecutor statementExecutor;
public ShardingStatement(final ShardingConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection connection, final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability) {super(Statement.class);
this.connection = connection;
statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
resultSetHoldability, connection);
}
ShardingStatement 外部执行 SQL 委托给了 statementExecutor。
执行
(1)executeQuery 执行过程
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
ResultSet result;
try {clearPrevious();
// 1. SQL 解析、路由、改写,最终生成 SQLRouteResult
shard(sql);
// 2. 生成执行打算 SQLRouteResult -> StatementExecuteUnit
initStatementExecutor();
// 3. statementExecutor.executeQuery() 执行工作
MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(),
connection.getRuntimeContext().getRule(), sqlRouteResult,
connection.getRuntimeContext().getMetaData().getRelationMetas(),
statementExecutor.executeQuery());
// 4. 后果合并
result = getResultSet(mergeEngine);
} finally {currentResultSet = null;}
currentResultSet = result;
return result;
}
(2)SQL 路由(包含 SQL 解析、路由、改写)
private SQLRouteResult sqlRouteResult;
private void shard(final String sql) {ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(runtimeContext.getRule(), runtimeContext.getProps(),
runtimeContext.getMetaData(), runtimeContext.getParseEngine());
sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}
SimpleQueryShardingEngine 进行 SQL 路由(包含 SQL 解析、路由、改写),生成 SQLRouteResult,当 ShardingStatement 实现 SQL 的路由,生成 SQLRouteResult 后,剩下的执行工作就全副交给 StatementExecutor 实现。
StatementExecutor
StatementExecutor 外部封装了 SQL 工作的执行过程,包含:SqlExecutePrepareTemplate 类生成执行打算 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。
类构造
重要属性
AbstractStatementExecutor 类中重要的属性:
// SQLExecutePrepareTemplate 用于生成执行打算 StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 保留生成的执行打算 StatementExecuteUnit
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
new LinkedList<>();
// SQLExecuteTemplate 用于执行 StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
// 保留查问后果
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
生成执行打算
// 执行前清理状态
private void clearPrevious() throws SQLException {statementExecutor.clear();
}
// 执行时初始化
private void initStatementExecutor() throws SQLException {statementExecutor.init(sqlRouteResult);
replayMethodForStatements();}
这里,须要留神的是: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 从新初始化。
statementExecutor.init() 初始化次要是生成执行打算 StatementExecuteUnit。
public void init(final SQLRouteResult routeResult) throws SQLException {setSqlStatementContext(routeResult.getSqlStatementContext());
getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
cacheStatements();}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {
// 获取连贯
@Override
public List<Connection> getConnections(
final ConnectionMode connectionMode,
final String dataSourceName, final int connectionSize)
throws SQLException {return StatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
}
// 生成执行打算 RouteUnit -> StatementExecuteUnit
@Override
public StatementExecuteUnit createStatementExecuteUnit(
final Connection connection, final RouteUnit routeUnit,
final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(
routeUnit, connection.createStatement(getResultSetType(), getResultSetConcurrency(),
getResultSetHoldability()), connectionMode);
}
});
}
SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行打算,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在工作执行时咱们会看到这个类。
工作执行
public List<QueryResult> executeQuery() throws SQLException {final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback =
new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement,
final ConnectionMode connectionMode) throws SQLException {return getQueryResult(sql, statement, connectionMode);
}
};
// 执行 StatementExecuteUnit
return executeCallback(executeCallback);
}
// sqlExecuteTemplate 执行 executeGroups(即 StatementExecuteUnit)
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
// 执行所有的工作 StatementExecuteUnit
List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
return result;
}
SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 办法,最终调用 getQueryResult 办法。
private QueryResult getQueryResult(final String sql, final Statement statement,
final ConnectionMode connectionMode) throws SQLException {ResultSet resultSet = statement.executeQuery(sql);
getResultSets().add(resultSet);
return ConnectionMode.MEMORY_STRICTLY == connectionMode
? new StreamQueryResult(resultSet)
: new MemoryQueryResult(resultSet);
}
ConnectionMode 有两种模式:内存限度 (MEMORY_STRICTLY) 和连贯限度(CONNECTION_STRICTLY),如果一个连贯执行多个 StatementExecuteUnit 则为内存限度(MEMORY_STRICTLY),采纳流式解决,即 StreamQueryResult,反之则为连贯限度(CONNECTION_STRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特地是在 Sharding-Proxy 中特地有用,防止将代理服务器撑爆。
重磅福利
微信搜一搜【冰河技术】微信公众号,关注这个有深度的程序员,每天浏览超硬核技术干货,公众号内回复【PDF】有我筹备的一线大厂面试材料和我原创的超硬核 PDF 技术文档,以及我为大家精心筹备的多套简历模板(不断更新中),心愿大家都能找到心仪的工作,学习是一条时而郁郁寡欢,时而开怀大笑的路,加油。如果你通过致力胜利进入到了心仪的公司,肯定不要懈怠放松,职场成长和新技术学习一样,逆水行舟。如果有幸咱们江湖再见!
另外,我开源的各个 PDF,后续我都会继续更新和保护,感激大家长期以来对冰河的反对!!
写在最初
如果你感觉冰河写的还不错,请微信搜寻并关注「冰河技术 」微信公众号,跟冰河学习高并发、分布式、微服务、大数据、互联网和云原生技术,「 冰河技术 」微信公众号更新了大量技术专题,每一篇技术文章干货满满!不少读者曾经通过浏览「 冰河技术 」微信公众号文章,吊打面试官,胜利跳槽到大厂;也有不少读者实现了技术上的飞跃,成为公司的技术骨干!如果你也想像他们一样晋升本人的能力,实现技术能力的飞跃,进大厂,升职加薪,那就关注「 冰河技术」微信公众号吧,每天更新超硬核技术干货,让你对如何晋升技术能力不再迷茫!