<h1> JAVA | Guava EventBus 应用 </h1>
系列文章目录
Table of Contents
[TOC]
前言
EventBus 是 Guava 的事件处理机制,是观察者模式(生产 / 生产模型)的一种实现。
观察者模式在咱们日常开发中应用十分宽泛,例如在订单零碎中,订单状态或者物流信息的变更会向用户发送 APP 推送、短信、告诉卖家、买家等等;审批零碎中,审批单的流程流转会告诉发动审批用户、审批的领导等等。
Observer 模式也是 JDK 中自带就反对的,其在 1.0 版本就曾经存在 Observer,不过随着 Java 版本的飞速降级,其应用形式始终没有变动,许多程序库提供了更加简略的实现,例如 Guava EventBus、RxJava、EventBus 等
一、为什么要用 Observer 模式以及 EventBus 长处?
EventBus 长处
- 相比 Observer 编程简略不便
- 通过自定义参数可实现同步、异步操作以及异样解决
- 单过程应用,无网络影响
毛病
- 只能单过程应用
- 我的项目异样重启或者退出不保障音讯长久化
如果须要分布式应用还是须要应用 MQ
二、EventBus 应用步骤
1. 引入库
Gradle
compile group: 'com.google.guava', name: 'guava', version: '29.0-jre'
Maven
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
引入依赖后,这里咱们次要应用 com.google.common.eventbus.EventBus
类进行操作,其提供了 register
、unregister
、post
来进行注册订阅、勾销订阅和公布音讯
public void register(Object object);
public void unregister(Object object);
public void post(Object event);
2. 同步应用
1. 首先创立一个 EventBus
EventBus eventBus = new EventBus();
2. 创立一个订阅者
在 Guava EventBus 中,是依据参数类型进行订阅,每个订阅的办法只能由一个参数,同时须要应用 @Subscribe
标识
class EventListener {
/**
* 监听 Integer 类型的音讯
*/
@Subscribe
public void listenInteger(Integer param) {System.out.println("EventListener#listenInteger ->" + param);
}
/**
* 监听 String 类型的音讯
*/
@Subscribe
public void listenString(String param) {System.out.println("EventListener#listenString ->" + param);
}
}
3. 注册到 EventBus 上并公布音讯
EventBus eventBus = new EventBus();
eventBus.register(new EventListener());
eventBus.post(1);
eventBus.post(2);
eventBus.post("3");
运行后果为
EventListener#listenInteger ->1
EventListener#listenInteger ->2
EventListener#listenString ->3
依据须要咱们能够创立多个订阅者实现订阅信息,同时如果一个类型存在多个订阅者,则所有订阅办法都会执行
为什么说这么做是同步的呢?
Guava Event 实际上是应用线程池来解决订阅音讯的,通过源码能够看出,当咱们应用默认的构造方法创立 EventBus
的时候,其中 executor
为 MoreExecutors.directExecutor()
,其具体实现中间接调用的 Runnable#run
办法,使其依然在同一个线程中执行,所以默认操作依然是同步的,这种解决办法也有实用的中央,这样既能够解耦也能够让办法在同一个线程中执行获取同线程中的便当,比方事务的解决
EventBus 局部源码
public class EventBus {private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers;
private final Dispatcher dispatcher;
public EventBus() {this("default");
}
public EventBus(String identifier) {this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
}
public EventBus(SubscriberExceptionHandler exceptionHandler) {this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {this.subscribers = new SubscriberRegistry(this);
this.identifier = (String)Preconditions.checkNotNull(identifier);
this.executor = (Executor)Preconditions.checkNotNull(executor);
this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
}
}
DirectExecutor 局部源码
enum DirectExecutor implements Executor {
INSTANCE;
private DirectExecutor() {}
public void execute(Runnable command) {command.run();
}
public String toString() {return "MoreExecutors.directExecutor()";
}
}
3. 异步应用
通过下面的源码,能够看出只有将构造方法中的 executor 换成一个线程池实现即可, 同时 Guava EventBus 为了简化操作,提供了一个简化的计划即 AsyncEventBus
EventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
这样即可实现异步应用
AsyncEventBus 源码
public class AsyncEventBus extends EventBus {public AsyncEventBus(String identifier, Executor executor) {super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
}
public AsyncEventBus(Executor executor) {super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
}
4. 异样解决
如果解决时产生异样应该如何解决? 在看源码中,无论是 EventBus
还是 AsyncEventBus
都可传入自定义的 SubscriberExceptionHandler
该 handler 当出现异常时会被调用,我可能够从参数 exception
获取异样信息,从 context
中获取音讯信息进行特定的解决
其接口申明为
public interface SubscriberExceptionHandler {
/** Handles exceptions thrown by subscribers. */
void handleException(Throwable exception, SubscriberExceptionContext context);
}
总结
在下面的根底上,咱们能够定义一些音讯类型来实现不同音讯的监听和解决,通过实现 SubscriberExceptionHandler
来解决异样的状况,无论时同步还是异步都能熟能生巧
参考
- https://github.com/google/guava
- https://github.com/greenrobot…
- https://github.com/ReactiveX/…