关于java:结合Springboot基于EventBus实现异步有序事件执行

8次阅读

共计 4129 个字符,预计需要花费 11 分钟才能阅读完成。

背景

开发时遇到了一个场景,一个单机程序,在我的项目运行过程中,须要将一些操作解耦,异步顺次执行,并且不须要长久化操作,程序重启后,从新基于最新的数据操作。
开发时,思考过几个计划:

  1. MQ;
  2. Spring 事件或者 EventBus。

因为程序单机,并不简单,再接入 MQ 过于宏大,减少了零碎复杂度,并且也不须要音讯长久化,所以就被 pass 掉了;
Spring 事件和 EventBus 都是基于观察者模式,开发难度小,不会减少零碎复杂度,但不能满足事件异步顺次执行执行的需要。看了下 EventBus 的代码后,决定基于此批改。

依赖

这里应用 guava 提供的 EventBus 性能,所以须要以来 guava 包

            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>${guava.version}</version>
            </dependency>

EventBus 执行思路

EventBus 的事件办法办法:EventBus.post

  public void post(Object event) {
    // 获取所有监听器
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
    // 执行所有监听器的办法
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

其中 dispatcher 在申明类时必须指定,AsyncEventBus 中,默认应用 LegacyAsyncDispatcher 类,该类是 EventBus 提供的异步执行器,其中 dispatch 办法如下:

    void dispatch(Object event, Iterator<Subscriber> subscribers) {checkNotNull(event);
      while (subscribers.hasNext()) {queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {e.subscriber.dispatchEvent(e.event);
      }
    }

其中,次要逻辑是获取所有的监听器,顺次调用 subscriber.dispathEvent 办法。

subscriber.dispathEvent 办法:

  final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
          @Override
          public void run() {
            try {invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }

这里能够看到,最初都由 executor.execute 来执行。

这里的 executor 指线程池,即,异步 EventBus 最初都交由线程池来实现。

所以,想要设计成异步有序时,能够指定线程池 maxSize 为 1,从而保障顺次执行。

所以,在申明 AsyncEventBus 时,须要指定自定义的 Executor

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EventConsume {

    /**
     * 事件标识
     */
    String identifier();}

public interface EventHandler<T> {

    @Subscribe
    boolean process(T data);

}

@Slf4j
public class BlockingPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {log.error("线程池 [ {} ] 期待队列已满,正在执行阻塞期待", executor.toString());
        if (!executor.isShutdown()) {
            try {executor.getQueue().put(r);
            } catch (Exception e) {log.error("阻塞策略异样", e);
            }
        }
    }
}

public class EventThreadPoolFactory {

    private static final int DEFAULT_CORE_SIZE = 0;
    private static final int DEFAULT_MAX_SIZE = 1;
    private static final int DEFAULT_TIMEOUT = 1;
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.HOURS;
    private static final int DEFAULT_QUEUE_SIZE = 5000;
    private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);

    public static Executor buildDefaultExecutor(String identifier) {
        return new ThreadPoolExecutor(DEFAULT_CORE_SIZE,
                DEFAULT_MAX_SIZE,
                DEFAULT_TIMEOUT,
                DEFAULT_TIME_UNIT,
                DEFAULT_WORK_QUEUE,
                ThreadFactoryBuilder.create().setNamePrefix(String.format("%s-", identifier)).build(),
                new BlockingPolicy());
    }

}

@Component
@Slf4j
public class EventHub {private final Map<String, AsyncEventBus> eventBusMap = new ConcurrentHashMap<>();

    @Autowired
    private ApplicationContext applicationContext;

    @PostConstruct
    public void onStart() {this.loadEventHandler();
    }

    public void loadEventHandler() {Map<String, EventHandler> eventHandlerMap = applicationContext.getBeansOfType(EventHandler.class);
        for (EventHandler eventHandler : eventHandlerMap.values()) {this.register(eventHandler);
        }
    }

    private void register(EventHandler eventHandler) {EventConsume eventConsume = eventHandler.getClass().getAnnotation(EventConsume.class);
        if (eventConsume == null || StrUtil.isBlank(eventConsume.identifier())) {log.error("EventHandler[ {} ]没有配置 identifier,注册失败", eventHandler.getClass().getSimpleName());
            return;
        }
        String identifier = eventConsume.identifier();
        AsyncEventBus eventBus = eventBusMap.get(identifier);
        if (eventBus == null) {AsyncEventBus asyncEventBus = new AsyncEventBus(identifier, EventThreadPoolFactory.buildDefaultExecutor(identifier));
            eventBus = ObjectUtil.defaultIfNull(eventBusMap.putIfAbsent(identifier, asyncEventBus), asyncEventBus);
        }
        eventBus.register(eventHandler);
    }

    public EventBus getEventBus(String identifier) {return eventBusMap.get(identifier);
    }
}

@Component
@Slf4j
public class EventProducer {

    @Autowired
    private EventHub eventHub;

    public <T> void post(String identifier, T data) {EventBus eventBus = eventHub.getEventBus(identifier);
        if (eventBus == null) {log.error("identifier [ {} ] 没有事件监听者", identifier);
            return;
        }
        eventBus.post(data);
    }

}
正文完
 0