关于后端:Spring事务实现原理

39次阅读

共计 15618 个字符,预计需要花费 40 分钟才能阅读完成。

1、引言

spring 的 spring-tx 模块提供了对事务管理反对,应用 spring 事务能够让咱们从简单的事务处理中失去解脱,无须要去解决取得连贯、敞开连贯、事务提交和回滚等这些操作。

spring 事务有编程式事务和申明式事务两种实现形式。编程式事务是通过编写代码来治理事务的提交、回滚、以及事务的边界。这意味着开发者须要在代码中显式地调用事务的开始、提交和回滚。申明式事务是通过配置来治理事务,您能够应用注解或 XML 配置来定义事务的边界和属性,而无需显式编写事务管理的代码。

上面咱们逐渐剖析 spring 源代码,了解 spring 事务的实现原理。

2、编程式事务

2.1 应用示例

// transactionManager 是某一个具体的 PlatformTransactionManager 实现类的对象
private PlatformTransactionManager transactionManager;


// 定义事务属性
DefaultTransactionDefinition def = new DefaultTransactionDefinition();

// 获取事务
TransactionStatus status = transactionManager.getTransaction(def);

try {
    // 执行数据库操作
    // ...
    
    // 提交事务
    transactionManager.commit(status);
} catch (Exception ex) {
    // 回滚事务
    transactionManager.rollback(status);
}

在应用编程式事务处理的过程中,利用 DefaultTransactionDefinition 对象来持有事务处理属性。同时,在创立事务的过程中失去一个 TransactionStatus 对象,而后通过间接调用 transactionManager 对象 的 commit() 和 rollback() 办法 来实现事务处理。

2.2 PlatformTransactionManager 外围接口

PlatformTransactionManager 是 Spring 事务管理的外围接口,通过 PlatformTransactionManager 接口设计了一系列与事务处理非亲非故的接口办法,如 getTransaction()、commit()、rollback() 这些和事务处理相干的对立接口。对于这些接口的实现,很大一部分是由 AbstractTransactionManager 抽象类来实现的。

AbstractPlatformManager 封装了 Spring 事务处理中通用的解决局部,比方事务的创立、提交、回滚,事务状态和信息的解决,与线程的绑定等,有了这些通用解决的反对,对于具体的事务管理器而言,它们只须要解决和具体数据源相干的组件设置就能够了,比方在 DataSourceTransactionManager 中,就只须要配置好和 DataSource 事务处理相干的接口以及相干的设置。

2.3 事务的创立

PlatformTransactionManager 的 getTransaction() 办法,封装了底层事务的创立,并生成一个 TransactionStatus 对象。AbstractPlatformTransactionManager 提供了创立事务的模板,这个模板会被具体的事务处理器所应用。从上面的代码中能够看到,AbstractPlatformTransactionManager 会依据事务属性配置和以后过程绑定的事务信息,对事务是否须要创立,怎么创立 进行一些通用的解决,而后把事务创立的底层工作交给具体的事务处理器实现,如:DataSourceTransactionManager、HibernateTransactionManager。

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    if (isExistingTransaction(transaction)) {return handleExistingTransaction(def, transaction, debugEnabled);
    }
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation'mandatory'");
    }
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {logger.debug("Creating new transaction with name [" + def.getName() + "]:" + def);
        }
        try {return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error ex) {resume(null, suspendedResources);
            throw ex;
        }
    }
    else {if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated;" +
                    "isolation level will effectively be ignored:" + def);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {boolean newSynchronization = this.getTransactionSynchronization() != SYNCHRONIZATION_NEVER;
    DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    this.doBegin(transaction, definition);
    this.prepareSynchronization(status, definition);
    return status;
}

事务创立的后果是生成一个 TransactionStatus 对象,通过这个对象来保留事务处理须要的根本信息,TransactionStatus 的创立过程如下:

protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
    return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}

以上是创立一个全新事务的过程,还有另一种状况是:在创立以后事务时,线程中曾经有事务存在了。这种状况会波及事务流传行为的解决。spring 中七种事务流传行为如下:

| 事务流传行为类型 | 阐明 |
| PROPAGATION_REQUIRED | 如果以后没有事务,就新建一个事务,如果曾经存在一个事务中,退出到这个事务中。这是最常见的抉择。|
| PROPAGATION_SUPPORTS | 反对以后事务,如果以后没有事务,就以非事务形式执行。|
| PROPAGATION_MANDATORY | 应用以后的事务,如果以后没有事务,就抛出异样。|
| PROPAGATION\_REQUIRES\_NEW | 新建事务,如果以后存在事务,把以后事务挂起。|
| PROPAGATION\_NOT\_SUPPORTED | 以非事务形式执行操作,如果以后存在事务,就把以后事务挂起。|
| PROPAGATION_NEVER | 以非事务形式执行,如果以后存在事务,则抛出异样。|
| PROPAGATION_NESTED | 如果以后存在事务,则在嵌套事务内执行。如果以后没有事务,则执行与 PROPAGATION_REQUIRED 相似的操作。|

如果检测到已存在事务,handleExistingTransaction() 办法将依据不同的事务流传行为类型执行相应逻辑。

PROPAGATION_NEVER

即以后办法须要在非事务的环境下执行,如果有事务存在,那么抛出异样。

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation'never'");
}

PROPAGATION\_NOT\_SUPPORTED

与前者的区别在于,如果有事务存在,那么将事务挂起,而不是抛出异样。

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {Object suspendedResources = suspend(transaction);
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}

PROPAGATION\_REQUIRES\_NEW

新建事务,如果以后存在事务,把以后事务挂起。

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {SuspendedResourcesHolder suspendedResources = suspend(transaction);
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

PROPAGATION_NESTED

开始一个 “ 嵌套的 ” 事务, 它是曾经存在事务的一个真正的子事务. 嵌套事务开始执行时, 它将获得一个 savepoint. 如果这个嵌套事务失败, 咱们将回滚到此 savepoint. 嵌套事务是内部事务的一部分, 只有内部事务完结后它才会被提交。

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {if (useSavepointForNestedTransaction()) {
        DefaultTransactionStatus status = newTransactionStatus(definition, transaction, false, false, true, debugEnabled, null);
        this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
        try {status.createAndHoldSavepoint();
        }
        catch (RuntimeException | Error ex) {this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
            throw ex;
        }
        this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
        return status;
    }
    else {return startTransaction(definition, transaction, true, debugEnabled, null);
    }
}

2.4 事务挂起

事务挂起在 AbstractTransactionManager.suspend() 中解决,该办法外部将调用具体事务管理器的 doSuspend() 办法。以 DataSourceTransactionManager 为例,将 ConnectionHolder 设为 null,因为一个 ConnectionHolder 对象就代表了一个数据库连贯,将 ConnectionHolder 设为 null 就意味着咱们下次要应用连贯时,将从新从连接池获取。

protected Object doSuspend(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

unbindResource() 办法最终会调用 TransactionSynchronizationManager.doUnbindResource() 办法,该办法将移除以后线程与事务对象的绑定。

private static Object doUnbindResource(Object actualKey) {Map<Object, Object> map = resources.get();
    if (map == null) {return null;}
    Object value = map.remove(actualKey);
    if (map.isEmpty()) {resources.remove();
    }
    if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {value = null;}
    return value;
}

而被挂起的事务的各种状态最终会保留在 TransactionStatus 对象中。

2.5 事务提交 & 回滚

次要是对 jdbc 的封装、源码逻辑较清晰,不开展细说。

3、申明式事务

其底层建设在 AOP 的根底之上,对办法前后进行拦挡,而后在指标办法开始之前创立或者退出一个事务,在执行完指标办法之后依据执行状况提交或者回滚事务。通过申明式事物,无需在业务逻辑代码中掺杂事务管理的代码,只需在配置文件中做相干的事务规定申明(或通过等价的基于标注的形式),便能够将事务规定利用到业务逻辑中。

3.1 应用示例

配置:

<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource">
    <property name="driverClassName" value="${jdbc.driverClassName}" />
    <property name="url" value="${jdbc.url}" />
    <property name="username" value="${jdbc.username}" />
    <property name="password" value="${jdbc.password}" />
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource"/>
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>

代码:

@Transactional
public void addOrder() {// 执行数据库操作}

3.2 自定义标签解析

先从配置文件开始动手,找到解决 annotation-driven 标签的类 TxNamespaceHandler。TxNamespaceHandler 实现了 NamespaceHandler 接口,定义了如何解析和解决自定义 XML 标签。

@Override
public void init() {registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
    registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
    registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}

AnnotationDrivenBeanDefinitionParser 里的 parse() 办法,对 XML 标签 annotation-driven 进行解析。

@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {registerTransactionalEventListenerFactory(parserContext);
    String mode = element.getAttribute("mode");
    if ("aspectj".equals(mode)) {
        // mode="aspectj"
        registerTransactionAspect(element, parserContext);
        if (ClassUtils.isPresent("jakarta.transaction.Transactional", getClass().getClassLoader())) {registerJtaTransactionAspect(element, parserContext);
        }
    }
    else {
        // mode="proxy"
        AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
    }
    return null;
}

以默认 mode 配置为例,执行 configureAutoProxyCreator() 办法,将在 Spring 容器中注册了 3 个 bean:

BeanFactoryTransactionAttributeSourceAdvisor、TransactionInterceptor、AnnotationTransactionAttributeSource。同时会将 TransactionInterceptor 的 BeanName 传入到 Advisor 中,而后将 AnnotationTransactionAttributeSource 这个 Bean 注入到 Advisor 中。之后动静代理的时候会应用这个 Advisor 去寻找每个 Bean 是否须要动静代理。

// Create the TransactionAttributeSourceAdvisor definition.
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
advisorDef.setSource(eleSource);
advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
if (element.hasAttribute("order")) {advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
}
parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);

CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
parserContext.registerComponent(compositeDef);

3.3 Advisor

回顾 AOP 用法,Advisor 可用于定义一个切面,它蕴含切点(Pointcut)和告诉(Advice),用于在特定的连接点上执行特定的操作。spring 事务实现了一个 Advisor: BeanFactoryTransactionAttributeSourceAdvisor。

public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut();

    public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {this.pointcut.setTransactionAttributeSource(transactionAttributeSource);
    }

    public void setClassFilter(ClassFilter classFilter) {this.pointcut.setClassFilter(classFilter);
    }

    @Override
    public Pointcut getPointcut() {return this.pointcut;}
}

BeanFactoryTransactionAttributeSourceAdvisor 其实是一个 PointcutAdvisor,是否匹配到切入点取决于 Pointcut。Pointcut 的外围在于其 ClassFilter 和 MethodMatcher。

ClassFilter:

TransactionAttributeSourcePointcut 外部公有类 TransactionAttributeSourceClassFilter,实现了 Spring 框架中的 ClassFilter 接口。在 matches 办法中,它首先查看传入的类 clazz 否为 TransactionalProxy、TransactionManager 或 PersistenceExceptionTranslator 的子类,如果不是,则获取以后的 TransactionAttributeSource 并查看其是否容许该类作为候选类。

private class TransactionAttributeSourceClassFilter implements ClassFilter {
    @Override
    public boolean matches(Class<?> clazz) {if (TransactionalProxy.class.isAssignableFrom(clazz) ||
                TransactionManager.class.isAssignableFrom(clazz) ||
                PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {return false;}
        return (transactionAttributeSource == null || transactionAttributeSource.isCandidateClass(clazz));
    }
}

MethodMatcher:

TransactionAttributeSourcePointcut.matches:

@Override
public boolean matches(Method method, Class<?> targetClass) {
    return (this.transactionAttributeSource == null ||
            this.transactionAttributeSource.getTransactionAttribute(method, targetClass) != null);
}

getTransactionAttribute() 办法最终会调用至 AbstractFallbackTransactionAttributeSource.computeTransactionAttribute() 办法,该办法将先去办法上查找是否有相应的事务注解 (比方 @Transactional),如果没有,那么再去类上查找。

protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
    // Don't allow non-public methods, as configured.
    if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {return null;}

    // The method may be on an interface, but we need attributes from the target class.
    // If the target class is null, the method will be unchanged.
    Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

    // First try is the method in the target class.
    TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
    if (txAttr != null) {return txAttr;}

    // Second try is the transaction attribute on the target class.
    txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
    if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {return txAttr;}

    if (specificMethod != method) {
        // Fallback is to look at the original method.
        txAttr = findTransactionAttribute(method);
        if (txAttr != null) {return txAttr;}
        // Last fallback is the class of the original method.
        txAttr = findTransactionAttribute(method.getDeclaringClass());
        if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {return txAttr;}
    }

    return null;
}

3.4 TransactionInterceptor

TransactionInterceptor 是 spring 事务提供的 AOP 拦截器,实现了 AOP Alliance 的 MethodInterceptor 接口,是一种告诉(advice)。其能够用于在办法调用前后进行事务管理。

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {// Work out the target class: may be {@code null}.
    // The TransactionAttributeSource should be passed the target class
    // as well as the method, which may be from an interface.
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

    // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
        @Override
        @Nullable
        public Object proceedWithInvocation() throws Throwable {return invocation.proceed();
        }
        @Override
        public Object getTarget() {return invocation.getThis();
        }
        @Override
        public Object[] getArguments() {return invocation.getArguments();
        }
    });
}

invokeWithinTransaction() 办法会依据指标办法上的事务配置,来决定是开启新事务、退出已有事务,还是间接执行逻辑(如果没有事务)。其代码简化如下 ( 仅保留 PlatformTransactionManager 局部):

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) {
    // If the transaction attribute is null, the method is non-transactional.
    final TransactionAttribute txAttr = getTransactionAttributeSource()
        .getTransactionAttribute(method, targetClass);
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass);
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        ObjectretVal = null;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            retVal = invocation.proceedWithInvocation();} catch (Throwableex) {
            // target invocation exception
            completeTransactionAfterThrowing(txInfo, ex);
            throwex;
        } finally {cleanupTransactionInfo(txInfo);
        }
        commitTransactionAfterReturning(txInfo);
        returnretVal;
    }
}

作者:京东批发 范锡军

起源:京东云开发者社区 转载请注明起源

正文完
 0