上半部分主要是 EventBus3.0 架构分析,接下来开始 EventBus3.0 的源码分析了。
我们从 EventBus3.0 使用方式开始源码分析,先来分析注册事件~
1. 注册源码分析
注册事件方式:
EventBus.getDefault().register(this);
EventBus 的 getDefault() 是一个单例,确保只有一个 EventBus 对象实例
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}
EventBus 构造方法做什么呢?
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {this(DEFAULT_BUILDER);
}
在 EventBus 无参构造方法调用了有一个参数的构造方法,参数传入的是 EventBusBuilder 对象,该对象配置 EventBus 相关默认的属性。
EventBus(EventBusBuilder builder) {logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
这些属性可以通过配置 EventBusBuilder 来更改
EventBus.builder()
.logNoSubscriberMessages(false)
.sendNoSubscriberEvent(false)
.installDefaultEventBus();
接下来分析 EventBus 注册重要的一个方法 register,先看下它的源码
public void register(Object subscriber) {
// 获取当前注册类的 Class 对象
Class<?> subscriberClass = subscriber.getClass();
// 查找当前注册类中被 @Subscribe 注解标记的所有方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 遍历所有的订阅方法,完成注册
for (SubscriberMethod subscriberMethod : subscriberMethods) {subscribe(subscriber, subscriberMethod);
}
}
}
从 register 方法源码可以看出,主要完成订阅方法查找和注册,订阅方法查找由 findSubscriberMethods 完成,注册由 subscribe 完成。
首先看下 findSubscriberMethods 方法源码实现
// 缓存订阅方法,key:当前注册类的 Class value:订阅方法集合
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 先从缓存中获取当前注册类的订阅方法,如果找到,直接返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {return subscriberMethods;}
// ignoreGeneratedIndex 默认值为 false
// 主要作用:是否忽略由 APT(注解处理器) 生成的订阅索引
if (ignoreGeneratedIndex) {
// 如果忽略,则使用反射技术查找所有被 @Subscribe 注解标记的所有订阅方法
subscriberMethods = findUsingReflection(subscriberClass);
} else {// 默认不忽略,但是如果没有使用 APT( 注解处理器) 生成的订阅索引,则还是通过反射技术查找
subscriberMethods = findUsingInfo(subscriberClass);
}
// 如果当前注册类中没有订阅方法,则抛出异常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber" + subscriberClass
+ "and its super classes have no public methods with the @Subscribe annotation");
} else {
// 当前注册类中有订阅方法,添加到缓存中,以便再次注册直接使用
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
findSubscriberMethods 方法主要干的事情很清晰,首先从缓存中获取当前注册类的订阅方法,如果找到,直接返回;如果没有找到,通过 APT(注解处理器) 或者反射技术查找当前注册类中的所有订阅方法,如果没有找到,抛出异常,否则缓存到内存中,并返回当前注册类中所有的订阅方法。
其中查找当前注册类中的所有订阅方法通过 findUsingInfo 方法实现
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// FindState
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {// 如果没有使用 APT( 注解处理器) 生成订阅方法索引,返回 null,则进入 else 语句中
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 使用反射技术查找当前注册类中所有的订阅方法
findUsingReflectionInSingleClass(findState);
}
// 从父类中继续查找,直到父类为 null
findState.moveToSuperclass();}
// 返回注册类中所有的订阅方法,并释放 findState 中状态,同时把 findState 对象放回缓存池中
return getMethodsAndRelease(findState);
}
findUsingInfo 方法主要是从当前注册类及父类中查找所有的订阅方法,首先从通过 APT(注解处理器) 生成订阅方法索引中查找,如果没有使用 APT(注解处理器) 生成,则通过反射技术查找。
先来看下通过反射技术怎么去查找?关键方法 findUsingReflectionInSingleClass 源码如下:
private void findUsingReflectionInSingleClass(FindState findState) {Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {int modifiers = method.getModifiers();
// 方法必须是 public 的,且不能是 ABSTRACT 或者 STATIC
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {Class<?>[] parameterTypes = method.getParameterTypes();
// 方法的参数必须是有一个
if (parameterTypes.length == 1) {Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// 方法必须被 @Subscribe 注解标记
if (subscribeAnnotation != null) {
// 方法参数类型
Class<?> eventType = parameterTypes[0];
// 用来判断该方法是否已经添加过
if (findState.checkAdd(method, eventType)) {ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method" + methodName +
"must have exactly 1 parameter but has" + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
"is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
订阅方法的查找已经分析完了,然后就是 subscribe 方法源码分析了
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 订阅事件类型,也就是订阅方法的参数类型
Class<?> eventType = subscriberMethod.eventType;
// Subscription 封装当前注册类的对象和订阅方法
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// subscriptionsByEventType 是一个 Map,key 为订阅事件类型,value 为 Subscription 集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {if (subscriptions.contains(newSubscription)) {throw new EventBusException("Subscriber" + subscriber.getClass() + "already registered to event"
+ eventType);
}
}
// 将 newSubscription 添加到 subscriptions 集合中
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {subscriptions.add(i, newSubscription);
break;
}
}
// typesBySubscriber 也是一个 Map,key 为注册类的对象,value 为当前注册类中所有订阅方法中的参数类型集合
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 默认 sticky 为 false,代表不是粘性事件,下面代码先不看,后面说到粘性事件再来分析它
if (subscriberMethod.sticky) {if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
subscribe 方法中的 subscriptionsByEventType 和 typesBySubscriber 两个 Map。在事件发布时用到
subscriptionsByEventType 来完成事件的处理,在取消注册时用到 subscriptionsByEventType 和 typesBySubscriber 这两个 Map,后面会具体分析到~~
2. 取消注册源码分析
取消注册方式:
EventBus.getDefault().unregister(this);
取消注册调用 EventBus 的 unregister 方法,下面是它的源码
public synchronized void unregister(Object subscriber) {
// typesBySubscriber 是一个 Map,注册的时候已经将 key 为当前注册类的对象,value 为当前注册类所有订阅方法的参数类型放入当前 Map 中
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
// 如果 subscribedTypes 不为 null,说明当前类有注册
if (subscribedTypes != null) {for (Class<?> eventType : subscribedTypes) {
// 根据事件类型 eventType,取消订阅
unsubscribeByEventType(subscriber, eventType);
}
// 将注册的对象从 Map 中移除
typesBySubscriber.remove(subscriber);
} else {logger.log(Level.WARNING, "Subscriber to unregister was not registered before:" + subscriber.getClass());
}
}
从 unregister 方法源码可知,先根据当前取消注册类的对象从 typesBySubscriber 缓存中找到所有订阅方法的事件类型,然后根据事件类型,取消订阅。接下来看下 unsubscribeByEventType 源码:
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// subscriptionsByEventType 也是一个 Map,注册的时候已经将 key 为订阅事件类型,value 为 Subscription 对象集合放入当前 Map 中
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {int size = subscriptions.size();
for (int i = 0; i < size; i++) {Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
// 移除 Subscription
subscriptions.remove(i);
i--;
size--;
}
}
}
}
从取消注册源码分析可知,主要是从 typesBySubscriber 和 subscriptionsByEventType 这两个 Map 中移除注册类对象和移除订阅方法。
3. 发布普通事件
普通事件的发布,可以通过下面方式:
EventBus.getDefault().post("hello, eventbus!");
可以看到发布事件通过 post 方法完成
public void post(Object event) {
// currentPostingThreadState 是一个 PostingThreadState 类型的 ThreadLocal
// PostingThreadState 维护者事件队列和线程模型
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
// 将要发送的事件先加入事件队列中
eventQueue.add(event);
// isPosting 默认值为 false
if (!postingState.isPosting) {
// 是否是主线程
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 循环遍历事件队列,从队列的头部开始发布事件
while (!eventQueue.isEmpty()) {postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post 方法先将要事件加入到事件队列中,然后循环事件队列,交给 postSingleEvent 方法处理
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// eventInheritance 默认为 true,表示查找事件的继承类
if (eventInheritance) {
// 查找该事件和该事件继承的事件集合
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 如果没有注册,subscriptionFound 为 false
if (!subscriptionFound) {if (logNoSubscriberMessages) { // logNoSubscriberMessages 默认值为 true
logger.log(Level.FINE, "No subscribers registered for event" + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {post(new NoSubscriberEvent(this, event));
}
}
}
postSingleEvent 方法会查找该事件和它的继承事件,然后交给 postSingleEventForEventType 方法处理
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// subscriptionsByEventType 在注册时,会将事件和订阅方法加入
subscriptions = subscriptionsByEventType.get(eventClass);
}
// 如果没有注册,subscriptions 就会为 null
if (subscriptions != null && !subscriptions.isEmpty()) {for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 事件处理
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {break;}
}
return true;
}
return false;
}
postSingleEventForEventType 中会从注册表 subscriptionsByEventType 中找出该事件的所有订阅方法,交给 postToSubscription 方法处理
4. 线程切换
最后是事件的处理了,会根据 ThreadMode 线程模式切换线程处理事件
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 根据订阅方法的线程模式判断,事件需要在哪个线程处理
switch (subscription.subscriberMethod.threadMode) {
// 默认线程模式,在哪个线程发布事件就在哪个线程处理事件
case POSTING:
invokeSubscriber(subscription, event);
break;
// 要求在主线程处理事件
case MAIN:
// 如果发布事件在主线程,直接执行
if (isMainThread) {invokeSubscriber(subscription, event);
} else {
// 如果发布事件在子线程,先将事件入队列,然后通过 Handler 切换到主线程执行
mainThreadPoster.enqueue(subscription, event);
}
break;
// 要求在主线程处理事件
case MAIN_ORDERED:
// 无论发布事件在哪个线程,都会把事件入队列,然后通过 Handler 切换到主线程执行
if (mainThreadPoster != null) {mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
// 要求在后台线程处理事件
case BACKGROUND:
// 如果发布事件在主线程,先将事件入队列,然后丢到线程池处理,串行处理
if (isMainThread) {backgroundPoster.enqueue(subscription, event);
} else {
// 如果发布事件在子线程,直接执行
invokeSubscriber(subscription, event);
}
break;
// 要求在异步线程处理事件
case ASYNC:
// 无论发布事件在哪个线程,都会先入队列,然后丢到线程池处理,并行处理
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode:" + subscription.subscriberMethod.threadMode);
}
}
线程切换其实就是根据订阅事件方法的线程模型及发布事件的线程来决定如何处理,处理方式分为两种:一种在相应的线程直接通过 invokeSubscriber 方法,使用反射技术来执行订阅事件方法
void invokeSubscriber(Subscription subscription, Object event) {
try {subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {throw new IllegalStateException("Unexpected exception", e);
}
}
还有一种就是先将事件入队列(底层是双向链表结构实现),然后交给 Handler 或者线程池处理。以 ASYNC 线程模型为例, asyncPoster 是 AsyncPoster 类的一个实例
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
// PendingPostQueue 是一个双向链表
queue = new PendingPostQueue();}
public void enqueue(Subscription subscription, Object event) {PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
// 加入链表的尾部
queue.enqueue(pendingPost);
// 直接丢给线程池处理
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {PendingPost pendingPost = queue.poll();
if(pendingPost == null) {throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
先分析到这,接下来分析粘性事件,订阅索引,AsyncExecutor 等~~~