本文和大家一起刨析 Spring 事务的相干源码,篇幅较长,代码片段较多,倡议应用电脑浏览

本文指标

  • 了解Spring事务管理外围接口
  • 了解Spring事务管理的外围逻辑
  • 了解事务的流传类型及其实现原理

版本

SpringBoot 2.3.3.RELEASE

什么是事务的流传?

Spring 除了封装了事务管制之外,还形象出了 事务的流传 这个概念,事务的流传并不是关系型数据库所定义的,而是Spring在封装事务时做的加强扩大,能够通过@Transactional 指定事务的流传,具体类型如下

事务流传行为类型阐明
PROPAGATION_REQUIRED如果以后没有事务,就新建一个事务,如果曾经存在一个事务中,退出到这个事务中。Spring的默认事务流传类型
PROPAGATION_SUPPORTS反对以后事务,如果以后没有事务,就以非事务形式执行。
PROPAGATION_MANDATORY应用以后的事务,如果以后没有事务,就抛出异样。
PROPAGATION_REQUIRES_NEW新建事务,如果以后存在事务,把以后事务挂起(暂停)。
PROPAGATION_NOT_SUPPORTED以非事务形式执行操作,如果以后存在事务,就把以后事务挂起。
PROPAGATION_NEVER以非事务形式执行,如果以后存在事务,则抛出异样。
PROPAGATION_NESTED如果以后存在事务,则在嵌套事务内执行。如果以后没有事务,则执行与PROPAGATION_REQUIRED相似的操作。

举个栗子

以嵌套事务为例

@Servicepublic class DemoServiceImpl implements DemoService {    @Autowired    private JdbcTemplate jdbcTemplate;    @Autowired    private DemoServiceImpl self;    @Transactional    @Override    public void insertDB() {        String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";        jdbcTemplate.update(sql, uuid(), "taven");        try {            // 内嵌事务将会回滚,而内部事务不会受到影响            self.nested();        } catch (Exception e) {            e.printStackTrace();        }    }    @Transactional(propagation = Propagation.NESTED)    @Override    public void nested() {        String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";        jdbcTemplate.update(sql, uuid(), "nested");        throw new RuntimeException("rollback nested");    }    private String uuid() {        return UUID.randomUUID().toString();    }}

上述代码中,nested()办法标记了事务流传类型为嵌套,如果nested()中抛出异样仅会回滚nested()办法中的sql,不会影响到insertDB()办法中曾经执行的sql

留神:service 调用外部办法时,如果间接应用this调用,事务不会失效。因而应用this调用相当于跳过了内部的代理类,所以AOP不会失效,无奈应用事务

思考

家喻户晓,Spring 事务是通过AOP实现的,如果是咱们本人写一个AOP管制事务,该怎么做呢?

// 伪代码public Object invokeWithinTransaction() {    // 开启事务    connection.beginTransaction();    try {        // 反射执行办法        Object result = invoke();        // 提交事务        connection.commit();        return result;    } catch(Exception e) {        // 产生异样时回滚        connection.rollback();        throw e;    }     }

在这个根底上,咱们来思考一下如果是咱们本人做的话,事务的流传该如何实现

PROPAGATION_REQUIRED为例,这个仿佛很简略,咱们判断一下以后是否有事务(能够思考应用ThreadLocal存储已存在的事务对象),如果有事务,那么就不开启新的事务。反之,没有事务,咱们就创立新的事务

如果事务是由以后切面开启的,则提交/回滚事务,反之不做解决

那么事务流传中形容的挂起(暂停)以后事务,和内嵌事务是如何实现的?

<!--

这里提前剧透一下,内嵌事务是应用关系型数据库的savepoint实现的
-->

源码动手

要浏览事务流传相干的源码,咱们先来理解下Spring 事务管理的外围接口与类

  1. TransactionDefinition

该接口定义了事务的所有属性(隔离级别,流传类型,超时工夫等等),咱们日常开发中常常应用的 @Transactional 其实最终会被转化为 TransactionDefinition

  1. TransactionStatus

事务的状态,以最罕用的实现 DefaultTransactionStatus 为例,该类存储了以后的事务对象,savepoint,以后挂起的事务,是否实现,是否仅回滚等等

  1. TransactionManager

这是一个空接口,间接继承他的 interface 有 PlatformTransactionManager(咱们平时用的就是这个,默认的实现类DataSourceTransactionManager)以及
ReactiveTransactionManager(响应式事务管理器,因为不是本文重点,咱们不多说)

从上述两个接口来看,TransactionManager 的次要作用

  • 通过TransactionDefinition开启一个事务,返回TransactionStatus
  • 通过TransactionStatus 提交、回滚事务(理论开启事务的Connection通常存储在TransactionStatus中)
public interface PlatformTransactionManager extends TransactionManager {        TransactionStatus getTransaction(@Nullable TransactionDefinition definition)            throws TransactionException;        void commit(TransactionStatus status) throws TransactionException;        void rollback(TransactionStatus status) throws TransactionException;}
  1. TransactionInterceptor

事务拦截器,事务AOP的外围类(反对响应式事务,编程式事务,以及咱们罕用的规范事务),因为篇幅起因,本文只探讨规范事务的相干实现

上面咱们从事务逻辑的入口 TransactionInterceptor 动手,来看下Spring事务管理的外围逻辑以及事务流传的实现

TransactionInterceptor

TransactionInterceptor 实现了MethodInvocation(这是实现AOP的一种形式),其外围逻辑在父类TransactionAspectSupport 中,办法地位:TransactionInterceptor::invokeWithinTransaction

    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,            final InvocationCallback invocation) throws Throwable {        // If the transaction attribute is null, the method is non-transactional.        TransactionAttributeSource tas = getTransactionAttributeSource();        // 以后事务的属性 TransactionAttribute extends TransactionDefinition        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);        // 事务属性中能够定义以后应用哪个事务管理器        // 如果没有定义就去Spring上下文找到一个可用的 TransactionManager        final TransactionManager tm = determineTransactionManager(txAttr);        // 省略了响应式事务的解决 ...                PlatformTransactionManager ptm = asPlatformTransactionManager(tm);        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {            // Standard transaction demarcation with getTransaction and commit/rollback calls.            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);            Object retVal;            try {                // This is an around advice: Invoke the next interceptor in the chain.                // This will normally result in a target object being invoked.                // 如果有下一个拦截器则执行,最终会执行到指标办法,也就是咱们的业务代码                retVal = invocation.proceedWithInvocation();            }            catch (Throwable ex) {                // target invocation exception                // 当捕捉到异样时实现以后事务 (提交或者回滚)                completeTransactionAfterThrowing(txInfo, ex);                throw ex;            }            finally {                cleanupTransactionInfo(txInfo);            }            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {                // Set rollback-only in case of Vavr failure matching our rollback rules...                TransactionStatus status = txInfo.getTransactionStatus();                if (status != null && txAttr != null) {                    retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);                }            }            // 依据事务的状态提交或者回滚            commitTransactionAfterReturning(txInfo);            return retVal;        }        // 省略了编程式事务的解决 ...    }

这里代码很多,依据正文的地位,咱们能够把外围逻辑梳理进去

  1. 获取以后事务属性,事务管理器(以注解事务为例,这些都能够通过@Transactional来定义)
  2. createTransactionIfNecessary,判断是否有必要创立事务
  3. invocation.proceedWithInvocation 执行拦截器链,最终会执行到指标办法
  4. completeTransactionAfterThrowing当抛出异样后,实现这个事务,提交或者回滚,并抛出这个异样
  5. commitTransactionAfterReturning 从办法命名来看,这个办法会提交事务。

然而深刻源码中会发现,该办法中也蕴含回滚逻辑,具体行为会依据以后TransactionStatus的一些状态来决定(也就是说,咱们也能够通过设置以后TransactionStatus,来管制事务回滚,并不一定只能通过抛出异样),详见AbstractPlatformTransactionManager::commit

<!--

业务代码中能够通过TransactionAspectSupport.currentTransactionStatus()获取以后TransactionStatus
-->

咱们持续,来看看createTransactionIfNecessary做了什么

TransactionAspectSupport::createTransactionIfNecessary
    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,            @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {        // If no name specified, apply method identification as transaction name.        if (txAttr != null && txAttr.getName() == null) {            txAttr = new DelegatingTransactionAttribute(txAttr) {                @Override                public String getName() {                    return joinpointIdentification;                }            };        }        TransactionStatus status = null;        if (txAttr != null) {            if (tm != null) {                // 通过事务管理器开启事务                status = tm.getTransaction(txAttr);            }            else {                if (logger.isDebugEnabled()) {                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +                            "] because no transaction manager has been configured");                }            }        }                return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);    }

createTransactionIfNecessary中的外围逻辑

  1. 通过PlatformTransactionManager(事务管理器)开启事务
  2. prepareTransactionInfo 筹备事务信息,这个具体做了什么咱们稍后再讲

持续来看PlatformTransactionManager::getTransaction,该办法只有一个实现 AbstractPlatformTransactionManager::getTransaction

    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)            throws TransactionException {        // Use defaults if no transaction definition given.        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());        // 获取以后事务,该办法有继承 AbstractPlatformTransactionManager 的子类自行实现        Object transaction = doGetTransaction();        boolean debugEnabled = logger.isDebugEnabled();        // 如果目前存在事务        if (isExistingTransaction(transaction)) {            // Existing transaction found -> check propagation behavior to find out how to behave.            return handleExistingTransaction(def, transaction, debugEnabled);        }        // Check definition settings for new transaction.        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());        }        // 流传类型PROPAGATION_MANDATORY, 要求以后必须有事务        // No existing transaction found -> check propagation behavior to find out how to proceed.        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {            throw new IllegalTransactionStateException(                    "No existing transaction found for transaction marked with propagation 'mandatory'");        }        // PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED 不存在事务时创立事务        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {            SuspendedResourcesHolder suspendedResources = suspend(null);            if (debugEnabled) {                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);            }            try {                // 开启事务                return startTransaction(def, transaction, debugEnabled, suspendedResources);            }            catch (RuntimeException | Error ex) {                resume(null, suspendedResources);                throw ex;            }        }        else {            // Create "empty" transaction: no actual transaction, but potentially synchronization.            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {                logger.warn("Custom isolation level specified but no actual transaction initiated; " +                        "isolation level will effectively be ignored: " + def);            }            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);        }    }

代码很多,重点关注正文局部即可

  1. doGetTransaction获取以后事务
  2. 如果存在事务,则调用handleExistingTransaction解决,这个咱们稍后会讲到

接下来,会依据事务的流传决定是否开启事务

  1. 如果事务流传类型为PROPAGATION_MANDATORY,且不存在事务,则抛出异样
  2. 如果流传类型为 PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED,且以后不存在事务,则调用startTransaction创立事务
  3. 当不满足 3、4时,例如 PROPAGATION_NOT_SUPPORTED,此时会执行事务同步,然而不会创立真正的事务
Spring 事务同步在之前一篇博客中有讲到,传送门????https://www.jianshu.com/p/788...
Spring 如何治理以后的事务

接下来讲讲下面提到的doGetTransactionhandleExistingTransaction,这两个办法是由不同的TransactionManager自行实现的

咱们以SpringBoot默认的TransactionManager,DataSourceTransactionManager为例

    @Override    protected Object doGetTransaction() {        DataSourceTransactionObject txObject = new DataSourceTransactionObject();        txObject.setSavepointAllowed(isNestedTransactionAllowed());        ConnectionHolder conHolder =                (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());        txObject.setConnectionHolder(conHolder, false);        return txObject;    }    @Override    protected boolean isExistingTransaction(Object transaction) {        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;        return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());    }

联合 AbstractPlatformTransactionManager::getTransaction 一起来看,doGetTransaction 其实获取的是以后的Connection。
判断以后是否存在事务,是判断DataSourceTransactionObject 对象中是否蕴含connection,以及connection是否开启了事务。

咱们持续来看下TransactionSynchronizationManager.getResource(obtainDataSource())获取以后connection的逻辑

TransactionSynchronizationManager::getResource
    private static final ThreadLocal<Map<Object, Object>> resources =            new NamedThreadLocal<>("Transactional resources");        @Nullable    // TransactionSynchronizationManager::getResource    public static Object getResource(Object key) {        // DataSourceTransactionManager 调用该办法时,以数据源作为key                // TransactionSynchronizationUtils::unwrapResourceIfNecessary 如果key为包装类,则获取被包装的对象        // 咱们能够疏忽该逻辑        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);        Object value = doGetResource(actualKey);        if (value != null && logger.isTraceEnabled()) {            logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +                    Thread.currentThread().getName() + "]");        }        return value;    }    /**     * Actually check the value of the resource that is bound for the given key.     */    @Nullable    private static Object doGetResource(Object actualKey) {        Map<Object, Object> map = resources.get();        if (map == null) {            return null;        }        Object value = map.get(actualKey);        // Transparently remove ResourceHolder that was marked as void...        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {            map.remove(actualKey);            // Remove entire ThreadLocal if empty...            if (map.isEmpty()) {                resources.remove();            }            value = null;        }        return value;    }

看到这里,咱们能明确DataSourceTransactionManager是如何治理线程之间的Connection,ThreadLocal 中存储一个Map,key为数据源对象,value为该数据源在以后线程的Connection

DataSourceTransactionManager 在开启事务后,会调用TransactionSynchronizationManager::bindResource将指定数据源的Connection绑定到以后线程

AbstractPlatformTransactionManager::handleExistingTransaction

咱们持续回头看,如果存在事务的状况,如何解决

    private TransactionStatus handleExistingTransaction(            TransactionDefinition definition, Object transaction, boolean debugEnabled)            throws TransactionException {        // 如果事务的流传要求以非事务形式执行 抛出异样        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {            throw new IllegalTransactionStateException(                    "Existing transaction found for transaction marked with propagation 'never'");        }        // PROPAGATION_NOT_SUPPORTED 如果存在事务,则挂起以后事务,以非事务形式执行        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {            if (debugEnabled) {                logger.debug("Suspending current transaction");            }            // 挂起以后事务            Object suspendedResources = suspend(transaction);            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);            // 构建一个无事务的TransactionStatus            return prepareTransactionStatus(                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);        }        // PROPAGATION_REQUIRES_NEW 如果存在事务,则挂起以后事务,新建一个事务        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {            if (debugEnabled) {                logger.debug("Suspending current transaction, creating new transaction with name [" +                        definition.getName() + "]");            }            SuspendedResourcesHolder suspendedResources = suspend(transaction);            try {                return startTransaction(definition, transaction, debugEnabled, suspendedResources);            }            catch (RuntimeException | Error beginEx) {                resumeAfterBeginException(transaction, suspendedResources, beginEx);                throw beginEx;            }        }        // PROPAGATION_NESTED 内嵌事务,就是咱们结尾举得例子        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {            if (!isNestedTransactionAllowed()) {                throw new NestedTransactionNotSupportedException(                        "Transaction manager does not allow nested transactions by default - " +                        "specify 'nestedTransactionAllowed' property with value 'true'");            }            if (debugEnabled) {                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");            }            // 非JTA事务管理器都是通过savePoint实现的内嵌事务            // savePoint:关系型数据库中事务能够创立还原点,并且能够回滚到还原点            if (useSavepointForNestedTransaction()) {                // Create savepoint within existing Spring-managed transaction,                // through the SavepointManager API implemented by TransactionStatus.                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.                DefaultTransactionStatus status =                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);                // 创立还原点                status.createAndHoldSavepoint();                return status;            }            else {                // Nested transaction through nested begin and commit/rollback calls.                // Usually only for JTA: Spring synchronization might get activated here                // in case of a pre-existing JTA transaction.                return startTransaction(definition, transaction, debugEnabled, null);            }        }        // 如果执行到这一步流传类型肯定是,PROPAGATION_SUPPORTS 或者 PROPAGATION_REQUIRED        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.        if (debugEnabled) {            logger.debug("Participating in existing transaction");        }                // 校验目前办法中的事务定义和已存在的事务定义是否统一        if (isValidateExistingTransaction()) {            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {                    Constants isoConstants = DefaultTransactionDefinition.constants;                    throw new IllegalTransactionStateException("Participating transaction with definition [" +                            definition + "] specifies isolation level which is incompatible with existing transaction: " +                            (currentIsolationLevel != null ?                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :                                    "(unknown)"));                }            }            if (!definition.isReadOnly()) {                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {                    throw new IllegalTransactionStateException("Participating transaction with definition [" +                            definition + "] is not marked as read-only but existing transaction is");                }            }        }        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);        // 构建一个TransactionStatus,但不开启事务        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);    }

这里代码很多,逻辑看上述正文即可。这里终于看到了期待已久的挂起事务和内嵌事务了,咱们还是看一下DataSourceTransactionManager的实现

  • 挂起事务:通过TransactionSynchronizationManager::unbindResource 依据数据源获取以后的Connection,并在resource中移除该Connection。之后会将该Connection存储到TransactionStatus对象中
    // DataSourceTransactionManager::doSuspend    @Override    protected Object doSuspend(Object transaction) {        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;        txObject.setConnectionHolder(null);        return TransactionSynchronizationManager.unbindResource(obtainDataSource());    }

在事务提交或者回滚后,调用 AbstractPlatformTransactionManager::cleanupAfterCompletion会将TransactionStatus 中缓存的Connection从新绑定到resource中

  • 内嵌事务:通过关系型数据库的savePoint实现,提交或回滚的时候会判断如果以后事务为savePoint则开释savePoint或者回滚到savePoint,具体逻辑参考AbstractPlatformTransactionManager::processRollbackAbstractPlatformTransactionManager::processCommit

至此,事务的流传源码剖析完结

prepareTransactionInfo

上文留下了一个问题,prepareTransactionInfo 办法做了什么,咱们先来看下TransactionInfo的构造

    protected static final class TransactionInfo {        @Nullable        private final PlatformTransactionManager transactionManager;        @Nullable        private final TransactionAttribute transactionAttribute;        private final String joinpointIdentification;        @Nullable        private TransactionStatus transactionStatus;        @Nullable        private TransactionInfo oldTransactionInfo;                // ...    }

该类在Spring中的作用,是为了外部传递对象。ThreadLocal中存储了最新的TransactionInfo,通过以后TransactionInfo能够找到他的oldTransactionInfo。每次创立事务时会新建一个TransactionInfo(无论有没有真正的事务被创立)存储到ThreadLocal中,在每次事务完结后,会将以后ThreadLocal中的TransactionInfo重置为oldTransactionInfo,这样的构造造成了一个链表,使得Spring事务在逻辑上能够有限嵌套上来

如果感觉有播种,能够关注我的公众号,你的点赞和关注就是对我最大的反对