共计 6623 个字符,预计需要花费 17 分钟才能阅读完成。
序
本文次要钻研一下 spring 的 TransactionSynchronizationAdapter
示例代码
public void insert(TechBook techBook){bookMapper.insert(techBook); | |
// send after tx commit but is async | |
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { | |
@Override | |
public void afterCommit() {System.out.println("send email after transaction commit..."); | |
} | |
} | |
); | |
System.out.println("service end"); | |
} |
应用 TransactionSynchronizationManager.registerSynchronization 注册了一个 TransactionSynchronizationAdapter,在其 afterCommit 办法也就是事务提交胜利之后执行一些额定逻辑
TransactionSynchronizationAdapter
org/springframework/transaction/support/TransactionSynchronizationAdapter.java
public abstract class TransactionSynchronizationAdapter implements TransactionSynchronization, Ordered { | |
@Override | |
public int getOrder() {return Ordered.LOWEST_PRECEDENCE;} | |
@Override | |
public void suspend() {} | |
@Override | |
public void resume() {} | |
@Override | |
public void flush() {} | |
@Override | |
public void beforeCommit(boolean readOnly) { } | |
@Override | |
public void beforeCompletion() {} | |
@Override | |
public void afterCommit() {} | |
@Override | |
public void afterCompletion(int status) {}} |
TransactionSynchronizationAdapter 是个抽象类,申明实现 TransactionSynchronization 及 Ordered 接口
TransactionSynchronization
org/springframework/transaction/support/TransactionSynchronization.java
/** | |
* Invoked after transaction commit. Can perform further operations right | |
* <i>after</i> the main transaction has <i>successfully</i> committed. | |
* <p>Can e.g. commit further operations that are supposed to follow on a successful | |
* commit of the main transaction, like confirmation messages or emails. | |
* <p><b>NOTE:</b> The transaction will have been committed already, but the | |
* transactional resources might still be active and accessible. As a consequence, | |
* any data access code triggered at this point will still "participate" in the | |
* original transaction, allowing to perform some cleanup (with no commit following | |
* anymore!), unless it explicitly declares that it needs to run in a separate | |
* transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW} for any | |
* transactional operation that is called from here.</b> | |
* @throws RuntimeException in case of errors; will be <b>propagated to the caller</b> | |
* (note: do not throw TransactionException subclasses here!) | |
*/ | |
default void afterCommit() {} |
留神这里正文阐明了异样不会被捕捉,而且倡议不在这里抛出 TransactionException 的子类;另外对于 afterCommit 有数据库相干操作的倡议应用 PROPAGATION_REQUIRES_NEW 这个事务流传级别,不然 afterCommit 的操作可能不会失效
registerSynchronization
org/springframework/transaction/support/TransactionSynchronizationManager.java
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = | |
new NamedThreadLocal<>("Transaction synchronizations"); | |
/** | |
* Register a new transaction synchronization for the current thread. | |
* Typically called by resource management code. | |
* <p>Note that synchronizations can implement the | |
* {@link org.springframework.core.Ordered} interface. | |
* They will be executed in an order according to their order value (if any). | |
* @param synchronization the synchronization object to register | |
* @throws IllegalStateException if transaction synchronization is not active | |
* @see org.springframework.core.Ordered | |
*/ | |
public static void registerSynchronization(TransactionSynchronization synchronization) | |
throws IllegalStateException {Assert.notNull(synchronization, "TransactionSynchronization must not be null"); | |
Set<TransactionSynchronization> synchs = synchronizations.get(); | |
if (synchs == null) {throw new IllegalStateException("Transaction synchronization is not active"); | |
} | |
synchs.add(synchronization); | |
} |
TransactionSynchronizationManager 的 registerSynchronization 办法会把 TransactionSynchronization 注册到以后线程的 synchronizations
processCommit
org/springframework/transaction/support/AbstractPlatformTransactionManager.java
private void processCommit(DefaultTransactionStatus status) throws TransactionException { | |
try { | |
boolean beforeCompletionInvoked = false; | |
try { | |
boolean unexpectedRollback = false; | |
prepareForCommit(status); | |
triggerBeforeCommit(status); | |
triggerBeforeCompletion(status); | |
beforeCompletionInvoked = true; | |
if (status.hasSavepoint()) {if (status.isDebug()) {logger.debug("Releasing transaction savepoint"); | |
} | |
unexpectedRollback = status.isGlobalRollbackOnly(); | |
status.releaseHeldSavepoint();} | |
else if (status.isNewTransaction()) {if (status.isDebug()) {logger.debug("Initiating transaction commit"); | |
} | |
unexpectedRollback = status.isGlobalRollbackOnly(); | |
doCommit(status); | |
} | |
else if (isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = status.isGlobalRollbackOnly(); | |
} | |
// Throw UnexpectedRollbackException if we have a global rollback-only | |
// marker but still didn't get a corresponding exception from commit. | |
if (unexpectedRollback) { | |
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only"); | |
} | |
} | |
catch (UnexpectedRollbackException ex) { | |
// can only be caused by doCommit | |
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); | |
throw ex; | |
} | |
catch (TransactionException ex) { | |
// can only be caused by doCommit | |
if (isRollbackOnCommitFailure()) {doRollbackOnCommitException(status, ex); | |
} | |
else {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); | |
} | |
throw ex; | |
} | |
catch (RuntimeException | Error ex) {if (!beforeCompletionInvoked) {triggerBeforeCompletion(status); | |
} | |
doRollbackOnCommitException(status, ex); | |
throw ex; | |
} | |
// Trigger afterCommit callbacks, with an exception thrown there | |
// propagated to callers but the transaction still considered as committed. | |
try {triggerAfterCommit(status); | |
} | |
finally {triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); | |
} | |
} | |
finally {cleanupAfterCompletion(status); | |
} | |
} | |
private void triggerAfterCommit(DefaultTransactionStatus status) {if (status.isNewSynchronization()) {if (status.isDebug()) {logger.trace("Triggering afterCommit synchronization"); | |
} | |
TransactionSynchronizationUtils.triggerAfterCommit();} | |
} |
AbstractPlatformTransactionManager 的 processCommit 办法,在提交胜利之后触发 triggerAfterCommit,这里调用了 TransactionSynchronizationUtils.triggerAfterCommit(),留神这里没有 try catch,阐明 triggerAfterCommit 的异样最终会抛给调用方
triggerAfterCommit
org/springframework/transaction/support/TransactionSynchronizationUtils.java
public static void triggerAfterCommit() {invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations()); | |
} | |
public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {if (synchronizations != null) {for (TransactionSynchronization synchronization : synchronizations) {synchronization.afterCommit(); | |
} | |
} | |
} |
这里遍历 synchronizations 执行 afterCommit 办法,如果其中有一个有异样抛出则中断
小结
应用 TransactionSynchronizationManager.registerSynchronization 能够在以后线程的事务注册一个 TransactionSynchronizationAdapter,能够在 afterCommit 办法也就是事务提交胜利之后执行一些额定逻辑;留神这里抛出的异样不影响事务提交,然而异样不会被 catch 须要由调用方解决,对于 afterCommit 有数据库相干操作的倡议应用 PROPAGATION_REQUIRES_NEW 这个事务流传级别,不然 afterCommit 的 db 操作可能不会失效。
在事务提交之后做一些事件可能不须要 TransactionSynchronizationManager.registerSynchronization 这种形式也能实现,也就是须要额定一层来调用事务操作,有异样会抛出,没有异样则执行事务提交之后的事件,前提就是事务回滚异样不能被吞掉,不然外层调用可能认为事务胜利了
还有一种形式就是应用 TransactionalEventListener,这种形式比 TransactionSynchronizationManager.registerSynchronization 更为优雅一些
doc
- 如何在数据库事务提交胜利后进行异步操作
- Spring 事务 aftercommit 原理及实际