共计 2955 个字符,预计需要花费 8 分钟才能阅读完成。
对于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐以及低延时的高可扩大流数据存储个性。
本文来自社区用户投稿,作者侯盛鑫,来自伴鱼。
在很多在线的业务零碎中,因为业务逻辑解决出现异常,一条音讯没有被确认,咱们须要尽可能筹备好优雅地解决故障。重试是咱们的罕用做法,个别咱们从以下三方面动手进行重试:
- 设置从新投递。若须要容许从新生产失败的音讯,咱们能够配置消费者同时容许生产音讯从业务主题和重试主题,并配置了容许消费者主动重试。
- 设置重试队列。如果音讯没有被生产胜利,它将被保留到重试主题当中。并能够指定延时工夫,主动从新生产重试主题外面的生产失败音讯。
- 重试的次数限度。默认状况下,如果消费者没有胜利生产一条音讯(也就是说消费者无奈 ack),它将重试同一条音讯。
那么,难道咱们不能简略地让这种默认行为接管所有,而后重试音讯直到胜利吗?问题是这条音讯可能永远不会胜利。至多没有某种模式的手动干涉它是不会胜利的。于是乎,消费者就永远不会持续解决后续的任何音讯,并且咱们的音讯解决将陷入困境,所以在重试肯定次数后将采取死信队列的办法存储为确认胜利音讯。
如上图,Pulsar 采纳非阻塞申请重试队列和死信队列 (DLQ) 来扩大现有事件驱动架构作用,通过这样解决咱们就能够在不中断实时流量的状况下实现解耦、可察看的错误处理。
然而 Pulsar 默认状况下,主动重试这个选项是敞开的,咱们能够设置 enableRetry 选项为 true,这样能够在这个消费者中进行重试。如下例子所示,消费者会从重试主题生产音讯:
package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"time"
)
func main() {
cp := pulsar.ClientOptions{
URL: "pulsar://xxx.xxx.xxx.xxx:6650",
OperationTimeout: 30 * time.Second,
}
client, err := pulsar.NewClient(cp)
if err != nil {return}
defer client.Close()
d := &pulsar.DLQPolicy{
MaxDeliveries: 3,
RetryLetterTopic: "persistent://group/server/xxx-RETRY",
DeadLetterTopic: "persistent://group/server/xxx-DLQ",
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://group/server/xxx",
SubscriptionName: "test",
Type: pulsar.Failover,
RetryEnable: true,
DLQ: d,
NackRedeliveryDelay: time.Second * 3,
})
if err != nil {return}
ctx := context.Background()
for {msg, err := consumer.Receive(ctx)
if err != nil {return}
if msg.Key() == 0 {
// 确认的解决
consumer.Ack(msg)
} else {
// 不确认,等 NackRedeliveryDelay 后将被从新投递到主队列进行生产
consumer.Nack(msg)
// 稍后解决, 等 xx 秒后将被从新投递到重试队列
consumer.ReconsumeLater(msg, time.Second * 5)
// 以上办法二选其一
}
}
}
重试队列
首先,如上样例主动创立了一个重试队列,产生重试音讯须要两个条件其中一个:
- Nack() 函数,消费者的 Nack() 函数用于确认解决单个音讯失败。一旦音讯被“否定确认”时,它将被标记为在之后从新传递。投递对象是以后的主 topic,投递次数不受影响,投递工夫受 NackRedeliveryDelay 管制。
- AckTimeout 参数,因为网络抖动,服务 Down 机等起因,未能及时 Nack,Pulsar 为了欠缺重试机制设置了 Acktimeout 默认为 0(不开启的)的参数,consumer 解决一旦超过 Acktimeout 将被投递重试。(在 golang sdk v0.6.0 以及之前并没有实现设置 Acktimeout 的相干性能,之后请继续关注)
重试行为中的重试队列的重试行为是和工夫相干的。目前次要通过 consumer.ReconsumeLater() 办法触发,一旦触发到重试队列,重试次数会相应在重试中缩小,这里的 DLQPolicy 构造中的 RetryLetterTopic 是 Pulsar 为了进行重试在本来根底上新建的 topic,默认状况下是:{TopicName}-{Subscription}-RETRY,这是为了最大水平不烦扰主 topic 的数据的做法。
Golang 的 sdk 并没有实现 java sdk 中那样丰盛多样的重试机制,然而却简略粗犷间接凋谢了 NackRedeliveryDelay 原始延迟时间的参数,这样不便了各种策略的定制化开发。
其中 DLQPolicy.MaxDeliveries 这个参数在音讯出错时,将决定最多持续尝试发送多少次,如到用户设置的最大值,音讯还没有胜利发送,此时 Pulsar 会将音讯推送到死信队列中,也就是 DLQPolicy.DeadLetterTopic。
留神:⚠️RLQ 是一个提早队列,生产用 shared 模式!
死信队列
当重试次数用完时,信息将被路由到死信队列中,留神⚠️:此时音讯状态会变成已确认。死信队列是一个不分区的长久化队列,用户能够依据本人的需要对信息音讯做相应的解决。sdk 提供 DLQPolicy.DeadLetterTopic 参数来设置“死信队列”的名字。默认状况下死信队列名称是:{TopicName}-{Subscription}-DLQ。
总结
到此为止,咱们梳理一下流程:
1、除了失常生产写入的 topic 外重试还会减少一个重试队列,sdk 中会主动订阅重试队列;
2、重试队列实际上是一个提早队列,未确认音讯将保护一个工夫相干的优先级队列;
3、当重试用完时,音讯将进入死信队列,音讯状态变为已确认,用户生产死信队列解决死信音讯。
作者简介
我叫侯盛鑫,也能够我叫大云,目前就任于伴鱼基础架构,负责音讯队列的保护与相干开发,Rust 日报小组中的菜鸡成员,喜爱钻研存储,服务治理等方向。首次接触 Pulsar 就对存储和计算拆散的构造所吸引,顺滑的生产者消费者接入和高吞吐让我好奇这个我的项目的实现,冀望之后能在 Pulsar 的相干性能中做些奉献。
举荐浏览
• 博文举荐|深刻解析 Apache Pulsar 中的事务• Pulsar 2.8.0 新增个性概览:独占 Producer、事务等
• 博文举荐|无效治理数据安全性—— Pulsar Schema 治理