乐趣区

关于pulsar:优雅的故障处理快速创建-Pulsar-重试队列

对于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐以及低延时的高可扩大流数据存储个性。

本文来自社区用户投稿,作者侯盛鑫,来自伴鱼。

在很多在线的业务零碎中,因为业务逻辑解决出现异常,一条音讯没有被确认,咱们须要尽可能筹备好优雅地解决故障。重试是咱们的罕用做法,个别咱们从以下三方面动手进行重试:

  • 设置从新投递。若须要容许从新生产失败的音讯,咱们能够配置消费者同时容许生产音讯从业务主题和重试主题,并配置了容许消费者主动重试。
  • 设置重试队列。如果音讯没有被生产胜利,它将被保留到重试主题当中。并能够指定延时工夫,主动从新生产重试主题外面的生产失败音讯。
  • 重试的次数限度。默认状况下,如果消费者没有胜利生产一条音讯(也就是说消费者无奈 ack),它将重试同一条音讯。

那么,难道咱们不能简略地让这种默认行为接管所有,而后重试音讯直到胜利吗?问题是这条音讯可能永远不会胜利。至多没有某种模式的手动干涉它是不会胜利的。于是乎,消费者就永远不会持续解决后续的任何音讯,并且咱们的音讯解决将陷入困境,所以在重试肯定次数后将采取死信队列的办法存储为确认胜利音讯。

如上图,Pulsar 采纳非阻塞申请重试队列和死信队列 (DLQ) 来扩大现有事件驱动架构作用,通过这样解决咱们就能够在不中断实时流量的状况下实现解耦、可察看的错误处理。

然而 Pulsar 默认状况下,主动重试这个选项是敞开的,咱们能够设置 enableRetry 选项为 true,这样能够在这个消费者中进行重试。如下例子所示,消费者会从重试主题生产音讯:

 package main
 
 
import (
    "context"
    "fmt"
    "github.com/apache/pulsar-client-go/pulsar"
    "time"
)
 
func main() {
 
 
    cp := pulsar.ClientOptions{
        URL: "pulsar://xxx.xxx.xxx.xxx:6650",
        OperationTimeout: 30 * time.Second,
    }
 
    client, err := pulsar.NewClient(cp)
    if err != nil {return}
    defer client.Close()
 
    d := &pulsar.DLQPolicy{
        MaxDeliveries: 3,
        RetryLetterTopic: "persistent://group/server/xxx-RETRY",
        DeadLetterTopic: "persistent://group/server/xxx-DLQ",
    }
 
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic: "persistent://group/server/xxx",
        SubscriptionName: "test",
        Type: pulsar.Failover,
        RetryEnable: true,
        DLQ: d,
        NackRedeliveryDelay: time.Second * 3,
    })
    if err != nil {return}
 
    ctx := context.Background()
    for {msg, err := consumer.Receive(ctx)
        if err != nil {return}
        if msg.Key() == 0 {
            // 确认的解决
            consumer.Ack(msg)
        } else {
            // 不确认,等 NackRedeliveryDelay 后将被从新投递到主队列进行生产
consumer.Nack(msg)
 
            // 稍后解决, 等 xx 秒后将被从新投递到重试队列
consumer.ReconsumeLater(msg, time.Second * 5)
 
            // 以上办法二选其一
        }
    }
}

重试队列

首先,如上样例主动创立了一个重试队列,产生重试音讯须要两个条件其中一个:

  • Nack() 函数,消费者的 Nack() 函数用于确认解决单个音讯失败。一旦音讯被“否定确认”时,它将被标记为在之后从新传递。投递对象是以后的主 topic,投递次数不受影响,投递工夫受 NackRedeliveryDelay 管制。
  • AckTimeout 参数,因为网络抖动,服务 Down 机等起因,未能及时 Nack,Pulsar 为了欠缺重试机制设置了 Acktimeout 默认为 0(不开启的)的参数,consumer 解决一旦超过 Acktimeout 将被投递重试。(在 golang sdk v0.6.0 以及之前并没有实现设置 Acktimeout 的相干性能,之后请继续关注)

重试行为中的重试队列的重试行为是和工夫相干的。目前次要通过 consumer.ReconsumeLater() 办法触发,一旦触发到重试队列,重试次数会相应在重试中缩小,这里的 DLQPolicy 构造中的 RetryLetterTopic 是 Pulsar 为了进行重试在本来根底上新建的 topic,默认状况下是:{TopicName}-{Subscription}-RETRY,这是为了最大水平不烦扰主 topic 的数据的做法。

Golang 的 sdk 并没有实现 java sdk 中那样丰盛多样的重试机制,然而却简略粗犷间接凋谢了 NackRedeliveryDelay 原始延迟时间的参数,这样不便了各种策略的定制化开发。

其中 DLQPolicy.MaxDeliveries 这个参数在音讯出错时,将决定最多持续尝试发送多少次,如到用户设置的最大值,音讯还没有胜利发送,此时 Pulsar 会将音讯推送到死信队列中,也就是 DLQPolicy.DeadLetterTopic。

留神:⚠️RLQ 是一个提早队列,生产用 shared 模式!

死信队列

当重试次数用完时,信息将被路由到死信队列中,留神⚠️:此时音讯状态会变成已确认。死信队列是一个不分区的长久化队列,用户能够依据本人的需要对信息音讯做相应的解决。sdk 提供 DLQPolicy.DeadLetterTopic 参数来设置“死信队列”的名字。默认状况下死信队列名称是:{TopicName}-{Subscription}-DLQ。

总结

到此为止,咱们梳理一下流程:
1、除了失常生产写入的 topic 外重试还会减少一个重试队列,sdk 中会主动订阅重试队列;
2、重试队列实际上是一个提早队列,未确认音讯将保护一个工夫相干的优先级队列;

3、当重试用完时,音讯将进入死信队列,音讯状态变为已确认,用户生产死信队列解决死信音讯。

作者简介

我叫侯盛鑫,也能够我叫大云,目前就任于伴鱼基础架构,负责音讯队列的保护与相干开发,Rust 日报小组中的菜鸡成员,喜爱钻研存储,服务治理等方向。首次接触 Pulsar 就对存储和计算拆散的构造所吸引,顺滑的生产者消费者接入和高吞吐让我好奇这个我的项目的实现,冀望之后能在 Pulsar 的相干性能中做些奉献。

举荐浏览

•  博文举荐|深刻解析 Apache Pulsar 中的事务•  Pulsar 2.8.0 新增个性概览:独占 Producer、事务等
•  博文举荐|无效治理数据安全性—— Pulsar Schema 治理

退出移动版