共计 40921 个字符,预计需要花费 103 分钟才能阅读完成。
继上一篇《spring 事务的这 10 种坑,你稍不留神可能就会踩中!!!》之后,我打算对 spring 的事务做详细分析,带大家一起探讨一下 spring 事务的设计原理和底层实现,心愿这篇文章可能让你有所播种。
一、开启事务性能
1、spring 开启事务
有些敌人的公司可能还没有应用 springboot,这里我介绍一下 spring 的事务开启。当然,我次要介绍的是基于注解的形式配置 spring 事务,因为基于 xml 的形式相对来说有些繁琐,并且比拟古老,我在这里就不做赘述了。
基于注解的办法应用起来非常简单,使@EnableTransactionManagement
注解就能够开启事务性能。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
/**
* true:cglib 代理 false:jdk 动静代理
*/
boolean proxyTargetClass() default false;
/**
* PROXY:代理的告诉模式 ASPECTJ:ASPECTJ 的告诉模式
*/
AdviceMode mode() default AdviceMode.PROXY;
/**
* 如果存在多个切换时能够指定执行程序
*/
int order() default Ordered.LOWEST_PRECEDENCE;}
接下来重点看一下TransactionManagementConfigurationSelector
类
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override
protected String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
该办法逻辑比较简单,就是判断告诉模式是 PROXY,还是 ASPECTJ。咱们重点介绍的是 PROXY 模式,如果对 ASPECTJ 模式感兴趣的敌人,能够深入研究一下。
PROXY 模式次要是要加载两个类:AutoProxyRegistrar 和 ProxyTransactionManagementConfiguration
先看一下 AutoProxyRegistrar 类
public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
// 获取 @EnableTransactionManagement 上的所有注解属性类型
Set<String> annoTypes = importingClassMetadata.getAnnotationTypes();
for (String annoType : annoTypes) {
// 获取用户配置的注解属性
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType);
if (candidate == null) {continue;}
// 获取告诉模式
Object mode = candidate.get("mode");
// 获取代理类型
Object proxyTargetClass = candidate.get("proxyTargetClass");
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
Boolean.class == proxyTargetClass.getClass()) {
candidateFound = true;
// 如果是 PROXY 代理模式
if (mode == AdviceMode.PROXY) {
// 注册 aop 入口类
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean) proxyTargetClass) {AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
if (!candidateFound && logger.isWarnEnabled()) {String name = getClass().getSimpleName();}
}
}
看一下 registerAutoProxyCreatorIfNecessary 办法
@Nullable
public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry) {return registerAutoProxyCreatorIfNecessary(registry, null);
}
@Nullable
public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, @Nullable Object source) {return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}
特地揭示一下,这里须要注册的类是:InfrastructureAdvisorAutoProxyCreator
@Nullable
private static BeanDefinition registerOrEscalateApcAsRequired(Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) {Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
// 判断是否曾经注册过 aop 的入口类了,如果曾经注册过了则须要判断类的优先级
if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
// 获取曾经注册的 aop 入口类 BeanDefinition
BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
// 如果以后的入口类和已注册的入口类不一样
if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
// 获取已注册的入口类优先级
int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
// 获取以后入口类优先级
int requiredPriority = findPriorityForClass(cls);
// 如果已注册的入口类优先级小于以后入口类优先级
if (currentPriority < requiredPriority) {
// 用以后入口类替换已注册的入口类
apcDefinition.setBeanClassName(cls.getName());
}
}
return null;
}
// 实例化 beanDefinition
RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
beanDefinition.setSource(source);
beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);
// 设置 role 为 ROLE_INFRASTRUCTURE
beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
// 将 beanDefinition 注册到 spring 容器
registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
return beanDefinition;
}
这个办法也比较简单,就是注册 aop 入口类。先判断如果 spring 容器中已有 aop 入口类,则跟以后的 aop 入口类比拟优先级,如果以后 aop 入口类的优先级大于已有的,则注册以后类(即InfrastructureAdvisorAutoProxyCreator
)作为新的 aop 入口类。
❝
这里引出一个问题:
应用 @EnableAspectJAutoProxy 注解开启 AOP 时,有本人的入口类:
AnnotationAwareAspectJAutoProxyCreator
,这个入口类和 @EnableTransactionManagement 注解引入的入口类:
InfrastructureAdvisorAutoProxyCreator
哪个的优先级更高?❞
接下来看一下这个 aop 入口类:InfrastructureAdvisorAutoProxyCreator
public class InfrastructureAdvisorAutoProxyCreator extends AbstractAdvisorAutoProxyCreator {
@Nullable
private ConfigurableListableBeanFactory beanFactory;
@Override
protected void initBeanFactory(ConfigurableListableBeanFactory beanFactory) {super.initBeanFactory(beanFactory);
this.beanFactory = beanFactory;
}
@Override
protected boolean isEligibleAdvisorBean(String beanName) {return (this.beanFactory != null && this.beanFactory.containsBeanDefinition(beanName) &&
this.beanFactory.getBeanDefinition(beanName).getRole() == BeanDefinition.ROLE_INFRASTRUCTURE);
}
}
isEligibleAdvisorBean办法在 aop 中是用来判断 切面是否合格的,这里判断了 beanDefinition 的 role 属性是否为 BeanDefinition.ROLE_INFRASTRUCTURE,还 记得下面咱们介绍的 registerOrEscalateApcAsRequired 办法中
// 设置 role 为 ROLE_INFRASTRUCTURE
beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
有这一行代码,设置 beanDefinition 的 role 就是 BeanDefinition.ROLE_INFRASTRUCTURE,所以会触发 AOP 的性能。
因为 AOP 的性能不是本文介绍的重点,有些细节在这里就不做过多赘述了。
接下来,看一下须要注册的另外一个类:ProxyTransactionManagementConfiguration
@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource());
advisor.setAdvice(transactionInterceptor());
if (this.enableTx != null) {advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor() {TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource());
if (this.txManager != null) {interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
从下面能够看进去,ProxyTransactionManagementConfiguration 是一个配置类,它次要定义了:BeanFactoryTransactionAttributeSourceAdvisor、TransactionAttributeSource 和 TransactionInterceptor 三个 bean。其中 BeanFactoryTransactionAttributeSourceAdvisor 蕴含了 TransactionAttributeSource 和 TransactionInterceptor。
如果大家相熟 spring 的 aop 的话,必定晓得个别切面 advisor 由 pointcut 和 advice 组成。
而 TransactionInterceptor 实现了 Advice 接口。
Advice 有了,那么 pointcut 从哪里来呢?
咱们看一下 BeanFactoryTransactionAttributeSourceAdvisor 类自身。
能够看到这个类外面就定义了 TransactionAttributeSourcePointcut 对象。
有了这些性能,事务切面能够失常运行了。
回头再看看定义 advisor bean 实例的时候,应用了 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)注解,这样能够确保在应用 AOP 性能时可能正确的匹配到事务的 advisor
AnnotationTransactionAttributeSource 类有什么用?
它外面蕴含了事务注解解析器,说简略点它是用于解析事务注解属性的。
构造方法中会默认初始化 SpringTransactionAnnotationParser 类,
public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {
@Override
@Nullable
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(element, Transactional.class, false, false);
if (attributes != null) {return parseTransactionAnnotation(attributes);
}
else {return null;}
}
public TransactionAttribute parseTransactionAnnotation(Transactional ann) {return parseTransactionAnnotation(AnnotationUtils.getAnnotationAttributes(ann, false, false));
}
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
@Override
public boolean equals(Object other) {return (this == other || other instanceof SpringTransactionAnnotationParser);
}
@Override
public int hashCode() {return SpringTransactionAnnotationParser.class.hashCode();
}
}
咱们能够看到它会解析 @Transactional 注解上的属性,并且封装到 RuleBasedTransactionAttribute 对象中。
TransactionInterceptor 类有什么用?
它是真正执行事务性能的中央,前面我会用重点介绍。
spring 基于注解事务开启的性能,先介绍到这里,上面咱们一起看看 springboot 是如何开启事务的。
2、springboot 主动开启事务
如上图,找到 spring-boot-autoconfigure jar 包 META-INF 目录下的 spring.factories 文件。
关上该文件
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
org.springframework.boot.autoconfigure.admin.SpringApplicationAdminJmxAutoConfiguration,
org.springframework.boot.autoconfigure.aop.AopAutoConfiguration,
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,
org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration,
org.springframework.boot.autoconfigure.cache.CacheAutoConfiguration,
org.springframework.boot.autoconfigure.cassandra.CassandraAutoConfiguration,
......
org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,
......
能够看到这个文件中 key 为
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
的配置上面有一行
org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,
该类就是事务主动配置的外围代码,让咱们一起看看这个类。
@Configuration
@ConditionalOnClass(PlatformTransactionManager.class)
@AutoConfigureAfter({ JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
Neo4jDataAutoConfiguration.class })
@EnableConfigurationProperties(TransactionProperties.class)
public class TransactionAutoConfiguration {
// 定义用户自定义的事务管理器
@Bean
@ConditionalOnMissingBean
public TransactionManagerCustomizers platformTransactionManagerCustomizers(ObjectProvider<List<PlatformTransactionManagerCustomizer<?>>> customizers) {return new TransactionManagerCustomizers(customizers.getIfAvailable());
}
@Configuration
@ConditionalOnSingleCandidate(PlatformTransactionManager.class)
public static class TransactionTemplateConfiguration {
private final PlatformTransactionManager transactionManager;
public TransactionTemplateConfiguration(PlatformTransactionManager transactionManager) {this.transactionManager = transactionManager;}
// 定义 TransactionTemplate,用它做编程式事务开发
@Bean
@ConditionalOnMissingBean
public TransactionTemplate transactionTemplate() {return new TransactionTemplate(this.transactionManager);
}
}
@Configuration
@ConditionalOnBean(PlatformTransactionManager.class)
@ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class)
public static class EnableTransactionManagementConfiguration {
//proxy-target-class=false 示意配置了 jdk 动静代理
// 则应用注解 EnableTransactionManagement 的时候 proxyTargetClass = false
@Configuration
@EnableTransactionManagement(proxyTargetClass = false)
@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "false", matchIfMissing = false)
public static class JdkDynamicAutoProxyConfiguration { }
//proxy-target-class=true 示意配置了 cglib 生成代理
// 则应用注解 EnableTransactionManagement 的时候 proxyTargetClass = true
@Configuration
@EnableTransactionManagement(proxyTargetClass = true)
@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "true", matchIfMissing = true)
public static class CglibAutoProxyConfiguration {}}
}
so easy
springboot 开启事务性能的底层其实就是应用了 @EnableTransactionManagement 注解,又回到之前的逻辑。
这里就能够解释,为什么咱们在 springboot 程序外面没有应用过 @EnableTransactionManagement 注解开启事务,它却主动领有事务的性能。因为它在事务的主动配置中曾经应用了 @EnableTransactionManagement 注解开启事务。
至此,spring 如何开启事务曾经介绍完了。
接下来,咱们的重点是 TransactionInterceptor,因为在它外面实现了事务的逻辑。
二、外围拦截器
废话不不多说,间接上 TransactionInterceptor 的代码。
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {public TransactionInterceptor() { }
public TransactionInterceptor(PlatformTransactionManager ptm, Properties attributes) {setTransactionManager(ptm);
setTransactionAttributes(attributes);
}
public TransactionInterceptor(PlatformTransactionManager ptm, TransactionAttributeSource tas) {setTransactionManager(ptm);
setTransactionAttributeSource(tas);
}
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
代码其实比较简单,要害代码是 invokeWithinTransaction 办法。
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 获取事务属性解析器
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 获取事务惟一标识
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 如果是非事务执行 或者 非 JTA 事务
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创立事务信息
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 执行指标办法
retVal = invocation.proceedWithInvocation();}
catch (Throwable ex) {
// 回滚事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清理事务信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 如果是 JTA 事务
else {final ThrowableHolder throwableHolder = new ThrowableHolder();
try {Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {return invocation.proceedWithInvocation();
}
catch (Throwable ex) {if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {throw (RuntimeException) ex;
}
else {throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {cleanupTransactionInfo(txInfo);
}
});
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {throw throwableHolder.throwable;}
return result;
}
catch (ThrowableHolderException ex) {throw ex.getCause();
}
catch (TransactionSystemException ex2) {if (throwableHolder.throwable != null) {logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {if (throwableHolder.throwable != null) {logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
因为是外围代码,咱们用流程图梳理一下流程
咱们再看看 getTransactionAttribute 办法,这个办法在 AbstractFallbackTransactionAttributeSource 类外面。
@Override
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {if (method.getDeclaringClass() == Object.class) {return null;}
// 获取缓存的 key
Object cacheKey = getCacheKey(method, targetClass);
// 依据 key 从缓存中获取 TransactionAttribute 对象,缓存值
TransactionAttribute cached = this.attributeCache.get(cacheKey);
// 如果缓存值不为空,则间接返回缓存值
if (cached != null) {
// 如果缓存值为本人定义的空值,则返回 null
if (cached == NULL_TRANSACTION_ATTRIBUTE) {return null;}
else {return cached;}
}
// 如果缓存值为空
else {
// 计算 TransactionAttribute
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// 如果 TransactionAttribute 为空则缓存一个空值
if (txAttr == null) {this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
}
// 如果 TransactionAttribute 不为空,则作为缓存值增加到缓存中
else {String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
if (txAttr instanceof DefaultTransactionAttribute) {((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
if (logger.isDebugEnabled()) {logger.debug("Adding transactional method'" + methodIdentification + "'with attribute:" + txAttr);
}
// 增加到缓存
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}
看一下 computeTransactionAttribute 办法
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 如果指标办法不是 public,则间接返回空,示意非事务执行
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {return null;}
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// 第一步从指标办法上找事务属性。如果办法和类上都配置了事务,则以办法上的为准
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
// 如果存在则间接返回
if (txAttr != null) {return txAttr;}
// 第二步从指标类上找事务属性
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
// 如果存在则间接返回
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {return txAttr;}
if (specificMethod != method) {txAttr = findTransactionAttribute(method);
if (txAttr != null) {return txAttr;}
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;,}
}
return null;
}
❝if (allowPublicMethodsOnly()
&& !Modifier.isPublic(method.getModifiers())) {
return null;
}
这个判断条件就是非 public 办法为什么不反对事务的起因
说白了,下面的办法就是一系列的查找 TransactionAttribute(事务属性)的过程。
咱们重点看看 findTransactionAttribute 办法
@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Method method) {return determineTransactionAttribute(method);
}
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {for (TransactionAnnotationParser annotationParser : this.annotationParsers) {TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element);
if (attr != null) {return attr;}
}
return null;
}
private final Set<TransactionAnnotationParser> annotationParsers;
其中 annotationParsers 就是咱们之前说过 AnnotationTransactionAttributeSource 类的构造方法中对应的事务属性解析器。
咱们一探到底,一起看一下 parseTransactionAnnotation 办法,是如何解析的
@Override
@Nullable
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(element, Transactional.class, false, false);
if (attributes != null) {return parseTransactionAnnotation(attributes);
}
else {return null;}
}
能够看到这个办法会去解析 @Transactional 注解上的属性,如果注解属性不为空,则会调用 parseTransactionAnnotation 办法。
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
哈哈哈,这些不就是咱们相熟的事务属性吗?
这个办法从注解属性上获取到具体的属性值,封装到RuleBasedTransactionAttribute 类中返回。
接下来看一下 createTransactionIfNecessary 办法。
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果事务属性不为空,并且没有名称,则生成一个名称
if (txAttr != null && txAttr.getName() == null) {txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {return joinpointIdentification;}
};
}
TransactionStatus status = null;
if (txAttr != null) {if (tm != null) {
// 获取事务状态
status = tm.getTransaction(txAttr);
}
else {if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 封装事务信息对象,并且将事务信息对象绑定到以后线程
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
三、获取事务
这里先看一下 getTransaction 办法
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 获取事务对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果事务属性 definition 为空,则创立一个默认的事务属性 definition
if (definition == null) {definition = new DefaultTransactionDefinition();
}
// 如果存在事务
if (isExistingTransaction(transaction)) {
// 解决已有事务
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 如果事务不存在,并且超时工夫小于默认值 -1,则报异样
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 如果事务不存在,并且流传属性为 PROPAGATION_MANDATORY,则报异样
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation'mandatory'");
}
// 如果事务不存在,并且事务流传属性为空:PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 空挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {logger.debug("Creating new transaction with name [" + definition.getName() + "]:" + definition);
}
try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创立事务状态对象,留神这里的 newTransaction=true
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 获取数据库连贯,敞开数据主动提交,如果是只读事务,则设置为只读
// 同时获取数据库隔离根本和超时工夫,最初将数据库连贯绑定到以后线程
doBegin(transaction, definition);
// 将事务名称、激活状态、只读状态、隔离级别等绑定到以后线程
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {resume(null, suspendedResources);
throw ex;
}
}
else {
// 如果事务不存在,并且事务流传属性是:PROPAGATION_NEVER、PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED
// 如果定义了隔离级别然而事务又不存在,则打印一个正告
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated;" +
"isolation level will effectively be ignored:" + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 返回非事务的事务状态
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
再看看事务存在时调用的办法 handleExistingTransaction
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 如果事务存在,并且事务流传属性是 PROPAGATION_NEVER,则报异样
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation'never'");
}
// 如果事务存在,并且事务流传属性是 PROPAGATION_NOT_SUPPORTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {if (debugEnabled) {logger.debug("Suspending current transaction");
}
// 挂起以后事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 返回非事务的事务状态
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 如果事务存在,并且事务流传属性是 PROPAGATION_REQUIRES_NEW
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 挂起以后事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创立新事务状态
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 获取数据库连贯,敞开数据主动提交,如果是只读事务,则设置为只读
// 同时获取数据库隔离根本和超时工夫,最初将数据库连贯绑定到以后线程
doBegin(transaction, definition);
// 将事务名称、激活状态、只读状态、隔离级别等绑定到以后线程
prepareSynchronization(status, definition);
// 返回新的事务状态
return status;
}
catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 如果事务存在,并且事务流传属性是 PROPAGATION_NESTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 如果不容许嵌套事务,则报异样
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default -" +
"specify'nestedTransactionAllowed'property with value'true'");
}
if (debugEnabled) {logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 如果容许应用回滚点
if (useSavepointForNestedTransaction()) {
// 创立事务状态对象将 newTransaction 设置为 false
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 创立回滚点
status.createAndHoldSavepoint();
return status;
}
else {
// 如果不容许应用回滚点
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创立新的事务状态
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);
// 获取数据库连贯,敞开数据主动提交,如果是只读事务,则设置为只读
// 同时获取数据库隔离根本和超时工夫,最初将数据库连贯绑定到以后线程
doBegin(transaction, definition);
// 将事务名称、激活状态、只读状态、隔离级别等绑定到以后线程
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {logger.debug("Participating in existing transaction");
}
// 对事务的属性做一下校验
if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction:" +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
// 如果事务存在,并且流传属性是:PROPAGATION_REQUIRED、PROPAGATION_MANDATORY、PROPAGATION_SUPPORTS
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 将 newTransaction 设置为 false
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
这两段代码尽管我写了很多正文,有些敌人看起来可能还是一脸懵逼。让咱们一起先看看事务 7 钟流传属性的怎么回事。
- Propagation.REQUIRED(required):反对以后事务,如果以后有事务,那么退出事务,如果以后没有事务则新建一个(默认状况)
- Propagation.NOT_SUPPORTED(not_supported):以非事务形式执行操作,如果以后存在事务就把以后事务挂起,执行完后复原事务(疏忽以后事务);
- Propagation.SUPPORTS (supports):如果以后有事务则退出,如果没有则不必事务。
- Propagation.MANDATORY (mandatory):反对以后事务,如果以后没有事务,则抛出异样。(以后必须有事务)
- PROPAGATION_NEVER (never):以非事务形式执行,如果以后存在事务,则抛出异样。(以后必须不能有事务)
- Propagation.REQUIRES_NEW (requires_new):反对以后事务,如果以后有事务,则挂起以后事务,而后新创建一个事务,如果以后没有事务,则本人创立一个事务。
- Propagation.NESTED (nested 嵌套事务):如果以后存在事务,则嵌套在以后事务中。如果以后没有事务,则新建一个事务本人执行(和 required 一样)。嵌套的事务应用保留点作为回滚点,当内部事务回滚时不会影响内部事物的提交;然而内部回滚会把内部事务一起回滚回去。(这个和新建一个事务的区别)
而后用一张图给大家再形容一下事务的这 7 种流传属性是如何调用的。
再回头看看 doGetTransaction 办法,它是一个钩子办法,次要看 DataSourceTransactionManager 的实现
@Override
protected Object doGetTransaction() {
// 创立一个事务实体对象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 设置是否容许设置回滚点
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 获取数据库连贯的 holder
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 赋值
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
重点看看 TransactionSynchronizationManager.getResource 办法
@Nullable
public static Object getResource(Object key) {
// 获取 key,其实就是 dataSource
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
// 依据 key 获取值,其实就是 connectionHolder
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
进入 doGetResource 办法
//threadLocal 保留 connectionHolder
//map 中的 key 是 dataSource,value 是 connectionHolder
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
@Nullable
private static Object doGetResource(Object actualKey) {
// 从以后线程中获取数据连贯 map
Map<Object, Object> map = resources.get();
if (map == null) {return null;}
// 依据 dataSource 获取 connectionHolder
Object value = map.get(actualKey);
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {map.remove(actualKey);
if (map.isEmpty()) {resources.remove();
}
value = null;
}
// 返回 connectionHolder
return value;
}
这里就是嵌套事务和外层事务为什么能够共用同一个数据库连贯的起因,因为数据库连贯放在 threadLocal 中了,嵌套事务和外层事务共用同一个线程,就能够通过 threadLocal 获取同一个连贯。
❝
这里又引出一个问题:
内嵌事务 和 外层事务共用同一个数据库连贯,它们共用同一个事务对象吗?
再看看一下 doBegin 办法
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 如果没有数据库连贯或连贯不是同步事务状态
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 从 dataSource 中获取一个数据库连贯
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 将数据库连贯封装到 ConnectionHolder 对象中
// 并且设置到事务对象中
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 设置事务同步状态为 true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 从 ConnectionHolder 中获取数据库连贯
con = txObject.getConnectionHolder().getConnection();
// 获取数据库隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 将数据库隔离级别设置到以后事务对象中
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 如果以后数据库连贯是主动提交
if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 敞开主动提交
con.setAutoCommit(false);
}
// 如果事务属性配置了只读事务,则将事务设置成只读
prepareTransactionalConnection(con, definition);
// 设置数据库连贯 holder 事务状态为激活状态
txObject.getConnectionHolder().setTransactionActive(true);
// 获取事务属性中配置的超时工夫
int timeout = determineTimeout(definition);
// 如果配置的超时工夫不等于 -1
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
// 则给数据库连贯 holder 设置新的超时工夫
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 如果是新事务
if (txObject.isNewConnectionHolder()) {
// 将数据库连贯绑定到以后线程
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
// 如果是新连贯
if (txObject.isNewConnectionHolder()) {
// 开释数据库连贯
DataSourceUtils.releaseConnection(con, obtainDataSource());
// 将事务对象中的 ConnectionHolder 设置为空
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
咱们只看一下 bindResource 办法,其余的办法比较简单,我就不一一介绍了。
public static void bindResource(Object key, Object value) throws IllegalStateException {
// 获取 key,即 dataSource
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
// 从 threadLocal 中获取数据库连贯
Map<Object, Object> map = resources.get();
// 如果 map 为空,则创立一个 map,设置到 threadLocal 中
if (map == null) {map = new HashMap<>();
resources.set(map);
}
// 将新的数据库连贯放到 map 中,返回旧连贯对象
Object oldValue = map.put(actualKey, value);
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {oldValue = null;}
if (oldValue != null) {throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
if (logger.isTraceEnabled()) {logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
Thread.currentThread().getName() + "]");
}
}
❝
这里又引出一个问题:
threadLocal 中为什么要放一个 map 对象,key 是 dataSource,而 value 是 connectionHolder?
再看看挂起办法 suspend
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
// 如果事务事务激活状态
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 挂起事务同步资源,解绑线程绑定的连贯,连接池回收连贯
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 挂起事务
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
// 解绑线程中的事务名称
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
// 解绑线程中的只读状态
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
// 解绑线程中的只隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
// 解绑线程中的激活状态
TransactionSynchronizationManager.setActualTransactionActive(false);
// 封装挂起资源对象
return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
其实还有很多办法想介绍,然而因为篇幅的起因,在这里就不一一列出了,有趣味的敌人能够关注我的公众账号给我留言。
四、提交事务
提交事务的代码要从 commitTransactionAfterReturning 开始说起。
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
// 判断以后有事务信息,并且事务状态不为空,才容许提交事务
if (txInfo != null && txInfo.getTransactionStatus() != null) {if (logger.isTraceEnabled()) {logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 提交事务
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 如果事务状态是实现状态,则抛出异样,这种状况阐明曾经提交过了
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果事务状态事务是本地回滚
if (defStatus.isLocalRollbackOnly()) {if (defStatus.isDebug()) {logger.debug("Transactional code has requested rollback");
}
// 回滚事务
processRollback(defStatus, false);
return;
}
// 如果没有配置全局回滚,并且以后状态须要回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {if (defStatus.isDebug()) {logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 回滚事务
processRollback(defStatus, true);
return;
}
// 提交事务
processCommit(defStatus);
}
执行事务提交之前,先判断一下事务状态是否须要提交,如果有须要回滚的则回滚,最初提交事务交给 processCommit 办法。
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 空实现
prepareForCommit(status);
// 提交会话
triggerBeforeCommit(status);
// 解绑线程和会话绑定关系,敞开会话
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 如果配置了回滚点,阐明是嵌套事务
if (status.hasSavepoint()) {if (status.isDebug()) {logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 革除回滚点
status.releaseHeldSavepoint();}
// 如果是新事务
else if (status.isNewTransaction()) {if (status.isDebug()) {logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 提交事务
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = status.isGlobalRollbackOnly();
}
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// 敞开会话工厂,敞开会话,重置属性
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {doRollbackOnCommitException(status, ex);
}
else {
// 敞开会话工厂,敞开会话,重置属性
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {if (!beforeCompletionInvoked) {triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
try {
// 空实现
triggerAfterCommit(status);
}
finally {
// 敞开会话工厂,敞开会话,重置属性
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 解绑数据库连贯,重置并敞开数据库连贯
// 复原挂起的资源
cleanupAfterCompletion(status);
}
}
我再用一张流程图梳理一下整个流程
doCommit 办法代码很简略
@Override
protected void doCommit(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {con.commit();
}
catch (SQLException ex) {throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
就是调用数据库连贯的 commit 办法,清空回滚点也是调用数据库连贯的 releaseSavepoint 办法,比较简单在这里就不深刻介绍了。
五、回滚事务
回滚事务入口在 completeTransactionAfterThrowing 办法
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 如果存在事务信息,并且事务状态不为空
if (txInfo != null && txInfo.getTransactionStatus() != null) {if (logger.isTraceEnabled()) {logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception:" + ex);
}
// 如果属性属性不为空,并且须要回滚
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 回滚事务
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
// 如果不须要回滚事务
else {
try {
// 持续提交事务
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
咱们重点看一下是如何判断要回滚事务的,看一下 txInfo.transactionAttribute.rollbackOn 办法
@Override
public boolean rollbackOn(Throwable ex) {return (ex instanceof RuntimeException || ex instanceof Error);
}
❝
这里又引出一个问题:
默认状况下,事务是否须要回滚是依据异样类型判断的,如果异样类型为 RuntimeException 或者 Error,则须要回滚。那么问题来了,如果异样类型是 Exception 须要回滚不?如果不须要回滚,怎样才能让异样类型是 Exception 的事务回滚?
重点看一下事务回滚办法 rollback
@Override
public final void rollback(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
再进入 processRollback 办法
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 解绑线程和会话绑定关系,敞开会话
triggerBeforeCompletion(status);
// 如果设置了回滚点,阐明是嵌套事务
if (status.hasSavepoint()) {if (status.isDebug()) {logger.debug("Rolling back transaction to savepoint");
}
// 回滚到回滚点,清空回滚点
status.rollbackToHeldSavepoint();}
// 如果是新事务
else if (status.isNewTransaction()) {if (status.isDebug()) {logger.debug("Initiating transaction rollback");
}
// 回滚
doRollback(status);
}
else {
// jta 事务
if (status.hasTransaction()) {
// 如果是本地回滚或者全局回滚
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {if (status.isDebug()) {logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 设置回滚状态为须要回滚
doSetRollbackOnly(status);
}
else {if (status.isDebug()) {logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = false;}
}
}
catch (RuntimeException | Error ex) {
// 敞开会话工厂,敞开会话,重置属性
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 敞开会话工厂,敞开会话,重置属性
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 解绑数据库连贯,重置并敞开数据库连贯
// 复原挂起的资源
cleanupAfterCompletion(status);
}
}
再用一张流程图梳理一下流程
事务还有很多有意思的代码,因为篇幅的起因临时先介绍到这里。
文章两头,我提出了几个小问题敌人们有空能够思考一下,这些问题我会在下一篇文章中进行解读。欢送敌人们关注我的公众账号:苏三说技术,如果大家有一些问题能够给我留言。谢谢大家。
举荐浏览
- spring 事务的这 10 种坑,你稍不留神可能就会踩中!!!
- spring 解决循环依赖为什么要用三级缓存?
- java8 stream 的这些开发技巧,你值得好好珍藏
- mq 要如何解决音讯失落、反复生产?
- 什么是缓存击穿、雪崩、穿透
- 揭开 ThreadLocal 的神秘面纱
- 面试官:spring 的 BeanFatory 和 FactoryBean 区别