写在后面
在产品初期疾速迭代的过程中,往往为了疾速上线而占据市场,在后端开发的过程中往往不会过多的思考分布式和微服务,往往会将后端服务做成一个单体利用,而数据库也是一样,最后会把所有的业务数据都放到一个数据库中,即所谓的单实例数据库。随着业务的迅速倒退,将所有数据都放在一个数据库中曾经不足以撑持业务倒退的须要。此时,就会对系统进行分布式革新,而数据库业务进行分库分表的拆分。那么,问题来了,如何更好的拜访和治理拆分后的数据库呢?业界曾经有很多成熟的解决方案,其中,一个十分优良的解决方案就是:Apache ShardingSphere。明天,咱们就从源码级别来独特探讨下sharding-jdbc的外围源码。
sharding-jdbc经典用法
Sharding-Jdbc 是一个轻量级的分库分表框架,应用时最要害的是配置分库分表策略,其余的和应用一般的 MySQL 驱动一样,简直不必改代码。例如上面的代码片段。
try(DataSource dataSource = ShardingDataSourceFactory.createDataSource( createDataSourceMap(), shardingRuleConfig, new Properties()) { Connection connection = dataSource.getConnection(); ...}
咱们在程序中拿到Connection对象后,就能够像应用一般的JDBC一样来应用sharding-jdbc操作数据库了。
sharding-jdbc包构造
sharding-jdbc ├── sharding-jdbc-core 重写DataSource/Connection/Statement/ResultSet四大对象 └── sharding-jdbc-orchestration 配置核心sharding-core ├── sharding-core-api 接口和配置类 ├── sharding-core-common 通用分片策略实现... ├── sharding-core-entry SQL解析、路由、改写,外围类BaseShardingEngine ├── sharding-core-route SQL路由,外围类StatementRoutingEngine ├── sharding-core-rewrite SQL改写,外围类ShardingSQLRewriteEngine ├── sharding-core-execute SQL执行,外围类ShardingExecuteEngine └── sharding-core-merge 后果合并,外围类MergeEngineshardingsphere-sql-parser ├── shardingsphere-sql-parser-spi SQLParserEntry,用于初始化SQLParser ├── shardingsphere-sql-parser-engine SQL解析,外围类SQLParseEngine ├── shardingsphere-sql-parser-relation └── shardingsphere-sql-parser-mysql MySQL解析器,外围类MySQLParserEntry和MySQLParsershardingsphere-underlying 根底接口和api ├── shardingsphere-rewrite SQLRewriteEngine接口 ├── shardingsphere-execute QueryResult查问后果 └── shardingsphere-merge MergeEngine接口shardingsphere-spi SPI加载工具类sharding-transaction ├── sharding-transaction-core 接口ShardingTransactionManager,SPI加载 ├── sharding-transaction-2pc 实现类XAShardingTransactionManager └── sharding-transaction-base 实现类SeataATShardingTransactionManager
sharding-jdbc中的四大对象
所有的所有都从 ShardingDataSourceFactory 开始的,创立了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。
public static DataSource createDataSource( final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException { return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);}
阐明: 本文次要以 ShardingDataSource 为切入点剖析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。
DataSource
这里,波及到两个比拟重要的接口,一个是DataSource,一个是Connection。咱们首先来看下它们的类图。
- DataSource
- Connection
DataSource 和 Connection 都比较简单,没有解决过多的逻辑,只是 dataSourceMap, shardingRule 进行简略的封装。
ShardingDataSource 持有对数据源和分片规定,能够通过 getConnection 办法获取 ShardingConnection 连贯。
private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext( dataSourceMap, shardingRule, props, getDatabaseType());@Overridepublic final ShardingConnection getConnection() { return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());}
Connection
ShardingConnection 能够创立 Statement 和 PrepareStatement 两种运行形式,如下代码所示。
@Overridepublic Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);}@Overridepublic PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);}
阐明: ShardingConnection 次要是将创立 ShardingStatement 和 ShardingPreparedStatement 两个对象,次要的执行逻辑都在 Statement 对象中。另外,ShardingConnection 还有两个重要的性能,一个是获取真正的数据库连贯,一个是事务提交性能。
Statement
Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。咱们来看下Statement的类图
对于Statement,我就不做过多的形容了,置信应用过JDBC的小伙伴,对Statement都不生疏了。
ResultSet
ResultSet类图如下所示。
咱们从源码中能够看出:ShardingResultSet 只是对 MergedResult 的简略封装。
private final MergedResult mergeResultSet;@Overridepublic boolean next() throws SQLException { return mergeResultSet.next();}
sharding-jdbc-core外围剖析
ShardingStatement 外部有三个外围的类,一是 SimpleQueryShardingEngine 实现 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最初调用 MergeEngine 对后果进行合并解决。
ShardingStatement
初始化
private final ShardingConnection connection;private final StatementExecutor statementExecutor;public ShardingStatement(final ShardingConnection connection) { this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);}public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { super(Statement.class); this.connection = connection; statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);}
ShardingStatement 外部执行 SQL 委托给了 statementExecutor。
执行
(1)executeQuery 执行过程
@Overridepublic ResultSet executeQuery(final String sql) throws SQLException { ResultSet result; try { clearPrevious(); // 1. SQL 解析、路由、改写,最终生成 SQLRouteResult shard(sql); // 2. 生成执行打算 SQLRouteResult -> StatementExecuteUnit initStatementExecutor(); // 3. statementExecutor.executeQuery() 执行工作 MergeEngine mergeEngine = MergeEngineFactory.newInstance( connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), statementExecutor.executeQuery()); // 4. 后果合并 result = getResultSet(mergeEngine); } finally { currentResultSet = null; } currentResultSet = result; return result;}
(2)SQL 路由(包含 SQL 解析、路由、改写)
private SQLRouteResult sqlRouteResult;private void shard(final String sql) { ShardingRuntimeContext runtimeContext = connection.getRuntimeContext(); SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine( runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine()); sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());}
SimpleQueryShardingEngine 进行 SQL 路由(包含 SQL 解析、路由、改写),生成 SQLRouteResult,当 ShardingStatement 实现 SQL 的路由,生成 SQLRouteResult 后,剩下的执行工作就全副交给 StatementExecutor 实现。
StatementExecutor
StatementExecutor 外部封装了 SQL 工作的执行过程,包含:SqlExecutePrepareTemplate 类生成执行打算 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。
类构造
重要属性
AbstractStatementExecutor 类中重要的属性:
// SQLExecutePrepareTemplate用于生成执行打算StatementExecuteUnitprivate final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;// 保留生成的执行打算StatementExecuteUnitprivate final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups = new LinkedList<>();// SQLExecuteTemplate用于执行StatementExecuteUnitprivate final SQLExecuteTemplate sqlExecuteTemplate;// 保留查问后果private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
生成执行打算
// 执行前清理状态private void clearPrevious() throws SQLException { statementExecutor.clear();}// 执行时初始化private void initStatementExecutor() throws SQLException { statementExecutor.init(sqlRouteResult); replayMethodForStatements();}
这里,须要留神的是: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 从新初始化。
statementExecutor.init() 初始化次要是生成执行打算 StatementExecuteUnit。
public void init(final SQLRouteResult routeResult) throws SQLException { setSqlStatementContext(routeResult.getSqlStatementContext()); getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); cacheStatements();}private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups( final Collection<RouteUnit> routeUnits) throws SQLException { return getSqlExecutePrepareTemplate().getExecuteUnitGroups( routeUnits, new SQLExecutePrepareCallback() { // 获取连贯 @Override public List<Connection> getConnections( final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return StatementExecutor.super.getConnection().getConnections( connectionMode, dataSourceName, connectionSize); } // 生成执行打算RouteUnit -> StatementExecuteUnit @Override public StatementExecuteUnit createStatementExecuteUnit( final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit( routeUnit, connection.createStatement( getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()), connectionMode); } });}
SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行打算,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在工作执行时咱们会看到这个类。
工作执行
public List<QueryResult> executeQuery() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(sql, statement, connectionMode); } }; // 执行StatementExecuteUnit return executeCallback(executeCallback);}// sqlExecuteTemplate 执行 executeGroups(即StatementExecuteUnit)protected final <T> List<T> executeCallback( final SQLExecuteCallback<T> executeCallback) throws SQLException { // 执行所有的工作 StatementExecuteUnit List<T> result = sqlExecuteTemplate.executeGroup( (Collection) executeGroups, executeCallback); refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext); return result;}
SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 办法,最终调用 getQueryResult 办法。
private QueryResult getQueryResult(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { ResultSet resultSet = statement.executeQuery(sql); getResultSets().add(resultSet); return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);}
ConnectionMode 有两种模式:内存限度(MEMORY_STRICTLY)和连贯限度(CONNECTION_STRICTLY),如果一个连贯执行多个 StatementExecuteUnit 则为内存限度(MEMORY_STRICTLY),采纳流式解决,即 StreamQueryResult ,反之则为连贯限度(CONNECTION_STRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特地是在 Sharding-Proxy 中特地有用,防止将代理服务器撑爆。
重磅福利
微信搜一搜【冰河技术】微信公众号,关注这个有深度的程序员,每天浏览超硬核技术干货,公众号内回复【PDF】有我筹备的一线大厂面试材料和我原创的超硬核PDF技术文档,以及我为大家精心筹备的多套简历模板(不断更新中),心愿大家都能找到心仪的工作,学习是一条时而郁郁寡欢,时而开怀大笑的路,加油。如果你通过致力胜利进入到了心仪的公司,肯定不要懈怠放松,职场成长和新技术学习一样,逆水行舟。如果有幸咱们江湖再见!
另外,我开源的各个PDF,后续我都会继续更新和保护,感激大家长期以来对冰河的反对!!
写在最初
如果你感觉冰河写的还不错,请微信搜寻并关注「 冰河技术 」微信公众号,跟冰河学习高并发、分布式、微服务、大数据、互联网和云原生技术,「 冰河技术 」微信公众号更新了大量技术专题,每一篇技术文章干货满满!不少读者曾经通过浏览「 冰河技术 」微信公众号文章,吊打面试官,胜利跳槽到大厂;也有不少读者实现了技术上的飞跃,成为公司的技术骨干!如果你也想像他们一样晋升本人的能力,实现技术能力的飞跃,进大厂,升职加薪,那就关注「 冰河技术 」微信公众号吧,每天更新超硬核技术干货,让你对如何晋升技术能力不再迷茫!