作者:王子源
1 观察者模式简介
1.1 定义
指多个对象间存在一对多的依赖关系,当一个对象的状态产生扭转时,所有依赖于它的对象都失去告诉并被自动更新。这种模式有时又称作公布 - 订阅模式、模型 - 视图模式,它是对象行为型模式。
1.2 角色介绍
在观察者模式中,有以下几个角色。
主题也叫被观察者(Subject):
- 定义被观察者必须实现的职责,
- 它能动静的减少勾销观察者,它个别是抽象类或者是实现类,仅仅实现作为被观察者必须实现的职责:
- 治理观察者并告诉观察者。
观察者(Observer):
观察者承受到音讯后,即进行更新操作,对接管到的信息进行解决。
具体的被观察者(ConcreteSubject):
定义被观察者本人的业务逻辑,同时定义对哪些事件进行告诉。
具体的观察者(ConcreteObserver):
具体的观察者,每个观察者接管到音讯后的解决反馈是不同的,每个观察者都有本人的解决逻辑。
1.3 观察者模式的实用场景
- 对象间存在一对多关系,一个对象的状态产生扭转会影响其余对象。
- 当一个形象模型有两个方面,其中一个方面依赖于另一方面时,可将这二者封装在独立的对象中以使它们能够各自独立地扭转和复用。
- 实现相似播送机制的性能,不须要晓得具体收听者,只需散发播送,零碎中感兴趣的对象会主动接管该播送。
- 多层级嵌套应用,造成一种链式触发机制,使得事件具备跨域(逾越两种观察者类型)告诉。
2 观察者模式在 Spring 中的利用
2.1 spring 的事件监听机制
Spring 事件机制是观察者模式的实现。ApplicationContext 中事件处理是由 ApplicationEvent 类和 ApplicationListener 接口来提供的。如果一个 Bean 实现了 ApplicationListener 接口,并且曾经公布到容器中去,每次 ApplicationContext 公布一个 ApplicationEvent 事件,这个 Bean 就会接到告诉。ApplicationEvent 事件的公布须要显示触发,要么 Spring 触发,要么咱们编码触发。spring 内置事件由 spring 触发。咱们先来看一下,如何自定义 spring 事件,并使其被监听和公布。
2.1.1 事件
事件,ApplicationEvent,该抽象类继承了 EventObject,EventObject 是 JDK 中的类,并倡议所有的事件都应该继承自 EventObject。
public abstract class ApplicationEvent extends EventObject {
private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp = System.currentTimeMillis();
public ApplicationEvent(Object source) {super(source);
}
public final long getTimestamp() {return this.timestamp;}
}
2.1.2 监听器
ApplicationListener,是一个接口,该接口继承了 EventListener 接口。EventListener 接口是 JDK 中的,倡议所有的事件监听器都应该继承 EventListener。
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {void onApplicationEvent(E var1);
}
2.1.3 事件公布器
ApplicationEventPublisher,ApplicationContext 继承了该接口,在 ApplicationContext 的形象实现类 AbstractApplicationContext 中做了实现上面咱们来看一下
org.springframework.context.support.AbstractApplicationContext#publishEvent(java.lang.Object, org.springframework.core.ResolvableType)
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {Assert.notNull(event, "Event must not be null");
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {applicationEvent = (ApplicationEvent) event;
}
else {applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();}
}
if (this.earlyApplicationEvents != null) {this.earlyApplicationEvents.add(applicationEvent);
}
else {
// 获取以后注入的公布器, 执行公布办法
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
if (this.parent != null) {if (this.parent instanceof AbstractApplicationContext) {((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {this.parent.publishEvent(event);
}
}
}
咱们能够看到,AbstractApplicationContext 中 publishEvent 办法最终执行公布事件的是 ApplicationEventMulticaster#multicastEvent 办法,上面咱们再来一起看一下 multicastEvent 办法
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
// 拿到所有的监听器, 如果异步执行器不为空, 异步执行
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {if (executor != null) {executor.execute(() -> invokeListener(listener, event));
}
else {
// 执行监听办法
invokeListener(listener, event);
}
}
}
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {doInvokeListener(listener, event);
}
catch (Throwable err) {errorHandler.handleError(err);
}
}
else {doInvokeListener(listener, event);
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {logger.trace("Non-matching event type for listener:" + listener, ex);
}
}
else {throw ex;}
}
}
下面介绍了非 spring 内置的事件公布和监听执行流程。总结一下
- 定义一个事件, 该事件继承 ApplicationEvent
- 定义一个监听器, 实现 ApplicationListener 接口
- 定义一个事件公布器, 实现 ApplicationEventPublisherAware 接口
- 调用 ApplicationEventPublisher#publishEvent 办法.
2.2 spring 事件实现原理
下面咱们讲到了 spring 事件的公布,那么 spring 事件公布之后,spring 是如何依据事件找到事件对应的监听器呢?咱们一起来探索一下。
spring 的容器初始化过程想必大家都已非常理解,这里就不过多赘述,咱们间接看 refresh 办法在 refresh 办法中,有这样两个办法,initApplicationEventMulticaster()和 registerListeners()
咱们先来看 initApplicationEventMulticaster()办法
2.2.1 initApplicationEventMulticaster()
org.springframework.context.support.AbstractApplicationContext#initApplicationEventMulticaster
public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";
protected void initApplicationEventMulticaster() {
// 取得 beanFactory
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//BeanFactory 中是否有 ApplicationEventMulticaster
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
// 如果 BeanFactory 中不存在,就创立一个 SimpleApplicationEventMulticaster
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No'" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "'bean, using" +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
上述代码咱们能够看出,spring 先从 BeanFactory 中获取 applicationEventMulticaster 如果为空,则间接创立 SimpleApplicationEventMulticaster
2.2.2 registerListeners()
org.springframework.context.support.AbstractApplicationContext#registerListeners
registerListeners 是将各种实现了 ApplicationListener 的监听器注册到 ApplicationEventMulticaster 事件播送器中
protected void registerListeners() {for (ApplicationListener<?> listener : getApplicationListeners()) {getApplicationEventMulticaster().addApplicationListener(listener);
}
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
// 把监听器注册到事件公布器上
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
// 如果内置监听事件汇合不为空
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
// 执行 spring 内置的监听办法
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
这里解释一下 earlyApplicationListeners
earlyApplicationListeners 的实质还是 ApplicationListener。Spring 单例 Ban 的实例化是在 Refresh 阶段实例化的,那么用户自定义的一些 ApplicationListener 组件天然也是在这个阶段才初始化,然而 Spring 容器启动过程中,在 Refresh 实现之前还有很多事件:如 Spring 上下文环境筹备等事件,这些事件又是 Spring 容器启动必须要监听的。所以 Spring 定义了一个 earlyApplicationListeners 汇合,这个汇合中的 Listener 在 factories 文件中定义好,在容器 Refresh 之前事后实例化好,而后就能够监听 Spring 容器启动过程中的所有事件。
当 registerListeners 办法执行实现, 咱们的监听器曾经增加到多播器 SimpleApplicationEventMulticaster 中了,并且 earlyEvent 晚期事件也曾经执行结束。然而咱们发现, 如果自定义了一个监听器去监听 spring 内置的事件, 此时并没有被执行,那咱们注册的监听器是如何被执行的呢?答案在 finishRefresh 办法中。
2.2.3 finishRefresh
org.springframework.context.support.AbstractApplicationContext#finishRefresh
protected void finishRefresh() {clearResourceCaches();
initLifecycleProcessor();
getLifecycleProcessor().onRefresh();
// 容器中的类全副初始化结束后,触发刷新事件
publishEvent(new ContextRefreshedEvent(this));
LiveBeansView.registerApplicationContext(this);
}
如果咱们想要实现在 spring 容器中所有 bean 创立实现后做一些扩大性能,咱们就能够实现 ApplicationListener 这样咱们就能够实现其性能了。至此,Spring 中同步的事件监听公布模式咱们就解说完了,当然 Spring 还反对异步的音讯监听执行机制。
2.2.4 spring 中异步的监听执行机制
咱们回过头来看一下 ApplicationEventMulticaster#pushEvent 办法
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {Assert.notNull(event, "Event must not be null");
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {applicationEvent = (ApplicationEvent) event;
}
else {applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();}
}
if (this.earlyApplicationEvents != null) {this.earlyApplicationEvents.add(applicationEvent);
}
else {
// 获取以后注入的公布器, 执行公布办法
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
if (this.parent != null) {if (this.parent instanceof AbstractApplicationContext) {((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {this.parent.publishEvent(event);
}
}
最终执行公布事件的是 ApplicationEventMulticaster#multicastEvent 办法,上面咱们再来一起看一下 multicastEvent 办法
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
// 拿到所有的监听器, 如果异步执行器不为空, 异步执行
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {if (executor != null) {executor.execute(() -> invokeListener(listener, event));
}
else {
// 执行监听办法
invokeListener(listener, event);
}
}
}
能够看到,异步事件告诉次要依附 SimpleApplicationEventMulticaster 类中的 Executor 去实现的,如果这个变量不配置的话默认事件告诉是同步的,否则就是异步告诉了,要实现同时反对同步告诉和异步告诉就得从这里下手;咱们上文曾经剖析过了在 initApplicationEventMulticaster 办法中有这样一段代码
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
如果 BeanFactory 中曾经有了 SimpleApplicationEventMulticaster 则不会从新创立,那么咱们能够再 spring 中注册一个 SimpleApplicationEventMulticaster 并且向其中注入对应的 Executor 这样咱们就能够失去一个异步执行监听的 SimpleApplicationEventMulticaster 了,咱们的告诉就会通过 Executor 异步执行。这样能够大大提高事件公布的效率。
在 springboot 我的项目中咱们能够减少一个配置类来实现
@Configuration
@EnableAsync
public class Config {@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster(){SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
return simpleApplicationEventMulticaster;
}
@Bean
public TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(300);
executor.setThreadNamePrefix("thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
spring 我的项目中咱们也能够减少如下 xml 配置
<!-- 定义事件异步解决 -->
<bean id="commonTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维持处于 Keep-alive 状态的线程数量。如果设置了 allowCoreThreadTimeOut 为 true,该值可能为 0。并发线程数, 想达到真正的并发成果, 最好对应 CPU 的线程数及外围数 -->
<property name="corePoolSize" value="5" />
<!-- 最大线程池容量 -->
<property name="maxPoolSize" value="20" />
<!-- 超过最大线程池容量后, 容许的线程队列数 -->
<property name="queueCapacity" value="100" />
<!-- 线程池保护线程所容许的闲暇工夫 . 单位毫秒,默认为 60s,超过这个工夫后会将大于 corePoolSize 的线程敞开,放弃 corePoolSize 的个数 -->
<property name="keepAliveSeconds" value="300" />
<!-- 容许外围线程超时: false(默认值)不容许超时,哪怕闲暇;true 则应用 keepAliveSeconds 来管制期待超时工夫,最终 corePoolSize 的个数可能为 0 -->
<property name="allowCoreThreadTimeOut" value="true" />
<!-- 线程池对回绝工作 (无线程可用) 的解决策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
<!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy: 主线程间接执行该工作,执行完之后尝试增加下一个工作到线程池中 -->
<!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy: 间接抛出 java.util.concurrent.RejectedExecutionException 异样 -->
</property>
</bean>
<!-- 名字必须是 applicationEventMulticaster,因为 AbstractApplicationContext 默认找个 -->
<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
<!-- 注入工作执行器 这样就实现了异步调用 -->
<property name="taskExecutor" ref="commonTaskExecutor"></property>
</bean>
3 小结
本文次要解说了观察者模式在 spring 中的利用及事件监听机制,JDK 也有实现提供事件监听机制 Spring 的事件机制也是基于 JDK 来扩大的。Spring 的事件机制默认是同步阻塞的,想要晋升对应的效率要思考异步事件。