本文次要钻研一下spring的TransactionalEventListener

TransactionalEventListener

org/springframework/transaction/event/TransactionalEventListener.java

/** * An {@link EventListener} that is invoked according to a {@link TransactionPhase}. * * <p>If the event is not published within an active transaction, the event is discarded * unless the {@link #fallbackExecution} flag is explicitly set. If a transaction is * running, the event is processed according to its {@code TransactionPhase}. * * <p>Adding {@link org.springframework.core.annotation.Order @Order} to your annotated * method allows you to prioritize that listener amongst other listeners running before * or after transaction completion. * * @author Stephane Nicoll * @author Sam Brannen * @since 4.2 */@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@EventListenerpublic @interface TransactionalEventListener {    /**     * Phase to bind the handling of an event to.     * <p>The default phase is {@link TransactionPhase#AFTER_COMMIT}.     * <p>If no transaction is in progress, the event is not processed at     * all unless {@link #fallbackExecution} has been enabled explicitly.     */    TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;    /**     * Whether the event should be processed if no transaction is running.     */    boolean fallbackExecution() default false;    /**     * Alias for {@link #classes}.     */    @AliasFor(annotation = EventListener.class, attribute = "classes")    Class<?>[] value() default {};    /**     * The event classes that this listener handles.     * <p>If this attribute is specified with a single value, the annotated     * method may optionally accept a single parameter. However, if this     * attribute is specified with multiple values, the annotated method     * must <em>not</em> declare any parameters.     */    @AliasFor(annotation = EventListener.class, attribute = "classes")    Class<?>[] classes() default {};    /**     * Spring Expression Language (SpEL) attribute used for making the event     * handling conditional.     * <p>The default is {@code ""}, meaning the event is always handled.     * @see EventListener#condition     */    String condition() default "";}
TransactionalEventListener是EventListener的事务感知版本,默认的是TransactionPhase是AFTER_COMMIT

ApplicationListenerMethodTransactionalAdapter

org/springframework/transaction/event/ApplicationListenerMethodTransactionalAdapter.java

/** * {@link GenericApplicationListener} adapter that delegates the processing of * an event to a {@link TransactionalEventListener} annotated method. Supports * the exact same features as any regular {@link EventListener} annotated method * but is aware of the transactional context of the event publisher. * * <p>Processing of {@link TransactionalEventListener} is enabled automatically * when Spring's transaction management is enabled. For other cases, registering * a bean of type {@link TransactionalEventListenerFactory} is required. * * @author Stephane Nicoll * @author Juergen Hoeller * @since 4.2 * @see ApplicationListenerMethodAdapter * @see TransactionalEventListener */class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {    private final TransactionalEventListener annotation;    public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {        super(beanName, targetClass, method);        TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);        if (ann == null) {            throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method);        }        this.annotation = ann;    }    @Override    public void onApplicationEvent(ApplicationEvent event) {        if (TransactionSynchronizationManager.isSynchronizationActive()                && TransactionSynchronizationManager.isActualTransactionActive()) {            TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);        }        else if (this.annotation.fallbackExecution()) {            if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {                logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");            }            processEvent(event);        }        else {            // No transactional event execution at all            if (logger.isDebugEnabled()) {                logger.debug("No transaction is active - skipping " + event);            }        }    }    private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) {        return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase());    }    private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {        private final ApplicationListenerMethodAdapter listener;        private final ApplicationEvent event;        private final TransactionPhase phase;        public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,                ApplicationEvent event, TransactionPhase phase) {            this.listener = listener;            this.event = event;            this.phase = phase;        }        @Override        public int getOrder() {            return this.listener.getOrder();        }        @Override        public void beforeCommit(boolean readOnly) {            if (this.phase == TransactionPhase.BEFORE_COMMIT) {                processEvent();            }        }        @Override        public void afterCompletion(int status) {            if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {                processEvent();            }            else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {                processEvent();            }            else if (this.phase == TransactionPhase.AFTER_COMPLETION) {                processEvent();            }        }        protected void processEvent() {            this.listener.processEvent(this.event);        }    }}
ApplicationListenerMethodTransactionalAdapter继承了ApplicationListenerMethodAdapter,它的结构器会找到指定办法的TransactionalEventListener信息;其onApplicationEvent办法在有事务的时候会创立并注册transactionSynchronization到以后事务,没有事务若容许fallbackExecution也会执行processEvent
TransactionSynchronizationEventAdapter只是笼罩了beforeCommit及afterCompletion两个办法,在afterCompletion办法中依据status的值与phase的值的匹配关系决定是否执行processEvent

TransactionalEventListenerFactory

org/springframework/transaction/event/TransactionalEventListenerFactory.java

/** * {@link EventListenerFactory} implementation that handles {@link TransactionalEventListener} * annotated methods. * * @author Stephane Nicoll * @since 4.2 */public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {    private int order = 50;    public void setOrder(int order) {        this.order = order;    }    @Override    public int getOrder() {        return this.order;    }    @Override    public boolean supportsMethod(Method method) {        return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);    }    @Override    public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {        return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);    }}
TransactionalEventListenerFactory用于创立ApplicationListenerMethodTransactionalAdapter

EventListenerMethodProcessor

org/springframework/context/event/EventListenerMethodProcessor.java

public class EventListenerMethodProcessor        implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {    //......    @Override    public void afterSingletonsInstantiated() {        ConfigurableListableBeanFactory beanFactory = this.beanFactory;        Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");        String[] beanNames = beanFactory.getBeanNamesForType(Object.class);        for (String beanName : beanNames) {            if (!ScopedProxyUtils.isScopedTarget(beanName)) {                Class<?> type = null;                try {                    type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);                }                catch (Throwable ex) {                    // An unresolvable bean type, probably from a lazy bean - let's ignore it.                    if (logger.isDebugEnabled()) {                        logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);                    }                }                if (type != null) {                    if (ScopedObject.class.isAssignableFrom(type)) {                        try {                            Class<?> targetClass = AutoProxyUtils.determineTargetClass(                                    beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));                            if (targetClass != null) {                                type = targetClass;                            }                        }                        catch (Throwable ex) {                            // An invalid scoped proxy arrangement - let's ignore it.                            if (logger.isDebugEnabled()) {                                logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", ex);                            }                        }                    }                    try {                        processBean(beanName, type);                    }                    catch (Throwable ex) {                        throw new BeanInitializationException("Failed to process @EventListener " +                                "annotation on bean with name '" + beanName + "'", ex);                    }                }            }        }    }    private void processBean(final String beanName, final Class<?> targetType) {        if (!this.nonAnnotatedClasses.contains(targetType) &&                AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&                !isSpringContainerClass(targetType)) {            Map<Method, EventListener> annotatedMethods = null;            try {                annotatedMethods = MethodIntrospector.selectMethods(targetType,                        (MethodIntrospector.MetadataLookup<EventListener>) method ->                                AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));            }            catch (Throwable ex) {                // An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.                if (logger.isDebugEnabled()) {                    logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);                }            }            if (CollectionUtils.isEmpty(annotatedMethods)) {                this.nonAnnotatedClasses.add(targetType);                if (logger.isTraceEnabled()) {                    logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());                }            }            else {                // Non-empty set of methods                ConfigurableApplicationContext context = this.applicationContext;                Assert.state(context != null, "No ApplicationContext set");                List<EventListenerFactory> factories = this.eventListenerFactories;                Assert.state(factories != null, "EventListenerFactory List not initialized");                for (Method method : annotatedMethods.keySet()) {                    for (EventListenerFactory factory : factories) {                        if (factory.supportsMethod(method)) {                            Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));                            ApplicationListener<?> applicationListener =                                    factory.createApplicationListener(beanName, targetType, methodToUse);                            if (applicationListener instanceof ApplicationListenerMethodAdapter) {                                ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);                            }                            context.addApplicationListener(applicationListener);                            break;                        }                    }                }                if (logger.isDebugEnabled()) {                    logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +                            beanName + "': " + annotatedMethods);                }            }        }    }    //......}        
EventListenerMethodProcessor实现了SmartInitializingSingleton接口,其afterSingletonsInstantiated办法先确定type,而后执行processBean,该办法会先收集annotatedMethods,而后遍历该办法,在遍历factories针对反对该办法的factory执行createApplicationListener,增加到context中

小结

TransactionalEventListener是EventListener的事务感知版本,默认的是TransactionPhase是AFTER_COMMIT,TransactionSynchronizationEventAdapter只是笼罩了beforeCommit及afterCompletion两个办法,在afterCompletion办法中依据status的值与phase的值的匹配关系决定是否执行processEvent,因此这里抛出的异样会被捕捉并log下来

doc

  • 聊聊TransactionSynchronization的invokeAfterCompletion
  • 聊聊spring的TransactionSynchronizationAdapter