乐趣区

关于java:EventBus的初步实践

介绍

EventBus 是一类主动事件处理器,也叫事件总线,基于观察者设计模式,使得某些工作可能失去主动解决,例如增加审批日志。

提供了公布 - 订阅模型,能够不便的在 EventBus 上注册订阅者,发布者能够简略的将事件传递给 EventBus,EventBus 会主动将事件传递给相关联的订阅者,只能用于线程间通信。

跟线程池的区别

  • 事件总线能够有多个订阅者,线程池只会有一个执行逻辑
  • 事件总线底层就是基于线程池

事件总线的长处

  • 编程不便
  • 反对同步 / 异步模式
  • 与 aop 来比更加灵便

事件总线的毛病

  • 基于内存,如果断电,就会失落事件
  • 只能单过程应用
  • 代码过于扩散,不不便调试

应用办法

// 事件总线的定义
public class EventBusCenter {private static EventBus eventBus = new EventBus();
 
    private EventBusCenter() {}
 
    public static EventBus getInstance() {return eventBus;}
 
    public static void register(Object obj) {eventBus.register(obj);
    }
 
    public static void unregister(Object obj) {eventBus.unregister(obj);
    }
 
    public static void post(Object obj) {eventBus.post(obj);
    }
 
 
}
// Consumer
public class DataObserver {
 
    /**
     * 只有通过 @Subscribe 注解的办法才会被注册进 EventBus
     * 而且办法有且只能有 1 个参数
     */
    @Subscribe
    public void func(String msg) {System.out.println("String msg:" + msg);
    }
}
 
// Provider
public class Test {public static void main(String[] args) throws InterruptedException {DataObserver observer = new DataObserver1();
      
        EventBusCenter.register(observer);
 
        // 只有注册的参数类型为 String 的办法会被调用
        EventBusCenter.post("post string method");
        EventBusCenter.post(123);
    }
}

上述代码别离是事件总线的定义,消费者和提供者,代码逻辑都很简略,然而,理论开发中,会给代码解耦。

理论开发中,类图如下

/**
 * 事件治理接口,提供消费者订阅、事件投递,作为顶层的接口
 */
public interface IEventBus {
    /**
     * 公布异步事件
     *
     * @param event 事件实体
     */
    void asyncPost(Object event);

    /**
     * 公布同步事件
     *
     * @param event 事件实体
     */
    void syncPost(Object event);

    /**
     * 增加消费者
     *
     * @param obj 消费者对象,默认以 class 为 key
     */
    void addConsumer(Object obj);

    /**
     * 移除消费者
     *
     * @param obj 消费者对象,默认以 class 为 key
     */
    void removeConsumer(Object obj);

    /**
     * 扫描消费者
     *
     * @param packageName 扫描包
     */
    void scanConsumer(String packageName);
}

下面就是顶层的事件接口

/**
 * Guava EventBus 和 Spring 的桥梁
 */
public abstract class AbstractSpringEventBus implements IEventBus, ApplicationContextAware {
    private ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
        this.scanConsumer(null);
    }

    @Override
    public void scanConsumer(String packageName) {context.getBeansOfType(IEventConsumer.class).forEach((k, v) -> {this.addConsumer(v);
        });
    }
}

下面的代码很乏味,首先是抽象类实现接口,不须要实现所有的接口办法

  1. 实现 ApplicationContextAware 接口,这是 Bean 初始化第一步的过程,Bean 还没有齐全初始化,也就是在 Spring 启动的时候 setApplicationContext 会被调用,在其中初始化了利用上下文,并且调用 scanConsumer 办法扫描消费者的 Bean,并注册到事件总线上。
  2. 实现 IEventBus 接口中的scanConsumer,为第一步提供服务。

当然抽象类还是过于形象了,理论应用可能还须要分异步和同步事件,况且 AbstractSpringEventBus 还没有实现 addConsumer 办法,所以在不同的业务中须要定义不同的实现,如下

/**
 * 业务事件总线
 */
@Component
@Slf4j
public class InvoiceEventBus extends AbstractSpringEventBus {
    private final EventBus asyncEventBus;

    private final EventBus syncEventBus;

    public InvoiceEventBus() {
        // 异步事件配置线程池
        asyncEventBus = new AsyncEventBus(new ThreadPoolExecutor(4, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                new ThreadFactoryBuilder().setNameFormat("invoiceAsyncEventBus-thread-%d").build()),
                new EventBusSubscriberExceptionHandler());

        // 同步事件配置
        syncEventBus = new EventBus("SyncEventBus");
    }

    @Override
    public void asyncPost(Object event) {asyncEventBus.post(event);
    }

    @Override
    public void syncPost(Object event) {syncEventBus.post(event);
    }

    @Override
    public void addConsumer(Object obj) {asyncEventBus.register(obj);
        syncEventBus.register(obj);
    }

    @Override
    public void removeConsumer(Object obj) {asyncEventBus.unregister(obj);
        syncEventBus.unregister(obj);
    }

}

至此,下面的代码做好了所有消费者注册的工作,所以前面扩大只须要实现IEventConsumer,就会被注册,就会监听事件

public interface IEventConsumer<T> {
    /**
     * 消费者事件
     * @param event 事件
     */
    void consumer(T event);
}

接口如上所示,实现如下

/**
 * 状态更改事件执行器
 */
@Slf4j
@Component
public class StatusExecutor implements IEventConsumer<StatusChangeEvent> {

    @Subscribe
    @Override
    public void consumer(StatusChangeEvent statusChangeEvent) {log.info("状态变更事件");
            // 业务代码
       }
    }
}

留神到泛型是StatusChangeEvent,也就是说,只有 post 了这个类型的对象,外部业务代码就会执行.

post 逻辑就简略了

@Slf4j
@Service
public class InvoiceRequisitionService {
  @Resource
  private InvoiceEventBus invoiceEventBus;
  
  // 更新 service
  // 显然,这里更改了具体的 Object,然而能够看到代码的档次关系是很漂亮的。public Boolean update(Object object) {invoiceEventBus.syncPost(object);
    return true;
  }
}
退出移动版