共计 11316 个字符,预计需要花费 29 分钟才能阅读完成。
作者:京东科技 刘子洋
背景
最近我的项目呈现同一音讯发送屡次的景象,对上游业务方造成困扰,通过排查发现应用 EventBus 形式不正确。也借此机会学习了下 EventBus 并进行分享。以下为分享内容,本文次要分为五个局部,篇幅较长,望大家急躁浏览。
- 1、简述:简略介绍 EventBus 及其组成部分。
- 2、原理解析:次要对 listener 注册流程及 Event 公布流程进行解析。
- 3、应用领导:EventBus 简略的应用领导。
- 4、注意事项:在应用 EventBus 中须要留神的一些暗藏逻辑。
- 5、分享时发问的问题
- 6、我的项目中遇到的问题:上述问题进行详细描述并复现场景。
1、简述
1.1、概念
下文摘自 EventBus 源码正文,从正文中能够直观理解到他的 性能、个性、注意事项。
【源码正文】
Dispatches events to listeners, and provides ways for listeners to register themselves.
The EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). It is designed exclusively to replace traditional Java in-process event distribution using explicit registration. It is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.
Receiving Events
To receive events, an object should:
- Expose a public method, known as the event subscriber, which accepts a single argument of the type of event desired;
- Mark it with a Subscribe annotation;
- Pass itself to an EventBus instance’s register(Object) method.
Posting Events
To post an event, simply provide the event object to the post(Object) method. The EventBus instance will determine the type of event and route it to all registered listeners.
Events are routed based on their type — an event will be delivered to any subscriber for any type to which the event is assignable. This includes implemented interfaces, all superclasses, and all interfaces implemented by superclasses.
When post is called, all registered subscribers for an event are run in sequence, so subscribers should be reasonably quick. If an event may trigger an extended process (such as a database load), spawn a thread or queue it for later. (For a convenient way to do this, use an AsyncEventBus.)
Subscriber Methods
Event subscriber methods must accept only one argument: the event.
Subscribers should not, in general, throw. If they do, the EventBus will catch and log the exception. This is rarely the right solution for error handling and should not be relied upon; it is intended solely to help find problems during development.
The EventBus guarantees that it will not call a subscriber method from multiple threads simultaneously, unless the method explicitly allows it by bearing the AllowConcurrentEvents annotation. If this annotation is not present, subscriber methods need not worry about being reentrant, unless also called from outside the EventBus.
Dead Events
If an event is posted, but no registered subscribers can accept it, it is considered “dead.” To give the system a second chance to handle dead events, they are wrapped in an instance of DeadEvent and reposted.
If a subscriber for a supertype of all events (such as Object) is registered, no event will ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadEvent extends Object, a subscriber registered to receive any Object will never receive a DeadEvent.
This class is safe for concurrent use.
See the Guava User Guide article on EventBus.
Since:
10.0
Author:
Cliff Biffle
1.2、零碎流程
1.3、组成部分
1.3.1、调度器
EventBus、AsyncEventBus 都是一个调度的角色,区别是一个同步一个异步。
- EventBus
源码正文:> Dispatches events to listeners, and provides ways for listeners to register themselves.
意思是说 EventBus 散发事件(Event)给 listeners 解决,并且提供 listeners 注册本人的办法。从这里咱们能够看出 EventBus 次要是一个调度的角色。**EventBus 总结 **
- 1. 同步执行,事件发送方在收回事件之后,会期待所有的事件生产方执行结束后,才会回来继续执行本人前面的代码。- 2. 事件发送方和事件生产方会在同一个线程中执行,生产方的执行线程取决于发送方。- 3. 同一个事件的多个订阅者,在接管到事件的程序下面有不同。谁先注册到 EventBus 的,谁先执行,如果是在同一个类中的两个订阅者一起被注册到 EventBus 的状况,收到事件的程序跟办法名无关。
- AsyncEventBus
源码正文:> An {@link EventBus} that takes the Executor of your choice and uses it to dispatch events, allowing dispatch to occur asynchronously.
意思是说 AsyncEventBus 就是 EventBus,只不过 AsyncEventBus 应用你指定的线程池(不指定应用默认线程池)去散发事件(Event),并且是异步进行的。**AsyncEventBus 总结 **
- 1. 异步执行,事件发送方异步收回事件,不会期待事件生产方是否收到,间接执行本人前面的代码。- 2. 在定义 AsyncEventBus 时,构造函数中会传入一个线程池。事件生产方收到异步事件时,生产方会从线程池中获取一个新的线程来执行本人的工作。- 3. 同一个事件的多个订阅者,它们的注册程序跟接管到事件的程序上没有任何分割,都会同时收到事件,并且都是在新的线程中,** 异步并发 ** 的执行本人的工作。
1.3.2、事件承载器
- Event
事件主体,用于承载音讯。
- DeadEvent
源码正文:>Wraps an event that was posted, but which had no subscribers and thus could not be delivered, Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect misconfigurations in a system's event distribution.
意思是说 DeadEvent 就是一个被包装的 event,只不过是一个没有订阅者无奈被散发的 event。咱们能够在开发时注册一个 DeadEvent,因为它能够检测零碎事件散布中的谬误配置。
1.3.3、事件注册核心
SubscriberRegistry
源码正文:> Registry of subscribers to a single event bus.
意思是说 SubscriberRegistry 是单个事件总线(EventBus)的订阅者注册表。
1.3.4、事件散发器
Dispatcher
源码正文:>Handler for dispatching events to subscribers, providing different event ordering guarantees that make sense for different situations.
>Note: The dispatcher is orthogonal to the subscriber's Executor. The dispatcher controls the order in which events are dispatched, while the executor controls how (i.e. on which thread) the subscriber is actually called when an event is dispatched to it.
意思是说 Dispatcher 次要工作是将事件散发到订阅者,并且能够不同的状况,按不同的程序散发。
Dispatcher 有三个子类,用以满足不同的散发状况
1.PerThreadQueuedDispatcher
源码正文:> Returns a dispatcher that queues events that are posted reentrantly on a thread that is already dispatching an event, guaranteeing that all events posted on a single thread are dispatched to all subscribers in the order they are posted.
> When all subscribers are dispatched to using a direct executor (which dispatches on the same thread that posts the event), this yields a breadth-first dispatch order on each thread. That is, all subscribers to a single event A will be called before any subscribers to any events B and C that are posted to the event bus by the subscribers to A.
意思是说一个线程在处理事件过程中又公布了一个事件,PerThreadQueuedDispatcher 会将前面这个事件放到最初,从而保障在单个线程上公布的所有事件都按其公布程序分发给订阅者。** 留神,每个线程都要本人存储事件的队列。**
第二段是说 PerThreadQueuedDispatcher 按 ** 广度优先 ** 散发事件。并给了一个例子:代码中公布了事件 A,订阅者收到后,在执行过程中又公布了事件 B 和事件 C,PerThreadQueuedDispatcher 会确保事件 A 分发给所有订阅者后,再散发 B、C 事件。
2.LegacyAsyncDispatcher
源码正文:> Returns a dispatcher that queues events that are posted in a single global queue. This behavior matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful. For async dispatch, an immediate dispatcher should generally be preferable.
意思是说 LegacyAsyncDispatcher 有一个全局队列用于寄存所有事件,LegacyAsyncDispatcher 个性与 AsyncEventBus 个性齐全相符,除此之外没有其余什么个性。如果异步散发的话,最好用 immediate dispatcher。
3.ImmediateDispatcher
源码正文:> Returns a dispatcher that dispatches events to subscribers immediately as they're posted without using an intermediate queue to change the dispatch order. This is effectively a depth-first dispatch order, vs. breadth-first when using a queue.
意思是说 ImmediateDispatcher 在公布事件时立刻将事件分发给订阅者,而不应用两头队列更改散发程序。这实际上是 ** 深度优先 ** 的调度程序,而不是应用队列时的 ** 广度优先 **。
1.3.4、订阅者
- Subscriber
源码正文:> A subscriber method on a specific object, plus the executor that should be used for dispatching events to it.
Two subscribers are equivalent when they refer to the same method on the same object (not class). This property is used to ensure that no subscriber method is registered more than once.
第一段意思是说,Subscriber 是特定对象(Event)的订阅办法,用于执行被散发事件。第二段说当两个订阅者在同一对象 **(不是类)** 上援用雷同的办法时,它们是等效的,此属性用于确保不会屡次注册任何订阅者办法,次要说明会对订阅者进行判重,如果是同一个对象的同一个办法,则认为是同一个订阅者,不会进行反复注册。
- SynchronizedSubscriber
源码正文:> Subscriber that synchronizes invocations of a method to ensure that only one thread may enter the method at a time.
意思是说同步办法调用以确保一次只有一个线程能够执行订阅者办法(线程平安)。
2、原理解析
2.1、主体流程
- listener 通过 EventBus 进行注册。
- SubscriberRegister 会依据 listener、listener 中含有【@Subscribe】注解的办法及各办法参数创立 Subscriber 对象,并将其保护在 Subscribers(ConcurrentMap 类型,key 为 event 类对象,value 为 subscriber 汇合)中。
- publisher 公布事件 Event。
- 公布 Event 后,EventBus 会从 SubscriberRegister 中查找出所有订阅此事件的 Subscriber,而后让 Dispatcher 散发 Event 到每一个 Subscriber。
流程如下:
2.2、listener 注册原理
2.2.1、listener 注册流程
- 缓存所有含有 @Subscribe 注解办法到 subscriberMethodsCache(LoadingCache<Class<?>, ImmutableList>,key 为 listener,value 为 method 汇合)。
- listener 注册。
2.2.2、原理剖析
- 获取含有 @Subscribe 正文的办法进行缓存
找到所有被【@Subscribe】润饰的办法,并进行缓存
留神!!!这两个办法被 static 润饰,类加载的时候就进行寻找
订阅者惟一标识是【办法名 + 入参】
- 注册订阅者
1. 注册办法创立 Subscriber 时,如果 method 含有【@AllowConcurrentEvents】正文,则创立 SynchronizedSubscriber,否则创立 Subscriber
2、获取所有订阅者
3、从缓存中获取所有订阅办法
2.3、Event 公布原理
2.3.1、公布主体流程
- publisher 公布事件 Event。
- EventBus 依据 Event 类对象从 SubscriberRegistry 中获取所有订阅者。
- 将 Event 和 eventSubscribers 交由 Dispatcher 去散发。
- Dispatcher 将 Event 分发给每个 Subscribers。
- Subscriber 利用反射执行订阅者办法。
图中画出了三个 Dispatcher 的散发原理。
2.3.2、原理剖析
- 创立缓存
缓存 EventMsg 所有超类
留神!!!此处是静态方法,因而在代码加载的时候就会缓存 Event 所有超类。 - 公布 Event 事件
此办法是公布事件时调用的办法。 - 获取所有订阅者
1、从缓存中获取所有订阅者2、获取 Event 超类
- 事件散发
1、散发入口2、散发器散发
2.1、ImmediateDispatcher
来了一个事件则告诉对这个事件感兴趣的订阅者。2.2、PerThreadQueuedDispatcher(EventBus 默认选项)
在同一个线程 post 的 Event 执行程序是有序的。用 ThreadLocal<Queue> queue 来实现每个线程的 Event 有序性,在把事件增加到 queue 后会有一个 ThreadLocal dispatching 来判断以后线程是否正在散发,如果正在散发,则这次增加的 event 不会马上进行散发而是等到 dispatching 的值为 false(散发实现)才进行。
源码如下:2.3、LegacyAsyncDispatcher(AsyncEventBus 默认选项)
会有一个全局的队列 ConcurrentLinkedQueue queue 保留 EventWithSubscriber(事件和 subscriber), 如果被不同的线程 poll,不能保障在 queue 队列中的 event 是有序公布的。源码如下: - 执行订阅者办法
办法入口是 dispatchEvent,源码如下:因为 Subscriber 有两种,因而执行办法也有两种:
1.Subscriber(非线程平安)2.SynchronizedSubscriber(线程平安)
留神!!!执行办法会加同步锁
3、应用领导
3.1、次要流程
3.2、流程详解
- 1、创立 EventBus、AsyncEventBus Bean
在我的项目中对立配置全局单例 Bean(如非凡需要,可配置多例) - 2、定义 EventMsg
设置音讯载体。 - 3、注册 Listener
注册 Listener,处理事件留神! 在应用 PostConstruct 正文进行注册时,须要留神子类会执行父类含有 PostConstruct 正文的办法。
- 3、事件公布
封装对立公布事件的 Bean,而后通过 Bean 注入到须要公布的 Bean 外面进行事件公布。
此处对 EventBus 进行了对立封装收口操作,次要思考的是如果做一些操作,间接改这一处就能够。如果不须要封装,能够在应用的中央间接注入 EventBus 即可。
4、注意事项
4.1、循环散发事件
如果业务流程较长,切记梳理好业务流程,不要让事件循环散发。
目前 EventBus 没有对循环事件进行解决。
4.2、应用 @PostConstrucrt 注册 listener
子类在执行实例化时,会执行父类 @PostConstrucrt 正文 。如果 listenerSon 继承 listenerFather,当两者都应用 @PostConstrucrt 注册订阅办法时,子类也会调用父类的注册办法进行注册订阅办法。 因为 EventBus 机制,子类注册订阅办法时,也会注册父类的监听办法
Subscriber 惟一标记是(listener+method),因而在对同一办法注册时,因为不是同一个 listener,所以对于 EventBus 是两个订阅办法。
因而,如果存在 listenerSon、listenerFather 两个 listener,且 listenerSon 继承 listenerFather。当都应用 @PostConstrucrt 注册时,会导致 listenerFather 外面的订阅办法注册两次。
4.3、含有继承关系的 listener
当注册 listener 含有继承关系时,listener 解决 Event 音讯时,listener 的父类也会解决该音讯。
4.3.1、继承关系的订阅者
4.3.2、原理
子类 listener 注册,父类 listener 也会注册
4.4、含有继承关系的 Event
如果作为参数的 Event 有继承关系,应用 EventBus 公布 Event 时,Event 父类的监听者也会对 Event 进行解决。
4.4.1、执行后果
4.4.2、原理
在散发音讯的时候,会获取所有订阅者数据(Event 订阅者和 Event 超类的订阅者),而后进行散发数据。
获取订阅者数据如下图:
缓存 Event 及其超类的类对象,key 为 Event 类对象。
5、分享发问问题
问题 1:PerThreadQueuedDispatcherd 外面的队列,是否是有界队列?
有界队列,最大值为 int 的最大值(2147483647),源码如下图:
问题 2:dispatcher 分发给订阅者是否有序?
EventBus: 同步事件总线 :
同一个事件的多个订阅者,在接管到事件的程序下面有不同。谁先注册到 EventBus 的,谁先执行(因为 base 应用的是 PostConstruct 进行注册,因而跟不同 Bean 之间的初始化程序有关系)。如果是在同一个类中的两个订阅者一起被注册到 EventBus 的状况,收到事件的程序跟办法名无关。
AsyncEventBus: 异步事件总线:同一个事件的多个订阅者,它们的注册程序跟接管到事件的程序上没有任何分割,都会同时收到事件,并且都是在新的线程中,异步并发的执行本人的工作。
问题 3:EventBus 与 SpringEvent 的比照?
- 应用形式比拟
我的项目 | 事件 | 发布者 | 公布办法 | 是否异步 | 监听者 | 注册形式 |
---|---|---|---|---|---|---|
EventBus | 任意对象 | EventBus | EventBus#post | 反对同步异步 | 注解 Subscribe 办法 | 手动注册 EventBus#register |
SpringEvent | 任意对象 | ApplicationEventPublisher | ApplicationEventPublisher#publishEvent | 反对同步异步 | 注解 EventListener 办法 | 零碎注册 |
- 应用场景比拟
我的项目 | 事件辨别 | 是否反对事件簇 | 是否反对自定义 event | 是否反对过滤 | 是否反对事件隔离 | 是否反对事务 | 是否反对设置订阅者生产程序 | 复杂程度 |
---|---|---|---|---|---|---|---|---|
EventBus | Class | 是 | 是 | 否 | 是 | 否 | 否 | 简略 |
Spring Event | Class | 是 | 是 | 是 | 否 | 是 | 是 | 简单 |
参考链接 https://www.cnblogs.com/shoren/p/eventBus_springEvent.html
问题 4:EventBus 的应用场景,联合现有应用场景思考是否适合?
EventBus 临时不实用,次要有一下几个点:
- EventBus 不反对事务,我的项目在更新、创立商品时,最好等事务提交胜利后,再发送 MQ 音讯(次要问题点)
- EventBus 不反对设置同一音讯的订阅者生产程序。
- EventBus 不反对音讯过滤。SpringEvent 反对音讯过滤
6. 我的项目中遇到的问题
6.1、问题形容
商品上架时会触发渠道散发性能,会有两步操作
- 1、创立一条散发记录,并对外发送一条 未散发状态 的商品变更音讯(通过 eventBus 事件发送音讯)。
- 2、将散发记录改为审核中(须要审核)或审核通过(不须要审核),并对外发送一条 已散发状态 的商品变更音讯(通过 eventBus 事件发送音讯)。
所以散发会触发两条散发状态不同的商品变更音讯,一条是未散发,另一条是已散发 。理论发送了两条散发状态雷同的商品变更音讯, 状态都是已散发。
6.2、起因
咱们先来回顾下 EventBus 监听者处理事件时有三种策略,这是根本原因:
- ImmediateDispatcher:来一个事件马上进行解决。
- PerThreadQueuedDispatcher(eventBus 默认选项,我的项目中应用此策略):在同一个线程 post 的 Event, 执行的程序是有序的。用 ThreadLocal<Queue> queue 来实现每个线程 post 的 Event 是有序的,在把事件增加到 queue 后会有一个 ThreadLocal dispatching 来判断以后线程是否正在散发,如果正在散发,则这次增加的 event 不会马上进行散发而是等到 dispatching 的值为 false 才进行。
- LegacyAsyncDispatcher(AsyncEventBus 默认选项):会有一个全局的队列 ConcurrentLinkedQueue queue 保留 EventWithSubscriber(事件和 subscriber), 如果被不同的线程 poll 不能保障在 queue 队列中的 event 是有序公布的。
详情可见上文中的【2.3.4、事件散发】
再看下我的项目中的逻辑:
商品主动散发在商品变更的 Listener 里操作。因为以后散发操作处于商品上架事件处理过程中,因而对于增加散发记录事件不会立马解决,而是将其放入队列。上架操作实现,散发状态变为已散发。等上架操作实现后,商品变更 Listener 解决散发事件(此时有两条 EventMsg,一个是增加散发记录另一个是批改散发状态),散发状态实时查问,对于第一个散发事件,查问到的散发记录是已散发状态。最终导致两条音讯都是已散发状态。
6.3、场景复现
在 handler 中对动态变量进行两次 +1 操作,每操作一步发送一条事件,此处假如动态变量为散发状态。
6.4、解决办法
目前 Dispatcher 包用 default 润饰,使用者无奈指定 Dispatcher 策略。并且 ImmediateDispatcher 应用 private 润饰。
因而目前暂无解决非同步问题,只能在业务逻辑上进行躲避。
其实能够批改源码并公布一个包本人应用,然而公司平安规定不容许这样做,只能通过业务逻辑上进行躲避,下图是 github 上对此问题的探讨。
7、总结
如果我的项目中须要应用异步解耦解决一些事项,应用 EventBus 还是比拟不便的。