关于中间件:面试官问我看过shardingjdbc的源码吗我吧啦吧啦说了一通

40次阅读

共计 10081 个字符,预计需要花费 26 分钟才能阅读完成。

写在后面

在产品初期疾速迭代的过程中,往往为了疾速上线而占据市场,在后端开发的过程中往往不会过多的思考分布式和微服务,往往会将后端服务做成一个单体利用,而数据库也是一样,最后会把所有的业务数据都放到一个数据库中,即所谓的单实例数据库。随着业务的迅速倒退,将所有数据都放在一个数据库中曾经不足以撑持业务倒退的须要。此时,就会对系统进行分布式革新,而数据库业务进行分库分表的拆分。那么,问题来了,如何更好的拜访和治理拆分后的数据库呢?业界曾经有很多成熟的解决方案,其中,一个十分优良的解决方案就是:Apache ShardingSphere。明天,咱们就从源码级别来独特探讨下 sharding-jdbc 的外围源码。

sharding-jdbc 经典用法

Sharding-Jdbc 是一个轻量级的分库分表框架,应用时最要害的是配置分库分表策略,其余的和应用一般的 MySQL 驱动一样,简直不必改代码。例如上面的代码片段。

try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties()) {Connection connection = dataSource.getConnection();
    ...
}

咱们在程序中拿到 Connection 对象后,就能够像应用一般的 JDBC 一样来应用 sharding-jdbc 操作数据库了。

sharding-jdbc 包构造

sharding-jdbc  
    ├── sharding-jdbc-core      重写 DataSource/Connection/Statement/ResultSet 四大对象
    └── sharding-jdbc-orchestration        配置核心
sharding-core
    ├── sharding-core-api       接口和配置类    
    ├── sharding-core-common    通用分片策略实现...
    ├── sharding-core-entry     SQL 解析、路由、改写,外围类 BaseShardingEngine
    ├── sharding-core-route     SQL 路由,外围类 StatementRoutingEngine
    ├── sharding-core-rewrite   SQL 改写,外围类 ShardingSQLRewriteEngine
    ├── sharding-core-execute   SQL 执行,外围类 ShardingExecuteEngine
    └── sharding-core-merge     后果合并,外围类 MergeEngine
shardingsphere-sql-parser 
    ├── shardingsphere-sql-parser-spi       SQLParserEntry,用于初始化 SQLParser
    ├── shardingsphere-sql-parser-engine    SQL 解析,外围类 SQLParseEngine
    ├── shardingsphere-sql-parser-relation
    └── shardingsphere-sql-parser-mysql     MySQL 解析器,外围类 MySQLParserEntry 和 MySQLParser
shardingsphere-underlying           根底接口和 api
    ├── shardingsphere-rewrite      SQLRewriteEngine 接口
    ├── shardingsphere-execute      QueryResult 查问后果
    └── shardingsphere-merge        MergeEngine 接口
shardingsphere-spi                  SPI 加载工具类
sharding-transaction
    ├── sharding-transaction-core   接口 ShardingTransactionManager,SPI 加载        
    ├── sharding-transaction-2pc    实现类 XAShardingTransactionManager
    └── sharding-transaction-base   实现类 SeataATShardingTransactionManager

sharding-jdbc 中的四大对象

所有的所有都从 ShardingDataSourceFactory 开始的,创立了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。

public static DataSource createDataSource(
        final Map<String, DataSource> dataSourceMap,
        final ShardingRuleConfiguration shardingRuleConfig,
        final Properties props) throws SQLException {
    return new ShardingDataSource(dataSourceMap,
               new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}

阐明: 本文次要以 ShardingDataSource 为切入点剖析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。

DataSource

这里,波及到两个比拟重要的接口,一个是 DataSource,一个是 Connection。咱们首先来看下它们的类图。

  • DataSource
  • Connection

DataSource 和 Connection 都比较简单,没有解决过多的逻辑,只是 dataSourceMap, shardingRule 进行简略的封装。

ShardingDataSource 持有对数据源和分片规定,能够通过 getConnection 办法获取 ShardingConnection 连贯。

private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
@Override
public final ShardingConnection getConnection() {return new ShardingConnection(getDataSourceMap(), runtimeContext,
            TransactionTypeHolder.get());
}

Connection

ShardingConnection 能够创立 Statement 和 PrepareStatement 两种运行形式,如下代码所示。

@Override
public Statement createStatement(final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    return new ShardingStatement(this, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability)
        throws SQLException {
    return new ShardingPreparedStatement(this, sql, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

阐明: ShardingConnection 次要是将创立 ShardingStatement 和 ShardingPreparedStatement 两个对象,次要的执行逻辑都在 Statement 对象中。另外,ShardingConnection 还有两个重要的性能,一个是获取真正的数据库连贯,一个是事务提交性能。

Statement

Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。咱们来看下 Statement 的类图

对于 Statement,我就不做过多的形容了,置信应用过 JDBC 的小伙伴,对 Statement 都不生疏了。

ResultSet

ResultSet 类图如下所示。

咱们从源码中能够看出:ShardingResultSet 只是对 MergedResult 的简略封装。

private final MergedResult mergeResultSet;
@Override
public boolean next() throws SQLException {return mergeResultSet.next();
}

sharding-jdbc-core 外围剖析

ShardingStatement 外部有三个外围的类,一是 SimpleQueryShardingEngine 实现 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最初调用 MergeEngine 对后果进行合并解决。

ShardingStatement

初始化

private final ShardingConnection connection;
private final StatementExecutor statementExecutor;

public ShardingStatement(final ShardingConnection connection) {
    this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
            ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public ShardingStatement(final ShardingConnection connection, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {super(Statement.class);
    this.connection = connection;
    statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
            resultSetHoldability, connection);
}

ShardingStatement 外部执行 SQL 委托给了 statementExecutor。

执行

(1)executeQuery 执行过程

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
    ResultSet result;
    try {clearPrevious();
        // 1. SQL 解析、路由、改写,最终生成 SQLRouteResult
        shard(sql);
        // 2. 生成执行打算 SQLRouteResult -> StatementExecuteUnit
        initStatementExecutor();
        // 3. statementExecutor.executeQuery() 执行工作
        MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(),
                connection.getRuntimeContext().getRule(), sqlRouteResult,
                connection.getRuntimeContext().getMetaData().getRelationMetas(),
                statementExecutor.executeQuery());
        // 4. 后果合并
        result = getResultSet(mergeEngine);
    } finally {currentResultSet = null;}
    currentResultSet = result;
    return result;
}

(2)SQL 路由(包含 SQL 解析、路由、改写)

private SQLRouteResult sqlRouteResult;
private void shard(final String sql) {ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(runtimeContext.getRule(), runtimeContext.getProps(),
            runtimeContext.getMetaData(), runtimeContext.getParseEngine());
    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}

SimpleQueryShardingEngine 进行 SQL 路由(包含 SQL 解析、路由、改写),生成 SQLRouteResult,当 ShardingStatement 实现 SQL 的路由,生成 SQLRouteResult 后,剩下的执行工作就全副交给 StatementExecutor 实现。

StatementExecutor

StatementExecutor 外部封装了 SQL 工作的执行过程,包含:SqlExecutePrepareTemplate 类生成执行打算 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。

类构造

重要属性

AbstractStatementExecutor 类中重要的属性:

// SQLExecutePrepareTemplate 用于生成执行打算 StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 保留生成的执行打算 StatementExecuteUnit
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
            new LinkedList<>();

// SQLExecuteTemplate 用于执行 StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
// 保留查问后果
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();

生成执行打算

// 执行前清理状态
private void clearPrevious() throws SQLException {statementExecutor.clear();
}
// 执行时初始化
private void initStatementExecutor() throws SQLException {statementExecutor.init(sqlRouteResult);
    replayMethodForStatements();}

这里,须要留神的是: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 从新初始化。

statementExecutor.init() 初始化次要是生成执行打算 StatementExecuteUnit。

public void init(final SQLRouteResult routeResult) throws SQLException {setSqlStatementContext(routeResult.getSqlStatementContext());
    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
    cacheStatements();}

private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {
                // 获取连贯
                @Override
                public List<Connection> getConnections(
                        final ConnectionMode connectionMode,
                        final String dataSourceName, final int connectionSize)
                        throws SQLException {return StatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
                }

                // 生成执行打算 RouteUnit -> StatementExecuteUnit
                @Override
                public StatementExecuteUnit createStatementExecuteUnit(
                        final Connection connection, final RouteUnit routeUnit,
                        final ConnectionMode connectionMode) throws SQLException {
                    return new StatementExecuteUnit(
                            routeUnit, connection.createStatement(getResultSetType(), getResultSetConcurrency(),
                            getResultSetHoldability()), connectionMode);
                }
            });
}

SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行打算,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在工作执行时咱们会看到这个类。

工作执行

public List<QueryResult> executeQuery() throws SQLException {final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<QueryResult> executeCallback = 
        new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
        @Override
        protected QueryResult executeSQL(final String sql, final Statement statement,
                final ConnectionMode connectionMode) throws SQLException {return getQueryResult(sql, statement, connectionMode);
        }
    };
    // 执行 StatementExecuteUnit
    return executeCallback(executeCallback);
}

// sqlExecuteTemplate 执行 executeGroups(即 StatementExecuteUnit)
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
    // 执行所有的工作 StatementExecuteUnit
    List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
    refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
    return result;
}

SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 办法,最终调用 getQueryResult 办法。

private QueryResult getQueryResult(final String sql, final Statement statement,
        final ConnectionMode connectionMode) throws SQLException {ResultSet resultSet = statement.executeQuery(sql);
    getResultSets().add(resultSet);
    return ConnectionMode.MEMORY_STRICTLY == connectionMode
            ? new StreamQueryResult(resultSet)
            : new MemoryQueryResult(resultSet);
}

ConnectionMode 有两种模式:内存限度 (MEMORY_STRICTLY) 和连贯限度(CONNECTION_STRICTLY),如果一个连贯执行多个 StatementExecuteUnit 则为内存限度(MEMORY_STRICTLY),采纳流式解决,即 StreamQueryResult,反之则为连贯限度(CONNECTION_STRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特地是在 Sharding-Proxy 中特地有用,防止将代理服务器撑爆。

重磅福利

微信搜一搜【冰河技术】微信公众号,关注这个有深度的程序员,每天浏览超硬核技术干货,公众号内回复【PDF】有我筹备的一线大厂面试材料和我原创的超硬核 PDF 技术文档,以及我为大家精心筹备的多套简历模板(不断更新中),心愿大家都能找到心仪的工作,学习是一条时而郁郁寡欢,时而开怀大笑的路,加油。如果你通过致力胜利进入到了心仪的公司,肯定不要懈怠放松,职场成长和新技术学习一样,逆水行舟。如果有幸咱们江湖再见!

另外,我开源的各个 PDF,后续我都会继续更新和保护,感激大家长期以来对冰河的反对!!

写在最初

如果你感觉冰河写的还不错,请微信搜寻并关注「冰河技术 」微信公众号,跟冰河学习高并发、分布式、微服务、大数据、互联网和云原生技术,「 冰河技术 」微信公众号更新了大量技术专题,每一篇技术文章干货满满!不少读者曾经通过浏览「 冰河技术 」微信公众号文章,吊打面试官,胜利跳槽到大厂;也有不少读者实现了技术上的飞跃,成为公司的技术骨干!如果你也想像他们一样晋升本人的能力,实现技术能力的飞跃,进大厂,升职加薪,那就关注「 冰河技术」微信公众号吧,每天更新超硬核技术干货,让你对如何晋升技术能力不再迷茫!

正文完
 0