介绍
EventBus是一类主动事件处理器,也叫事件总线,基于观察者设计模式,使得某些工作可能失去主动解决,例如增加审批日志。
提供了公布-订阅模型,能够不便的在EventBus上注册订阅者,发布者能够简略的将事件传递给EventBus,EventBus会主动将事件传递给相关联的订阅者,只能用于线程间通信。
跟线程池的区别
- 事件总线能够有多个订阅者,线程池只会有一个执行逻辑
- 事件总线底层就是基于线程池
事件总线的长处
- 编程不便
- 反对同步/异步模式
- 与aop来比更加灵便
事件总线的毛病
- 基于内存,如果断电,就会失落事件
- 只能单过程应用
- 代码过于扩散,不不便调试
应用办法
// 事件总线的定义public class EventBusCenter { private static EventBus eventBus = new EventBus(); private EventBusCenter() { } public static EventBus getInstance() { return eventBus; } public static void register(Object obj) { eventBus.register(obj); } public static void unregister(Object obj) { eventBus.unregister(obj); } public static void post(Object obj) { eventBus.post(obj); } }
// Consumerpublic class DataObserver { /** * 只有通过@Subscribe注解的办法才会被注册进EventBus * 而且办法有且只能有1个参数 */ @Subscribe public void func(String msg) { System.out.println("String msg: " + msg); }}
// Providerpublic class Test { public static void main(String[] args) throws InterruptedException { DataObserver observer = new DataObserver1(); EventBusCenter.register(observer); // 只有注册的参数类型为String的办法会被调用 EventBusCenter.post("post string method"); EventBusCenter.post(123); }}
上述代码别离是事件总线的定义,消费者和提供者,代码逻辑都很简略,然而,理论开发中,会给代码解耦。
理论开发中,类图如下
/** * 事件治理接口,提供消费者订阅、事件投递,作为顶层的接口 */public interface IEventBus { /** * 公布异步事件 * * @param event 事件实体 */ void asyncPost(Object event); /** * 公布同步事件 * * @param event 事件实体 */ void syncPost(Object event); /** * 增加消费者 * * @param obj 消费者对象,默认以class为key */ void addConsumer(Object obj); /** * 移除消费者 * * @param obj 消费者对象,默认以class为key */ void removeConsumer(Object obj); /** * 扫描消费者 * * @param packageName 扫描包 */ void scanConsumer(String packageName);}
下面就是顶层的事件接口
/** * Guava EventBus和Spring的桥梁 */public abstract class AbstractSpringEventBus implements IEventBus, ApplicationContextAware { private ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context = applicationContext; this.scanConsumer(null); } @Override public void scanConsumer(String packageName) { context.getBeansOfType(IEventConsumer.class).forEach((k, v) -> { this.addConsumer(v); }); }}
下面的代码很乏味,首先是抽象类实现接口, 不须要实现所有的接口办法
- 实现
ApplicationContextAware
接口,这是Bean初始化第一步的过程,Bean还没有齐全初始化,也就是在Spring启动的时候setApplicationContext
会被调用,在其中初始化了利用上下文,并且调用scanConsumer办法扫描消费者的Bean,并注册到事件总线上。 - 实现
IEventBus
接口中的scanConsumer
,为第一步提供服务。
当然抽象类还是过于形象了,理论应用可能还须要分异步和同步事件,况且AbstractSpringEventBus
还没有实现addConsumer办法,所以在不同的业务中须要定义不同的实现,如下
/** * 业务事件总线 */@Component@Slf4jpublic class InvoiceEventBus extends AbstractSpringEventBus { private final EventBus asyncEventBus; private final EventBus syncEventBus; public InvoiceEventBus() { //异步事件配置线程池 asyncEventBus = new AsyncEventBus(new ThreadPoolExecutor(4, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("invoiceAsyncEventBus-thread-%d").build()), new EventBusSubscriberExceptionHandler()); // 同步事件配置 syncEventBus = new EventBus("SyncEventBus"); } @Override public void asyncPost(Object event) { asyncEventBus.post(event); } @Override public void syncPost(Object event) { syncEventBus.post(event); } @Override public void addConsumer(Object obj) { asyncEventBus.register(obj); syncEventBus.register(obj); } @Override public void removeConsumer(Object obj) { asyncEventBus.unregister(obj); syncEventBus.unregister(obj); }}
至此,下面的代码做好了所有消费者注册的工作,所以前面扩大只须要实现IEventConsumer
,就会被注册,就会监听事件
public interface IEventConsumer<T> { /** * 消费者事件 * @param event 事件 */ void consumer(T event);}
接口如上所示,实现如下
/** * 状态更改事件执行器 */@Slf4j@Componentpublic class StatusExecutor implements IEventConsumer<StatusChangeEvent> { @Subscribe @Override public void consumer(StatusChangeEvent statusChangeEvent) { log.info("状态变更事件"); // 业务代码 } }}
留神到泛型是StatusChangeEvent
,也就是说,只有post了这个类型的对象,外部业务代码就会执行.
post逻辑就简略了
@Slf4j@Servicepublic class InvoiceRequisitionService { @Resource private InvoiceEventBus invoiceEventBus; // 更新service // 显然,这里更改了具体的Object,然而能够看到代码的档次关系是很漂亮的。 public Boolean update(Object object) { invoiceEventBus.syncPost(object); return true; }}