介绍

EventBus是一类主动事件处理器,也叫事件总线,基于观察者设计模式,使得某些工作可能失去主动解决,例如增加审批日志。

提供了公布-订阅模型,能够不便的在EventBus上注册订阅者,发布者能够简略的将事件传递给EventBus,EventBus会主动将事件传递给相关联的订阅者,只能用于线程间通信。

跟线程池的区别

  • 事件总线能够有多个订阅者,线程池只会有一个执行逻辑
  • 事件总线底层就是基于线程池

事件总线的长处

  • 编程不便
  • 反对同步/异步模式
  • 与aop来比更加灵便

事件总线的毛病

  • 基于内存,如果断电,就会失落事件
  • 只能单过程应用
  • 代码过于扩散,不不便调试

应用办法

// 事件总线的定义public class EventBusCenter {     private static EventBus eventBus = new EventBus();     private EventBusCenter() {     }     public static EventBus getInstance() {        return eventBus;    }     public static void register(Object obj) {        eventBus.register(obj);    }     public static void unregister(Object obj) {        eventBus.unregister(obj);    }     public static void post(Object obj) {        eventBus.post(obj);    }  }
// Consumerpublic class DataObserver {     /**     * 只有通过@Subscribe注解的办法才会被注册进EventBus     * 而且办法有且只能有1个参数     */    @Subscribe    public void func(String msg) {        System.out.println("String msg: " + msg);    }}
 // Providerpublic class Test {     public static void main(String[] args) throws InterruptedException {         DataObserver observer = new DataObserver1();              EventBusCenter.register(observer);         // 只有注册的参数类型为String的办法会被调用        EventBusCenter.post("post string method");        EventBusCenter.post(123);    }}

上述代码别离是事件总线的定义,消费者和提供者,代码逻辑都很简略,然而,理论开发中,会给代码解耦。

理论开发中,类图如下

/** * 事件治理接口,提供消费者订阅、事件投递,作为顶层的接口 */public interface IEventBus {    /**     * 公布异步事件     *     * @param event 事件实体     */    void asyncPost(Object event);    /**     * 公布同步事件     *     * @param event 事件实体     */    void syncPost(Object event);    /**     * 增加消费者     *     * @param obj 消费者对象,默认以class为key     */    void addConsumer(Object obj);    /**     * 移除消费者     *     * @param obj 消费者对象,默认以class为key     */    void removeConsumer(Object obj);    /**     * 扫描消费者     *     * @param packageName 扫描包     */    void scanConsumer(String packageName);}

下面就是顶层的事件接口

/** * Guava EventBus和Spring的桥梁 */public abstract class AbstractSpringEventBus implements IEventBus, ApplicationContextAware {    private ApplicationContext context;    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.context = applicationContext;        this.scanConsumer(null);    }    @Override    public void scanConsumer(String packageName) {        context.getBeansOfType(IEventConsumer.class).forEach((k, v) -> {            this.addConsumer(v);        });    }}

下面的代码很乏味,首先是抽象类实现接口, 不须要实现所有的接口办法

  1. 实现ApplicationContextAware接口,这是Bean初始化第一步的过程,Bean还没有齐全初始化,也就是在Spring启动的时候setApplicationContext会被调用,在其中初始化了利用上下文,并且调用scanConsumer办法扫描消费者的Bean,并注册到事件总线上。
  2. 实现IEventBus接口中的scanConsumer,为第一步提供服务。

当然抽象类还是过于形象了,理论应用可能还须要分异步和同步事件,况且AbstractSpringEventBus还没有实现addConsumer办法,所以在不同的业务中须要定义不同的实现,如下

/** * 业务事件总线 */@Component@Slf4jpublic class InvoiceEventBus extends AbstractSpringEventBus {    private final EventBus asyncEventBus;    private final EventBus syncEventBus;    public InvoiceEventBus() {        //异步事件配置线程池        asyncEventBus = new AsyncEventBus(new ThreadPoolExecutor(4, Integer.MAX_VALUE,                60L, TimeUnit.SECONDS,                new SynchronousQueue<>(),                new ThreadFactoryBuilder().setNameFormat("invoiceAsyncEventBus-thread-%d").build()),                new EventBusSubscriberExceptionHandler());        // 同步事件配置        syncEventBus = new EventBus("SyncEventBus");    }    @Override    public void asyncPost(Object event) {        asyncEventBus.post(event);    }    @Override    public void syncPost(Object event) {        syncEventBus.post(event);    }    @Override    public void addConsumer(Object obj) {        asyncEventBus.register(obj);        syncEventBus.register(obj);    }    @Override    public void removeConsumer(Object obj) {        asyncEventBus.unregister(obj);        syncEventBus.unregister(obj);    }}

至此,下面的代码做好了所有消费者注册的工作,所以前面扩大只须要实现IEventConsumer,就会被注册,就会监听事件

public interface IEventConsumer<T> {    /**     * 消费者事件     * @param event 事件     */    void consumer(T event);}

接口如上所示,实现如下

/** * 状态更改事件执行器 */@Slf4j@Componentpublic class StatusExecutor implements IEventConsumer<StatusChangeEvent> {    @Subscribe    @Override    public void consumer(StatusChangeEvent statusChangeEvent) {      log.info("状态变更事件");            // 业务代码       }    }}

留神到泛型是StatusChangeEvent,也就是说,只有post了这个类型的对象,外部业务代码就会执行.

post逻辑就简略了

@Slf4j@Servicepublic class InvoiceRequisitionService {  @Resource  private InvoiceEventBus invoiceEventBus;    // 更新service  // 显然,这里更改了具体的Object,然而能够看到代码的档次关系是很漂亮的。  public Boolean update(Object object) {        invoiceEventBus.syncPost(object);    return true;  }}