共计 9992 个字符,预计需要花费 25 分钟才能阅读完成。
序
本文次要钻研一下 spring 的 TransactionalEventListener
TransactionalEventListener
org/springframework/transaction/event/TransactionalEventListener.java
/** | |
* An {@link EventListener} that is invoked according to a {@link TransactionPhase}. | |
* | |
* <p>If the event is not published within an active transaction, the event is discarded | |
* unless the {@link #fallbackExecution} flag is explicitly set. If a transaction is | |
* running, the event is processed according to its {@code TransactionPhase}. | |
* | |
* <p>Adding {@link org.springframework.core.annotation.Order @Order} to your annotated | |
* method allows you to prioritize that listener amongst other listeners running before | |
* or after transaction completion. | |
* | |
* @author Stephane Nicoll | |
* @author Sam Brannen | |
* @since 4.2 | |
*/ | |
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) | |
@Retention(RetentionPolicy.RUNTIME) | |
@Documented | |
@EventListener | |
public @interface TransactionalEventListener { | |
/** | |
* Phase to bind the handling of an event to. | |
* <p>The default phase is {@link TransactionPhase#AFTER_COMMIT}. | |
* <p>If no transaction is in progress, the event is not processed at | |
* all unless {@link #fallbackExecution} has been enabled explicitly. | |
*/ | |
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT; | |
/** | |
* Whether the event should be processed if no transaction is running. | |
*/ | |
boolean fallbackExecution() default false; | |
/** | |
* Alias for {@link #classes}. | |
*/ | |
@AliasFor(annotation = EventListener.class, attribute = "classes") | |
Class<?>[] value() default {}; | |
/** | |
* The event classes that this listener handles. | |
* <p>If this attribute is specified with a single value, the annotated | |
* method may optionally accept a single parameter. However, if this | |
* attribute is specified with multiple values, the annotated method | |
* must <em>not</em> declare any parameters. | |
*/ | |
@AliasFor(annotation = EventListener.class, attribute = "classes") | |
Class<?>[] classes() default {}; | |
/** | |
* Spring Expression Language (SpEL) attribute used for making the event | |
* handling conditional. | |
* <p>The default is {@code ""}, meaning the event is always handled. | |
* @see EventListener#condition | |
*/ | |
String condition() default "";} |
TransactionalEventListener 是 EventListener 的事务感知版本,默认的是 TransactionPhase 是 AFTER_COMMIT
ApplicationListenerMethodTransactionalAdapter
org/springframework/transaction/event/ApplicationListenerMethodTransactionalAdapter.java
/** | |
* {@link GenericApplicationListener} adapter that delegates the processing of | |
* an event to a {@link TransactionalEventListener} annotated method. Supports | |
* the exact same features as any regular {@link EventListener} annotated method | |
* but is aware of the transactional context of the event publisher. | |
* | |
* <p>Processing of {@link TransactionalEventListener} is enabled automatically | |
* when Spring's transaction management is enabled. For other cases, registering | |
* a bean of type {@link TransactionalEventListenerFactory} is required. | |
* | |
* @author Stephane Nicoll | |
* @author Juergen Hoeller | |
* @since 4.2 | |
* @see ApplicationListenerMethodAdapter | |
* @see TransactionalEventListener | |
*/ | |
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter { | |
private final TransactionalEventListener annotation; | |
public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {super(beanName, targetClass, method); | |
TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class); | |
if (ann == null) {throw new IllegalStateException("No TransactionalEventListener annotation found on method:" + method); | |
} | |
this.annotation = ann; | |
} | |
@Override | |
public void onApplicationEvent(ApplicationEvent event) {if (TransactionSynchronizationManager.isSynchronizationActive() | |
&& TransactionSynchronizationManager.isActualTransactionActive()) {TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); | |
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); | |
} | |
else if (this.annotation.fallbackExecution()) {if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {logger.warn("Processing" + event + "as a fallback execution on AFTER_ROLLBACK phase"); | |
} | |
processEvent(event); | |
} | |
else { | |
// No transactional event execution at all | |
if (logger.isDebugEnabled()) {logger.debug("No transaction is active - skipping" + event); | |
} | |
} | |
} | |
private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) {return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase()); | |
} | |
private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter { | |
private final ApplicationListenerMethodAdapter listener; | |
private final ApplicationEvent event; | |
private final TransactionPhase phase; | |
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener, | |
ApplicationEvent event, TransactionPhase phase) { | |
this.listener = listener; | |
this.event = event; | |
this.phase = phase; | |
} | |
@Override | |
public int getOrder() {return this.listener.getOrder(); | |
} | |
@Override | |
public void beforeCommit(boolean readOnly) {if (this.phase == TransactionPhase.BEFORE_COMMIT) {processEvent(); | |
} | |
} | |
@Override | |
public void afterCompletion(int status) {if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {processEvent(); | |
} | |
else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {processEvent(); | |
} | |
else if (this.phase == TransactionPhase.AFTER_COMPLETION) {processEvent(); | |
} | |
} | |
protected void processEvent() {this.listener.processEvent(this.event); | |
} | |
} | |
} |
ApplicationListenerMethodTransactionalAdapter 继承了 ApplicationListenerMethodAdapter,它的结构器会找到指定办法的 TransactionalEventListener 信息;其 onApplicationEvent 办法在有事务的时候会创立并注册 transactionSynchronization 到以后事务,没有事务若容许 fallbackExecution 也会执行 processEvent
TransactionSynchronizationEventAdapter 只是笼罩了 beforeCommit 及 afterCompletion 两个办法,在 afterCompletion 办法中依据 status 的值与 phase 的值的匹配关系决定是否执行 processEvent
TransactionalEventListenerFactory
org/springframework/transaction/event/TransactionalEventListenerFactory.java
/** | |
* {@link EventListenerFactory} implementation that handles {@link TransactionalEventListener} | |
* annotated methods. | |
* | |
* @author Stephane Nicoll | |
* @since 4.2 | |
*/ | |
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered { | |
private int order = 50; | |
public void setOrder(int order) {this.order = order;} | |
@Override | |
public int getOrder() {return this.order;} | |
@Override | |
public boolean supportsMethod(Method method) {return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class); | |
} | |
@Override | |
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method); | |
} | |
} |
TransactionalEventListenerFactory 用于创立 ApplicationListenerMethodTransactionalAdapter
EventListenerMethodProcessor
org/springframework/context/event/EventListenerMethodProcessor.java
public class EventListenerMethodProcessor | |
implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor { | |
//...... | |
@Override | |
public void afterSingletonsInstantiated() { | |
ConfigurableListableBeanFactory beanFactory = this.beanFactory; | |
Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set"); | |
String[] beanNames = beanFactory.getBeanNamesForType(Object.class); | |
for (String beanName : beanNames) {if (!ScopedProxyUtils.isScopedTarget(beanName)) { | |
Class<?> type = null; | |
try {type = AutoProxyUtils.determineTargetClass(beanFactory, beanName); | |
} | |
catch (Throwable ex) { | |
// An unresolvable bean type, probably from a lazy bean - let's ignore it. | |
if (logger.isDebugEnabled()) {logger.debug("Could not resolve target class for bean with name'" + beanName + "'", ex); | |
} | |
} | |
if (type != null) {if (ScopedObject.class.isAssignableFrom(type)) { | |
try { | |
Class<?> targetClass = AutoProxyUtils.determineTargetClass(beanFactory, ScopedProxyUtils.getTargetBeanName(beanName)); | |
if (targetClass != null) {type = targetClass;} | |
} | |
catch (Throwable ex) { | |
// An invalid scoped proxy arrangement - let's ignore it. | |
if (logger.isDebugEnabled()) {logger.debug("Could not resolve target bean for scoped proxy'" + beanName + "'", ex); | |
} | |
} | |
} | |
try {processBean(beanName, type); | |
} | |
catch (Throwable ex) { | |
throw new BeanInitializationException("Failed to process @EventListener" + | |
"annotation on bean with name'" + beanName + "'", ex); | |
} | |
} | |
} | |
} | |
} | |
private void processBean(final String beanName, final Class<?> targetType) {if (!this.nonAnnotatedClasses.contains(targetType) && | |
AnnotationUtils.isCandidateClass(targetType, EventListener.class) && | |
!isSpringContainerClass(targetType)) { | |
Map<Method, EventListener> annotatedMethods = null; | |
try { | |
annotatedMethods = MethodIntrospector.selectMethods(targetType, | |
(MethodIntrospector.MetadataLookup<EventListener>) method -> | |
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class)); | |
} | |
catch (Throwable ex) { | |
// An unresolvable type in a method signature, probably from a lazy bean - let's ignore it. | |
if (logger.isDebugEnabled()) {logger.debug("Could not resolve methods for bean with name'" + beanName + "'", ex); | |
} | |
} | |
if (CollectionUtils.isEmpty(annotatedMethods)) {this.nonAnnotatedClasses.add(targetType); | |
if (logger.isTraceEnabled()) {logger.trace("No @EventListener annotations found on bean class:" + targetType.getName()); | |
} | |
} | |
else { | |
// Non-empty set of methods | |
ConfigurableApplicationContext context = this.applicationContext; | |
Assert.state(context != null, "No ApplicationContext set"); | |
List<EventListenerFactory> factories = this.eventListenerFactories; | |
Assert.state(factories != null, "EventListenerFactory List not initialized"); | |
for (Method method : annotatedMethods.keySet()) {for (EventListenerFactory factory : factories) {if (factory.supportsMethod(method)) {Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName)); | |
ApplicationListener<?> applicationListener = | |
factory.createApplicationListener(beanName, targetType, methodToUse); | |
if (applicationListener instanceof ApplicationListenerMethodAdapter) {((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator); | |
} | |
context.addApplicationListener(applicationListener); | |
break; | |
} | |
} | |
} | |
if (logger.isDebugEnabled()) {logger.debug(annotatedMethods.size() + "@EventListener methods processed on bean'" + | |
beanName + "':" + annotatedMethods); | |
} | |
} | |
} | |
} | |
//...... | |
} |
EventListenerMethodProcessor 实现了 SmartInitializingSingleton 接口,其 afterSingletonsInstantiated 办法先确定 type,而后执行 processBean,该办法会先收集 annotatedMethods,而后遍历该办法,在遍历 factories 针对反对该办法的 factory 执行 createApplicationListener,增加到 context 中
小结
TransactionalEventListener 是 EventListener 的事务感知版本,默认的是 TransactionPhase 是 AFTER_COMMIT,TransactionSynchronizationEventAdapter 只是笼罩了 beforeCommit 及 afterCompletion 两个办法,在 afterCompletion 办法中依据 status 的值与 phase 的值的匹配关系决定是否执行 processEvent,因此这里抛出的异样会被捕捉并 log 下来
doc
- 聊聊 TransactionSynchronization 的 invokeAfterCompletion
- 聊聊 spring 的 TransactionSynchronizationAdapter