乐趣区

关于后端:nats-简介和使用

nats 简介和应用

nats 有 3 个产品

  • core-nats: 不做长久化的及时信息传输零碎
  • nats-streaming: 基于 nats 的长久化音讯队列 (已弃用)
  • nats-jetstream: 基于 nats 的长久化音讯队列

这里次要探讨 core-nats 和 nats-jetstream

nats

nats 疾速开始

  • 启动 nats
# 启动 nats
docker run --network host -p 4222:4222 nats
  • Connect 连贯
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {log.Fatal("NATS 连贯失败")
}
defer nc.Close()
  • Publish 公布 / 生产音讯
// 生产音讯
err := nc.Publish("foo", []byte("Hello World"))
if err != nil {log.Fatal("NATS 公布失败")
}
// Flush 公布缓冲区
err = nc.Flush()
if err != nil {log.Fatal("NATS Flush 失败")
}

出于性能思考, 公布的音讯先写入到相似 Buffer 缓存的中央, 而后再一次性发送到 nats 服务器

参考官网文档: https://docs.nats.io/using-na…

  • Subscribe 订阅 / 生产音讯
// 生产音讯
_, err = nc.Subscribe("foo", func(msg *nats.Msg) {fmt.Printf("收到音讯: %s\n", msg.Data)
})
if err != nil {log.Fatal("NATS 订阅失败")
}

nats 提供 公布订阅, 申请响应, 和队列模型 3 种 API. 别离是公布订阅模型, 申请响应模型, 和队列模型, 上面开展介绍

公布订阅

公布订阅模型, 一个发布者, 多个订阅者, 多个订阅者都能够收到同一个音讯

// 生产音讯
_, err = nc.Subscribe("foo", func(msg *nats.Msg) {fmt.Printf("收到音讯: %s\n", msg.Data)
})
if err != nil {log.Fatal("NATS 订阅失败")
}

队列模型

队列模型, 一个发布者, 多个订阅者, 音讯在多个音讯中负载平衡调配, 调配给 A 消费者, 这个音讯就不会再调配给其余消费者了

// 生产音讯
// queue 是队列组的名称, 同一组队列最多只有一个接收者能胜利接管
_, _ = nc.QueueSubscribe("foo", "queue", func(msg *nats.Msg) {fmt.Printf("收到音讯: %s\n", string(msg.Data))
})

申请响应

生产者能收到消费者的回复

// 生产音讯
nc.Subscribe("help", func(m *nats.Msg) {fmt.Printf("收到音讯: %s\n", string(m.Data))
    nc.Publish(m.Reply, []byte("I can help!"))
})

// 生产音讯
go func() {msg, _ := nc.Request("help", []byte("help me"), 100*time.Millisecond)
    fmt.Printf("收到回复: %s\n", string(msg.Data))
}()

select {}
利用 1 - 保障音讯可靠性

nats 自身不做任何的音讯的长久化, 是 “ 最多一次 ” 交付模型

举个例子, 如果生产的音讯没有消费者接, 音讯就丢掉了

然而申请响应机制能够通过业务代码保障音讯的可靠性, 在业务层面实现常见音讯队列的 ACK 机制

举个例子, 生产者发送音讯, 消费者承受音讯后处理, 胜利返回 OK, 失败返回 error, 生产者如果收到 error 或者超时就能够补发音讯

利用 2 - 解耦 PRC 调用

申请响应模型和 RPC 调用是统一的, 咱们能够用这个实现一个基于事件驱动的 RPC 总线

nats-jetstream

架构

jetstream 提供长久化 nats 服务, 客户端反对实时推送的 push 模式和自定义拉取的 pull 模式, 架构图如下

  • subject: 和 nats 一样, 用来辨别不同的音讯
  • stream: 定义了音讯的贮存形式, 保留规定, 抛弃规定 (stream 和 subject 是 1:n 的关系)
  • consumer: 定义了音讯承受形式并记录承受到的地位, 有 2 种生产形式及时推送 push 和自定义拉取 pull (consumer 和 stream 是 1:n 的关系)

用反对 jetstream 的形式启动 nats

# 启动 nats jetStream (同时反对 nats API 和 jetStream API)
docker run --network host -p 4222:4222 nats -js

架构示例

贴一个来自于官网文档的 push pull 混合应用的架构示例图

基于 pull 的 worker 消费者和基于 push 的 monitor 消费者同时存在

Stream

JetStream 中 Stream 定义了音讯的贮存形式, 保留规定, 抛弃规定.

一个 Stream 能够对应多个 Subject, 如果一条音讯合乎 Stream 的保留规定, 就会被保留下来

留神 JetStream 所有生产和生产的音讯的 Subject 都须要有 Stream 对应, 不然报错

贴一个 Stream 的外围配置 (v1.15.0)

// jsm.go

// StreamConfig 用于定义一个流, 大多数参数都有正当的默认值
// 如果 subject 没写, 就会调配一个随机的 subject
type StreamConfig struct {
    // 名称
    Name        string   
    // 形容
    Description string
    // 对应的多个 Subject
    Subjects    []string 
    // 音讯 3 种保留策略
    // RetentionPolicy 最大音讯数, 最大存储空间或者最大存活工夫达到限度, 就能够删除音讯
    // InterestPolicy 须要所有 consumer 确认能够删除音讯
    // WorkQueuePolicy 只须要一个 consumer 确认能够删除音讯
    Retention RetentionPolicy 
    // 最大 Consumer 数量
    MaxConsumers int 
    // 最大存储 Mgs 数量
    MaxMsgs int64 
    // 最大贮存占用
    MaxBytes int64 
    // 音讯 2 种淘汰策略
    // DiscardOld 音讯达到限度后, 抛弃最早的音讯
    // DiscardNew 音讯达到限度后, 信息音讯新推送会失败
    Discard DiscardPolicy 
    // 音讯存活工夫
    MaxAge time.Duration 
    // 每个 subject 最大音讯数量
    MaxMsgsPerSubject int64 
    // 每个音讯最大大小
    MaxMsgSize int32 
    // 反对文件贮存和内存贮存 2 种类型
    Storage StorageType 
    // 音讯分片数量
    Replicas int 
    // 不须要 ack
    NoAck    bool   
    // ...         
}

Consumer

Consumer 定义了音讯承受形式并记录承受到的地位

举个例子如果消费者在 Sub 音讯的时候指定了 Consumer, 就会从记录的地位开始推送音讯, 而不是从头开始

贴一个 Consumer 的外围配置 (v1.15.0)

// jsm.go

type ConsumerConfig struct {
    // 名称
    Durable string `json:"durable_name,omitempty"`
    // 形容
    Description string `json:"description,omitempty"`
    // 交付 Subject
    DeliverSubject string `json:"deliver_subject,omitempty"`
    // 交付 Group
    DeliverGroup string `json:"deliver_group,omitempty"`
    // 交付策略
    // 交付所有 (默认), 交付最初一个, 交付最新, 自定义开始序号, 自定义开始工夫
    DeliverPolicy DeliverPolicy `json:"deliver_policy"`
    // 开始序号
    OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
    // 开始工夫
    OptStartTime *time.Time `json:"opt_start_time,omitempty"`
    // ack 策略
    // 不须要 ack (默认), 隐式 ack All , 每个都须要显示 ack
    AckPolicy AckPolicy `json:"ack_policy"`
    // ack 等待时间
    AckWait    time.Duration   `json:"ack_wait,omitempty"`
    MaxDeliver int             `json:"max_deliver,omitempty"`
    BackOff    []time.Duration `json:"backoff,omitempty"`
    // 过滤的 Subject
    FilterSubject string `json:"filter_subject,omitempty"`
    // 重试策略
    // 尽快重试, ReplayOriginalPolicy 雷同工夫重试
    ReplayPolicy ReplayPolicy `json:"replay_policy"`
    // 限速
    RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
    // 采样频率
    SampleFrequency string `json:"sample_freq,omitempty"`
    // 最大期待数量
    MaxWaiting int `json:"max_waiting,omitempty"`
    // 最大 Pending ack 数量
    MaxAckPending int `json:"max_ack_pending,omitempty"`
    // flow 管制
    FlowControl bool `json:"flow_control,omitempty"`
    // 心跳工夫
    Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
    // ...
}

代码示例

Core Publish-Subcribe

package main


import (
    "fmt"
    "os"
    "time"


    "github.com/nats-io/nats.go"
)


func main() {
    // 环境变量中获取 NATS 服务器地址
    url := os.Getenv("NATS_URL")
    if url == "" {url = nats.DefaultURL}

    // 连贯 NATS 服务器
    nc, _ := nats.Connect(url)


    defer nc.Drain()

    // 生产音讯 1, 因为没有消费者, 这个音讯会失落
    nc.Publish("greet.1", []byte("hello"))

    // 订阅音讯, 异步承受, 这个时候有消费者了
    sub, _ := nc.SubscribeSync("greet.*")

    // 第一个音讯因为没有消费者所以会失落
    msg, _ := sub.NextMsg(10 * time.Millisecond)
    fmt.Println("subscribed after a publish...")
    fmt.Printf("msg is nil? %v\n", msg == nil)

    // 生产音讯 2, 3
    nc.Publish("greet.2", []byte("hello"))
    nc.Publish("greet.3", []byte("hello"))


    msg, _ = sub.NextMsg(10 * time.Millisecond)
    fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)


    msg, _ = sub.NextMsg(10 * time.Millisecond)
    fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)


    nc.Publish("greet.4", []byte("hello"))


    msg, _ = sub.NextMsg(10 * time.Millisecond)
    fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
}

output:

subscribed after a publish...
msg is nil? true
msg data: "hello" on subject "greet.2"
msg data: "hello" on subject "greet.3"
msg data: "hello" on subject "greet.4"

Request-Reply

package main


import (
    "fmt"
    "os"
    "time"


    "github.com/nats-io/nats.go"
)


func main() {url := os.Getenv("NATS_URL")
    if url == "" {url = nats.DefaultURL}


    nc, _ := nats.Connect(url)
    defer nc.Drain()


    sub, _ := nc.Subscribe("greet.*", func(msg *nats.Msg) {name := msg.Subject[6:]
        msg.Respond([]byte("hello," + name))
    })


    rep, _ := nc.Request("greet.joe", nil, time.Second)
    fmt.Println(string(rep.Data))


    rep, _ = nc.Request("greet.sue", nil, time.Second)
    fmt.Println(string(rep.Data))


    rep, _ = nc.Request("greet.bob", nil, time.Second)
    fmt.Println(string(rep.Data))


    sub.Unsubscribe()


    _, err := nc.Request("greet.joe", nil, time.Second)
    fmt.Println(err)
}

output

hello, joe
hello, sue
hello, bob
nats: no responders available for request

Limits-based Stream

package main


import (
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"


    "github.com/nats-io/nats.go"
)


func main() {url := os.Getenv("NATS_URL")
    if url == "" {url = nats.DefaultURL}


    nc, _ := nats.Connect(url)


    defer nc.Drain()


    js, _ := nc.JetStream()


    cfg := nats.StreamConfig{
        Name:     "EVENTS",
        Subjects: []string{"events.>"},
    }


    cfg.Storage = nats.FileStorage


    js.AddStream(&cfg)
    fmt.Println("created the stream")


    js.Publish("events.page_loaded", nil)
    js.Publish("events.mouse_clicked", nil)
    js.Publish("events.mouse_clicked", nil)
    js.Publish("events.page_loaded", nil)
    js.Publish("events.mouse_clicked", nil)
    js.Publish("events.input_focused", nil)
    fmt.Println("published 6 messages")


    js.PublishAsync("events.input_changed", nil)
    js.PublishAsync("events.input_blurred", nil)
    js.PublishAsync("events.key_pressed", nil)
    js.PublishAsync("events.input_focused", nil)
    js.PublishAsync("events.input_changed", nil)
    js.PublishAsync("events.input_blurred", nil)


    select {case <-js.PublishAsyncComplete():
        fmt.Println("published 6 messages")
    case <-time.After(time.Second):
        log.Fatal("publish took too long")
    }


    printStreamState(js, cfg.Name)


    // 限度音讯数量
    cfg.MaxMsgs = 10
    js.UpdateStream(&cfg)
    fmt.Println("set max messages to 10")


    printStreamState(js, cfg.Name)

    // 限度音讯大小
    cfg.MaxBytes = 300
    js.UpdateStream(&cfg)
    fmt.Println("set max bytes to 300")


    printStreamState(js, cfg.Name)

    // 限度音讯最大存活工夫
    cfg.MaxAge = time.Second
    js.UpdateStream(&cfg)
    fmt.Println("set max age to one second")


    printStreamState(js, cfg.Name)


    fmt.Println("sleeping one second...")
    time.Sleep(time.Second)


    printStreamState(js, cfg.Name)
}


func printStreamState(js nats.JetStreamContext, name string) {info, _ := js.StreamInfo(name)
    b, _ := json.MarshalIndent(info.State, ""," ")
    fmt.Println("inspecting stream info")
    fmt.Println(string(b))
}

output

created the stream
published 6 messages
published 6 messages
inspecting stream info
{
 "messages": 12,
 "bytes": 594,
 "first_seq": 1,
 "first_ts": "2022-07-22T13:04:47.814798969Z",
 "last_seq": 12,
 "last_ts": "2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}
set max messages to 10
inspecting stream info
{
 "messages": 10,
 "bytes": 496,
 "first_seq": 3,
 "first_ts": "2022-07-22T13:04:47.815772395Z",
 "last_seq": 12,
 "last_ts": "2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}
set max bytes to 300
inspecting stream info
{
 "messages": 6,
 "bytes": 298,
 "first_seq": 7,
 "first_ts": "2022-07-22T13:04:47.817220635Z",
 "last_seq": 12,
 "last_ts": "2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}
set max age to one second
inspecting stream info
{
 "messages": 6,
 "bytes": 298,
 "first_seq": 7,
 "first_ts": "2022-07-22T13:04:47.817220635Z",
 "last_seq": 12,
 "last_ts": "2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}
sleeping one second...
inspecting stream info
{
 "messages": 0,
 "bytes": 0,
 "first_seq": 13,
 "first_ts": "1970-01-01T00:00:00Z",
 "last_seq": 12,
 "last_ts": "2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}

更多示例参考: https://natsbyexample.com/

reference

官网文档: https://docs.nats.io/

官网 GitHub: https://github.com/nats-io/na…

代码示例: https://natsbyexample.com/

https://marco79423.net/articl…

本文由 mdnice 多平台公布

退出移动版