本篇文章是对Mybatis知识点的一个扩大,次要一起来钻研下Spring是如何来治理事务的。顺便再多聊一点其余的知识点,在学习的过程中次要带着以下问题来进行有目标的学习
而后最终来答复上面这些问题。
Mybatis是如何整合进Spring中的
- Spring如何晓得哪些接口是Mapper接口的?
- Mapper接口是如何变成Spring Bean的?
- Spring在哪里申明的SqlSession的实现逻辑?
- Spring中申明式事务的实现形式是怎么的?
- Spring中如何解决嵌套事务的?
- Spring中事务的传播方式是如何实现的?
https://cloud.tencent.com/dev...
一、如何整合进Spring中的
默认大家对Spring都比拟理解了,这里只说后果。都晓得接口是不能被实例化的,那么接口是如何成为Bean的呢?
1.1 如何晓得哪些是Mybatis的接口呢?
@MapperScan
Spring中在配置类上加上这个注解。依据源码能看到还导入了MapperScannerRegistrar
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)@Documented@Import(MapperScannerRegistrar.class)@Repeatable(MapperScans.class)public @interface MapperScan {}public class MapperScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {}
MapperScannerRegistrar
会在配置类解析时候拿到MapperScan
注解信息,并解析外面的参数。生成一个 MapperScannerConfigurer
信息。
从源码中能看到Mybatis的很多配置信息,都会被注入到MapperScannerConfigurer
中。
public class MapperScannerConfigurer implements BeanDefinitionRegistryPostProcessor, InitializingBean, ApplicationContextAware, BeanNameAware {}
实现自BeanDefinitionRegistryPostProcessor会前置,拿到MapperScan中的basePackage,最终通过ClassPathMapperScanner
扫描并增加到BeanDefinitionRegistry
中。
到这里这种形式就能晓得哪些是Mybatis中的Mapper接口了。
还有第二种形式当发现Spring容器中没有MapperScannerConfigurer
。会主动注入一个
会间接指定哪些类被Mapper润饰,就将他生成Bean。
好了,到这里就晓得如何来确定那些接口是要生成Mybatis接口的了。上面看下个问题。
1.2 Mapper接口是如何变成Spring Bean的?
接口是不能被实例化的,然而在Spring中如何想让接口实例化就能够应用 FactoryBean + 动静代理的形式,实现接口类的实例化。
- 首先利用 ClassPathBeanDefinitionScanner 找到合乎规定的类生成 BeanDefinition。
- 给 BeanDefinition 指定BeanClass,执行 FactoryBean 是
MapperFactoryBean
二、Spring在哪里申明的SqlSession的实现逻辑?
通过Mybatis的学习晓得SqlSession一共有2个包装类。SqlSessionManager和SqlSessionTemplate。那么SqlSession是在哪里指定用哪个的呢?
答案就在 MapperFactoryBean
public class MapperFactoryBean<T> extends SqlSessionDaoSupport implements FactoryBean<T> { private SqlSessionTemplate sqlSessionTemplate; public void setSqlSessionFactory(SqlSessionFactory sqlSessionFactory) { if (this.sqlSessionTemplate == null || sqlSessionFactory != this.sqlSessionTemplate.getSqlSessionFactory()) { this.sqlSessionTemplate = createSqlSessionTemplate(sqlSessionFactory); } } @SuppressWarnings("WeakerAccess") protected SqlSessionTemplate createSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { return new SqlSessionTemplate(sqlSessionFactory); }}
三、Spring中申明式事务的实现形式是怎么的
看了Mybatis中事务这一章节,晓得如果应用了SqlSessionTemplate,那么事务的权限就外包给了Spring。那么Spring中事务怎么解决的呢?
终于进入正题了。Spring中提供两种事务的能力。
- 申明式事务
- 编程式事务
3.1 申明式事务
应用 Transactional
润饰办法,其次要实现是应用切面实现。
TransactionAspectSupport#invokeWithinTransaction
。拦挡办法。获取事务管理器。
这里咱们先来思考下,通过后面的学习晓得事务的最底层实现是jdbc驱动来实现的。
那么切面中要想实现,就必须保障切面中的线程执行的数据库操作,肯定是同一个SqlSession
这样能力在办法失常执行时候做commit,异样时候做rollback操作。
那咱们看下他是如何保障切面中的数据库操作肯定是同一个SqlSession的吧。这部分逻辑就在 SqlSessionTemplate
中。
- 获取以后线程是否曾经有SqlSession了,如果有就间接应用,这样就保障在切面中的事务用的是同一个事务了。
3.2 编程式事务
TransactionTemplate#execute
编程是事务须要实现者本人来治理事务的,Spring提供的扩大接口类是 CallbackPreferringPlatformTransactionManager
。如果发现容器中默认的事务管理类是这个
就间接调动全局的这个事务管理办法。如果不是就本人来解决。这种设计的益处是,事务管理器既能够做关系型数据库的事务管理,也能够满足一些特定场景的事务管制(eg: 给Kafka的逻辑做一个事务管理)。
四、Spring中如何解决嵌套事务的?
什么是嵌套事务,举一个伪代码的例子。上面 saveUser
代码中有2个Mapper。然而有几个SqlSession呢?
UserMapper userMapper;RegistroyMapper registoryMapper;@Transactional(rollbackFor = {Throwable.class, RuntimeException.class, ExecutionException.class})public void save(User user){ userMapper.save(user);}@Transactional(rollbackFor = {Throwable.class, RuntimeException.class, ExecutionException.class})public void saveUser(String userName,Strign password){ User user = registoryMapper.regis(userName,password); save(user);}
通过下面的学习咱们理解到如果是Spring来治理的事务是一个线程对应一个SqlSession。所以说下面伪代码中的两个Mapper
其实是用的同一个SqlSession,这样能力保障是在同一个事务中。外围代码逻辑就在这里 SqlSessionUtils#getSqlSession
。
从Spring中的事务管理器中获取 SqlSession
。是否应用同一个事务,外包给Spring容器去托管。这就给Spring提供了很多能够施展的空间。
比如说流传机制等。
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED); notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED); SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory); SqlSession session = sessionHolder(executorType, holder); if (session != null) { return session; } LOGGER.debug(() -> "Creating a new SqlSession"); session = sessionFactory.openSession(executorType); registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session); return session; }
五、Spring中事务的传播方式是如何实现的?
传播方式 | 阐明 | 罕用 |
---|---|---|
TransactionDefinition.PROPAGATION_REQUIRED | 如果存在一个事务,则反对以后事务。如果没有事务则开启 | ✅ |
TransactionDefinition.PROPAGATION_SUPPORTS | 如果存在一个事务,反对以后事务。如果没有事务,则非事务的执行 | |
TransactionDefinition.PROPAGATION_MANDATORY | 如果曾经存在一个事务,反对以后事务。如果没有一个流动的事务,则抛出异样 | |
TransactionDefinition.PROPAGATION_NEVER | 总是非事务地执行,如果存在一个流动事务,则抛出异样 | |
TransactionDefinition.PROPAGATION_NOT_SUPPORTED | 总是非事务地执行,并挂起任何存在的事务 | |
TransactionDefinition.PROPAGATION_REQUIRES_NEW | 总是开启一个新的事务。如果一个事务曾经存在,则将这个存在的事务挂起。 | |
TransactionDefinition.PROPAGATION_NESTED | 如果一个流动的事务存在,则运行在一个嵌套的事务中. 如果没有流动事务则按TransactionDefinition.PROPAGATION_REQUIRED 属性执行 |
思考流传机制如何实现
首先咱们先思考下流传机制是如何实现的,因为咱们晓得 要保障是同一个事务,那么肯定是同一个SqlSession,这样能力保障是同一个事务
。
而如果要新开事务,就要先将以后线程绑定的SqlSession等事务信息,给挂起,那么是如何进行挂起的呢? SqlSession又是如何跟线程绑定的呢?
5.1 SqlSession是如何跟线程绑定的呢?
通过TransactionSynchronizationManager中的ThreadLocal跟线程绑定(new NamedThreadLocal<>("Transactional resources"))
。留神: 如果主线程下创立子线程是不能绑定上的。
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator, SqlSession session) { SqlSessionHolder holder = new SqlSessionHolder(session, executorType, exceptionTranslator); TransactionSynchronizationManager.bindResource(sessionFactory, holder); TransactionSynchronizationManager .registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory)); holder.setSynchronizedWithTransaction(true); holder.requested(); }
5.2 事务是如何嵌套的?
答案就在 TransactionAspectSupport#TransactionInfo
中。一个事务注解对应一个TransactionInfo,如果呈现嵌套
就会生成一个事务链。如下图一样。
当里层的事务处理实现后会执行清理动作,同时在将第一个的事务在进行复原跟线程绑定。
private void restoreThreadLocalStatus() { // Use stack to restore old transaction TransactionInfo. // Will be null if none was set. transactionInfoHolder.set(this.oldTransactionInfo); }
5.3 事务是如何挂起的?
后面晓得每一个 @Transaction
注解会对应一个 TransactionAspectSupport#TransactionInfo
。而事务挂起后,会先跟线程进行解绑。
而后挂起的事务 SuspendedResourcesHolder
会被增加在 TransactionStatus
中。
挂起的数据保留在哪里
protected final class TransactionInfo { // 事务管理器 @Nullable private final PlatformTransactionManager transactionManager; // 事务信息 @Nullable private final TransactionAttribute transactionAttribute; // 切面点 private final String joinpointIdentification; // DefaultTransactionStatus @Nullable private TransactionStatus transactionStatus; @Nullable private TransactionInfo oldTransactionInfo;}public class DefaultTransactionStatus extends AbstractTransactionStatus { @Nullable private final Object transaction; private final boolean newTransaction; private final boolean newSynchronization; private final boolean readOnly; private final boolean debug; @Nullable private final Object suspendedResources;}
如何进行挂起的
TransactionSynchronization 事务同步器,为了解决事务的传播方式
- suspend 暂定事务,将事务从以后线程上解绑
- resume 复原事务,将事务从新复原到以后线程上
- beforeCommit 触发提交事务,执行commit
- beforeCompletion 事务提交后
- afterCommit 提交后
- afterCompletion 实现后调用
SqlSessionSynchronization 也是跟以后线程绑定的
- 地位
TransactionSynchronizationManager#ThreadLocal<Set<TransactionSynchronization>> synchronizations
// 挂起时候,将SqlSessionHolder与以后线程进行解绑 @Override public void suspend() { if (this.holderActive) { LOGGER.debug(() -> "Transaction synchronization suspending SqlSession [" + this.holder.getSqlSession() + "]"); TransactionSynchronizationManager.unbindResource(this.sessionFactory); } } /** * 复原时候从新跟以后线程绑定 */ @Override public void resume() { if (this.holderActive) { LOGGER.debug(() -> "Transaction synchronization resuming SqlSession [" + this.holder.getSqlSession() + "]"); TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder); } }
5.4 传播方式具体实现
上面这段代码就是事务注解的切面解决类,Spring事务的所有逻辑和扩大反对都在这里。
TransactionAspectSupport#invokeWithinTransaction
首先咱们先看整体的逻辑
- 获取当切面上的
@Transaction
注解信息 - 依据注解信息找到指定的事务管理器,如果没有执行就应用默认的
- 生成事务信息
TransactionInfo
流传机制,事务挂起都在这个类上 - 失败执行回滚&胜利提交&如果是嵌套事务,从
TransactionInfo
中将挂起的事务从新跟线程进行绑定
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); // 获取被事务注解标记的事务信息 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 依据事务注解上指定的事务管理器名称,去零碎中获取,如果没有就拿零碎中默认的事务管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 切面拦挡点: com.alibaba.purchase.domain.replenish.impl.ReplenishDomainWriteServiceImpl.mockSave final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 这里只看关系型数据的的事务逻辑。CallbackPreferringPlatformTransactionManager是具备回调性质的事务管理器,多用于解决自定的事务 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 获取事务的信息,蕴含传播方式 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } commitTransactionAfterReturning(txInfo); return retVal; }}
这里只看流传机制吧。AbstractPlatformTransactionManager#handleExistingTransaction
- TransactionDefinition.PROPAGATION_NEVER 如果存在事务就报错
- TransactionDefinition.PROPAGATION_NOT_SUPPORTED 如果有事务,就挂起(以后事务跟线程解绑)。不应用事务进行执行。
- TransactionDefinition.PROPAGATION_REQUIRES_NEW 以后事务挂起,新开个事务。
/** * Create a TransactionStatus for an existing transaction. */ private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // TransactionDefinition.PROPAGATION_NEVER(总是非事务地执行,如果存在一个流动事务,则抛出异样)就间接阻断报错 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // TransactionDefinition.PROPAGATION_NOT_SUPPORTED 总是非事务地执行,并挂起任何存在的事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 数据暂存在TransactionSynchronizationManager#synchronizations同步器中 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 总是开启一个新的事务。如果一个事务曾经存在,则将这个存在的事务挂起。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // 如果有事务存在,则运行在一个嵌套的事务中. 如果没有流动事务则按TransactionDefinition.PROPAGATION_REQUIRED 属性执行 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 应用以后事务,并减少以后事务的一次援用。 status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. // 没有新建一个事务 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } // boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
5.5 嵌套事务如何晓得是否要提交
当两个Mapper中应用的是同一个SqlSession,那么会不会第二个事务在执行后,就间接commit了呢,此时第一个事务有一次commit。导致异样呢?
解决方案在这里 DefaultTransactionStatus
第二个事务状态中
- newTransaction = false
- newSynchronization = false
而上面代码中会做校验,只须要同步时候才会提交事务。
protected final void triggerBeforeCommit(DefaultTransactionStatus status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCommit synchronization"); } TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly()); }}
第一个事务状态中
- newTransaction = true
- newSynchronization = true
才会真正的去执行。
5.6 这样设计是否线程平安
线程平安只有在多线程环境下才会呈现。那么这里肯定会有多线程问题。而事务是跟线程进行绑定的,所以这里尽管有多线程然而不会有线程平安问题。
然而这里咱们看源码线程绑定时候应用的ThreadLocal,所以你在线程中创立子线程或者是线程中应用线程池,这里的事务都不会共享的。
本文由mdnice多平台公布