背景
开发时遇到了一个场景,一个单机程序,在我的项目运行过程中,须要将一些操作解耦,异步顺次执行,并且不须要长久化操作,程序重启后,从新基于最新的数据操作。
开发时,思考过几个计划:
- MQ;
- Spring事件或者EventBus。
因为程序单机,并不简单,再接入MQ过于宏大,减少了零碎复杂度,并且也不须要音讯长久化,所以就被pass掉了;
Spring事件和EventBus都是基于观察者模式,开发难度小,不会减少零碎复杂度,但不能满足事件异步顺次执行执行的需要。看了下EventBus的代码后,决定基于此批改。
依赖
这里应用 guava 提供的 EventBus 性能,所以须要以来 guava 包
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency>
EventBus 执行思路
EventBus
的事件办法办法:EventBus.post
public void post(Object event) { // 获取所有监听器 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { // 执行所有监听器的办法 dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
其中 dispatcher
在申明类时必须指定,AsyncEventBus
中,默认应用 LegacyAsyncDispatcher
类,该类是 EventBus
提供的异步执行器,其中 dispatch
办法如下:
void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } }
其中,次要逻辑是获取所有的监听器,顺次调用 subscriber.dispathEvent
办法。
subscriber.dispathEvent
办法:
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
这里能够看到,最初都由 executor.execute
来执行。
这里的 executor
指线程池,即,异步 EventBus 最初都交由线程池来实现。
所以,想要设计成异步有序时,能够指定线程池 maxSize 为 1,从而保障顺次执行。
所以,在申明 AsyncEventBus
时,须要指定自定义的 Executor
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)public @interface EventConsume { /** * 事件标识 */ String identifier();}public interface EventHandler<T> { @Subscribe boolean process(T data);}@Slf4jpublic class BlockingPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.error("线程池[ {} ]期待队列已满,正在执行阻塞期待", executor.toString()); if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (Exception e) { log.error("阻塞策略异样", e); } } }}public class EventThreadPoolFactory { private static final int DEFAULT_CORE_SIZE = 0; private static final int DEFAULT_MAX_SIZE = 1; private static final int DEFAULT_TIMEOUT = 1; private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.HOURS; private static final int DEFAULT_QUEUE_SIZE = 5000; private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE); public static Executor buildDefaultExecutor(String identifier) { return new ThreadPoolExecutor(DEFAULT_CORE_SIZE, DEFAULT_MAX_SIZE, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, DEFAULT_WORK_QUEUE, ThreadFactoryBuilder.create().setNamePrefix(String.format("%s-", identifier)).build(), new BlockingPolicy()); }}@Component@Slf4jpublic class EventHub { private final Map<String, AsyncEventBus> eventBusMap = new ConcurrentHashMap<>(); @Autowired private ApplicationContext applicationContext; @PostConstruct public void onStart() { this.loadEventHandler(); } public void loadEventHandler() { Map<String, EventHandler> eventHandlerMap = applicationContext.getBeansOfType(EventHandler.class); for (EventHandler eventHandler : eventHandlerMap.values()) { this.register(eventHandler); } } private void register(EventHandler eventHandler) { EventConsume eventConsume = eventHandler.getClass().getAnnotation(EventConsume.class); if (eventConsume == null || StrUtil.isBlank(eventConsume.identifier())) { log.error("EventHandler[ {} ]没有配置 identifier ,注册失败", eventHandler.getClass().getSimpleName()); return; } String identifier = eventConsume.identifier(); AsyncEventBus eventBus = eventBusMap.get(identifier); if (eventBus == null) { AsyncEventBus asyncEventBus = new AsyncEventBus(identifier, EventThreadPoolFactory.buildDefaultExecutor(identifier)); eventBus = ObjectUtil.defaultIfNull(eventBusMap.putIfAbsent(identifier, asyncEventBus), asyncEventBus); } eventBus.register(eventHandler); } public EventBus getEventBus(String identifier) { return eventBusMap.get(identifier); }}@Component@Slf4jpublic class EventProducer { @Autowired private EventHub eventHub; public <T> void post(String identifier, T data) { EventBus eventBus = eventHub.getEventBus(identifier); if (eventBus == null) { log.error("identifier [ {} ] 没有事件监听者", identifier); return; } eventBus.post(data); }}