乐趣区

关于.net:MASA-Framework-事件总线-跨进程事件总线

概述

跨过程事件总线容许公布和订阅跨服务传输的音讯, 服务的公布与订阅不在同一个过程中

在 Masa Framework 中, 跨过程总线事件提供了一个能够被开箱即用的程序

  • IntegrationEvents: 提供了发件箱模式

    • IntegrationEvents.Dapr: 借助 Dapr 实现了音讯的公布
    • EventLogs.EFCore: 基于 EFCore 实现的集成事件日志的提供者, 提供音讯的记录与状态更新、失败日志重试、删除过期的日志记录等

入门

跨过程事件与 Dapr 并不是强绑定的, Masa Framework 应用了 Dapr 提供的 pub/sub 的能力, 如果你不想应用它, 你也能够更换为其它实现, 但目前 Masa Framwork 中仅提供了 Dapr 的实现

  • 装置 .NET 6.0
  • 装置 Dapr
  1. 新建 ASP.NET Core 空我的项目Assignment.IntegrationEventBus,并装置Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCoreMasa.Contrib.Data.EFCore.SqliteMasa.Contrib.Data.UoW.EFCoreMasa.Contrib.Development.DaprStarter.AspNetCoreMicrosoft.EntityFrameworkCore.Design
dotnet new web -o Assignment.IntegrationEventBus
cd Assignment.IntegrationEventBus

dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.8 // 应用 dapr 提供的 pubsub 能力
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore --version 0.7.0-preview.8 // 本地音讯表
dotnet add package Masa.Contrib.Data.EFCore.Sqlite --version 0.7.0-preview.8 // 应用 EfCore.Sqlite
dotnet add package Masa.Contrib.Data.UoW.EFCore --version 0.7.0-preview.8 // 应用工作单元
dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.8 // 开发环境应用 DaprStarter 帮助治理 Dapr Sidecar
dotnet add package Microsoft.EntityFrameworkCore.Design --version 6.0.6 // 不便后续通过 CodeFirst 迁徙数据库
  1. 新建用户上下文类UserDbContext,并继承MasaDbContext
public class UserDbContext : MasaDbContext
{public UserDbContext(MasaDbContextOptions<UserDbContext> options) : base(options)
    {}}
  1. 注册DaprStarter, 帮助治理Dapr Sidecar, 批改Program.cs
if (builder.Environment.IsDevelopment())
{builder.Services.AddDaprStarter();
}

通过 Dapr 公布集成事件须要运行 Dapr, 线上环境可通过Kubernetes 来运行, 开发环境可借助 Dapr Starter 运行Dapr, 因而仅须要在开发环境应用它

  1. 注册跨过程事件总线,批改类Program
builder.Services.AddIntegrationEventBus(option =>
{option.UseDapr()
        .UseEventLog<UserDbContext>()
        .UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
});
var app = builder.Build();

#region dapr 订阅集成事件应用
app.UseRouting();

app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{endpoints.MapSubscribeHandler();
});
#endregion
  1. 新增用户注册事件的集成事件 RegisterUserEvent
public record RegisterUserEvent : IntegrationEvent
{public override string Topic { get; set;} = nameof(RegisterUserEvent);

    public string Account {get; set;}

    public string Mobile {get; set;}
}
  1. 关上 Assignment.IntegrationEventBus 所在文件夹,关上 cmd 或 Powershell 执行
dotnet ef migrations add init // 创立迁徙
dotnet ef database update // 更新数据库
  1. 发送跨过程事件,批改Program
app.MapPost("/register", async (IIntegrationEventBus eventBus) =>
{
    //todo: 模仿注册用户并公布注册用户事件
    await eventBus.PublishAsync(new RegisterUserEvent()
    {
        Account = "Tom",
        Mobile = "19999999999"
    });
});
  1. 订阅事件,批改Program
app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) =>
{Console.WriteLine($"注册用户胜利: {@event.Account}");
});

订阅事件临时未形象,目前应用的是 Dapr 原生的订阅形式,后续咱们会反对 Bind,届时不会因为更换 pubsub 的实现而导致订阅形式的扭转

只管跨过程事件目前仅反对了 Dapr,但这不代表你与RabbitMqKafka 等无缘,公布 / 订阅是 Dapr 形象出的能力,实现公布订阅的组件有很多种,RabbitMqKafka是其中一种实现,如果你想深刻理解他们之间的关系,能够参考:

  1. 手把手教你学 Dapr
  2. PubSub 代理

源码解读

首先咱们先要晓得的根底知识点:

  • IIntegrationEvent: 集成事件接口, 继承 IEvent (本地事件接口)、ITopic (订阅接口, 公布订阅的主题)、ITransaction (事务接口)
  • IIntegrationEventBus: 集成事件总线接口、用于提供发送集成事件的性能
  • IIntegrationEventLogService: 集成事件日志服务的接口 (提供保留本地日志、批改状态为进行中、胜利、失败、删除过期日志、获取期待重试日志列表的性能)
  • IntegrationEventLog: 集成事件日志, 提供本地音讯表的模型
  • IHasConcurrencyStamp: 并发标记接口 (实现此接口的类会主动为 RowVersion 赋值)

Masa.Contrib.Dispatcher.IntegrationEvents

提供了集成事件接口的实现类, 并反对了发件箱模式, 其中:

  • IPublisher: 集成事件的发送者
  • IProcessingServer: 后盾服务接口
  • IProcessor: 解决程序接口 (后盾处理程序中会获取所有的程序程序)

    • DeleteLocalQueueExpiresProcessor: 删除过期程序 (从本地队列删除)
    • DeletePublishedExpireEventProcessor: 删除已过期的公布胜利的本地音讯程序 (从 Db 删除)
    • RetryByLocalQueueProcessor: 重试本地音讯记录 (从本地队列中获取, 条件: 发送状态为失败或进行中且重试次数小于最大重试次数且重试距离大于最小重试距离)
    • RetryByDataProcessor: 重试本地音讯记录 (从 Db 获取, 条件: 发送状态为失败或进行中且重试次数小于最大重试次数且重试距离大于最小重试距离, 且不在本地重试队列中)
  • IntegrationEventBus: IIntegrationEvent 的实现

Masa.Contrib.Dispatcher.IntegrationEvents 中仅提供了发件箱的性能, 但集成事件的公布是由 IPublisher的实现类来提供, 由 Db 获取本地音讯表的性能是由 IIntegrationEventLogService 的实现类来提供, 它们别离属于 Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore 的性能, 这也是为什么应用集成事件须要援用包

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

如何疾速接入其它实现

那会有小伙伴问了, 我当初没有应用Dapr, 将来一段时间临时也还不心愿接入Dapr, 我想本人接入, 以实现集成事件的公布能够吗?

当然是能够的, 如果你心愿自行实现集成事件, 那么这个时候你会遇到两种状况

接入方反对发件箱模式

以社区用的较多的库 CAP 为例, 因为它自身曾经实现了发件箱模式, 咱们不须要再解决本地音讯表, 也无需思考本地音讯记录的治理, 那咱们能够这样做

  1. 新建类库 Masa.Contrib.Dispatcher.IntegrationEvents.Cap, 增加Masa.BuildingBlocks.Dispatcher.IntegrationEvents 的援用, 并装置DotNetCore.CAP
dotnet add package DotNetCore.CAP
  1. 新增类IntegrationEventBus, 并实现IIntegrationEventBus
public class IntegrationEventBus : IIntegrationEventBus
{
    private readonly ICapPublisher _publisher;
    private readonly ICapTransaction _capTransaction;
    private readonly IUnitOfWork? _unitOfWork;
    public IntegrationEventBus(ICapPublisher publisher, ICapTransaction capTransaction, IUnitOfWork? unitOfWork = null)
    {
        _publisher = publisher;
        _capTransaction = capTransaction;
        _unitOfWork = unitOfWork;
    }
    
    public Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        // 如果应用事务
        // _publisher.Transaction.Value.DbTransaction = unitOfWork.Transaction;
        // _publisher.Publish(@event.Topic, @event);
        throw new NotImplementedException();}

    public IEnumerable<Type> GetAllEventTypes()
    {throw new NotImplementedException();
    }

    public Task CommitAsync(CancellationToken cancellationToken = default)
    {throw new NotImplementedException();
    }
}

CAP 已反对本地事务, 应用以后 IUnitOfWork 提供的事务, 确保数据的原子性

  1. 新建类 ServiceCollectionExtensions, 将自定义Publisher 注册到服务汇合
public static class ServiceCollectionExtensions
{public static DispatcherOptions UseRabbitMq(this IServiceCollection services)
    {
         //todo: 注册 RabbitMq 信息
         services.TryAddScoped<IIntegrationEventBus, IntegrationEventBus>();
         return dispatcherOptions;
    }
}

曾经实现发件箱模式的能够间接应用, 而不须要援用

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

以上未通过理论验证, 感兴趣的能够尝试下, 欢送随时提pr

接入方不反对发件箱模式

我心愿间接接入RabbitMq, 但我本人没有做发件箱模式, 那我能够怎么做呢?

因为 Masa.Contrib.Dispatcher.IntegrationEvents 已提供发件箱模式, 如果仅仅心愿更换一个公布事件的实现者, 那咱们仅须要实现 IPublisher 即可

  1. 新建类库 Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq, 增加Masa.Contrib.Dispatcher.IntegrationEvents 我的项目援用, 并装置RabbitMQ.Client
dotnet add package RabbitMQ.Client // 应用 RabbitMq
  1. 新增类Publisher,并实现IPublisher
public class Publisher : IPublisher
{public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
    {
        //todo: 通过 RabbitMQ.Client 发送音讯到 RabbitMq
        throw new NotImplementedException();}
}
  1. 新建类 DispatcherOptionsExtensions, 将自定义Publisher 注册到服务汇合
public static class DispatcherOptionsExtensions
{public static DispatcherOptions UseRabbitMq(this Masa.Contrib.Dispatcher.IntegrationEvents.Options.DispatcherOptions options)
    {
         //todo: 注册 RabbitMq 信息
         dispatcherOptions.Services.TryAddSingleton<IPublisher, Publisher>();
         return dispatcherOptions;
    }
}
  1. 如何应用自定义实现RabbitMq
builder.Services.AddIntegrationEventBus(option =>
{option.UseRabbitMq();// 批改为应用 RabbitMq
    option.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
    option.UseEventLog<UserDbContext>();});

本章源码

Assignment12

https://github.com/zhenlei520…

开源地址

MASA.Framework:https://github.com/masastack/…


如果你对咱们的 MASA Framework 感兴趣,无论是代码奉献、应用、提 Issue,欢送分割咱们

  • WeChat:MasaStackTechOps
  • QQ:7424099
退出移动版