关于c#:手把手教你学Dapr-7-Actors

3次阅读

共计 10057 个字符,预计需要花费 26 分钟才能阅读完成。

介绍

Actor 模式将 Actor 形容为最低级别的“计算单元”。换句话说,您在一个独立的单元(称为 actor)中编写代码,该单元接管音讯并一次解决一个音讯,没有任何并发或线程。

再换句话说,依据 ActorId 划分独立计算单元后,雷同的 ActorId 重入要排队,能够了解为lock(ActorId)

:这里有个反例,就是 重入性 的引入,这个概念目前还是 Preview,它容许同一个链内能够反复进入,判断的规范不止是 ActorId 这么简略,即本人调本人是被容许的。这个 默认是敞开的 ,须要手动开启,即 默认不容许本人调本人

当您的代码解决一条音讯时,它能够向其余参与者发送一条或多条音讯,或者创立新的参与者。底层运行时治理每个参与者运行的形式、工夫和地点,并在参与者之间路由音讯。

大量的 Actor 能够同时执行,Actor 彼此独立执行。

Dapr 蕴含一个运行时,它专门实现了 Virtual Actor 模式。通过 Dapr 的实现,您能够依据 Actor 模型编写 Dapr Actor,而 Dapr 利用底层平台提供的可扩展性和可靠性保障。

什么时候用 Actors

Actor 设计模式非常适合许多分布式系统问题和场景,但您首先应该思考的是该模式的束缚。一般来说,如果呈现以下状况,请思考应用 Actors 模式来为您的问题或场景建模:

  • 您的问题空间波及大量(数千个或更多)小的、独立且孤立的状态和逻辑单元
  • 您心愿应用 须要 与内部组件 进行 大量交互 的单线程对象,包含跨一组 Actors 查问状态。
  • 您的 Actor 实例 会通过收回 I/O 操作来 阻塞 具备不可预测提早的调用者。

Dapr Actor

每个 Actor 都被定义为 Actor 类型的实例,就像对象是类的实例一样。例如,可能有一个执行计算器性能的 Actor 类型,并且可能有许多该类型的 Actor 散布在集群的各个节点上。每个这样的 Actor 都由一个 Acotr ID 惟一标识。

生命周期

Dapr Actors 是虚构的,这意味着他们的生命周期与他们的内存体现无关。因而,它们不须要显式创立或销毁。Dapr Actors 运行时在第一次收到对该 Actor ID 的申请时会主动激活该 Actor。如果一个 Actor 在一段时间内没有被应用,Dapr Actors 运行时就会对内存中的对象进行垃圾回收。如果稍后须要从新激活,它还将放弃对参与者存在的理解。如果稍后须要从新激活,它还将放弃对 Actor 的所有原有数据。

调用 Actor 办法和揭示会重置闲暇工夫,例如揭示触发将使 Actor 放弃沉闷。无论 Actor 是沉闷还是不沉闷,Actor 揭示都会触发,如果为不沉闷的 Actor 触发,它将首先激活演员。Actor 计时器不会重置闲暇工夫,因而计时器触发不会使 Actor 放弃活动状态。计时器仅在 Actor 处于活动状态时触发。

Reminders 和 Timers 最大的区别就是 Reminders 会放弃 Actor 的活动状态,而 Timers 不会

Dapr 运行时用来查看 Actor 是否能够被垃圾回收的闲暇超时和扫描距离是可配置的。当 Dapr 运行时调用 Actor 服务以获取反对的 Actor 类型时,能够传递此信息。

因为 Virtual Actor 模型的存在,这种 Virtual Actor 生命周期形象带来了一些注意事项,事实上,Dapr Actors 实现有时会偏离这个模型。

第一次将音讯发送到 Actor ID 时,Actor 被主动激活(导致构建 Actor 对象)。通过一段时间后,Actor 对象将被垃圾回收。被回收后再次应用 Actor ID 将导致结构一个新的 Actor 对象。Actor 的状态比对象的生命周期长,因为状态存储在 Dapr 运行时配置的状态治理组件中。

:Actor 被垃圾回收之前,Actor 对象是会复用的。这里会导致一个问题,在.Net Actor 类中,构造函数在 Actor 存活期间只会被调用一次。

散发和故障转移

为了提供可扩展性和可靠性,Actor 实例散布在整个集群中,Dapr 依据须要主动将它们从故障节点迁徙到衰弱节点。

Actors 散布在 Actor 服务的实例中,而这些实例散布在集群中的节点之间。对于给定的 Actor 类型,每个服务实例都蕴含一组 Actor。

Dapr 安置服务(Placement Service)

Dapr Actor 运行时为您治理散发计划和密钥范畴设置。这是由 Actor Placement 服务实现的。创立服务的新实例时,相应的 Dapr 运行时会注册它能够创立的 Actor 类型,并且安置服务会计算给定 Actor 类型的所有实例的分区。每个 Actor 类型的分区信息表被更新并存储在环境中运行的每个 Dapr 实例中,并且能够随着 Actor 服务的新实例的创立和销毁而动态变化。这如下图所示:

当客户端调用具备特定 ID 的 Actor(例如,Actor ID 123)时,客户端的 Dapr 实例会 Hash Actor 类型和 ID,并应用该信息调用能够为特定 Actor ID 的申请提供服务的相应 Dapr 实例。因而,始终为任何给定的 Actor ID 调用雷同的分区(或服务实例)。这如下图所示:

这简化了一些抉择,但也带来了一些思考:

  • 默认状况下,Actor 随机搁置到 pod 中,从而实现均匀分布。
  • 因为 Actor 是随机搁置的,应该能够预料到 Actor 操作总是须要网络通信,包含办法调用数据的序列化和反序列化,产生提早和开销。

:Dapr Actor 搁置服务仅用于 Actor 搁置,因而如果您的服务不应用 Dapr Actors,则不须要。搁置服务能够在所有托管环境中运行,包含自托管和 Kubernetes。

Actor 通信

您能够通过 HTTP/gRPC 调用 Actor,当然也能够用 SDK。

POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/<method/state/timers/reminders>

并发

Dapr Actor 运行时为拜访 Actor 办法提供了一个简略的回合制(turn-basesd)的拜访模型。这意味着在任何时候,Actor 对象的代码中都不能有超过一个线程处于活动状态。

单个 Actor 实例一次不能解决多个申请。如果预期要解决并发申请,Actor 实例可能会导致吞吐量瓶颈。

单个 Actor 实例指每个 Actor ID 对应的 Actor 对象。单个 Actor 不并发就没有问题

如果两个 Actor 之间存在循环申请,而同时向其中一个 Actor 收回内部申请,则 Actor 之间可能会陷入僵局。Dapr Actor 运行时主动超时 Actor 调用并向调用者抛出异样以中断可能的死锁状况。

重入性(Preview)

作为对 dapr 中根底 Actor 的加强。当初重入性为预览性能,感兴趣的小伙伴能够到看官网文档。

回合制拜访(Turn-based access)

一个回合包含一个 Actor 办法的残缺执行以响应来自其余 Actor 或客户端的申请,或者一个计时器 / 揭示回调的残缺执行。即便这些办法和回调是异步的,Dapr Actor 运行时也不会将它们穿插。一个回合必须齐全实现后,才容许进行新的回合。换句话说,以后正在执行的 Actor 办法或计时器 / 揭示回调必须齐全实现,能力容许对办法或回调的新调用。

Dapr Actor 运行时通过在回合开始时获取每个 Actor 的锁并在回合完结时开释锁来实现基于回合的并发性。因而,基于回合的并发是在每个 Actor 的根底上执行的,而不是跨 Actor。Actor 办法和计时器 / 揭示回调能够代表不同的 Actor 同时执行。

以下示例阐明了上述概念。思考实现两个异步办法(例如 Method1 和 Method2)、计时器和揭示的 Actor 类型。下图显示了代表属于此 Actor 类型的两个 Actors(ActorId1 和 ActorId2)执行这些办法和回调的工夫线示例。

Actor 状态治理

Actor 能够应用状态治理性能牢靠地保留状态。您能够通过 HTTP/gRPC 端点与 Dapr 交互以进行状态治理。

要应用 actor,您的状态存储必须反对事务。这意味着您的状态存储组件必须实现 TransactionalStore 接口。只有一个状态存储组件能够用作所有参与者的状态存储。

事务反对列表:https://docs.dapr.io/referenc…

:倡议学习的时候都用Redis,官网所有的示例也都是基于 Redis,比拟容易上手,且Dapr init 默认集成

Actor 计时器和揭示

Actor 能够通过注册计时器或揭示来安顿本人的定期工作。

计时器和揭示的性能十分类似。次要区别在于,Dapr Actor 运行时在停用后 不保留 无关 计时器 的任何信息,而应用 Dapr Actor 状态提供程序 保留 无关 揭示 的信息。

定时器和揭示的调度配置是雷同的,总结如下:


DueTime 是一个可选参数,用于设置第一次调用回调之前的工夫或工夫距离。如果省略 DueTime,则在定时器 / 揭示注册后立刻调用回调。

反对的格局:

  • RFC3339 日期格局,例如2020-10-02T15:00:00Z
  • time.Duration 格局,例如2h30m
  • ISO 8601 持续时间格局,例如PT2H30M

period 是一个可选参数,用于设置两次间断回调调用之间的工夫距离。当以 ISO 8601-1 持续时间格局指定时,您还能够配置反复次数以限度回调调用的总数。如果省略 period,则回调将仅被调用一次。

反对的格局:

  • time.Duration 格局,例如2h30m
  • ISO 8601 持续时间格局,例如PT2H30M, R5/PT1M30S

ttl 是一个可选参数,用于设置计时器 / 揭示到期和删除的工夫或工夫距离。如果省略 ttl,则不利用任何限度。

反对的格局:

  • RFC3339 日期格局,例如2020-10-02T15:00:00Z
  • time.Duration 格局,例如2h30m
  • ISO 8601 持续时间格局,例如PT2H30M

当您同时指定周期内的反复次数和 ttl 时,计时器 / 揭示将在满足任一条件时进行。

Actor 运行时配置

  • actorIdleTimeout – 停用闲暇 actor 之前的超时工夫。每个 actorScanInterval 距离都会查看超时。默认值:60 分钟
  • actorScanInterval – 指定扫描演员以停用闲暇 Actor 的频率的持续时间。闲置工夫超过 actor_idle_timeout 的 Actor 将被停用。默认值:30 秒
  • drainOngoingCallTimeout – 在耗尽 Rebalanced 的 Actor 的过程中的持续时间。这指定了以后流动 Actor 办法实现的超时工夫。如果以后没有 Actor 办法调用,则疏忽此项。默认值:60 秒
  • drainRebalancedActors – 如果为 true,Dapr 将期待 drainOngoingCallTimeout 持续时间以容许以后角色调用实现,而后再尝试停用角色。默认值:true

    drainRebalancedActors与下面的 drainOngoingCallTimeout 需搭配应用

  • reentrancy – (ActorReentrancyConfig) – 配置角色的重入行为。如果未提供,则禁用可重入。默认值:disabled, 0
  • remindersStoragePartitions – 配置 Actor 揭示的分区数。如果未提供,则所有揭示都将保留为 Actor 状态存储中的单个记录。默认值:0
// In Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    // Register actor runtime with DI
    services.AddActors(options =>
    {
        // Register actor types and configure actor settings
        options.Actors.RegisterActor<MyActor>();

        // Configure default settings
        options.ActorIdleTimeout = TimeSpan.FromMinutes(60);
        options.ActorScanInterval = TimeSpan.FromSeconds(30);
        options.DrainOngoingCallTimeout = TimeSpan.FromSeconds(60);
        options.DrainRebalancedActors = true;
        options.RemindersStoragePartitions = 7;
        // reentrancy not implemented in the .NET SDK at this time
    });

    // Register additional services for use with actors
    services.AddSingleton<BankService>();}

分区揭示(Preview)

在 sidecar 重新启动后,Actor 揭示会保留并持续触发。在 Dapr 运行时版本 1.3 之前,揭示被保留在 actor 状态存储中的单个记录上。

此为 Preview 性能,感兴趣能够看官网文档

.Net 调用 Dapr 的 Actor

与以往不同,Actor 示例会多创立一个共享类库用于寄存 Server 和 Client 共用的局部

创立 Assignment.Shared

创立 类库 我的项目,并增加Dapr.ActorsNuGet 包援用,最初增加以下几个类:

AccountBalance.cs

namespace Assignment.Shared;
public class AccountBalance
{public string AccountId { get; set;} = default!;

    public decimal Balance {get; set;}
}

IBankActor.cs

:这个是 Actor 接口,IActor 是 Dapr SDK 提供的

using Dapr.Actors;

namespace Assignment.Shared;
public interface IBankActor : IActor
{Task<AccountBalance> GetAccountBalance();

    Task Withdraw(WithdrawRequest withdraw);
}

OverdraftException.cs

namespace Assignment.Shared;
public class OverdraftException : Exception
{public OverdraftException(decimal balance, decimal amount)
        : base($"Your current balance is {balance:c} - that's not enough to withdraw {amount:c}.")
    {}}

WithdrawRequest.cs

namespace Assignment.Shared;
public class WithdrawRequest
{public decimal Amount { get; set;}
}

创立 Assignment.Server

创立 类库 我的项目,并增加 Dapr.Actors.AspNetCoreNuGet 包援用和Assignment.Shared 我的项目援用,最初批改程序端口为 5000。

:Server 与 Shared 和 Client 的 NuGet 包不一样,Server 是集成了服务端的一些性能

批改 program.cs

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<BankService>();
builder.Services.AddActors(options =>
{options.Actors.RegisterActor<DemoActor>();
});

var app = builder.Build();

app.UseRouting();

app.UseEndpoints(endpoints =>
{endpoints.MapActorsHandlers();
});

app.Run();

增加 BankService.cs

using Assignment.Shared;

namespace Assignment.Server;
public class BankService
{// Allow overdraft of up to 50 (of whatever currency).
    private readonly decimal OverdraftThreshold = -50m;

    public decimal Withdraw(decimal balance, decimal amount)
    {
        // Imagine putting some complex auditing logic here in addition to the basics.

        var updated = balance - amount;
        if (updated < OverdraftThreshold)
        {throw new OverdraftException(balance, amount);
        }

        return updated;
    }
}

增加 BankActor.cs

using Assignment.Shared;
using Dapr.Actors.Runtime;
using System;

namespace Assignment.Server;
public class BankActor : Actor, IBankActor, IRemindable // IRemindable is not required
{
    private readonly BankService bank;

    public BankActor(ActorHost host, BankService bank)
        : base(host)
    {
        // BankService is provided by dependency injection.
        // See Program.cs
        this.bank = bank;
    }

    public async Task<AccountBalance> GetAccountBalance()
    {var starting = new AccountBalance()
        {AccountId = this.Id.GetId(),
            Balance = 10m, // Start new accounts with 100, we're pretty generous.
        };

        var balance = await StateManager.GetOrAddStateAsync("balance", starting);
        return balance;
    }

    public async Task Withdraw(WithdrawRequest withdraw)
    {var starting = new AccountBalance()
        {AccountId = this.Id.GetId(),
            Balance = 10m, // Start new accounts with 100, we're pretty generous.
        };

        var balance = await StateManager.GetOrAddStateAsync("balance", starting)!;

        if (balance.Balance <= 0)
        {
            // Simulated reminder deposit
            if (Random.Shared.Next(100) > 90)
            {await RegisterReminderAsync("Deposit", null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
            }
        }

        // Throws Overdraft exception if the account doesn't have enough money.
        var updated = this.bank.Withdraw(balance.Balance, withdraw.Amount);

        balance.Balance = updated;
        await StateManager.SetStateAsync("balance", balance);
    }

    public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
    {if (reminderName == "Deposit")
        {var balance = await StateManager.GetStateAsync<AccountBalance>("balance")!;

            if (balance.Balance <= 0)
            {balance.Balance += 60; // 50(Overdraft Threshold) + 10 = 60
                Console.WriteLine("Deposit: 10");
            }
            else
            {Console.WriteLine("Deposit: ignore");
            }
        }
    }
}

运行 Assignment.Server

应用 Dapr CLI 来启动,先应用命令行工具跳转到目录 dapr-study-room\Assignment07\Assignment.Server,而后执行上面命令

dapr run --app-id testactor --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run

创立 Assignment.Client

创立 控制台 我的项目,并增加 Dapr.ActorsNuGet 包援用和Assignment.Shared 我的项目援用。

批改 Program.cs

using Assignment.Shared;
using Dapr.Actors;
using Dapr.Actors.Client;

Console.WriteLine("Creating a Bank Actor");
var bank = ActorProxy.Create<IBankActor>(ActorId.CreateRandom(), "BankActor");
Parallel.ForEach(Enumerable.Range(1, 10), async i =>
{while (true)
    {var balance = await bank.GetAccountBalance();
        Console.WriteLine($"[Worker-{i}] Balance for account'{balance.AccountId}'is'{balance.Balance:c}'.");

        Console.WriteLine($"[Worker-{i}] Withdrawing'{1m:c}'...");
        try
        {await bank.Withdraw(new WithdrawRequest() {Amount = 1m});
        }
        catch (ActorMethodInvocationException ex)
        {Console.WriteLine("[Worker-{i}] Overdraft:" + ex.Message);
        }

        Task.Delay(1000).Wait();}
});

Console.ReadKey();

运行 Assignment.Client

应用 Dapr CLI 来启动,先应用命令行工具跳转到目录 dapr-study-room\Assignment07\Assignment.Client,而后执行上面命令

dotnet run

本章源码

Assignment07

https://github.com/doddgu/dap…

咱们正在口头,新的框架、新的生态

咱们的指标是 自在的 易用的 可塑性强的 功能丰富的 强壮的

所以咱们借鉴 Building blocks 的设计理念,正在做一个新的框架MASA Framework,它有哪些特点呢?

  • 原生反对 Dapr,且容许将 Dapr 替换成传统通信形式
  • 架构不限,单体利用、SOA、微服务都反对
  • 反对.Net 原生框架,升高学习累赘,除特定畛域必须引入的概念,保持不造新轮子
  • 丰盛的生态反对,除了框架以外还有组件库、权限核心、配置核心、故障排查核心、报警核心等一系列产品
  • 外围代码库的单元测试覆盖率 90%+
  • 开源、收费、社区驱动
  • 还有什么?咱们在等你,一起来探讨

通过几个月的生产我的项目实际,已实现 POC,目前正在把之前的积攒重构到新的开源我的项目中

目前源码已开始同步到 Github(文档站点在布局中,会缓缓欠缺起来):

MASA.BuildingBlocks

MASA.Contrib

MASA.Utils

MASA.EShop

BlazorComponent

MASA.Blazor

QQ 群:7424099

微信群:加技术经营微信(MasaStackTechOps),备注来意,邀请进群

正文完
 0