共计 6023 个字符,预计需要花费 16 分钟才能阅读完成。
前言
之前使用 Spring 的事件机制来改造系统,完成了部分模块的解耦。但是实际使用时却发现存在以下问题:
当 ApplicationEventPublisher 批量推送 ApplicationEvent 时,如果 ApplicationListener 在处理的过程中抛出异常,则会导致后续的推送中断。
PS:Spring 版本为 5.1.5.RELEASE
下面将会展示一个复盘的示例
复盘示例
自定义事件
import org.springframework.context.ApplicationEvent;
/**
* 自定义事件
* @author RJH
* create at 2018/10/29
*/
public class SimpleEvent extends ApplicationEvent {
private int i;
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
*/
public SimpleEvent(Object source) {
super(source);
i=Integer.valueOf(source.toString());
}
public int getI() {
return i;
}
}
事件监听器
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* 自定义事件监听器
* @author RJH
* create at 2018/10/29
*/
@Component
public class SimpleEventListener implements ApplicationListener<SimpleEvent> {
@Override
public void onApplicationEvent(SimpleEvent event) {
if(event.getI()%10==0){
throw new RuntimeException();
}
System.out.println(“Time:”+event.getTimestamp()+” event:”+event.getSource());
}
}
事件推送
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
/**
* 事件推送
* @author RJH
* create at 2018/10/29
*/
public class EventApplication {
public static void main(String[] args) {
// 扫描特定 package
ApplicationContext context=new AnnotationConfigApplicationContext(“com.rjh.event”);
for(int i=1;i<=100;i++){// 批量推送事件
context.publishEvent(new SimpleEvent(i));
}
}
}
运行结果
Time:1553607971143 event:1
Time:1553607971145 event:2
Time:1553607971145 event:3
Time:1553607971145 event:4
Time:1553607971145 event:5
Time:1553607971145 event:6
Time:1553607971146 event:7
Time:1553607971146 event:8
Time:1553607971146 event:9
Exception in thread “main” java.lang.RuntimeException
at com.rjh.event.SimpleEventListener.onApplicationEvent(SimpleEventListener.java:17)
at com.rjh.event.SimpleEventListener.onApplicationEvent(SimpleEventListener.java:11)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:393)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:347)
at com.rjh.event.EventApplication.main(EventApplication.java:17)
分析
期待结果为 SimpleEventListener 抛出异常不影响 EventApplication 中后续事件的推送。但是实际上却是 SimpleEventListener 抛出异常会导致 EventApplication 后续事件的推送中断。从这里可以看出事件的推送和事件的监听是同步阻塞进行,而并非是异步。详细可以参考文档中的介绍:
Notice that ApplicationListener is generically parameterized with the type of your custom event (BlackListEvent in the preceding example). This means that the onApplicationEvent() method can remain type-safe, avoiding any need for downcasting. You can register as many event listeners as you wish, but note that, by default, event listeners receive events synchronously. This means that the publishEvent() method blocks until all listeners have finished processing the event. One advantage of this synchronous and single-threaded approach is that, when a listener receives an event, it operates inside the transaction context of the publisher if a transaction context is available. If another strategy for event publication becomes necessary, See the javadoc for Spring’s ApplicationEventMulticaster interface.
解决办法
将事件监听改造为异步处理,这里将会展示基于 JavaConfig 即注解的解决方案
开启异步
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* 开启异步服务配置类
* @author RJH
* create at 2019-03-26
*/
@EnableAsync
@Configuration
public class AsyncConfig {
}
异步事件监听
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 异步事件监听
* @author RJH
* create at 2019-03-26
*/
@Component
public class AsyncSimpleEventListener {
@EventListener
@Async
public void handleEvent(SimpleEvent event){
if(event.getI()%10==0){
throw new RuntimeException();
}
System.out.println(“Time:”+event.getTimestamp()+” event:”+event.getSource());
}
}
运行结果
Time:1553614469990 event:1
Time:1553614470007 event:72
Time:1553614470006 event:64
Time:1553614470006 event:67
Time:1553614470007 event:73
Time:1553614470007 event:71
Time:1553614470007 event:75
Time:1553614470006 event:68
Time:1553614470007 event:69
Time:1553614470006 event:62
Time:1553614470005 event:61
Time:1553614470006 event:63
Time:1553614470006 event:65
Time:1553614470007 event:74
Time:1553614470006 event:66
Time:1553614470005 event:59
Time:1553614470005 event:57
Time:1553614470005 event:55
Time:1553614470005 event:58
Time:1553614470004 event:51
Time:1553614470004 event:52
Time:1553614470002 event:43
Time:1553614470004 event:53
Time:1553614470002 event:38
Time:1553614470001 event:36
Time:1553614470004 event:54
Time:1553614470001 event:33
Time:1553614470000 event:29
Time:1553614470000 event:27
Time:1553614470005 event:56
Time:1553614469999 event:23
Time:1553614469999 event:22
Time:1553614469999 event:21
三月 26, 2019 11:34:30 下午 org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler handleUncaughtException
严重: Unexpected error occurred invoking async method: public void com.rjh.event.AsyncSimpleEventListener.handleEvent(com.rjh.event.SimpleEvent)
Time:1553614470000 event:24java.lang.RuntimeException
at com.rjh.event.AsyncSimpleEventListener.handleEvent(AsyncSimpleEventListener.java:19)
at com.rjh.event.AsyncSimpleEventListener$$FastClassBySpringCGLIB$$61742dbf.invoke(<generated>)
Time:1553614469998 event:15 at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:736)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
… 内容过长省略部分结果
分析
改造为异步执行后,事件监听就由线程池进行处理,此处还可以通过自定义线程池,并设置异常处理器来处理未捕获的异常。
参考资料
https://docs.spring.io/spring…
https://docs.spring.io/spring…