概述

事件总线是一种事件公布/订阅构造,通过公布订阅模式能够解耦不同架构层级,同样它也能够来解决业务之间的耦合,它有以下长处

  • 松耦合
  • 横切关注点
  • 可测试性
  • 事件驱动

公布订阅模式

通过下图咱们能够疾速理解公布订阅模式的实质

  1. 订阅者将本人关怀的事件在调度核心进行注册
  2. 事件的发布者通过调度核心把事件公布进来
  3. 订阅者收到本人关怀的事件变更并执行绝对应业务

其中发布者无需晓得订阅者是谁,订阅者彼此之间也互不意识,彼此之间互不烦扰

事件总线类型

在Masa Framework中,将事件划分为

  • 过程内事件 (Event)

本地事件,它的公布与订阅须要在同一个过程中,订阅方与公布方须要在同一个我的项目中

  • 跨过程事件 (IntegrationEvent)

集成事件,它的公布与订阅肯定不在同一个过程中,订阅方与公布方能够在同一个我的项目中,也能够在不同的我的项目中

上面咱们会用一个注册用户的例子来阐明如何应用本地事件

入门

  • 装置.NET 6.0
  1. 新建ASP.NET Core 空我的项目Assignment.InProcessEventBus,并装置Masa.Contrib.Dispatcher.Events
dotnet new web -o Assignment.InProcessEventBuscd Assignment.InProcessEventBusdotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
  1. 注册EventBus (用于公布本地事件), 批改Program.cs
builder.Services.AddEventBus();
  1. 新增RegisterUserEvent类并继承Event,用于公布注册用户事件
public record RegisterEvent : Event{    public string Account { get; set; }    public string Email { get; set; }    public string Password { get; set; }}
  1. 新增注册用户处理程序

在指定事件处理程序办法上减少个性 EventHandler,并在办法中减少参数 RegisterUserEvent

public class UserHandler{    private readonly ILogger<UserHandler>? _logger;    public UserHandler(ILogger<UserHandler>? logger = null)    {        //todo: 依据须要可在构造函数中注入其它服务 (需反对从DI获取)        _logger = logger;    }    [EventHandler]    public void RegisterUser(RegisterUserEvent @event)    {        //todo: 1. 编写注册用户业务        _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");                //todo: 2. 编写发送注册告诉等        _logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送邮件提醒注册胜利");    }}
注册用户的处理程序能够放到任意一个类中,但其结构函数参数必须反对从DI获取,且处理程序的办法仅反对 TaskVoid 两种, 不反对其它类型
  1. 发送注册用户事件,批改Program.cs
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>{    await eventBus.PublishAsync(@event);});

进阶

解决流程

EventBus的 申请管道蕴含一系列申请委托,顺次调用。 它们与ASP.NET Core中间件有殊途同归之妙,区别点在于中间件的执行程序与注册程序相同,最先注册的最初执行

每个委托均可在下一个委托前后执行操作,其中TransactionMiddleware是EventBus公布后第一个要进入的中间件 (默认提供),并且它是不反对屡次嵌套的。

EventBus 反对嵌套,这意味着咱们能够在Handler中从新公布一个新的Event,但TransactionMiddleware仅会在最外层进入时被触发一次

自定义中间件

依据须要咱们能够自定义中间件,并注册到EventBus的申请管道中,比方通过减少FluentValidation, 将参数验证从业务代码中剥离开来,从而使得处理程序更专一于业务

  1. 注册FluentValidation, 批改Program.cs
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
  1. 自定义验证中间件ValidatorMiddleware.cs,用于验证参数
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>    where TEvent : IEvent{    private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;    private readonly IEnumerable<IValidator<TEvent>> _validators;    public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null)    {        _validators = validators;        _logger = logger;    }    public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next)    {        var typeName = @event.GetType().FullName;        _logger?.LogDebug("----- Validating command {CommandType}", typeName);        var failures = _validators            .Select(v => v.Validate(@event))            .SelectMany(result => result.Errors)            .Where(error => error != null)            .ToList();        if (failures.Any())        {            _logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",                typeName,                @event,                failures);            throw new ValidationException("Validation exception", failures);        }        await next();    }}
  1. 注册EventBus并应用验证中间件ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
  1. 增加注册用户验证类RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>{    public RegisterUserEventValidator()    {        RuleFor(e => e.Account).NotNull().WithMessage("用户名不能为空");        RuleFor(e => e.Email).NotNull().WithMessage("邮箱不能为空");        RuleFor(e => e.Password)            .NotNull().WithMessage("明码不能为空")            .MinimumLength(6)            .WithMessage("明码必须大于6位")            .MaximumLength(20)            .WithMessage("明码必须小于20位");    }}

编排

EventBus 反对事件编排,它们能够用来解决一些对执行程序有要求的业务,比方: 注册用户必须胜利之后才能够发送注册邮件告诉,发送处分等等,那咱们能够这样做

将注册用户业务拆分为三个Handler,并通过指定Order的值来对执行事件排序

public class UserHandler{    private readonly ILogger<UserHandler>? _logger;    public UserHandler(ILogger<UserHandler>? logger = null)    {        _logger = logger;    }    [EventHandler(1)]    public void RegisterUser(RegisterUserEvent @event)    {        _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");        //todo: 编写注册用户业务    }    [EventHandler(2)]    public void SendAwardByRegister(RegisterUserEvent @event)    {        _logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送注册处分");        //todo: 编写发送处分等    }    [EventHandler(3)]    public void SendNoticeByRegister(RegisterUserEvent @event)    {        _logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送注册胜利邮件");        //todo: 编写发送注册告诉等    }}

Saga

EventBus反对Saga模式

具体是怎么做呢?

[EventHandler(1, IsCancel = true)]public void CancelSendAwardByRegister(RegisterUserEvent @event){    _logger?.LogDebug("-----------{Account} 注册胜利,发放处分失败 {Message}-----------", @event.Account, "发放处分弥补");}
当发送处分出现异常时,则执行弥补机制,执行程序为 (2 - 1) > 0,因为目前仅存在一个Order为1的Handler,则执行处分弥补后退出

但对于局部不须要执行失败但不须要执行回退的办法,咱们能够批改 FailureLevels 确保不会因为以后办法的异样而导致执行弥补机制

[EventHandler(3, FailureLevels = FailureLevels.Ignore)]public void SendNoticeByRegister(RegisterUserEvent @event){    _logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送邮件提醒注册胜利");    //todo: 编写发送注册告诉等}

源码解读

EventHandler

  • FailureLevels: 失败级别, 默认: Throw

    • Throw:产生异样后,顺次执行Order小于以后Handler的Order的勾销动作,比方:Handler程序为 1、2、3,CancelHandler为 1、2、3,如果执行 Handler3 异样,则顺次执行 2、1
    • ThrowAndCancel:产生异样后,顺次执行Order小于等于以后Handler的Order的勾销动作,比方:Handler程序为 1、2、3,CancelHandler为 1、2、3,如果执行 Handler3 异样,则顺次执行 3、2、1
    • Ignore:产生异样后,疏忽以后异样(不执行勾销动作),继续执行其余Handler
  • Order: 执行程序,默认: int.MaxValue,用于管制以后办法的执行程序
  • EnableRetry: 当Handler异样后是否启用重试, 默认: false
  • RetryTimes: 重试次数,当出现异常后执行多少次重试, 需开启重试配置
  • IsCancel: 是否是弥补机制,默认: false

Middleware<TEvent>

  • SupportRecursive: 是否反对递归 (嵌套), 默认: true

    • 局部中间件仅在最外层被触发一次,像TransactionMiddleware 就是如此,但也有很多中间件是须要被屡次执行的,比方ValidatorMiddleware,每次公布事件时都须要验证参数是否正确
  • HandleAsync(TEvent @event, EventHandlerDelegate next): 处理程序,通过调用 next() 使得申请进入下一个Handler

IEventHandler<TEvent> 与 ISagaEventHandler<TEvent>

  • HandleAsync(TEvent @event): 提供事件的Handler
  • CancelAsync(TEvent @event): 提供事件的弥补Handler
EventHandler性能相似,提供根本的Handler以及弥补Handler,举荐应用EventHandler的形式应用

TransactionMiddleware

提供事务中间件,当EventBusUoW以及Masa提供的Repository来应用时,当存在待提交的数据时,会主动执行保留并提交,当出现异常后,会执行事务回滚,无需放心脏数据入库

性能测试

与市面上应用较多的MeidatR作了比照,后果如下图所示:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT

Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart

MethodMeanErrorStdDevMedianMinMax
AddShoppingCartByEventBusAsync124.80 us346.93 us1,022.94 us8.650 us6.500 us10,202.4 us
AddShoppingCartByMediatRAsync110.57 us306.47 us903.64 us7.500 us5.300 us9,000.1 us

依据性能测试咱们发现,EventBus与MediatR性能差距很小,但EventBus提供的性能却要弱小的多

常见问题

  1. 依照文档操作,通过EventBus公布事件后,对应的Handler并没有执行,也没有发现错误?

①. EventBus.PublishAsync(@event) 是异步办法,确保期待办法调用胜利,查看是否呈现同步办法调用异步办法的状况
②. 注册EventBus时指定程序集汇合, Assembly被用于注册时获取并保留事件与Handler的对应关系

var assemblies = new[]{    typeof(UserHandler).Assembly};builder.Services.AddEventBus(assemblies);

程序集: 手动指定Assembly汇合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()

但因为NetCore按需加载,未应用的程序集在以后域中不存在,因而可能会导致局部事件以及Handler的对应关系未正确保留,因而可通过手动指定Assembly汇合或者批改全局配置中的Assembly汇合来修复这个问题

  1. 通过EventBus公布事件,Handler出错,但数据仍然保留到数据库中

①. 查看是否禁用事务

  1. DisableRollbackOnFailure是否为true (是否失败时禁止回滚)
  2. UseTransaction是否为false (禁止应用事务)

②. 查看以后数据库是否反对回滚。例如: 应用的是Mysql数据库,但回滚数据失败,请查看

本章源码

Assignment11

https://github.com/zhenlei520...

开源地址

MASA.Framework:https://github.com/masastack/...

如果你对咱们的 MASA Framework 感兴趣,无论是代码奉献、应用、提 Issue,欢送分割咱们

  • WeChat:MasaStackTechOps
  • QQ:7424099