背景

开发时遇到了一个场景,一个单机程序,在我的项目运行过程中,须要将一些操作解耦,异步顺次执行,并且不须要长久化操作,程序重启后,从新基于最新的数据操作。
开发时,思考过几个计划:

  1. MQ;
  2. Spring事件或者EventBus。

因为程序单机,并不简单,再接入MQ过于宏大,减少了零碎复杂度,并且也不须要音讯长久化,所以就被pass掉了;
Spring事件和EventBus都是基于观察者模式,开发难度小,不会减少零碎复杂度,但不能满足事件异步顺次执行执行的需要。看了下EventBus的代码后,决定基于此批改。

依赖

这里应用 guava 提供的 EventBus 性能,所以须要以来 guava 包

            <dependency>                <groupId>com.google.guava</groupId>                <artifactId>guava</artifactId>                <version>${guava.version}</version>            </dependency>

EventBus 执行思路

EventBus 的事件办法办法:EventBus.post

  public void post(Object event) {    // 获取所有监听器    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);    if (eventSubscribers.hasNext()) {    // 执行所有监听器的办法      dispatcher.dispatch(event, eventSubscribers);    } else if (!(event instanceof DeadEvent)) {      // the event had no subscribers and was not itself a DeadEvent      post(new DeadEvent(this, event));    }  }

其中 dispatcher 在申明类时必须指定,AsyncEventBus 中,默认应用 LegacyAsyncDispatcher 类,该类是 EventBus 提供的异步执行器,其中 dispatch 办法如下:

    void dispatch(Object event, Iterator<Subscriber> subscribers) {      checkNotNull(event);      while (subscribers.hasNext()) {        queue.add(new EventWithSubscriber(event, subscribers.next()));      }      EventWithSubscriber e;      while ((e = queue.poll()) != null) {        e.subscriber.dispatchEvent(e.event);      }    }

其中,次要逻辑是获取所有的监听器,顺次调用 subscriber.dispathEvent 办法。

subscriber.dispathEvent 办法:

  final void dispatchEvent(final Object event) {    executor.execute(        new Runnable() {          @Override          public void run() {            try {              invokeSubscriberMethod(event);            } catch (InvocationTargetException e) {              bus.handleSubscriberException(e.getCause(), context(event));            }          }        });  }

这里能够看到,最初都由 executor.execute 来执行。

这里的 executor 指线程池,即,异步 EventBus 最初都交由线程池来实现。

所以,想要设计成异步有序时,能够指定线程池 maxSize 为 1,从而保障顺次执行。

所以,在申明 AsyncEventBus 时,须要指定自定义的 Executor

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)public @interface EventConsume {    /**     * 事件标识     */    String identifier();}public interface EventHandler<T> {    @Subscribe    boolean process(T data);}@Slf4jpublic class BlockingPolicy implements RejectedExecutionHandler {    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        log.error("线程池[ {} ]期待队列已满,正在执行阻塞期待", executor.toString());        if (!executor.isShutdown()) {            try {                executor.getQueue().put(r);            } catch (Exception e) {                log.error("阻塞策略异样", e);            }        }    }}public class EventThreadPoolFactory {    private static final int DEFAULT_CORE_SIZE = 0;    private static final int DEFAULT_MAX_SIZE = 1;    private static final int DEFAULT_TIMEOUT = 1;    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.HOURS;    private static final int DEFAULT_QUEUE_SIZE = 5000;    private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);    public static Executor buildDefaultExecutor(String identifier) {        return new ThreadPoolExecutor(DEFAULT_CORE_SIZE,                DEFAULT_MAX_SIZE,                DEFAULT_TIMEOUT,                DEFAULT_TIME_UNIT,                DEFAULT_WORK_QUEUE,                ThreadFactoryBuilder.create().setNamePrefix(String.format("%s-", identifier)).build(),                new BlockingPolicy());    }}@Component@Slf4jpublic class EventHub {    private final Map<String, AsyncEventBus> eventBusMap = new ConcurrentHashMap<>();    @Autowired    private ApplicationContext applicationContext;    @PostConstruct    public void onStart() {        this.loadEventHandler();    }    public void loadEventHandler() {        Map<String, EventHandler> eventHandlerMap = applicationContext.getBeansOfType(EventHandler.class);        for (EventHandler eventHandler : eventHandlerMap.values()) {            this.register(eventHandler);        }    }    private void register(EventHandler eventHandler) {        EventConsume eventConsume = eventHandler.getClass().getAnnotation(EventConsume.class);        if (eventConsume == null || StrUtil.isBlank(eventConsume.identifier())) {            log.error("EventHandler[ {} ]没有配置 identifier ,注册失败", eventHandler.getClass().getSimpleName());            return;        }        String identifier = eventConsume.identifier();        AsyncEventBus eventBus = eventBusMap.get(identifier);        if (eventBus == null) {            AsyncEventBus asyncEventBus = new AsyncEventBus(identifier, EventThreadPoolFactory.buildDefaultExecutor(identifier));            eventBus = ObjectUtil.defaultIfNull(eventBusMap.putIfAbsent(identifier, asyncEventBus), asyncEventBus);        }        eventBus.register(eventHandler);    }    public EventBus getEventBus(String identifier) {        return eventBusMap.get(identifier);    }}@Component@Slf4jpublic class EventProducer {    @Autowired    private EventHub eventHub;    public <T> void post(String identifier, T data) {        EventBus eventBus = eventHub.getEventBus(identifier);        if (eventBus == null) {            log.error("identifier [ {} ] 没有事件监听者", identifier);            return;        }        eventBus.post(data);    }}