聊聊shardingjdbc的RootInvokeHook

32次阅读

共计 5811 个字符,预计需要花费 15 分钟才能阅读完成。

本文主要研究一下 sharding-jdbc 的 RootInvokeHook

RootInvokeHook

incubator-shardingsphere-4.0.0-RC1/sharding-core/sharding-core-execute/src/main/java/org/apache/shardingsphere/core/execute/hook/RootInvokeHook.java

public interface RootInvokeHook {
    
    /**
     * Handle when root invoke started.
     */
    void start();
    
    /**
     * Handle when root invoke finished.
     * 
     * @param connectionCount connection count
     */
    void finish(int connectionCount);
}
  • RootInvokeHook 定义了 start、finish 接口

SPIRootInvokeHook

incubator-shardingsphere-4.0.0-RC1/sharding-core/sharding-core-execute/src/main/java/org/apache/shardingsphere/core/execute/hook/SPIRootInvokeHook.java

public final class SPIRootInvokeHook implements RootInvokeHook {private final Collection<RootInvokeHook> rootInvokeHooks = NewInstanceServiceLoader.newServiceInstances(RootInvokeHook.class);
    
    static {NewInstanceServiceLoader.register(RootInvokeHook.class);
    }
    
    @Override
    public void start() {for (RootInvokeHook each : rootInvokeHooks) {each.start();
        }
    }
    
    @Override
    public void finish(final int connectionCount) {for (RootInvokeHook each : rootInvokeHooks) {each.finish(connectionCount);
        }
    }
}
  • SPIRootInvokeHook 实现了 RootInvokeHook 接口,它使用 NewInstanceServiceLoader 注册了 RootInvokeHook;rootInvokeHooks 集合由 NewInstanceServiceLoader.newServiceInstances 创建;start 方法遍历 rootInvokeHooks,执行其 start 方法;finish 方法则遍历 rootInvokeHooks,执行器 finish 方法

NewInstanceServiceLoader

incubator-shardingsphere-4.0.0-RC1/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/spi/NewInstanceServiceLoader.java

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NewInstanceServiceLoader {private static final Map<Class, Collection<Class<?>>> SERVICE_MAP = new HashMap<>();
    
    /**
     * Register SPI service into map for new instance.
     *
     * @param service service type
     * @param <T> type of service
     */
    public static <T> void register(final Class<T> service) {for (T each : ServiceLoader.load(service)) {registerServiceClass(service, each);
        }
    }
    
    @SuppressWarnings("unchecked")
    private static <T> void registerServiceClass(final Class<T> service, final T instance) {Collection<Class<?>> serviceClasses = SERVICE_MAP.get(service);
        if (null == serviceClasses) {serviceClasses = new LinkedHashSet<>();
        }
        serviceClasses.add(instance.getClass());
        SERVICE_MAP.put(service, serviceClasses);
    }
    
    /**
     * New service instances.
     *
     * @param service service class
     * @param <T> type of service
     * @return service instances
     */
    @SneakyThrows
    @SuppressWarnings("unchecked")
    public static <T> Collection<T> newServiceInstances(final Class<T> service) {Collection<T> result = new LinkedList<>();
        if (null == SERVICE_MAP.get(service)) {return result;}
        for (Class<?> each : SERVICE_MAP.get(service)) {result.add((T) each.newInstance());
        }
        return result;
    }
}
  • NewInstanceServiceLoader 的 register 方法会使用 ServiceLoader.load 加载指定 service 的实现类,然后调用 registerServiceClass 注册到 SERVICE_MAP;newServiceInstances 则找到指定 service 的实现类,然后挨个创建实例

OpenTracingRootInvokeHook

incubator-shardingsphere-4.0.0-RC1/sharding-opentracing/src/main/java/org/apache/shardingsphere/opentracing/hook/OpenTracingRootInvokeHook.java

public final class OpenTracingRootInvokeHook implements RootInvokeHook {
    
    public static final String ACTIVE_SPAN_CONTINUATION = "ACTIVE_SPAN_CONTINUATION";
    
    private static final String OPERATION_NAME = "/" + ShardingTags.COMPONENT_NAME + "/rootInvoke/";
    
    private ActiveSpan activeSpan;
    
    @Override
    public void start() {activeSpan = ShardingTracer.get().buildSpan(OPERATION_NAME).withTag(Tags.COMPONENT.getKey(), ShardingTags.COMPONENT_NAME).startActive();
        ShardingExecuteDataMap.getDataMap().put(ACTIVE_SPAN_CONTINUATION, activeSpan.capture());
    }
    
    @Override
    public void finish(final int connectionCount) {activeSpan.setTag(ShardingTags.CONNECTION_COUNT.getKey(), connectionCount).deactivate();}
}
  • OpenTracingRootInvokeHook 实现了 RootInvokeHook 接口,其 start 方法创建并启动 activeSpan;finish 方法则设置 CONNECTION_COUNT,然后标记 activeSpan 为 deactivate

AbstractConnectionAdapter

incubator-shardingsphere-4.0.0-RC1/sharding-jdbc/sharding-jdbc-core/src/main/java/org/apache/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java

@Getter
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
    
    private boolean autoCommit = true;
    
    private boolean readOnly = true;
    
    private volatile boolean closed;
    
    private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
    
    private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
    
    private final ForceExecuteTemplate<Entry<String, Connection>> forceExecuteTemplateForClose = new ForceExecuteTemplate<>();
    
    private final RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
    
    private final ShardingTransactionManager shardingTransactionManager;
    
    private ShardingTransactionManagerEngine shardingTransactionManagerEngine;
    
    private TransactionType transactionType;
    
    protected AbstractConnectionAdapter(final ShardingTransactionManagerEngine shardingTransactionManagerEngine, final TransactionType transactionType) {rootInvokeHook.start();
        this.transactionType = transactionType;
        this.shardingTransactionManagerEngine = shardingTransactionManagerEngine;
        shardingTransactionManager = shardingTransactionManagerEngine.getTransactionManager(transactionType);
    }

    //......

    public final void close() throws SQLException {
        closed = true;
        MasterVisitedManager.clear();
        TransactionTypeHolder.clear();
        int connectionSize = cachedConnections.size();
        try {forceExecuteTemplateForClose.execute(cachedConnections.entries(), new ForceExecuteCallback<Entry<String, Connection>>() {
        
                @Override
                public void execute(final Entry<String, Connection> cachedConnections) throws SQLException {cachedConnections.getValue().close();}
            });
        } finally {cachedConnections.clear();
            rootInvokeHook.finish(connectionSize);
        }
    }

    //......

}
  • AbstractConnectionAdapter 的构造器会执行 rootInvokeHook.start();其 close 方法会执行 rootInvokeHook.finish(connectionSize)

小结

RootInvokeHook 定义了 start、finish 接口;SPIRootInvokeHook 实现了 RootInvokeHook 接口,它使用 NewInstanceServiceLoader 注册了 RootInvokeHook;rootInvokeHooks 集合由 NewInstanceServiceLoader.newServiceInstances 创建;start 方法遍历 rootInvokeHooks,执行其 start 方法;finish 方法则遍历 rootInvokeHooks,执行器 finish 方法

doc

  • RootInvokeHook

正文完
 0