共计 4129 个字符,预计需要花费 11 分钟才能阅读完成。
背景
开发时遇到了一个场景,一个单机程序,在我的项目运行过程中,须要将一些操作解耦,异步顺次执行,并且不须要长久化操作,程序重启后,从新基于最新的数据操作。
开发时,思考过几个计划:
- MQ;
- Spring 事件或者 EventBus。
因为程序单机,并不简单,再接入 MQ 过于宏大,减少了零碎复杂度,并且也不须要音讯长久化,所以就被 pass 掉了;
Spring 事件和 EventBus 都是基于观察者模式,开发难度小,不会减少零碎复杂度,但不能满足事件异步顺次执行执行的需要。看了下 EventBus 的代码后,决定基于此批改。
依赖
这里应用 guava 提供的 EventBus 性能,所以须要以来 guava 包
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
EventBus 执行思路
EventBus
的事件办法办法:EventBus.post
public void post(Object event) {
// 获取所有监听器
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
// 执行所有监听器的办法
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
其中 dispatcher
在申明类时必须指定,AsyncEventBus
中,默认应用 LegacyAsyncDispatcher
类,该类是 EventBus
提供的异步执行器,其中 dispatch
办法如下:
void dispatch(Object event, Iterator<Subscriber> subscribers) {checkNotNull(event);
while (subscribers.hasNext()) {queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {e.subscriber.dispatchEvent(e.event);
}
}
其中,次要逻辑是获取所有的监听器,顺次调用 subscriber.dispathEvent
办法。
subscriber.dispathEvent
办法:
final void dispatchEvent(final Object event) {
executor.execute(new Runnable() {
@Override
public void run() {
try {invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
这里能够看到,最初都由 executor.execute
来执行。
这里的 executor
指线程池,即,异步 EventBus 最初都交由线程池来实现。
所以,想要设计成异步有序时,能够指定线程池 maxSize 为 1,从而保障顺次执行。
所以,在申明 AsyncEventBus
时,须要指定自定义的 Executor
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EventConsume {
/**
* 事件标识
*/
String identifier();}
public interface EventHandler<T> {
@Subscribe
boolean process(T data);
}
@Slf4j
public class BlockingPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {log.error("线程池 [ {} ] 期待队列已满,正在执行阻塞期待", executor.toString());
if (!executor.isShutdown()) {
try {executor.getQueue().put(r);
} catch (Exception e) {log.error("阻塞策略异样", e);
}
}
}
}
public class EventThreadPoolFactory {
private static final int DEFAULT_CORE_SIZE = 0;
private static final int DEFAULT_MAX_SIZE = 1;
private static final int DEFAULT_TIMEOUT = 1;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.HOURS;
private static final int DEFAULT_QUEUE_SIZE = 5000;
private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);
public static Executor buildDefaultExecutor(String identifier) {
return new ThreadPoolExecutor(DEFAULT_CORE_SIZE,
DEFAULT_MAX_SIZE,
DEFAULT_TIMEOUT,
DEFAULT_TIME_UNIT,
DEFAULT_WORK_QUEUE,
ThreadFactoryBuilder.create().setNamePrefix(String.format("%s-", identifier)).build(),
new BlockingPolicy());
}
}
@Component
@Slf4j
public class EventHub {private final Map<String, AsyncEventBus> eventBusMap = new ConcurrentHashMap<>();
@Autowired
private ApplicationContext applicationContext;
@PostConstruct
public void onStart() {this.loadEventHandler();
}
public void loadEventHandler() {Map<String, EventHandler> eventHandlerMap = applicationContext.getBeansOfType(EventHandler.class);
for (EventHandler eventHandler : eventHandlerMap.values()) {this.register(eventHandler);
}
}
private void register(EventHandler eventHandler) {EventConsume eventConsume = eventHandler.getClass().getAnnotation(EventConsume.class);
if (eventConsume == null || StrUtil.isBlank(eventConsume.identifier())) {log.error("EventHandler[ {} ]没有配置 identifier,注册失败", eventHandler.getClass().getSimpleName());
return;
}
String identifier = eventConsume.identifier();
AsyncEventBus eventBus = eventBusMap.get(identifier);
if (eventBus == null) {AsyncEventBus asyncEventBus = new AsyncEventBus(identifier, EventThreadPoolFactory.buildDefaultExecutor(identifier));
eventBus = ObjectUtil.defaultIfNull(eventBusMap.putIfAbsent(identifier, asyncEventBus), asyncEventBus);
}
eventBus.register(eventHandler);
}
public EventBus getEventBus(String identifier) {return eventBusMap.get(identifier);
}
}
@Component
@Slf4j
public class EventProducer {
@Autowired
private EventHub eventHub;
public <T> void post(String identifier, T data) {EventBus eventBus = eventHub.getEventBus(identifier);
if (eventBus == null) {log.error("identifier [ {} ] 没有事件监听者", identifier);
return;
}
eventBus.post(data);
}
}
正文完