关于程序员:第09篇Spring声明式事务的实现方式

40次阅读

共计 14283 个字符,预计需要花费 36 分钟才能阅读完成。

本篇文章是对 Mybatis 知识点的一个扩大, 次要一起来钻研下 Spring 是如何来治理事务的。顺便再多聊一点其余的知识点, 在学习的过程中次要带着以下问题来进行有目标的学习
而后最终来答复上面这些问题。

  1. Mybatis 是如何整合进 Spring 中的

    • Spring 如何晓得哪些接口是 Mapper 接口的?
    • Mapper 接口是如何变成 Spring Bean 的?
  2. Spring 在哪里申明的 SqlSession 的实现逻辑?
  3. Spring 中申明式事务的实现形式是怎么的?
  4. Spring 中如何解决嵌套事务的?
  5. Spring 中事务的传播方式是如何实现的?

https://cloud.tencent.com/dev…

一、如何整合进 Spring 中的

默认大家对 Spring 都比拟理解了, 这里只说后果。都晓得接口是不能被实例化的, 那么接口是如何成为 Bean 的呢?

1.1 如何晓得哪些是 Mybatis 的接口呢?

  • @MapperScan Spring 中在配置类上加上这个注解。依据源码能看到还导入了MapperScannerRegistrar
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(MapperScannerRegistrar.class)
@Repeatable(MapperScans.class)
public @interface MapperScan {}

public class MapperScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {}

MapperScannerRegistrar 会在配置类解析时候拿到 MapperScan 注解信息, 并解析外面的参数。生成一个 MapperScannerConfigurer 信息。
从源码中能看到 Mybatis 的很多配置信息, 都会被注入到 MapperScannerConfigurer 中。

public class MapperScannerConfigurer
    implements BeanDefinitionRegistryPostProcessor, InitializingBean, ApplicationContextAware, BeanNameAware {}

实现自 BeanDefinitionRegistryPostProcessor 会前置, 拿到 MapperScan 中的 basePackage, 最终通过 ClassPathMapperScanner 扫描并增加到
BeanDefinitionRegistry 中。

到这里这种形式就能晓得哪些是 Mybatis 中的 Mapper 接口了。

还有第二种形式当发现 Spring 容器中没有MapperScannerConfigurer。会主动注入一个

会间接指定哪些类被 Mapper 润饰, 就将他生成 Bean。

好了,到这里就晓得如何来确定那些接口是要生成 Mybatis 接口的了。上面看下个问题。

1.2 Mapper 接口是如何变成 Spring Bean 的?

接口是不能被实例化的,然而在 Spring 中如何想让接口实例化就能够应用 FactoryBean + 动静代理的形式,实现接口类的实例化。

  • 首先利用 ClassPathBeanDefinitionScanner 找到合乎规定的类生成 BeanDefinition。
  • 给 BeanDefinition 指定 BeanClass, 执行 FactoryBean 是 MapperFactoryBean

二、Spring 在哪里申明的 SqlSession 的实现逻辑?

通过 Mybatis 的学习晓得 SqlSession 一共有 2 个包装类。SqlSessionManager 和 SqlSessionTemplate。那么 SqlSession 是在哪里指定用哪个的呢?
答案就在 MapperFactoryBean

public class MapperFactoryBean<T> extends SqlSessionDaoSupport implements FactoryBean<T> {
  private SqlSessionTemplate sqlSessionTemplate;
   
     public void setSqlSessionFactory(SqlSessionFactory sqlSessionFactory) {if (this.sqlSessionTemplate == null || sqlSessionFactory != this.sqlSessionTemplate.getSqlSessionFactory()) {this.sqlSessionTemplate = createSqlSessionTemplate(sqlSessionFactory);
       }
     }
   
     @SuppressWarnings("WeakerAccess")
     protected SqlSessionTemplate createSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {return new SqlSessionTemplate(sqlSessionFactory);
     }
}

三、Spring 中申明式事务的实现形式是怎么的

看了 Mybatis 中事务这一章节, 晓得如果应用了 SqlSessionTemplate, 那么事务的权限就外包给了 Spring。那么 Spring 中事务怎么解决的呢?
终于进入正题了。Spring 中提供两种事务的能力。

  • 申明式事务
  • 编程式事务

3.1 申明式事务

应用 Transactional 润饰办法,其次要实现是应用切面实现。

  • TransactionAspectSupport#invokeWithinTransaction。拦挡办法。获取事务管理器。

这里咱们先来思考下, 通过后面的学习晓得事务的最底层实现是 jdbc 驱动来实现的。

那么切面中要想实现,就必须保障切面中的线程执行的数据库操作,肯定是同一个 SqlSession 这样能力在办法失常执行时候做 commit,异样时候做 rollback 操作。

那咱们看下他是如何保障切面中的数据库操作肯定是同一个 SqlSession 的吧。这部分逻辑就在 SqlSessionTemplate 中。

  • 获取以后线程是否曾经有 SqlSession 了,如果有就间接应用,这样就保障在切面中的事务用的是同一个事务了。

3.2 编程式事务

  • TransactionTemplate#execute

编程是事务须要实现者本人来治理事务的,Spring 提供的扩大接口类是 CallbackPreferringPlatformTransactionManager。如果发现容器中默认的事务管理类是这个
就间接调动全局的这个事务管理办法。如果不是就本人来解决。这种设计的益处是, 事务管理器既能够做关系型数据库的事务管理, 也能够满足一些特定场景的事务管制(eg: 给 Kafka 的逻辑做一个事务管理)。

四、Spring 中如何解决嵌套事务的?

什么是嵌套事务, 举一个伪代码的例子。上面 saveUser 代码中有 2 个 Mapper。然而有几个 SqlSession 呢?

UserMapper userMapper;

RegistroyMapper registoryMapper;

@Transactional(rollbackFor = {Throwable.class, RuntimeException.class, ExecutionException.class})
public void save(User user){userMapper.save(user);
}

@Transactional(rollbackFor = {Throwable.class, RuntimeException.class, ExecutionException.class})
public void saveUser(String userName,Strign password){User user = registoryMapper.regis(userName,password);
   save(user);
}

通过下面的学习咱们理解到如果是 Spring 来治理的事务是一个线程对应一个 SqlSession。所以说下面伪代码中的两个 Mapper
其实是用的同一个 SqlSession, 这样能力保障是在同一个事务中。外围代码逻辑就在这里 SqlSessionUtils#getSqlSession
从 Spring 中的事务管理器中获取 SqlSession。是否应用同一个事务,外包给 Spring 容器去托管。这就给 Spring 提供了很多能够施展的空间。
比如说流传机制等。

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator) {notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
    notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);

    SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);

    SqlSession session = sessionHolder(executorType, holder);
    if (session != null) {return session;}

    LOGGER.debug(() -> "Creating a new SqlSession");
    session = sessionFactory.openSession(executorType);

    registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);

    return session;
  }

五、Spring 中事务的传播方式是如何实现的?

传播方式 阐明 罕用
TransactionDefinition.PROPAGATION_REQUIRED 如果存在一个事务,则反对以后事务。如果没有事务则开启
TransactionDefinition.PROPAGATION_SUPPORTS 如果存在一个事务,反对以后事务。如果没有事务,则非事务的执行
TransactionDefinition.PROPAGATION_MANDATORY 如果曾经存在一个事务,反对以后事务。如果没有一个流动的事务,则抛出异样
TransactionDefinition.PROPAGATION_NEVER 总是非事务地执行,如果存在一个流动事务,则抛出异样
TransactionDefinition.PROPAGATION_NOT_SUPPORTED 总是非事务地执行,并挂起任何存在的事务
TransactionDefinition.PROPAGATION_REQUIRES_NEW 总是开启一个新的事务。如果一个事务曾经存在,则将这个存在的事务挂起。
TransactionDefinition.PROPAGATION_NESTED 如果一个流动的事务存在,则运行在一个嵌套的事务中. 如果没有流动事务则按 TransactionDefinition.PROPAGATION_REQUIRED 属性执行

思考流传机制如何实现

首先咱们先思考下流传机制是如何实现的, 因为咱们晓得 要保障是同一个事务, 那么肯定是同一个 SqlSession, 这样能力保障是同一个事务
而如果要新开事务, 就要先将以后线程绑定的 SqlSession 等事务信息, 给挂起,那么是如何进行挂起的呢? SqlSession 又是如何跟线程绑定的呢?

5.1 SqlSession 是如何跟线程绑定的呢?

通过 TransactionSynchronizationManager 中的 ThreadLocal 跟线程绑定(new NamedThreadLocal<>("Transactional resources"))。留神: 如果主线程下创立子线程是不能绑定上的。

private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {SqlSessionHolder holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
        TransactionSynchronizationManager.bindResource(sessionFactory, holder);
        TransactionSynchronizationManager
            .registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
        holder.setSynchronizedWithTransaction(true);
        holder.requested();}

5.2 事务是如何嵌套的?

答案就在 TransactionAspectSupport#TransactionInfo 中。一个事务注解对应一个 TransactionInfo, 如果呈现嵌套
就会生成一个事务链。如下图一样。

当里层的事务处理实现后会执行清理动作, 同时在将第一个的事务在进行复原跟线程绑定。

        private void restoreThreadLocalStatus() {
            // Use stack to restore old transaction TransactionInfo.
            // Will be null if none was set.
            transactionInfoHolder.set(this.oldTransactionInfo);
        }

5.3 事务是如何挂起的?

后面晓得每一个 @Transaction 注解会对应一个 TransactionAspectSupport#TransactionInfo。而事务挂起后, 会先跟线程进行解绑。
而后挂起的事务 SuspendedResourcesHolder 会被增加在 TransactionStatus 中。

挂起的数据保留在哪里

protected final class TransactionInfo {
        // 事务管理器
        @Nullable
        private final PlatformTransactionManager transactionManager;
        // 事务信息
        @Nullable
        private final TransactionAttribute transactionAttribute;
        // 切面点
        private final String joinpointIdentification;
        // DefaultTransactionStatus
        @Nullable
        private TransactionStatus transactionStatus; 
        @Nullable
        private TransactionInfo oldTransactionInfo;
}

public class DefaultTransactionStatus extends AbstractTransactionStatus {
    @Nullable
    private final Object transaction;
    private final boolean newTransaction;
    private final boolean newSynchronization;
    private final boolean readOnly;
    private final boolean debug;
    @Nullable
    private final Object suspendedResources;
}            

如何进行挂起的

TransactionSynchronization 事务同步器,为了解决事务的传播方式

  • suspend 暂定事务, 将事务从以后线程上解绑
  • resume 复原事务, 将事务从新复原到以后线程上
  • beforeCommit 触发提交事务,执行 commit
  • beforeCompletion 事务提交后
  • afterCommit 提交后
  • afterCompletion 实现后调用

SqlSessionSynchronization 也是跟以后线程绑定的

  • 地位 TransactionSynchronizationManager#ThreadLocal<Set<TransactionSynchronization>> synchronizations
 // 挂起时候, 将 SqlSessionHolder 与以后线程进行解绑
 @Override
 public void suspend() {if (this.holderActive) {LOGGER.debug(() -> "Transaction synchronization suspending SqlSession [" + this.holder.getSqlSession() + "]");
     TransactionSynchronizationManager.unbindResource(this.sessionFactory);
   }
 }

 /**
  * 复原时候从新跟以后线程绑定
  */
 @Override
 public void resume() {if (this.holderActive) {LOGGER.debug(() -> "Transaction synchronization resuming SqlSession [" + this.holder.getSqlSession() + "]");
     TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder);
   }
 }

5.4 传播方式具体实现

上面这段代码就是事务注解的切面解决类,Spring 事务的所有逻辑和扩大反对都在这里。

  • TransactionAspectSupport#invokeWithinTransaction

首先咱们先看整体的逻辑

  1. 获取当切面上的 @Transaction 注解信息
  2. 依据注解信息找到指定的事务管理器, 如果没有执行就应用默认的
  3. 生成事务信息 TransactionInfo 流传机制, 事务挂起都在这个类上
  4. 失败执行回滚 & 胜利提交 & 如果是嵌套事务, 从TransactionInfo 中将挂起的事务从新跟线程进行绑定
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        TransactionAttributeSource tas = getTransactionAttributeSource();
        // 获取被事务注解标记的事务信息
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        // 依据事务注解上指定的事务管理器名称, 去零碎中获取,如果没有就拿零碎中默认的事务管理器
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        // 切面拦挡点: com.alibaba.purchase.domain.replenish.impl.ReplenishDomainWriteServiceImpl.mockSave
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
        // 这里只看关系型数据的的事务逻辑。CallbackPreferringPlatformTransactionManager 是具备回调性质的事务管理器, 多用于解决自定的事务
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            // 获取事务的信息, 蕴含传播方式
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();}
            catch (Throwable ex) {
                // target invocation exception
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {cleanupTransactionInfo(txInfo);
            }
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
}    

这里只看流传机制吧。AbstractPlatformTransactionManager#handleExistingTransaction

  • TransactionDefinition.PROPAGATION_NEVER 如果存在事务就报错
  • TransactionDefinition.PROPAGATION_NOT_SUPPORTED 如果有事务, 就挂起(以后事务跟线程解绑)。不应用事务进行执行。
  • TransactionDefinition.PROPAGATION_REQUIRES_NEW 以后事务挂起, 新开个事务。
     /**
     * Create a TransactionStatus for an existing transaction.
     */
    private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
        // TransactionDefinition.PROPAGATION_NEVER(总是非事务地执行,如果存在一个流动事务,则抛出异样)就间接阻断报错
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation'never'");
        }
        // TransactionDefinition.PROPAGATION_NOT_SUPPORTED 总是非事务地执行,并挂起任何存在的事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {if (debugEnabled) {logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            // 数据暂存在 TransactionSynchronizationManager#synchronizations 同步器中
            return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        // 总是开启一个新的事务。如果一个事务曾经存在,则将这个存在的事务挂起。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }
        // 如果有事务存在,则运行在一个嵌套的事务中. 如果没有流动事务则按 TransactionDefinition.PROPAGATION_REQUIRED 属性执行
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default -" +
                        "specify'nestedTransactionAllowed'property with value'true'");
            }
            if (debugEnabled) {logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                // 应用以后事务, 并减少以后事务的一次援用。status.createAndHoldSavepoint();
                return status;
            }
            else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                // 没有新建一个事务
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        if (debugEnabled) {logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction:" +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        // 
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

5.5 嵌套事务如何晓得是否要提交

当两个 Mapper 中应用的是同一个 SqlSession, 那么会不会第二个事务在执行后, 就间接 commit 了呢, 此时第一个事务有一次 commit。导致异样呢?

解决方案在这里 DefaultTransactionStatus

第二个事务状态中

  • newTransaction = false
  • newSynchronization = false

而上面代码中会做校验, 只须要同步时候才会提交事务。

protected final void triggerBeforeCommit(DefaultTransactionStatus status) {if (status.isNewSynchronization()) {if (status.isDebug()) {logger.trace("Triggering beforeCommit synchronization");
            }
            TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
        }
}

第一个事务状态中

  • newTransaction = true
  • newSynchronization = true
    才会真正的去执行。

5.6 这样设计是否线程平安

线程平安只有在多线程环境下才会呈现。那么这里肯定会有多线程问题。而事务是跟线程进行绑定的, 所以这里尽管有多线程然而不会有线程平安问题。

然而这里咱们看源码线程绑定时候应用的 ThreadLocal, 所以你在线程中创立子线程或者是线程中应用线程池, 这里的事务都不会共享的。

本文由 mdnice 多平台公布

正文完
 0