乐趣区

关于框架:MASA-Framework-事件总线-进程内事件总线

概述

事件总线是一种事件公布 / 订阅构造,通过公布订阅模式能够解耦不同架构层级,同样它也能够来解决业务之间的耦合,它有以下长处

  • 松耦合
  • 横切关注点
  • 可测试性
  • 事件驱动

公布订阅模式

通过下图咱们能够疾速理解公布订阅模式的实质

  1. 订阅者将本人关怀的事件在调度核心进行注册
  2. 事件的发布者通过调度核心把事件公布进来
  3. 订阅者收到本人关怀的事件变更并执行绝对应业务

其中发布者无需晓得订阅者是谁,订阅者彼此之间也互不意识,彼此之间互不烦扰

事件总线类型

在 Masa Framework 中,将事件划分为

  • 过程内事件 (Event)

本地事件,它的公布与订阅须要在同一个过程中,订阅方与公布方须要在同一个我的项目中

  • 跨过程事件 (IntegrationEvent)

集成事件,它的公布与订阅肯定不在同一个过程中,订阅方与公布方能够在同一个我的项目中,也能够在不同的我的项目中

上面咱们会用一个注册用户的例子来阐明如何应用本地事件

入门

  • 装置.NET 6.0
  1. 新建 ASP.NET Core 空我的项目Assignment.InProcessEventBus,并装置Masa.Contrib.Dispatcher.Events
dotnet new web -o Assignment.InProcessEventBus
cd Assignment.InProcessEventBus
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
  1. 注册 EventBus (用于公布本地事件), 批改Program.cs
builder.Services.AddEventBus();
  1. 新增 RegisterUserEvent 类并继承Event,用于公布注册用户事件
public record RegisterEvent : Event
{public string Account { get; set;}

    public string Email {get; set;}

    public string Password {get; set;}
}
  1. 新增 注册用户 处理程序

在指定事件处理程序办法上减少个性 EventHandler,并在办法中减少参数 RegisterUserEvent

public class UserHandler
{
    private readonly ILogger<UserHandler>? _logger;

    public UserHandler(ILogger<UserHandler>? logger = null)
    {//todo: 依据须要可在构造函数中注入其它服务 (需反对从 DI 获取)
        _logger = logger;
    }

    [EventHandler]
    public void RegisterUser(RegisterUserEvent @event)
    {
        //todo: 1. 编写注册用户业务
        _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
        
        //todo: 2. 编写发送注册告诉等
        _logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送邮件提醒注册胜利");
    }
}

注册用户的处理程序能够放到任意一个类中,但其结构函数参数必须反对从 DI 获取,且处理程序的办法仅反对 TaskVoid 两种, 不反对其它类型

  1. 发送注册用户事件,批改Program.cs
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>
{await eventBus.PublishAsync(@event);
});

进阶

解决流程

EventBus 的 申请管道蕴含一系列申请委托,顺次调用。它们与 ASP.NET Core 中间件有殊途同归之妙,区别点在于中间件的 执行程序与注册程序相同,最先注册的最初执行

每个委托均可在下一个委托前后执行操作,其中 TransactionMiddleware 是 EventBus 公布后第一个要进入的中间件 (默认提供),并且它是不反对屡次嵌套的。

EventBus 反对嵌套,这意味着咱们能够在 Handler 中从新公布一个新的 Event,但TransactionMiddleware 仅会在最外层进入时被触发一次

自定义中间件

依据须要咱们能够自定义中间件,并注册到 EventBus 的申请管道中,比方通过减少FluentValidation, 将参数验证从业务代码中剥离开来,从而使得处理程序更专一于业务

  1. 注册FluentValidation, 批改Program.cs
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
  1. 自定义验证中间件ValidatorMiddleware.cs,用于验证参数
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>
    where TEvent : IEvent
{
    private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;
    private readonly IEnumerable<IValidator<TEvent>> _validators;

    public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null)
    {
        _validators = validators;
        _logger = logger;
    }

    public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
    {var typeName = @event.GetType().FullName;

        _logger?.LogDebug("----- Validating command {CommandType}", typeName);

        var failures = _validators
            .Select(v => v.Validate(@event))
            .SelectMany(result => result.Errors)
            .Where(error => error != null)
            .ToList();

        if (failures.Any())
        {_logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",
                typeName,
                @event,
                failures);

            throw new ValidationException("Validation exception", failures);
        }

        await next();}
}
  1. 注册 EventBus 并应用验证中间件ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
  1. 增加注册用户验证类RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>
{public RegisterUserEventValidator()
    {RuleFor(e => e.Account).NotNull().WithMessage("用户名不能为空");
        RuleFor(e => e.Email).NotNull().WithMessage("邮箱不能为空");
        RuleFor(e => e.Password)
            .NotNull().WithMessage("明码不能为空")
            .MinimumLength(6)
            .WithMessage("明码必须大于 6 位")
            .MaximumLength(20)
            .WithMessage("明码必须小于 20 位");
    }
}

编排

EventBus 反对事件编排,它们能够用来解决一些对执行程序有要求的业务,比方: 注册用户必须胜利之后才能够发送注册邮件告诉,发送处分等等,那咱们能够这样做

将注册用户业务拆分为三个 Handler,并通过指定 Order 的值来对执行事件排序

public class UserHandler
{
    private readonly ILogger<UserHandler>? _logger;

    public UserHandler(ILogger<UserHandler>? logger = null)
    {_logger = logger;}

    [EventHandler(1)]
    public void RegisterUser(RegisterUserEvent @event)
    {_logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
        //todo: 编写注册用户业务
    }

    [EventHandler(2)]
    public void SendAwardByRegister(RegisterUserEvent @event)
    {_logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送注册处分");
        //todo: 编写发送处分等
    }

    [EventHandler(3)]
    public void SendNoticeByRegister(RegisterUserEvent @event)
    {_logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送注册胜利邮件");
        //todo: 编写发送注册告诉等
    }
}

Saga

EventBus 反对 Saga 模式

具体是怎么做呢?

[EventHandler(1, IsCancel = true)]
public void CancelSendAwardByRegister(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Account} 注册胜利,发放处分失败 {Message}-----------", @event.Account, "发放处分弥补");
}

当发送处分出现异常时,则执行弥补机制,执行程序为 (2 – 1) > 0,因为目前仅存在一个 Order 为 1 的 Handler,则执行处分弥补后退出

但对于局部不须要执行失败但不须要执行回退的办法,咱们能够批改 FailureLevels 确保不会因为以后办法的异样而导致执行弥补机制

[EventHandler(3, FailureLevels = FailureLevels.Ignore)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Account} 注册胜利 {Message}-----------", @event.Account, "发送邮件提醒注册胜利");
    //todo: 编写发送注册告诉等
}

源码解读

EventHandler

  • FailureLevels: 失败级别, 默认: Throw

    • Throw:产生异样后,顺次执行 Order 小于以后 Handler 的 Order 的勾销动作,比方:Handler 程序为 1、2、3,CancelHandler 为 1、2、3,如果执行 Handler3 异样,则顺次执行 2、1
    • ThrowAndCancel:产生异样后,顺次执行 Order 小于等于以后 Handler 的 Order 的勾销动作,比方:Handler 程序为 1、2、3,CancelHandler 为 1、2、3,如果执行 Handler3 异样,则顺次执行 3、2、1
    • Ignore:产生异样后,疏忽以后异样(不执行勾销动作),继续执行其余 Handler
  • Order: 执行程序,默认: int.MaxValue,用于管制以后办法的执行程序
  • EnableRetry: 当 Handler 异样后是否启用重试, 默认: false
  • RetryTimes: 重试次数,当出现异常后执行多少次重试, 需开启重试配置
  • IsCancel: 是否是弥补机制,默认: false

Middleware<TEvent>

  • SupportRecursive: 是否反对递归 (嵌套), 默认: true

    • 局部中间件仅在最外层被触发一次,像TransactionMiddleware 就是如此,但也有很多中间件是须要被屡次执行的,比方ValidatorMiddleware,每次公布事件时都须要验证参数是否正确
  • HandleAsync(TEvent @event, EventHandlerDelegate next): 处理程序,通过调用 next() 使得申请进入下一个Handler

IEventHandler<TEvent> 与 ISagaEventHandler<TEvent>

  • HandleAsync(TEvent @event): 提供事件的 Handler
  • CancelAsync(TEvent @event): 提供事件的弥补 Handler

EventHandler 性能相似,提供根本的 Handler 以及弥补 Handler,举荐应用 EventHandler 的形式应用

TransactionMiddleware

提供事务中间件,当 EventBusUoW以及 Masa 提供的 Repository 来应用时,当存在待提交的数据时,会主动执行保留并提交,当出现异常后,会执行事务回滚,无需放心脏数据入库

性能测试

与市面上应用较多的 MeidatR 作了比照,后果如下图所示:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT

Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart

Method Mean Error StdDev Median Min Max
AddShoppingCartByEventBusAsync 124.80 us 346.93 us 1,022.94 us 8.650 us 6.500 us 10,202.4 us
AddShoppingCartByMediatRAsync 110.57 us 306.47 us 903.64 us 7.500 us 5.300 us 9,000.1 us

依据性能测试咱们发现,EventBus 与 MediatR 性能差距很小,但 EventBus 提供的性能却要弱小的多

常见问题

  1. 依照文档操作,通过 EventBus 公布事件后,对应的 Handler 并没有执行,也没有发现错误?

①. EventBus.PublishAsync(@event) 是异步办法,确保期待办法调用胜利,查看是否呈现同步办法调用异步办法的状况
②. 注册EventBus 时指定程序集汇合, Assembly 被用于注册时获取并保留事件与 Handler 的对应关系

var assemblies = new[]
{typeof(UserHandler).Assembly
};
builder.Services.AddEventBus(assemblies);

程序集: 手动指定 Assembly 汇合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()

但因为 NetCore 按需加载,未应用的程序集在以后域中不存在,因而可能会导致局部事件以及 Handler 的对应关系未正确保留,因而可通过手动指定 Assembly 汇合或者批改全局配置中的 Assembly 汇合来修复这个问题

  1. 通过 EventBus 公布事件,Handler 出错,但数据仍然保留到数据库中

①. 查看是否禁用事务

  1. DisableRollbackOnFailure 是否为 true (是否失败时禁止回滚)
  2. UseTransaction 是否为 false (禁止应用事务)

②. 查看以后数据库是否反对回滚。例如: 应用的是 Mysql 数据库,但回滚数据失败,请查看

本章源码

Assignment11

https://github.com/zhenlei520…

开源地址

MASA.Framework:https://github.com/masastack/…

如果你对咱们的 MASA Framework 感兴趣,无论是代码奉献、应用、提 Issue,欢送分割咱们

  • WeChat:MasaStackTechOps
  • QQ:7424099
退出移动版