关于后端:说下你可能没用过的EventBus

59次阅读

共计 5835 个字符,预计需要花费 15 分钟才能阅读完成。

最近在 Code Review 的时候发现了这样一个业务场景,某个业务解决实现之后须要告诉审核人员,告诉的形式蕴含短信和邮件,所以代码大抵是这样:

// 业务校验
validate();
// 解决业务逻辑
doBusiness();
// 发送邮件或者发送其余类型音讯
sendMsg(); 

这个对不对呢?

基于这种广泛的业务场景来说,个别首先咱们会思考 同步 或者 异步 发送的问题。

同步的话对接口 RT 有影响,而且和业务逻辑耦合在一起,这样的做法必定不太好。

个别状况下,咱们会做成异步的形式,应用 MQ 本人发送本人生产,或者说一个线程池搞定,这样的话不影响主业务逻辑,能够进步性能,并且代码做到理解耦。

而后还有就是数据一致性的问题,邮件肯定要发送胜利吗?

大多数时候其实咱们并不要求邮件肯定 100% 发送胜利,失败了就失败好了,监控告警打点做好失败率不要超过阈值就好,还有就是音讯服务一旦收到申请应该本人保障音讯可能投递。

所以总的来说,应用 MQ 发送音讯本人生产解决,或者线程池异步解决,最初本人搞个弥补的逻辑就能解决好这类问题。

那么,明天要说的是这两个解决方案之外的解决形式,对于这种场景其实咱们能够用 EventBus 来解决。

EventBus 应用

看名字就晓得,EventBus 是事件总线的意思,它是 Google Guava 库的一个工具,基于观察者模式能够做到 过程内 的代码解耦作用。

就拿下面的例子来说,引入一个 MQ 太重了,其实不太须要这样做,EventBus 也能达到这个成果,和 MQ 相比他只能提供过程内的音讯事件传递,这对于咱们这种业务场景来说足够了不是吗?

咱们先看 EventBus 怎么来应用,个别先创立一个 EventBus 实例。

//1. 创立 EventBus
private static EventBus eventBus = new EventBus();

第二步,创立一个事件音讯订阅者,解决形式非常简单,只有在咱们心愿去处理事件的办法上加上 @Subscribe 注解即可。

形参只能有一个,如果定义 0 个或者多个的话运行就会报错。

public class EmailMsgHandler {

    @Subscribe
    public void handle(Long businessId) {System.out.println("send email msg" + businessId);
    }
}

第三步,注册事件。

eventBus.register(new EmailMsgHandler());

第四步,发送事件。

eventBus.post(1L);

这就是一个 EventBus 应用的最简略例子,上面咱们看看联合结尾说的例子怎么解决。

结合实际

比方下面说的案例,举例来说比方注册和用户下单的场景,都须要发送音讯和邮件给用户。

EventBus 并不强制说咱们肯定要用单例模式,因为他的创立销毁老本比拟低,所以更多是依据咱们的业务场景和上下文本人来抉择。

public class UserService {private static EventBus eventBus = new EventBus();

    public void regist(){
        Long userId = 1L;
        eventBus.register(new EmailMsgHandler());
        eventBus.register(new SmsMsgHandler());
        eventBus.post(userId);
    }
}

public class BookingService {private static EventBus eventBus = new EventBus();

    public void booking(){
        // 业务逻辑
        Long bookingId = 2L;
        eventBus.register(new EmailMsgHandler());
        eventBus.register(new SmsMsgHandler());
        eventBus.post(bookingId);
    }
}

而后在业务逻辑解决实现之后,别离去注册了邮件和短信两个事件订阅者。

public class EmailMsgHandler {

    @Subscribe
    public void handle(Long businessId) {System.out.println("send email msg" + businessId);
    }
}

public class SmsMsgHandler {

    @Subscribe
    public void handle(Long businessId) {System.out.println("send sms msg" + businessId);
    }
}

最初咱们发送事件,用户注册咱们发送了一个用户 ID,下单胜利咱们发送了一个订单 ID。

再写一个测试类去测试一下,别离创立两个service,而后别离调用办法。

public class EventBusTest {public static void main(String[] args) {UserService userService = new UserService();
        userService.regist();

        BookingService bookingService = new BookingService();
        bookingService.booking();}
}

执行测试类,咱们能够看到输入,别离去执行了咱们的事件订阅的办法。

send email msg1send sms msg1send email msg2send sms msg2

应用起来你会发现非常简单,对于心愿轻量级简略地做到解耦应用 EventBus 十分适合。

留神别踩坑

首先,留神一下例子中的参数都是 Long 类型,如果事件的参数是其余类型的话,那么音讯是无奈承受到的,比方咱们把下单中发送的订单 ID 改成 String 类型而后会发现没有生产了,因为咱们没有定义一个参数类型是 String 的办法。

public class BookingService {private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3));    public void booking(){        // 业务逻辑        String bookingId = "2";        eventBus.register(new EmailMsgHandler());        eventBus.register(new SmsMsgHandler());        eventBus.post(bookingId);    }}// 输入 send email msg1send sms msg1

EmailMsgHandlerSmsMsgHandler都新增一个接管 String 类型的订阅办法,这样就能够接管到了。

@Subscribepublic void handle(String businessId) {System.out.println("send email msg for string" + businessId);}@Subscribepublic void handle(String businessId) {System.out.println("send sms msg for string" + businessId);}// 输入 send sms msg1send email msg1send email msg for string2send sms msg for string2

除此之外,其实咱们能够定义一个 DeadEvent 来解决这种状况,它相当于是一个默认的解决形式,当没有匹配的事件类型参数的话就会默认发送一个 DeadEvent 事件。

定义一个默认处理器。

public class DefaultEventHandler {@Subscribe    public void handle(DeadEvent event) {System.out.println("no subscriber," + event);    }}

BookingService 新增一个 pay() 领取办法,下单完了去领取,注册咱们的默认事件。

public void pay(){  // 业务逻辑  eventBus.register(new DefaultEventHandler());  eventBus.post(new Payment(UUID.randomUUID().toString()));}@ToString@Data@NoArgsConstructor@AllArgsConstructorpublic class Payment {private String paymentId;}

执行测试 bookingService.pay() 看到输入后果:

no subscriber,DeadEvent{source=AsyncEventBus{default}, event=Payment(paymentId=255da942-7128-4bd1-baca-f0a8e569ed88)}

源码剖析

OK,简略的介绍就到这里,那其实到目前为止咱们说的这个都是 同步调用 的,这不太合乎咱们的要求,咱们当然应用异步解决更好。

那就看看源码它是怎么实现的。

@Betapublic class EventBus {private static final Logger logger = Logger.getLogger(EventBus.class.getName());  private final String identifier;  private final Executor executor;  private final SubscriberExceptionHandler exceptionHandler;  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);  private final Dispatcher dispatcher;    public EventBus() {    this("default");  }  public EventBus(String identifier) {this(        identifier,        MoreExecutors.directExecutor(),        Dispatcher.perThreadDispatchQueue(),        LoggingHandler.INSTANCE);  }}

identifier就是个名字,标记,默认就是default

executor执行器,默认创立一个 MoreExecutors.directExecutor(),事件订阅者依据你本人提供的executor 来决定如何执行事件订阅的解决形式。

exceptionHandler是异样处理器,默认创立的就是打点日志。

subscribers就是咱们的消费者,订阅者。

dispatcher用来做事件散发。

默认创立的 executor 是一个 MoreExecutors.directExecutor(),看到command.run() 你就会发现他这不就是同步执行嘛。

public static Executor directExecutor() {    return DirectExecutor.INSTANCE;}private enum DirectExecutor implements Executor {INSTANCE;@Overridepublic void execute(Runnable command) {command.run();}@Overridepublic String toString() {    return "MoreExecutors.directExecutor()";}

同步执行还是不太好,咱们心愿不光给咱们解耦,还要异步执行,EventBus 给咱们提供了 AsyncEventBusExecutor 咱们本人传入就好了。

public class AsyncEventBus extends EventBus {public AsyncEventBus(String identifier, Executor executor) {super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);  }   public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);  }  public AsyncEventBus(Executor executor) {super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);  }

下面的代码咱们改成异步的,这样不就好起来了嘛,这样的话,实际上能够联合咱们本人的线程池来解决了。

private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3));

OK,这个说分明了,咱们能够顺便再看看事件散发的解决,看到 DeadEvent 了吧,没有以后事件的订阅者,就会发送一个 DeadEvent 事件,bingo!

public void post(Object event) {Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
      if (eventSubscribers.hasNext()) {dispatcher.dispatch(event, eventSubscribers);
  } else if (!(event instanceof DeadEvent)) {
    // the event had no subscribers and was not itself a DeadEvent
    post(new DeadEvent(this, event));
  }
}

总结

OK,这个应用和源码还是比较简单的哈,有趣味的同学能够本人去瞅瞅,花不了多少时间。

总的来说,EventBus就是提供了咱们一个更优雅的代码解耦的形式,理论工作中的业务你必定能用上它!

正文完
 0