共计 16604 个字符,预计需要花费 42 分钟才能阅读完成。
本文和大家一起刨析 Spring 事务的相干源码,篇幅较长,代码片段较多,倡议应用电脑浏览
本文指标
- 了解 Spring 事务管理外围接口
- 了解 Spring 事务管理的外围逻辑
- 了解事务的流传类型及其实现原理
版本
SpringBoot 2.3.3.RELEASE
什么是事务的流传?
Spring 除了封装了事务管制之外,还形象出了 事务的流传 这个概念,事务的流传并不是关系型数据库所定义的,而是 Spring 在封装事务时做的加强扩大,能够通过@Transactional
指定事务的流传,具体类型如下
事务流传行为类型 | 阐明 |
---|---|
PROPAGATION_REQUIRED |
如果以后没有事务,就新建一个事务,如果曾经存在一个事务中,退出到这个事务中。Spring 的默认事务流传类型 |
PROPAGATION_SUPPORTS |
反对以后事务,如果以后没有事务,就以非事务形式执行。 |
PROPAGATION_MANDATORY |
应用以后的事务,如果以后没有事务,就抛出异样。 |
PROPAGATION_REQUIRES_NEW |
新建事务,如果以后存在事务,把以后事务挂起(暂停)。 |
PROPAGATION_NOT_SUPPORTED |
以非事务形式执行操作,如果以后存在事务,就把以后事务挂起。 |
PROPAGATION_NEVER |
以非事务形式执行,如果以后存在事务,则抛出异样。 |
PROPAGATION_NESTED |
如果以后存在事务,则在嵌套事务内执行。如果以后没有事务,则执行与 PROPAGATION_REQUIRED 相似的操作。 |
举个栗子
以嵌套事务为例
@Service
public class DemoServiceImpl implements DemoService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private DemoServiceImpl self;
@Transactional
@Override
public void insertDB() {String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";
jdbcTemplate.update(sql, uuid(), "taven");
try {
// 内嵌事务将会回滚,而内部事务不会受到影响
self.nested();} catch (Exception e) {e.printStackTrace();
}
}
@Transactional(propagation = Propagation.NESTED)
@Override
public void nested() {String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";
jdbcTemplate.update(sql, uuid(), "nested");
throw new RuntimeException("rollback nested");
}
private String uuid() {return UUID.randomUUID().toString();}
}
上述代码中,nested()办法标记了事务流传类型为嵌套,如果 nested()
中抛出异样仅会回滚 nested()
办法中的 sql,不会影响到 insertDB()
办法中曾经执行的 sql
留神:service 调用外部办法时,如果间接应用 this 调用,事务不会失效。因而应用 this 调用相当于跳过了内部的代理类,所以 AOP 不会失效,无奈应用事务
思考
家喻户晓,Spring 事务是通过 AOP 实现的,如果是咱们本人写一个 AOP 管制事务,该怎么做呢?
// 伪代码
public Object invokeWithinTransaction() {
// 开启事务
connection.beginTransaction();
try {
// 反射执行办法
Object result = invoke();
// 提交事务
connection.commit();
return result;
} catch(Exception e) {
// 产生异样时回滚
connection.rollback();
throw e;
}
}
在这个根底上,咱们来思考一下如果是咱们本人做的话,事务的流传该如何实现
以 PROPAGATION_REQUIRED
为例,这个仿佛很简略,咱们判断一下以后是否有事务(能够思考应用 ThreadLocal 存储已存在的事务对象),如果有事务,那么就不开启新的事务。反之,没有事务,咱们就创立新的事务
如果事务是由以后切面开启的,则提交 / 回滚事务,反之不做解决
那么事务流传中形容的挂起(暂停)以后事务,和内嵌事务是如何实现的?
<!–
这里提前剧透一下,内嵌事务是应用关系型数据库的 savepoint 实现的
–>
源码动手
要浏览事务流传相干的源码,咱们先来理解下 Spring 事务管理的外围接口与类
- TransactionDefinition
该接口定义了事务的所有属性(隔离级别,流传类型,超时工夫等等),咱们日常开发中常常应用的 @Transactional
其实最终会被转化为 TransactionDefinition
- TransactionStatus
事务的状态,以最罕用的实现 DefaultTransactionStatus 为例,该类存储了以后的事务对象,savepoint,以后挂起的事务,是否实现,是否仅回滚等等
- TransactionManager
这是一个空接口,间接继承他的 interface 有 PlatformTransactionManager(咱们平时用的就是这个,默认的实现类 DataSourceTransactionManager)以及
ReactiveTransactionManager(响应式事务管理器,因为不是本文重点,咱们不多说)
从上述两个接口来看,TransactionManager 的次要作用
- 通过 TransactionDefinition 开启一个事务,返回 TransactionStatus
- 通过 TransactionStatus 提交、回滚事务(理论开启事务的 Connection 通常存储在 TransactionStatus 中)
public interface PlatformTransactionManager extends TransactionManager {TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}
- TransactionInterceptor
事务拦截器,事务 AOP 的外围类(反对响应式事务,编程式事务,以及咱们罕用的规范事务),因为篇幅起因,本文只探讨规范事务的相干实现
上面咱们从事务逻辑的入口 TransactionInterceptor 动手,来看下 Spring 事务管理的外围逻辑以及事务流传的实现
TransactionInterceptor
TransactionInterceptor 实现了 MethodInvocation(这是实现 AOP 的一种形式),其外围逻辑在父类TransactionAspectSupport 中,办法地位:TransactionInterceptor::invokeWithinTransaction
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
// 以后事务的属性 TransactionAttribute extends TransactionDefinition
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 事务属性中能够定义以后应用哪个事务管理器
// 如果没有定义就去 Spring 上下文找到一个可用的 TransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
// 省略了响应式事务的解决 ...
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 如果有下一个拦截器则执行,最终会执行到指标办法,也就是咱们的业务代码
retVal = invocation.proceedWithInvocation();}
catch (Throwable ex) {
// target invocation exception
// 当捕捉到异样时实现以后事务(提交或者回滚)completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 依据事务的状态提交或者回滚
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 省略了编程式事务的解决 ...
}
这里代码很多,依据正文的地位,咱们能够把外围逻辑梳理进去
- 获取以后事务属性,事务管理器(以注解事务为例,这些都能够通过
@Transactional
来定义) createTransactionIfNecessary
,判断是否有必要创立事务invocation.proceedWithInvocation
执行拦截器链,最终会执行到指标办法completeTransactionAfterThrowing
当抛出异样后,实现这个事务,提交或者回滚,并抛出这个异样commitTransactionAfterReturning
从办法命名来看,这个办法会提交事务。
然而深刻源码中会发现,该办法中也蕴含回滚逻辑,具体行为会依据以后 TransactionStatus 的一些状态来决定(也就是说,咱们也能够通过设置以后 TransactionStatus,来管制事务回滚,并不一定只能通过抛出异样),详见AbstractPlatformTransactionManager::commit
<!–
业务代码中能够通过
TransactionAspectSupport.currentTransactionStatus()
获取以后 TransactionStatus
–>
咱们持续,来看看 createTransactionIfNecessary 做了什么
TransactionAspectSupport::createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {return joinpointIdentification;}
};
}
TransactionStatus status = null;
if (txAttr != null) {if (tm != null) {
// 通过事务管理器开启事务
status = tm.getTransaction(txAttr);
}
else {if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
createTransactionIfNecessary 中的外围逻辑
- 通过 PlatformTransactionManager(事务管理器)开启事务
prepareTransactionInfo
筹备事务信息,这个具体做了什么咱们稍后再讲
持续来看PlatformTransactionManager::getTransaction
,该办法只有一个实现 AbstractPlatformTransactionManager::getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取以后事务,该办法有继承 AbstractPlatformTransactionManager 的子类自行实现
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果目前存在事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 流传类型 PROPAGATION_MANDATORY, 要求以后必须有事务
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation'mandatory'");
}
// PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED 不存在事务时创立事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {logger.debug("Creating new transaction with name [" + def.getName() + "]:" + def);
}
try {
// 开启事务
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated;" +
"isolation level will effectively be ignored:" + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
代码很多,重点关注正文局部即可
doGetTransaction
获取以后事务- 如果存在事务,则调用
handleExistingTransaction
解决,这个咱们稍后会讲到
接下来,会依据事务的流传决定是否开启事务
- 如果事务流传类型为
PROPAGATION_MANDATORY
,且不存在事务,则抛出异样 - 如果流传类型为
PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED
,且以后不存在事务,则调用startTransaction
创立事务 - 当不满足 3、4 时,例如
PROPAGATION_NOT_SUPPORTED
,此时会执行 事务同步,然而不会创立真正的事务
Spring 事务同步 在之前一篇博客中有讲到,传送门????https://www.jianshu.com/p/788…
Spring 如何治理以后的事务
接下来讲讲下面提到的doGetTransaction
、handleExistingTransaction
,这两个办法是由不同的 TransactionManager 自行实现的
咱们以 SpringBoot 默认的 TransactionManager,DataSourceTransactionManager 为例
@Override
protected Object doGetTransaction() {DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
@Override
protected boolean isExistingTransaction(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
联合 AbstractPlatformTransactionManager::getTransaction
一起来看,doGetTransaction
其实获取的是以后的 Connection。
判断以后是否存在事务,是判断 DataSourceTransactionObject 对象中是否蕴含 connection,以及 connection 是否开启了事务。
咱们持续来看下 TransactionSynchronizationManager.getResource(obtainDataSource())
获取以后 connection 的逻辑
TransactionSynchronizationManager::getResource
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
@Nullable
// TransactionSynchronizationManager::getResource
public static Object getResource(Object key) {
// DataSourceTransactionManager 调用该办法时,以数据源作为 key
// TransactionSynchronizationUtils::unwrapResourceIfNecessary 如果 key 为包装类,则获取被包装的对象
// 咱们能够疏忽该逻辑
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
/**
* Actually check the value of the resource that is bound for the given key.
*/
@Nullable
private static Object doGetResource(Object actualKey) {Map<Object, Object> map = resources.get();
if (map == null) {return null;}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {resources.remove();
}
value = null;
}
return value;
}
看到这里,咱们能明确 DataSourceTransactionManager 是如何治理线程之间的 Connection,ThreadLocal 中存储一个 Map,key 为数据源对象,value 为该数据源在以后线程的 Connection
DataSourceTransactionManager 在开启事务后,会调用 TransactionSynchronizationManager::bindResource
将指定数据源的 Connection 绑定到以后线程
AbstractPlatformTransactionManager::handleExistingTransaction
咱们持续回头看,如果存在事务的状况,如何解决
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 如果事务的流传要求以非事务形式执行 抛出异样
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation'never'");
}
// PROPAGATION_NOT_SUPPORTED 如果存在事务,则挂起以后事务,以非事务形式执行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {if (debugEnabled) {logger.debug("Suspending current transaction");
}
// 挂起以后事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 构建一个无事务的 TransactionStatus
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW 如果存在事务,则挂起以后事务,新建一个事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED 内嵌事务,就是咱们结尾举得例子
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default -" +
"specify'nestedTransactionAllowed'property with value'true'");
}
if (debugEnabled) {logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 非 JTA 事务管理器都是通过 savePoint 实现的内嵌事务
// savePoint:关系型数据库中事务能够创立还原点,并且能够回滚到还原点
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 创立还原点
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// 如果执行到这一步流传类型肯定是,PROPAGATION_SUPPORTS 或者 PROPAGATION_REQUIRED
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {logger.debug("Participating in existing transaction");
}
// 校验目前办法中的事务定义和已存在的事务定义是否统一
if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction:" +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 构建一个 TransactionStatus,但不开启事务
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
这里代码很多,逻辑看上述正文即可。这里终于看到了期待已久的 挂起事务和内嵌事务 了,咱们还是看一下 DataSourceTransactionManager 的实现
- 挂起事务:通过
TransactionSynchronizationManager::unbindResource
依据数据源获取以后的 Connection,并在 resource 中移除该 Connection。之后会将该 Connection 存储到 TransactionStatus 对象中
// DataSourceTransactionManager::doSuspend
@Override
protected Object doSuspend(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
在事务提交或者回滚后,调用 AbstractPlatformTransactionManager::cleanupAfterCompletion
会将 TransactionStatus 中缓存的 Connection 从新绑定到 resource 中
- 内嵌事务:通过关系型数据库的 savePoint 实现,提交或回滚的时候会判断如果以后事务为 savePoint 则开释 savePoint 或者回滚到 savePoint,具体逻辑参考
AbstractPlatformTransactionManager::processRollback
和AbstractPlatformTransactionManager::processCommit
至此,事务的流传源码剖析完结
prepareTransactionInfo
上文留下了一个问题,prepareTransactionInfo 办法做了什么,咱们先来看下 TransactionInfo
的构造
protected static final class TransactionInfo {
@Nullable
private final PlatformTransactionManager transactionManager;
@Nullable
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
@Nullable
private TransactionStatus transactionStatus;
@Nullable
private TransactionInfo oldTransactionInfo;
// ...
}
该类在 Spring 中的作用,是为了外部传递对象。ThreadLocal 中存储了最新的 TransactionInfo,通过以后 TransactionInfo 能够找到他的 oldTransactionInfo。每次创立事务时会新建一个 TransactionInfo(无论有没有真正的事务被创立)存储到 ThreadLocal 中,在每次事务完结后,会将以后 ThreadLocal 中的 TransactionInfo 重置为 oldTransactionInfo,这样的构造造成了一个链表,使得 Spring 事务在逻辑上能够有限嵌套上来
如果感觉有播种,能够关注我的公众号,你的点赞和关注就是对我最大的反对