写在后面

在产品初期疾速迭代的过程中,往往为了疾速上线而占据市场,在后端开发的过程中往往不会过多的思考分布式和微服务,往往会将后端服务做成一个单体利用,而数据库也是一样,最后会把所有的业务数据都放到一个数据库中,即所谓的单实例数据库。随着业务的迅速倒退,将所有数据都放在一个数据库中曾经不足以撑持业务倒退的须要。此时,就会对系统进行分布式革新,而数据库业务进行分库分表的拆分。那么,问题来了,如何更好的拜访和治理拆分后的数据库呢?业界曾经有很多成熟的解决方案,其中,一个十分优良的解决方案就是:Apache ShardingSphere。明天,咱们就从源码级别来独特探讨下sharding-jdbc的外围源码。

sharding-jdbc经典用法

Sharding-Jdbc 是一个轻量级的分库分表框架,应用时最要害的是配置分库分表策略,其余的和应用一般的 MySQL 驱动一样,简直不必改代码。例如上面的代码片段。

try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(    createDataSourceMap(), shardingRuleConfig, new Properties()) {    Connection connection = dataSource.getConnection();    ...}

咱们在程序中拿到Connection对象后,就能够像应用一般的JDBC一样来应用sharding-jdbc操作数据库了。

sharding-jdbc包构造

sharding-jdbc      ├── sharding-jdbc-core      重写DataSource/Connection/Statement/ResultSet四大对象    └── sharding-jdbc-orchestration        配置核心sharding-core    ├── sharding-core-api       接口和配置类        ├── sharding-core-common    通用分片策略实现...    ├── sharding-core-entry     SQL解析、路由、改写,外围类BaseShardingEngine    ├── sharding-core-route     SQL路由,外围类StatementRoutingEngine    ├── sharding-core-rewrite   SQL改写,外围类ShardingSQLRewriteEngine    ├── sharding-core-execute   SQL执行,外围类ShardingExecuteEngine    └── sharding-core-merge     后果合并,外围类MergeEngineshardingsphere-sql-parser     ├── shardingsphere-sql-parser-spi       SQLParserEntry,用于初始化SQLParser    ├── shardingsphere-sql-parser-engine    SQL解析,外围类SQLParseEngine    ├── shardingsphere-sql-parser-relation    └── shardingsphere-sql-parser-mysql     MySQL解析器,外围类MySQLParserEntry和MySQLParsershardingsphere-underlying           根底接口和api    ├── shardingsphere-rewrite      SQLRewriteEngine接口    ├── shardingsphere-execute      QueryResult查问后果    └── shardingsphere-merge        MergeEngine接口shardingsphere-spi                  SPI加载工具类sharding-transaction    ├── sharding-transaction-core   接口ShardingTransactionManager,SPI加载            ├── sharding-transaction-2pc    实现类XAShardingTransactionManager    └── sharding-transaction-base   实现类SeataATShardingTransactionManager

sharding-jdbc中的四大对象

所有的所有都从 ShardingDataSourceFactory 开始的,创立了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。

public static DataSource createDataSource(        final Map<String, DataSource> dataSourceMap,        final ShardingRuleConfiguration shardingRuleConfig,        final Properties props) throws SQLException {    return new ShardingDataSource(dataSourceMap,               new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);}

阐明: 本文次要以 ShardingDataSource 为切入点剖析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。

DataSource

这里,波及到两个比拟重要的接口,一个是DataSource,一个是Connection。咱们首先来看下它们的类图。

  • DataSource
  • Connection

DataSource 和 Connection 都比较简单,没有解决过多的逻辑,只是 dataSourceMap, shardingRule 进行简略的封装。

ShardingDataSource 持有对数据源和分片规定,能够通过 getConnection 办法获取 ShardingConnection 连贯。

private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(                dataSourceMap, shardingRule, props, getDatabaseType());@Overridepublic final ShardingConnection getConnection() {    return new ShardingConnection(getDataSourceMap(), runtimeContext,            TransactionTypeHolder.get());}

Connection

ShardingConnection 能够创立 Statement 和 PrepareStatement 两种运行形式,如下代码所示。

@Overridepublic Statement createStatement(final int resultSetType,        final int resultSetConcurrency, final int resultSetHoldability) {    return new ShardingStatement(this, resultSetType,            resultSetConcurrency, resultSetHoldability);}@Overridepublic PreparedStatement prepareStatement(final String sql, final int resultSetType,        final int resultSetConcurrency, final int resultSetHoldability)        throws SQLException {    return new ShardingPreparedStatement(this, sql, resultSetType,            resultSetConcurrency, resultSetHoldability);}

阐明: ShardingConnection 次要是将创立 ShardingStatement 和 ShardingPreparedStatement 两个对象,次要的执行逻辑都在 Statement 对象中。另外,ShardingConnection 还有两个重要的性能,一个是获取真正的数据库连贯,一个是事务提交性能。

Statement

Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。咱们来看下Statement的类图

对于Statement,我就不做过多的形容了,置信应用过JDBC的小伙伴,对Statement都不生疏了。

ResultSet

ResultSet类图如下所示。

咱们从源码中能够看出:ShardingResultSet 只是对 MergedResult 的简略封装。

private final MergedResult mergeResultSet;@Overridepublic boolean next() throws SQLException {    return mergeResultSet.next();}

sharding-jdbc-core外围剖析

ShardingStatement 外部有三个外围的类,一是 SimpleQueryShardingEngine 实现 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最初调用 MergeEngine 对后果进行合并解决。

ShardingStatement

初始化

private final ShardingConnection connection;private final StatementExecutor statementExecutor;public ShardingStatement(final ShardingConnection connection) {    this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,            ResultSet.HOLD_CURSORS_OVER_COMMIT);}public ShardingStatement(final ShardingConnection connection, final int resultSetType,        final int resultSetConcurrency, final int resultSetHoldability) {    super(Statement.class);    this.connection = connection;    statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,            resultSetHoldability, connection);}

ShardingStatement 外部执行 SQL 委托给了 statementExecutor。

执行

(1)executeQuery 执行过程

@Overridepublic ResultSet executeQuery(final String sql) throws SQLException {    ResultSet result;    try {        clearPrevious();        // 1. SQL 解析、路由、改写,最终生成 SQLRouteResult        shard(sql);        // 2. 生成执行打算 SQLRouteResult -> StatementExecuteUnit        initStatementExecutor();        // 3. statementExecutor.executeQuery() 执行工作        MergeEngine mergeEngine = MergeEngineFactory.newInstance(                connection.getRuntimeContext().getDatabaseType(),                connection.getRuntimeContext().getRule(), sqlRouteResult,                connection.getRuntimeContext().getMetaData().getRelationMetas(),                statementExecutor.executeQuery());        // 4. 后果合并        result = getResultSet(mergeEngine);    } finally {        currentResultSet = null;    }    currentResultSet = result;    return result;}

(2)SQL 路由(包含 SQL 解析、路由、改写)

private SQLRouteResult sqlRouteResult;private void shard(final String sql) {    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(            runtimeContext.getRule(), runtimeContext.getProps(),            runtimeContext.getMetaData(), runtimeContext.getParseEngine());    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());}

SimpleQueryShardingEngine 进行 SQL 路由(包含 SQL 解析、路由、改写),生成 SQLRouteResult,当 ShardingStatement 实现 SQL 的路由,生成 SQLRouteResult 后,剩下的执行工作就全副交给 StatementExecutor 实现。

StatementExecutor

StatementExecutor 外部封装了 SQL 工作的执行过程,包含:SqlExecutePrepareTemplate 类生成执行打算 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。

类构造

重要属性

AbstractStatementExecutor 类中重要的属性:

// SQLExecutePrepareTemplate用于生成执行打算StatementExecuteUnitprivate final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;// 保留生成的执行打算StatementExecuteUnitprivate final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =            new LinkedList<>();// SQLExecuteTemplate用于执行StatementExecuteUnitprivate final SQLExecuteTemplate sqlExecuteTemplate;// 保留查问后果private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();

生成执行打算

// 执行前清理状态private void clearPrevious() throws SQLException {    statementExecutor.clear();}// 执行时初始化private void initStatementExecutor() throws SQLException {    statementExecutor.init(sqlRouteResult);    replayMethodForStatements();}

这里,须要留神的是: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 从新初始化。

statementExecutor.init() 初始化次要是生成执行打算 StatementExecuteUnit。

public void init(final SQLRouteResult routeResult) throws SQLException {    setSqlStatementContext(routeResult.getSqlStatementContext());    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));    cacheStatements();}private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(        final Collection<RouteUnit> routeUnits) throws SQLException {    return getSqlExecutePrepareTemplate().getExecuteUnitGroups(            routeUnits, new SQLExecutePrepareCallback() {                // 获取连贯                @Override                public List<Connection> getConnections(                        final ConnectionMode connectionMode,                        final String dataSourceName, final int connectionSize)                        throws SQLException {                    return StatementExecutor.super.getConnection().getConnections(                            connectionMode, dataSourceName, connectionSize);                }                // 生成执行打算RouteUnit -> StatementExecuteUnit                @Override                public StatementExecuteUnit createStatementExecuteUnit(                        final Connection connection, final RouteUnit routeUnit,                        final ConnectionMode connectionMode) throws SQLException {                    return new StatementExecuteUnit(                            routeUnit, connection.createStatement(                            getResultSetType(), getResultSetConcurrency(),                            getResultSetHoldability()), connectionMode);                }            });}

SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行打算,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在工作执行时咱们会看到这个类。

工作执行

public List<QueryResult> executeQuery() throws SQLException {    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();    SQLExecuteCallback<QueryResult> executeCallback =         new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {        @Override        protected QueryResult executeSQL(final String sql, final Statement statement,                final ConnectionMode connectionMode) throws SQLException {            return getQueryResult(sql, statement, connectionMode);        }    };    // 执行StatementExecuteUnit    return executeCallback(executeCallback);}// sqlExecuteTemplate 执行 executeGroups(即StatementExecuteUnit)protected final <T> List<T> executeCallback(        final SQLExecuteCallback<T> executeCallback) throws SQLException {    // 执行所有的工作 StatementExecuteUnit    List<T> result = sqlExecuteTemplate.executeGroup(            (Collection) executeGroups, executeCallback);    refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);    return result;}

SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 办法,最终调用 getQueryResult 办法。

private QueryResult getQueryResult(final String sql, final Statement statement,        final ConnectionMode connectionMode) throws SQLException {    ResultSet resultSet = statement.executeQuery(sql);    getResultSets().add(resultSet);    return ConnectionMode.MEMORY_STRICTLY == connectionMode            ? new StreamQueryResult(resultSet)            : new MemoryQueryResult(resultSet);}

ConnectionMode 有两种模式:内存限度(MEMORY_STRICTLY)和连贯限度(CONNECTION_STRICTLY),如果一个连贯执行多个 StatementExecuteUnit 则为内存限度(MEMORY_STRICTLY),采纳流式解决,即 StreamQueryResult ,反之则为连贯限度(CONNECTION_STRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特地是在 Sharding-Proxy 中特地有用,防止将代理服务器撑爆。

重磅福利

微信搜一搜【冰河技术】微信公众号,关注这个有深度的程序员,每天浏览超硬核技术干货,公众号内回复【PDF】有我筹备的一线大厂面试材料和我原创的超硬核PDF技术文档,以及我为大家精心筹备的多套简历模板(不断更新中),心愿大家都能找到心仪的工作,学习是一条时而郁郁寡欢,时而开怀大笑的路,加油。如果你通过致力胜利进入到了心仪的公司,肯定不要懈怠放松,职场成长和新技术学习一样,逆水行舟。如果有幸咱们江湖再见!

另外,我开源的各个PDF,后续我都会继续更新和保护,感激大家长期以来对冰河的反对!!

写在最初

如果你感觉冰河写的还不错,请微信搜寻并关注「 冰河技术 」微信公众号,跟冰河学习高并发、分布式、微服务、大数据、互联网和云原生技术,「 冰河技术 」微信公众号更新了大量技术专题,每一篇技术文章干货满满!不少读者曾经通过浏览「 冰河技术 」微信公众号文章,吊打面试官,胜利跳槽到大厂;也有不少读者实现了技术上的飞跃,成为公司的技术骨干!如果你也想像他们一样晋升本人的能力,实现技术能力的飞跃,进大厂,升职加薪,那就关注「 冰河技术 」微信公众号吧,每天更新超硬核技术干货,让你对如何晋升技术能力不再迷茫!