介绍
公布 / 订阅模式容许微服务应用音讯互相通信。生产者或发布者在不晓得哪个应用程序将接管它们的状况下向主题发送音讯。这波及将它们写入输出通道。同样,消费者或订阅者订阅该主题并接管其音讯,而不晓得是什么服务产生了这些音讯。这波及从输入通道接管音讯。两头音讯代理负责将每条音讯从输出通道复制到所有对该音讯感兴趣的订阅者的输入通道。当您须要将微服务彼此拆散时,这种模式特地有用。
Dapr 中的公布 / 订阅 API 提供至多一次(at-least-once)的保障,并与各种音讯代理和队列系统集成。您的服务所应用的特定实现是可插入的,并被配置为运行时的 Dapr Pub/Sub 组件。这种办法打消了您服务的依赖性,从而使您的服务能够更便携,更灵便地适应更改。
Dapr 公布 / 订阅构建块提供了一个与平台无关的 API 来发送和接管音讯。您的服务将音讯公布到命名主题,并订阅主题以应用这些音讯。
下图显示了一个“shipping”服务和一个“email”服务的例子,它们都订阅了“cart”服务公布的主题。每个服务都会加载指向同一公布 / 订阅音讯总线组件的公布 / 订阅组件配置文件,例如 Redis Streams、NATS Streaming、Azure Service Bus 或 GCP Pub/Sub。
下图具备雷同的服务,然而这次显示的是 Dapr 公布 API,它发送“订单”主题和订阅服务上的订单端点,这些主题音讯由 Dapr 公布到。
个性
Cloud Events 音讯格局
为了启用音讯路由并为每条音讯提供额定的上下文,Dapr 应用 CloudEvents 1.0 标准作为其音讯格局。应用程序应用 Dapr 发送到主题的任何音讯都会主动“包装”在 Cloud Events 信封中,应用 datacontenttype
属性的 Content-Type
标头值。
Dapr 实现了以下 Cloud Events 字段:
id
source
specversion
type
datacontenttype
(Optional)
以下示例显示了 CloudEvent v1.0 中序列化为 JSON 的 XML 内容:
{
"specversion" : "1.0",
"type" : "xml.message",
"source" : "https://example.com/message",
"subject" : "Test XML Message",
"id" : "id-1234-5678-9101",
"time" : "2020-09-23T06:23:21Z",
"datacontenttype" : "text/xml",
"data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"
}
音讯订阅
Dapr 应用程序能够订阅已公布的主题。Dapr 容许您的应用程序订阅主题的两种办法:
- 申明式,其中订阅在内部文件中定义
- 程序化,在用户代码中定义订阅
消息传递
原则上,当订阅者在解决音讯后以非谬误响应进行响应时,Dapr 认为音讯已胜利传递。为了进行更精密的管制,Dapr 的公布 / 订阅 API 还提供了在响应负载中定义的显式状态,订阅者能够应用这些状态向 Dapr 批示特定的解决指令(例如 RETRY 或 DROP)。如果两个不同的应用程序(不同的 app-ID)订阅了同一个主题,Dapr 将每条音讯只传递给每个应用程序的一个实例。
主题范畴
默认状况下,所有反对 Dapr 公布 / 订阅组件(例如 Kafka、Redis Stream、RabbitMQ)的主题都可用于配置了该组件的每个应用程序。为了限度哪个应用程序能够公布或订阅主题,Dapr 提供了主题范畴。这使您可能容许应用程序公布哪些主题以及容许应用程序订阅哪些主题。
音讯生存工夫(TTL)
Dapr 能够在每条音讯的根底上设置超时音讯,这意味着如果没有从 pub/sub 组件读取音讯,则该音讯将被抛弃。这是为了避免沉积未读的音讯。在队列中比配置的 TTL 工夫长的音讯称为死音讯。
注
:也能够在组件创立时为给定队列设置音讯 TTL。
与不应用 Dapr 和 CloudEvents 的应用程序通信
对于一个应用程序应用 Dapr 而另一个应用程序不应用的场景,能够为发布者或订阅者禁用 CloudEvent 包装。这容许在不能一次全副采纳 Dapr 的应用程序中局部采纳 Dapr 公布订阅。
应用.Net 调用 Dapr 的公布订阅
以下示例创立应用程序来公布和订阅名为 deathStarStatus
的主题
先决条件
运行 dapr init
时,Redis Streams 默认装置在本地机器上。如果是 dapr init --slim
须要本人入手操作一些货色了,这里就不演示了。
通过关上您的组件文件进行验证 %UserProfile%\.dapr\components\pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
注
:这里 redisHost 的 value 能够依据理论状况设置,比方某云的 redis 实例等。又有人问过是否能够切换默认 db,当然能够,name 设置redisDB
,value 设置为你要应用的 db 即可
如果你要更具体的 yaml 配置参数,比方并发设置、最大重试次数等等都能够看这里 https://docs.dapr.io/referenc…
订阅主题
订阅主题有两种形式:申明式、程序化
申明式订阅
您能够应用以下自定义资源定义 (CRD) 订阅主题。创立一个名为 subscription.yaml 的文件并粘贴以下内容:
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
route: /dsstatus
pubsubname: pubsub
scopes:
- app1
- app2
下面的示例显示了对 pubsub 组件 pubsub
主题 deathStarStatus
的事件订阅。
将 CRD 文件放到组件目录即可,这里不持续开展说了。
程序化订阅
Dapr 实例在启动时调用您的应用程序并期待主题订阅的 JSON 响应:
- pubsubname:Dapr 应该应用哪个 pub/sub 组件
- topic:订阅哪个主题
- route:当音讯波及该主题时,Dapr 调用哪个 endpoint
注
:你可能会感觉,这是不是很麻烦?是的,所以咱们用 dapr dotnet-sdk 来帮忙咱们主动实现这些事件
.Net 订阅
以上是让 dapr sidecar 晓得这个音讯的订阅最终给谁。但咱们的程序里要怎么写呢?
如果你抉择的是申明式订阅,你做一个 route 即可,而如果是程序化订阅则不须要多写一个 yaml 文件,且通过个性即可反对,接下来看看.Net SDK 怎么做的吧。
创立 Assignment.Server(Sub)
创立 ASP.NET Core 空
我的项目,同时依据之前的文章内容增加Dapr.AspNetCore
NuGet 包和批改程序端口为 5000
批改 program.cs 代码
using Microsoft.AspNetCore.Mvc;
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{endpoints.MapSubscribeHandler();
});
app.MapPost("/dsstatus", ([FromBody] string word) => Console.WriteLine($"Hello {word}!")).WithTopic("pubsub", "deathStarStatus");
app.Run();
注
:为了通知 Dapr 音讯已胜利解决,请返回 200 OK 响应。如果 Dapr 收到除 200 之外的任何其余返回状态代码,或者如果您的应用程序解体,Dapr 将尝试依照 At-Least-Once 语义从新传递音讯。
运行 Assignment.Server
应用 Dapr CLI 来启动,先应用命令行工具跳转到目录 dapr-study-room\Assignment06\Assignment.Server
,而后执行上面命令
dapr run --app-id testpubsub --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run
创立 Assignment.Client(Publish)
创立 控制台
我的项目,并批改 program.cs
var client = new Dapr.Client.DaprClientBuilder().Build();
await client.PublishEventAsync<string>("pubsub", "deathStarStatus", "World");
运行 Assignment.Client 即可看到 Assignment.Server 中会打印Hello World!
将音讯路由到不同的事件处理程序
基于内容的路由是一种应用 DSL 而不是命令式利用程序代码的消息传递模式。PubSub 路由是此模式的一种实现,它容许开发人员应用表达式依据 CloudEvents 的内容将其路由到应用程序中的不同 URI/ 门路和事件处理程序。
注
:这是个预览性能,如果你感兴趣可自行尝试,值得一提的是 Common Expression Language (CEL) 很乏味,这里就只贴一段代码看看吧。
[Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]
[HttpPost("widgets")]
public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
[Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)]
[HttpPost("gadgets")]
public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
[Topic("pubsub", "inventory")]
[HttpPost("products")]
public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
公布 / 订阅主题拜访权限
命名空间或组件范畴可用于限度对特定应用程序的组件拜访。增加到组件的这些应用程序范畴仅限度具备特定 ID 的应用程序可能应用该组件。
除了这个通用组件范畴之外,公布 / 订阅组件还能够限度以下内容:
- 能够应用哪些主题(已公布或已订阅)
- 容许哪些应用程序公布到特定主题
- 容许哪些应用程序订阅特定主题
主题拜访权限
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: publishingScopes
value: "app1=topic1;app2=topic2,topic3;app3="
- name: subscriptionScopes
value: "app2=;app3=topic1"
下表显示了容许哪些应用程序公布到主题中:
topic1 | topic2 | topic3 | |
---|---|---|---|
app1 | X | ||
app2 | X | X | |
app3 |
下表显示了容许哪些应用程序订阅主题:
topic1 | topic2 | topic3 | |
---|---|---|---|
app1 | X | X | X |
app2 | |||
app3 | X |
注
:如果应用程序未列出(例如 subscriptionScopes
中的 app1),则容许订阅所有主题。因为未应用 allowedTopics
且 app1 没有任何订阅范畴,因而它还能够应用下面未列出的其余主题。
限度容许的主题
如果 Dapr 应用程序向其发送音讯,则会创立一个主题。在某些状况下,应该管制这个主题的创立。例如:
- 在 Dapr 应用程序中,在生成主题名称时呈现的谬误可能导致创立有限数量的主题
- 精简主题名称和总数,避免主题有限增长
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: allowedTopics
value: "topic1,topic2,topic3"
联合 allowedTopics 和 scopes
有时您心愿组合这两个作用域,因而只容许一组固定的主题,并将作用域指定给特定的应用程序。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: allowedTopics
value: "A,B"
- name: publishingScopes
value: "app1=A"
- name: subscriptionScopes
value: "app1=;app2=A"
注
:第三个应用程序没有列出,因为如果一个应用程序没有在范畴内指定,它是容许应用所有主题的。
下表显示了容许公布到主题的应用程序:
A | B | C | |
---|---|---|---|
app1 | X | ||
app2 | X | X | |
app3 | X | X |
下表显示了哪个应用程序能够订阅主题:
A | B | C | |
---|---|---|---|
app1 | |||
app2 | X | ||
app3 | X | X |
音讯 TTL
同状态治理,应用 metadata
的 ttlInSeconds
本章源码
Assignment06
https://github.com/doddgu/dap…
咱们正在口头,新的框架、新的生态
咱们的指标是 自在的
、 易用的
、 可塑性强的
、 功能丰富的
、 强壮的
。
所以咱们借鉴 Building blocks 的设计理念,正在做一个新的框架MASA Framework
,它有哪些特点呢?
- 原生反对 Dapr,且容许将 Dapr 替换成传统通信形式
- 架构不限,单体利用、SOA、微服务都反对
- 反对.Net 原生框架,升高学习累赘,除特定畛域必须引入的概念,保持不造新轮子
- 丰盛的生态反对,除了框架以外还有组件库、权限核心、配置核心、故障排查核心、报警核心等一系列产品
- 外围代码库的单元测试覆盖率 90%+
- 开源、收费、社区驱动
- 还有什么?咱们在等你,一起来探讨
通过几个月的生产我的项目实际,已实现 POC,目前正在把之前的积攒重构到新的开源我的项目中
目前源码已开始同步到 Github(文档站点在布局中,会缓缓欠缺起来):
MASA.BuildingBlocks
MASA.Contrib
MASA.Utils
MASA.EShop
BlazorComponent
MASA.Blazor
QQ 群:7424099
微信群:加技术经营微信(MasaStackTechOps),备注来意,邀请进群