概述
事件总线是一种事件公布 / 订阅构造,通过公布订阅模式能够解耦不同架构层级,同样它也能够来解决业务之间的耦合,它有以下长处
- 松耦合
- 横切关注点
- 可测试性
- 事件驱动
公布订阅模式
通过下图咱们能够疾速理解公布订阅模式的实质
- 订阅者将本人关怀的事件在调度核心进行注册
- 事件的发布者通过调度核心把事件公布进来
- 订阅者收到本人关怀的事件变更并执行绝对应业务
其中发布者无需晓得订阅者是谁,订阅者彼此之间也互不意识,彼此之间互不烦扰
事件总线类型
在 Masa Framework 中,将事件划分为
- 过程内事件 (Event)
本地事件,它的公布与订阅须要在同一个过程中,订阅方与公布方须要在同一个我的项目中
- 跨过程事件 (IntegrationEvent)
集成事件,它的公布与订阅肯定不在同一个过程中,订阅方与公布方能够在同一个我的项目中,也能够在不同的我的项目中
上面咱们会用一个注册用户的例子来阐明如何应用本地事件
入门
- 装置.NET 6.0
- 新建 ASP.NET Core 空我的项目
Assignment.InProcessEventBus
,并装置Masa.Contrib.Dispatcher.Events
dotnet new web -o Assignment.InProcessEventBus
cd Assignment.InProcessEventBus
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
- 注册 EventBus (用于公布本地事件), 批改
Program.cs
builder.Services.AddEventBus();
- 新增
RegisterUserEvent
类并继承Event
,用于公布注册用户事件
public record RegisterEvent : Event
{public string Account { get; set;}
public string Email {get; set;}
public string Password {get; set;}
}
- 新增
注册用户
处理程序
在指定事件处理程序办法上减少个性 EventHandler,并在办法中减少参数 RegisterUserEvent
public class UserHandler
{
private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null)
{//todo: 依据须要可在构造函数中注入其它服务 (需反对从 DI 获取)
_logger = logger;
}
[EventHandler]
public void RegisterUser(RegisterUserEvent @event)
{
//todo: 1. 编写注册用户业务
_logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
//todo: 2. 编写发送注册告诉等
_logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送邮件提醒注册胜利");
}
}
注册用户的处理程序能够放到任意一个类中,但其结构函数参数必须反对从 DI 获取,且处理程序的办法仅反对
Task
或Void
两种, 不反对其它类型
- 发送注册用户事件,批改
Program.cs
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>
{await eventBus.PublishAsync(@event);
});
进阶
解决流程
EventBus 的 申请管道蕴含一系列申请委托,顺次调用。它们与 ASP.NET Core 中间件有殊途同归之妙,区别点在于中间件的 执行程序与注册程序相同,最先注册的最初执行
每个委托均可在下一个委托前后执行操作,其中 TransactionMiddleware
是 EventBus 公布后第一个要进入的中间件 (默认提供),并且它是不反对屡次嵌套的。
EventBus 反对嵌套,这意味着咱们能够在 Handler 中从新公布一个新的
Event
,但TransactionMiddleware
仅会在最外层进入时被触发一次
自定义中间件
依据须要咱们能够自定义中间件,并注册到 EventBus 的申请管道中,比方通过减少FluentValidation
, 将参数验证从业务代码中剥离开来,从而使得处理程序更专一于业务
- 注册
FluentValidation
, 批改Program.cs
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
- 自定义验证中间件
ValidatorMiddleware.cs
,用于验证参数
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>
where TEvent : IEvent
{
private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;
private readonly IEnumerable<IValidator<TEvent>> _validators;
public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null)
{
_validators = validators;
_logger = logger;
}
public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
{var typeName = @event.GetType().FullName;
_logger?.LogDebug("----- Validating command {CommandType}", typeName);
var failures = _validators
.Select(v => v.Validate(@event))
.SelectMany(result => result.Errors)
.Where(error => error != null)
.ToList();
if (failures.Any())
{_logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",
typeName,
@event,
failures);
throw new ValidationException("Validation exception", failures);
}
await next();}
}
- 注册 EventBus 并应用验证中间件
ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
- 增加注册用户验证类
RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>
{public RegisterUserEventValidator()
{RuleFor(e => e.Account).NotNull().WithMessage("用户名不能为空");
RuleFor(e => e.Email).NotNull().WithMessage("邮箱不能为空");
RuleFor(e => e.Password)
.NotNull().WithMessage("明码不能为空")
.MinimumLength(6)
.WithMessage("明码必须大于 6 位")
.MaximumLength(20)
.WithMessage("明码必须小于 20 位");
}
}
编排
EventBus 反对事件编排,它们能够用来解决一些对执行程序有要求的业务,比方: 注册用户必须胜利之后才能够发送注册邮件告诉,发送处分等等,那咱们能够这样做
将注册用户业务拆分为三个 Handler,并通过指定 Order 的值来对执行事件排序
public class UserHandler
{
private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null)
{_logger = logger;}
[EventHandler(1)]
public void RegisterUser(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
//todo: 编写注册用户业务
}
[EventHandler(2)]
public void SendAwardByRegister(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送注册处分");
//todo: 编写发送处分等
}
[EventHandler(3)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送注册胜利邮件");
//todo: 编写发送注册告诉等
}
}
Saga
EventBus 反对 Saga 模式
具体是怎么做呢?
[EventHandler(1, IsCancel = true)]
public void CancelSendAwardByRegister(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Account} 注册胜利,发放处分失败 {Message}-----------", @event.Account, "发放处分弥补");
}
当发送处分出现异常时,则执行弥补机制,执行程序为 (2 – 1) > 0,因为目前仅存在一个 Order 为 1 的 Handler,则执行处分弥补后退出
但对于局部不须要执行失败但不须要执行回退的办法,咱们能够批改 FailureLevels
确保不会因为以后办法的异样而导致执行弥补机制
[EventHandler(3, FailureLevels = FailureLevels.Ignore)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送邮件提醒注册胜利");
//todo: 编写发送注册告诉等
}
源码解读
EventHandler
-
FailureLevels: 失败级别, 默认: Throw
- Throw:产生异样后,顺次执行 Order 小于以后 Handler 的 Order 的勾销动作,比方:Handler 程序为 1、2、3,CancelHandler 为 1、2、3,如果执行 Handler3 异样,则顺次执行 2、1
- ThrowAndCancel:产生异样后,顺次执行 Order 小于等于以后 Handler 的 Order 的勾销动作,比方:Handler 程序为 1、2、3,CancelHandler 为 1、2、3,如果执行 Handler3 异样,则顺次执行 3、2、1
- Ignore:产生异样后,疏忽以后异样(不执行勾销动作),继续执行其余 Handler
- Order: 执行程序,默认: int.MaxValue,用于管制以后办法的执行程序
- EnableRetry: 当 Handler 异样后是否启用重试, 默认: false
- RetryTimes: 重试次数,当出现异常后执行多少次重试, 需开启重试配置
- IsCancel: 是否是弥补机制,默认: false
Middleware<TEvent>
-
SupportRecursive: 是否反对递归 (嵌套), 默认: true
- 局部中间件仅在最外层被触发一次,像
TransactionMiddleware
就是如此,但也有很多中间件是须要被屡次执行的,比方ValidatorMiddleware
,每次公布事件时都须要验证参数是否正确
- 局部中间件仅在最外层被触发一次,像
- HandleAsync(TEvent @event, EventHandlerDelegate next): 处理程序,通过调用
next()
使得申请进入下一个Handler
IEventHandler<TEvent> 与 ISagaEventHandler<TEvent>
- HandleAsync(TEvent @event): 提供事件的 Handler
- CancelAsync(TEvent @event): 提供事件的弥补 Handler
与
EventHandler
性能相似,提供根本的 Handler 以及弥补 Handler,举荐应用EventHandler
的形式应用
TransactionMiddleware
提供事务中间件,当 EventBus
与UoW
以及 Masa 提供的 Repository
来应用时,当存在待提交的数据时,会主动执行保留并提交,当出现异常后,会执行事务回滚,无需放心脏数据入库
性能测试
与市面上应用较多的 MeidatR
作了比照,后果如下图所示:
BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT
Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart
Method | Mean | Error | StdDev | Median | Min | Max |
---|---|---|---|---|---|---|
AddShoppingCartByEventBusAsync | 124.80 us | 346.93 us | 1,022.94 us | 8.650 us | 6.500 us | 10,202.4 us |
AddShoppingCartByMediatRAsync | 110.57 us | 306.47 us | 903.64 us | 7.500 us | 5.300 us | 9,000.1 us |
依据性能测试咱们发现,EventBus 与 MediatR 性能差距很小,但 EventBus 提供的性能却要弱小的多
常见问题
- 依照文档操作,通过
EventBus
公布事件后,对应的 Handler 并没有执行,也没有发现错误?
①. EventBus.PublishAsync(@event) 是异步办法,确保期待办法调用胜利,查看是否呈现同步办法调用异步办法的状况
②. 注册EventBus
时指定程序集汇合, Assembly 被用于注册时获取并保留事件与 Handler 的对应关系
var assemblies = new[]
{typeof(UserHandler).Assembly
};
builder.Services.AddEventBus(assemblies);
程序集: 手动指定 Assembly 汇合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()
但因为 NetCore 按需加载,未应用的程序集在以后域中不存在,因而可能会导致局部事件以及 Handler 的对应关系未正确保留,因而可通过手动指定 Assembly 汇合或者批改全局配置中的 Assembly 汇合来修复这个问题
- 通过 EventBus 公布事件,Handler 出错,但数据仍然保留到数据库中
①. 查看是否禁用事务
- DisableRollbackOnFailure 是否为 true (是否失败时禁止回滚)
- UseTransaction 是否为 false (禁止应用事务)
②. 查看以后数据库是否反对回滚。例如: 应用的是 Mysql 数据库,但回滚数据失败,请查看
本章源码
Assignment11
https://github.com/zhenlei520…
开源地址
MASA.Framework:https://github.com/masastack/…
如果你对咱们的 MASA Framework 感兴趣,无论是代码奉献、应用、提 Issue,欢送分割咱们
- WeChat:MasaStackTechOps
- QQ:7424099