共计 6163 个字符,预计需要花费 16 分钟才能阅读完成。
序
本文主要研究一下 sharding-jdbc 的 SQLExecutionHook
SQLExecutionHook
incubator-shardingsphere-4.0.0-RC1/sharding-core/sharding-core-execute/src/main/java/org/apache/shardingsphere/core/execute/hook/SQLExecutionHook.java
public interface SQLExecutionHook {
/**
* Handle when SQL execution started.
*
* @param routeUnit route unit to be executed
* @param dataSourceMetaData data source meta data
* @param isTrunkThread is execution in trunk thread
* @param shardingExecuteDataMap sharding execute data map
*/
void start(RouteUnit routeUnit, DataSourceMetaData dataSourceMetaData, boolean isTrunkThread, Map<String, Object> shardingExecuteDataMap);
/**
* Handle when SQL execution finished success.
*/
void finishSuccess();
/**
* Handle when SQL execution finished failure.
*
* @param cause failure cause
*/
void finishFailure(Exception cause);
}
- SQLExecutionHook 接口定义了 start、finishSuccess、finishFailure 方法
SPISQLExecutionHook
incubator-shardingsphere-4.0.0-RC1/sharding-core/sharding-core-execute/src/main/java/org/apache/shardingsphere/core/execute/hook/SPISQLExecutionHook.java
public final class SPISQLExecutionHook implements SQLExecutionHook {private final Collection<SQLExecutionHook> sqlExecutionHooks = NewInstanceServiceLoader.newServiceInstances(SQLExecutionHook.class);
static {NewInstanceServiceLoader.register(SQLExecutionHook.class);
}
@Override
public void start(final RouteUnit routeUnit, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) {for (SQLExecutionHook each : sqlExecutionHooks) {each.start(routeUnit, dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
}
}
@Override
public void finishSuccess() {for (SQLExecutionHook each : sqlExecutionHooks) {each.finishSuccess();
}
}
@Override
public void finishFailure(final Exception cause) {for (SQLExecutionHook each : sqlExecutionHooks) {each.finishFailure(cause);
}
}
}
- SPISQLExecutionHook 实现了 SQLExecutionHook 接口;它使用 NewInstanceServiceLoader 注册了 SQLExecutionHook;sqlExecutionHooks 集合由 NewInstanceServiceLoader.newServiceInstances 创建;start 方法遍历 sqlExecutionHooks,执行其 start 方法;finishSuccess 方法则遍历 sqlExecutionHooks,执行其 finishSuccess 方法;finishFailure 方法则遍历 sqlExecutionHooks,执行其 finishFailure 方法
OpenTracingSQLExecutionHook
incubator-shardingsphere-4.0.0-RC1/sharding-opentracing/src/main/java/org/apache/shardingsphere/opentracing/hook/OpenTracingSQLExecutionHook.java
public final class OpenTracingSQLExecutionHook implements SQLExecutionHook {
private static final String OPERATION_NAME = "/" + ShardingTags.COMPONENT_NAME + "/executeSQL/";
private ActiveSpan activeSpan;
private Span span;
@Override
public void start(final RouteUnit routeUnit, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) {if (!isTrunkThread) {activeSpan = ((ActiveSpan.Continuation) shardingExecuteDataMap.get(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION)).activate();}
span = ShardingTracer.get().buildSpan(OPERATION_NAME)
.withTag(Tags.COMPONENT.getKey(), ShardingTags.COMPONENT_NAME)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.PEER_HOSTNAME.getKey(), dataSourceMetaData.getHostName())
.withTag(Tags.PEER_PORT.getKey(), dataSourceMetaData.getPort())
.withTag(Tags.DB_TYPE.getKey(), "sql")
.withTag(Tags.DB_INSTANCE.getKey(), routeUnit.getDataSourceName())
.withTag(Tags.DB_STATEMENT.getKey(), routeUnit.getSqlUnit().getSql())
.withTag(ShardingTags.DB_BIND_VARIABLES.getKey(), toString(routeUnit.getSqlUnit().getParameters())).startManual();}
private String toString(final List<Object> parameterSets) {return parameterSets.isEmpty() ? "": String.format("[%s]", Joiner.on(", ").join(parameterSets));
}
@Override
public void finishSuccess() {span.finish();
if (null != activeSpan) {activeSpan.deactivate();
}
}
@Override
public void finishFailure(final Exception cause) {ShardingErrorSpan.setError(span, cause);
span.finish();
if (null != activeSpan) {activeSpan.deactivate();
}
}
}
- OpenTracingSQLExecutionHook 实现了 SQLExecutionHook 接口,其 start 方法创建并启动 span、activeSpan;finishSuccess 及 finishFailure 方法都会执行 span.finish() 及 activeSpan.deactivate(),只是 finishFailure 则会标记 span 的 exception 信息
SQLExecuteCallback
incubator-shardingsphere-4.0.0-RC1/sharding-core/sharding-core-execute/src/main/java/org/apache/shardingsphere/core/execute/sql/execute/SQLExecuteCallback.java
@RequiredArgsConstructor
public abstract class SQLExecuteCallback<T> implements ShardingGroupExecuteCallback<StatementExecuteUnit, T> {
private final DatabaseType databaseType;
private final boolean isExceptionThrown;
@Override
public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread,
final Map<String, Object> shardingExecuteDataMap) throws SQLException {Collection<T> result = new LinkedList<>();
for (StatementExecuteUnit each : statementExecuteUnits) {result.add(execute0(each, isTrunkThread, shardingExecuteDataMap));
}
return result;
}
private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
DataSourceMetaData dataSourceMetaData = DataSourceMetaDataFactory.newInstance(databaseType, statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
try {sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
sqlExecutionHook.finishSuccess();
return result;
} catch (final SQLException ex) {sqlExecutionHook.finishFailure(ex);
ExecutorExceptionHandler.handleException(ex);
return null;
}
}
protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException;
}
- SQLExecuteCallback 的 execute0 方法在执行前创建 SPISQLExecutionHook,然后调用 sqlExecutionHook.start 方法,执行成功之后执行 sqlExecutionHook.finishSuccess 方法,捕获到 SQLException 则执行 sqlExecutionHook.finishFailure 方法
小结
SQLExecutionHook 接口定义了 start、finishSuccess、finishFailure 方法;SPISQLExecutionHook 实现了 SQLExecutionHook 接口;它使用 NewInstanceServiceLoader 注册了 SQLExecutionHook;sqlExecutionHooks 集合由 NewInstanceServiceLoader.newServiceInstances 创建;start 方法遍历 sqlExecutionHooks,执行其 start 方法;finishSuccess 方法则遍历 sqlExecutionHooks,执行其 finishSuccess 方法;finishFailure 方法则遍历 sqlExecutionHooks,执行其 finishFailure 方法
doc
- SQLExecutionHook