共计 3682 个字符,预计需要花费 10 分钟才能阅读完成。
介绍
EventBus 是一类主动事件处理器,也叫事件总线,基于观察者设计模式,使得某些工作可能失去主动解决,例如增加审批日志。
提供了公布 - 订阅模型,能够不便的在 EventBus 上注册订阅者,发布者能够简略的将事件传递给 EventBus,EventBus 会主动将事件传递给相关联的订阅者,只能用于线程间通信。
跟线程池的区别
- 事件总线能够有多个订阅者,线程池只会有一个执行逻辑
- 事件总线底层就是基于线程池
事件总线的长处
- 编程不便
- 反对同步 / 异步模式
- 与 aop 来比更加灵便
事件总线的毛病
- 基于内存,如果断电,就会失落事件
- 只能单过程应用
- 代码过于扩散,不不便调试
应用办法
// 事件总线的定义
public class EventBusCenter {private static EventBus eventBus = new EventBus();
private EventBusCenter() {}
public static EventBus getInstance() {return eventBus;}
public static void register(Object obj) {eventBus.register(obj);
}
public static void unregister(Object obj) {eventBus.unregister(obj);
}
public static void post(Object obj) {eventBus.post(obj);
}
}
// Consumer
public class DataObserver {
/**
* 只有通过 @Subscribe 注解的办法才会被注册进 EventBus
* 而且办法有且只能有 1 个参数
*/
@Subscribe
public void func(String msg) {System.out.println("String msg:" + msg);
}
}
// Provider
public class Test {public static void main(String[] args) throws InterruptedException {DataObserver observer = new DataObserver1();
EventBusCenter.register(observer);
// 只有注册的参数类型为 String 的办法会被调用
EventBusCenter.post("post string method");
EventBusCenter.post(123);
}
}
上述代码别离是事件总线的定义,消费者和提供者,代码逻辑都很简略,然而,理论开发中,会给代码解耦。
理论开发中,类图如下
/**
* 事件治理接口,提供消费者订阅、事件投递,作为顶层的接口
*/
public interface IEventBus {
/**
* 公布异步事件
*
* @param event 事件实体
*/
void asyncPost(Object event);
/**
* 公布同步事件
*
* @param event 事件实体
*/
void syncPost(Object event);
/**
* 增加消费者
*
* @param obj 消费者对象,默认以 class 为 key
*/
void addConsumer(Object obj);
/**
* 移除消费者
*
* @param obj 消费者对象,默认以 class 为 key
*/
void removeConsumer(Object obj);
/**
* 扫描消费者
*
* @param packageName 扫描包
*/
void scanConsumer(String packageName);
}
下面就是顶层的事件接口
/**
* Guava EventBus 和 Spring 的桥梁
*/
public abstract class AbstractSpringEventBus implements IEventBus, ApplicationContextAware {
private ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
this.scanConsumer(null);
}
@Override
public void scanConsumer(String packageName) {context.getBeansOfType(IEventConsumer.class).forEach((k, v) -> {this.addConsumer(v);
});
}
}
下面的代码很乏味,首先是抽象类实现接口,不须要实现所有的接口办法
- 实现
ApplicationContextAware
接口,这是 Bean 初始化第一步的过程,Bean 还没有齐全初始化,也就是在 Spring 启动的时候setApplicationContext
会被调用,在其中初始化了利用上下文,并且调用 scanConsumer 办法扫描消费者的 Bean,并注册到事件总线上。 - 实现
IEventBus
接口中的scanConsumer
,为第一步提供服务。
当然抽象类还是过于形象了,理论应用可能还须要分异步和同步事件,况且 AbstractSpringEventBus
还没有实现 addConsumer 办法,所以在不同的业务中须要定义不同的实现,如下
/**
* 业务事件总线
*/
@Component
@Slf4j
public class InvoiceEventBus extends AbstractSpringEventBus {
private final EventBus asyncEventBus;
private final EventBus syncEventBus;
public InvoiceEventBus() {
// 异步事件配置线程池
asyncEventBus = new AsyncEventBus(new ThreadPoolExecutor(4, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("invoiceAsyncEventBus-thread-%d").build()),
new EventBusSubscriberExceptionHandler());
// 同步事件配置
syncEventBus = new EventBus("SyncEventBus");
}
@Override
public void asyncPost(Object event) {asyncEventBus.post(event);
}
@Override
public void syncPost(Object event) {syncEventBus.post(event);
}
@Override
public void addConsumer(Object obj) {asyncEventBus.register(obj);
syncEventBus.register(obj);
}
@Override
public void removeConsumer(Object obj) {asyncEventBus.unregister(obj);
syncEventBus.unregister(obj);
}
}
至此,下面的代码做好了所有消费者注册的工作,所以前面扩大只须要实现IEventConsumer
,就会被注册,就会监听事件
public interface IEventConsumer<T> {
/**
* 消费者事件
* @param event 事件
*/
void consumer(T event);
}
接口如上所示,实现如下
/**
* 状态更改事件执行器
*/
@Slf4j
@Component
public class StatusExecutor implements IEventConsumer<StatusChangeEvent> {
@Subscribe
@Override
public void consumer(StatusChangeEvent statusChangeEvent) {log.info("状态变更事件");
// 业务代码
}
}
}
留神到泛型是StatusChangeEvent
,也就是说,只有 post 了这个类型的对象,外部业务代码就会执行.
post 逻辑就简略了
@Slf4j
@Service
public class InvoiceRequisitionService {
@Resource
private InvoiceEventBus invoiceEventBus;
// 更新 service
// 显然,这里更改了具体的 Object,然而能够看到代码的档次关系是很漂亮的。public Boolean update(Object object) {invoiceEventBus.syncPost(object);
return true;
}
}
正文完