概述

跨过程事件总线容许公布和订阅跨服务传输的音讯, 服务的公布与订阅不在同一个过程中

在Masa Framework中, 跨过程总线事件提供了一个能够被开箱即用的程序

  • IntegrationEvents: 提供了发件箱模式

    • IntegrationEvents.Dapr: 借助Dapr实现了音讯的公布
    • EventLogs.EFCore: 基于EFCore实现的集成事件日志的提供者, 提供音讯的记录与状态更新、失败日志重试、删除过期的日志记录等

入门

跨过程事件与Dapr并不是强绑定的, Masa Framework应用了Dapr提供的pub/sub的能力, 如果你不想应用它, 你也能够更换为其它实现, 但目前Masa Framwork中仅提供了Dapr的实现

  • 装置 .NET 6.0
  • 装置 Dapr
  1. 新建ASP.NET Core 空我的项目Assignment.IntegrationEventBus,并装置Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCoreMasa.Contrib.Data.EFCore.SqliteMasa.Contrib.Data.UoW.EFCoreMasa.Contrib.Development.DaprStarter.AspNetCoreMicrosoft.EntityFrameworkCore.Design
dotnet new web -o Assignment.IntegrationEventBuscd Assignment.IntegrationEventBusdotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.8 // 应用dapr提供的pubsub能力dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore --version 0.7.0-preview.8 //本地音讯表dotnet add package Masa.Contrib.Data.EFCore.Sqlite --version 0.7.0-preview.8 //应用EfCore.Sqlitedotnet add package Masa.Contrib.Data.UoW.EFCore --version 0.7.0-preview.8 //应用工作单元dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.8 //开发环境应用DaprStarter帮助治理Dapr Sidecardotnet add package Microsoft.EntityFrameworkCore.Design --version 6.0.6 //不便后续通过CodeFirst迁徙数据库
  1. 新建用户上下文类UserDbContext,并继承MasaDbContext
public class UserDbContext : MasaDbContext{    public UserDbContext(MasaDbContextOptions<UserDbContext> options) : base(options)    {    }}
  1. 注册DaprStarter, 帮助治理Dapr Sidecar, 批改Program.cs
if (builder.Environment.IsDevelopment()){    builder.Services.AddDaprStarter();}
通过Dapr公布集成事件须要运行Dapr, 线上环境可通过Kubernetes来运行, 开发环境可借助Dapr Starter运行Dapr, 因而仅须要在开发环境应用它
  1. 注册跨过程事件总线,批改类Program
builder.Services.AddIntegrationEventBus(option =>{    option.UseDapr()        .UseEventLog<UserDbContext>()        .UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));});var app = builder.Build();#region dapr 订阅集成事件应用app.UseRouting();app.UseCloudEvents();app.UseEndpoints(endpoints =>{    endpoints.MapSubscribeHandler();});#endregion
  1. 新增用户注册事件的集成事件 RegisterUserEvent
public record RegisterUserEvent : IntegrationEvent{    public override string Topic { get; set; } = nameof(RegisterUserEvent);    public string Account { get; set; }    public string Mobile { get; set; }}
  1. 关上Assignment.IntegrationEventBus所在文件夹,关上cmd或Powershell执行
dotnet ef migrations add init //创立迁徙dotnet ef database update //更新数据库
  1. 发送跨过程事件,批改Program
app.MapPost("/register", async (IIntegrationEventBus eventBus) =>{    //todo: 模仿注册用户并公布注册用户事件    await eventBus.PublishAsync(new RegisterUserEvent()    {        Account = "Tom",        Mobile = "19999999999"    });});
  1. 订阅事件,批改Program
app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) =>{    Console.WriteLine($"注册用户胜利: {@event.Account}");});
订阅事件临时未形象,目前应用的是Dapr原生的订阅形式,后续咱们会反对Bind,届时不会因为更换pubsub的实现而导致订阅形式的扭转

只管跨过程事件目前仅反对了Dapr,但这不代表你与RabbitMqKafka等无缘,公布/订阅是Dapr形象出的能力,实现公布订阅的组件有很多种,RabbitMqKafka是其中一种实现,如果你想深刻理解他们之间的关系,能够参考:

  1. 手把手教你学Dapr
  2. PubSub代理

源码解读

首先咱们先要晓得的根底知识点:

  • IIntegrationEvent: 集成事件接口, 继承 IEvent (本地事件接口)、ITopic (订阅接口, 公布订阅的主题)、ITransaction (事务接口)
  • IIntegrationEventBus: 集成事件总线接口、用于提供发送集成事件的性能
  • IIntegrationEventLogService: 集成事件日志服务的接口 (提供保留本地日志、批改状态为进行中、胜利、失败、删除过期日志、获取期待重试日志列表的性能)
  • IntegrationEventLog: 集成事件日志, 提供本地音讯表的模型
  • IHasConcurrencyStamp: 并发标记接口 (实现此接口的类会主动为RowVersion赋值)

Masa.Contrib.Dispatcher.IntegrationEvents

提供了集成事件接口的实现类, 并反对了发件箱模式, 其中:

  • IPublisher: 集成事件的发送者
  • IProcessingServer: 后盾服务接口
  • IProcessor: 解决程序接口 (后盾处理程序中会获取所有的程序程序)

    • DeleteLocalQueueExpiresProcessor: 删除过期程序 (从本地队列删除)
    • DeletePublishedExpireEventProcessor: 删除已过期的公布胜利的本地音讯程序 (从Db删除)
    • RetryByLocalQueueProcessor: 重试本地音讯记录 (从本地队列中获取, 条件: 发送状态为失败或进行中且重试次数小于最大重试次数且重试距离大于最小重试距离)
    • RetryByDataProcessor: 重试本地音讯记录 (从Db获取, 条件: 发送状态为失败或进行中且重试次数小于最大重试次数且重试距离大于最小重试距离, 且不在本地重试队列中)
  • IntegrationEventBus: IIntegrationEvent的实现

Masa.Contrib.Dispatcher.IntegrationEvents中仅提供了发件箱的性能, 但集成事件的公布是由 IPublisher的实现类来提供, 由Db获取本地音讯表的性能是由IIntegrationEventLogService的实现类来提供, 它们别离属于Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore的性能, 这也是为什么应用集成事件须要援用包

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

如何疾速接入其它实现

那会有小伙伴问了, 我当初没有应用Dapr, 将来一段时间临时也还不心愿接入Dapr, 我想本人接入, 以实现集成事件的公布能够吗?

当然是能够的, 如果你心愿自行实现集成事件, 那么这个时候你会遇到两种状况

接入方反对发件箱模式

以社区用的较多的库CAP为例, 因为它自身曾经实现了发件箱模式, 咱们不须要再解决本地音讯表, 也无需思考本地音讯记录的治理, 那咱们能够这样做

  1. 新建类库Masa.Contrib.Dispatcher.IntegrationEvents.Cap, 增加Masa.BuildingBlocks.Dispatcher.IntegrationEvents的援用, 并装置DotNetCore.CAP
dotnet add package DotNetCore.CAP
  1. 新增类IntegrationEventBus, 并实现IIntegrationEventBus
public class IntegrationEventBus : IIntegrationEventBus{    private readonly ICapPublisher _publisher;    private readonly ICapTransaction _capTransaction;    private readonly IUnitOfWork? _unitOfWork;    public IntegrationEventBus(ICapPublisher publisher, ICapTransaction capTransaction, IUnitOfWork? unitOfWork = null)    {        _publisher = publisher;        _capTransaction = capTransaction;        _unitOfWork = unitOfWork;    }        public Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent    {        // 如果应用事务        // _publisher.Transaction.Value.DbTransaction = unitOfWork.Transaction;        // _publisher.Publish(@event.Topic, @event);        throw new NotImplementedException();    }    public IEnumerable<Type> GetAllEventTypes()    {        throw new NotImplementedException();    }    public Task CommitAsync(CancellationToken cancellationToken = default)    {        throw new NotImplementedException();    }}
CAP已反对本地事务, 应用以后IUnitOfWork提供的事务, 确保数据的原子性
  1. 新建类ServiceCollectionExtensions, 将自定义Publisher注册到服务汇合
public static class ServiceCollectionExtensions{    public static DispatcherOptions UseRabbitMq(this IServiceCollection services)    {         //todo: 注册RabbitMq信息         services.TryAddScoped<IIntegrationEventBus, IntegrationEventBus>();         return dispatcherOptions;    }}

曾经实现发件箱模式的能够间接应用, 而不须要援用

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
以上未通过理论验证, 感兴趣的能够尝试下, 欢送随时提pr

接入方不反对发件箱模式

我心愿间接接入RabbitMq, 但我本人没有做发件箱模式, 那我能够怎么做呢?

因为Masa.Contrib.Dispatcher.IntegrationEvents已提供发件箱模式, 如果仅仅心愿更换一个公布事件的实现者, 那咱们仅须要实现IPublisher即可

  1. 新建类库Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq, 增加Masa.Contrib.Dispatcher.IntegrationEvents我的项目援用, 并装置RabbitMQ.Client
dotnet add package RabbitMQ.Client //应用RabbitMq
  1. 新增类Publisher,并实现IPublisher
public class Publisher : IPublisher{    public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent    {        //todo: 通过 RabbitMQ.Client 发送音讯到RabbitMq        throw new NotImplementedException();    }}
  1. 新建类DispatcherOptionsExtensions, 将自定义Publisher注册到服务汇合
public static class DispatcherOptionsExtensions{    public static DispatcherOptions UseRabbitMq(this Masa.Contrib.Dispatcher.IntegrationEvents.Options.DispatcherOptions options)    {         //todo: 注册RabbitMq信息         dispatcherOptions.Services.TryAddSingleton<IPublisher, Publisher>();         return dispatcherOptions;    }}
  1. 如何应用自定义实现RabbitMq
builder.Services.AddIntegrationEventBus(option =>{    option.UseRabbitMq();//批改为应用RabbitMq    option.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));    option.UseEventLog<UserDbContext>();});

本章源码

Assignment12

https://github.com/zhenlei520...

开源地址

MASA.Framework:https://github.com/masastack/...


如果你对咱们的 MASA Framework 感兴趣,无论是代码奉献、应用、提 Issue,欢送分割咱们

  • WeChat:MasaStackTechOps
  • QQ:7424099