nats 简介和应用
nats 有 3 个产品
- core-nats: 不做长久化的及时信息传输零碎
- nats-streaming: 基于 nats 的长久化音讯队列(已弃用)
- nats-jetstream: 基于 nats 的长久化音讯队列
这里次要探讨 core-nats 和 nats-jetstream
nats
nats 疾速开始
- 启动 nats
# 启动 natsdocker run --network host -p 4222:4222 nats
- Connect 连贯
nc, err := nats.Connect("nats://localhost:4222")if err != nil { log.Fatal("NATS 连贯失败")}defer nc.Close()
- Publish 公布/生产音讯
// 生产音讯err := nc.Publish("foo", []byte("Hello World"))if err != nil { log.Fatal("NATS 公布失败")}// Flush 公布缓冲区err = nc.Flush()if err != nil { log.Fatal("NATS Flush 失败")}
出于性能思考, 公布的音讯先写入到相似 Buffer 缓存的中央, 而后再一次性发送到 nats 服务器
参考官网文档: https://docs.nats.io/using-na...
- Subscribe 订阅/生产音讯
// 生产音讯_, err = nc.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("收到音讯: %s\n", msg.Data)})if err != nil { log.Fatal("NATS 订阅失败")}
nats 提供 公布订阅, 申请响应, 和队列模型 3 种 API. 别离是公布订阅模型, 申请响应模型, 和队列模型, 上面开展介绍
公布订阅
公布订阅模型, 一个发布者, 多个订阅者, 多个订阅者都能够收到同一个音讯
// 生产音讯_, err = nc.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("收到音讯: %s\n", msg.Data)})if err != nil { log.Fatal("NATS 订阅失败")}
队列模型
队列模型, 一个发布者, 多个订阅者, 音讯在多个音讯中负载平衡调配, 调配给 A 消费者, 这个音讯就不会再调配给其余消费者了
// 生产音讯// queue 是队列组的名称, 同一组队列最多只有一个接收者能胜利接管_, _ = nc.QueueSubscribe("foo", "queue", func(msg *nats.Msg) { fmt.Printf("收到音讯: %s\n", string(msg.Data))})
申请响应
生产者能收到消费者的回复
// 生产音讯nc.Subscribe("help", func(m *nats.Msg) { fmt.Printf("收到音讯: %s\n", string(m.Data)) nc.Publish(m.Reply, []byte("I can help!"))})// 生产音讯go func() { msg, _ := nc.Request("help", []byte("help me"), 100*time.Millisecond) fmt.Printf("收到回复: %s\n", string(msg.Data))}()select {}
利用1-保障音讯可靠性
nats 自身不做任何的音讯的长久化, 是 "最多一次" 交付模型
举个例子, 如果生产的音讯没有消费者接, 音讯就丢掉了
然而申请响应机制能够通过业务代码保障音讯的可靠性, 在业务层面实现常见音讯队列的 ACK 机制
举个例子, 生产者发送音讯, 消费者承受音讯后处理, 胜利返回 OK, 失败返回 error, 生产者如果收到 error 或者超时就能够补发音讯
利用2-解耦 PRC 调用
申请响应模型和 RPC 调用是统一的, 咱们能够用这个实现一个基于事件驱动的 RPC 总线
nats-jetstream
架构
jetstream 提供长久化 nats 服务, 客户端反对实时推送的 push 模式和自定义拉取的 pull 模式, 架构图如下
- subject: 和 nats 一样, 用来辨别不同的音讯
- stream: 定义了音讯的贮存形式, 保留规定, 抛弃规定 (stream 和 subject 是 1:n 的关系)
- consumer: 定义了音讯承受形式并记录承受到的地位, 有 2 种生产形式及时推送 push 和自定义拉取 pull (consumer 和 stream 是 1:n 的关系)
用反对 jetstream 的形式启动 nats
# 启动 nats jetStream (同时反对 nats API 和 jetStream API)docker run --network host -p 4222:4222 nats -js
架构示例
贴一个来自于官网文档的 push pull 混合应用的架构示例图
基于 pull 的 worker 消费者和基于 push 的 monitor 消费者同时存在
Stream
JetStream 中 Stream 定义了音讯的贮存形式, 保留规定, 抛弃规定.
一个 Stream 能够对应多个 Subject, 如果一条音讯合乎 Stream 的保留规定, 就会被保留下来
留神 JetStream 所有生产和生产的音讯的 Subject 都须要有 Stream 对应, 不然报错
贴一个 Stream 的外围配置 (v1.15.0)
// jsm.go// StreamConfig 用于定义一个流, 大多数参数都有正当的默认值// 如果 subject 没写, 就会调配一个随机的 subjecttype StreamConfig struct { // 名称 Name string // 形容 Description string // 对应的多个 Subject Subjects []string // 音讯 3 种保留策略 // RetentionPolicy 最大音讯数, 最大存储空间或者最大存活工夫达到限度, 就能够删除音讯 // InterestPolicy 须要所有 consumer 确认能够删除音讯 // WorkQueuePolicy 只须要一个 consumer 确认能够删除音讯 Retention RetentionPolicy // 最大 Consumer 数量 MaxConsumers int // 最大存储 Mgs 数量 MaxMsgs int64 // 最大贮存占用 MaxBytes int64 // 音讯 2 种淘汰策略 // DiscardOld 音讯达到限度后, 抛弃最早的音讯 // DiscardNew 音讯达到限度后, 信息音讯新推送会失败 Discard DiscardPolicy // 音讯存活工夫 MaxAge time.Duration // 每个 subject 最大音讯数量 MaxMsgsPerSubject int64 // 每个音讯最大大小 MaxMsgSize int32 // 反对文件贮存和内存贮存 2 种类型 Storage StorageType // 音讯分片数量 Replicas int // 不须要 ack NoAck bool // ... }
Consumer
Consumer 定义了音讯承受形式并记录承受到的地位
举个例子如果消费者在 Sub 音讯的时候指定了 Consumer, 就会从记录的地位开始推送音讯, 而不是从头开始
贴一个 Consumer 的外围配置 (v1.15.0)
// jsm.gotype ConsumerConfig struct { // 名称 Durable string `json:"durable_name,omitempty"` // 形容 Description string `json:"description,omitempty"` // 交付 Subject DeliverSubject string `json:"deliver_subject,omitempty"` // 交付 Group DeliverGroup string `json:"deliver_group,omitempty"` // 交付策略 // 交付所有 (默认), 交付最初一个, 交付最新, 自定义开始序号, 自定义开始工夫 DeliverPolicy DeliverPolicy `json:"deliver_policy"` // 开始序号 OptStartSeq uint64 `json:"opt_start_seq,omitempty"` // 开始工夫 OptStartTime *time.Time `json:"opt_start_time,omitempty"` // ack 策略 // 不须要ack (默认), 隐式ack All , 每个都须要显示ack AckPolicy AckPolicy `json:"ack_policy"` // ack等待时间 AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` // 过滤的Subject FilterSubject string `json:"filter_subject,omitempty"` // 重试策略 // 尽快重试, ReplayOriginalPolicy 雷同工夫重试 ReplayPolicy ReplayPolicy `json:"replay_policy"` // 限速 RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec // 采样频率 SampleFrequency string `json:"sample_freq,omitempty"` // 最大期待数量 MaxWaiting int `json:"max_waiting,omitempty"` //最大Pending ack数量 MaxAckPending int `json:"max_ack_pending,omitempty"` // flow 管制 FlowControl bool `json:"flow_control,omitempty"` // 心跳工夫 Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` // ...}
代码示例
Core Publish-Subcribe
package mainimport ( "fmt" "os" "time" "github.com/nats-io/nats.go")func main() { // 环境变量中获取 NATS 服务器地址 url := os.Getenv("NATS_URL") if url == "" { url = nats.DefaultURL } // 连贯 NATS 服务器 nc, _ := nats.Connect(url) defer nc.Drain() // 生产音讯 1, 因为没有消费者, 这个音讯会失落 nc.Publish("greet.1", []byte("hello")) // 订阅音讯, 异步承受, 这个时候有消费者了 sub, _ := nc.SubscribeSync("greet.*") // 第一个音讯因为没有消费者所以会失落 msg, _ := sub.NextMsg(10 * time.Millisecond) fmt.Println("subscribed after a publish...") fmt.Printf("msg is nil? %v\n", msg == nil) // 生产音讯 2, 3 nc.Publish("greet.2", []byte("hello")) nc.Publish("greet.3", []byte("hello")) msg, _ = sub.NextMsg(10 * time.Millisecond) fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject) msg, _ = sub.NextMsg(10 * time.Millisecond) fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject) nc.Publish("greet.4", []byte("hello")) msg, _ = sub.NextMsg(10 * time.Millisecond) fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)}
output:
subscribed after a publish...msg is nil? truemsg data: "hello" on subject "greet.2"msg data: "hello" on subject "greet.3"msg data: "hello" on subject "greet.4"
Request-Reply
package mainimport ( "fmt" "os" "time" "github.com/nats-io/nats.go")func main() { url := os.Getenv("NATS_URL") if url == "" { url = nats.DefaultURL } nc, _ := nats.Connect(url) defer nc.Drain() sub, _ := nc.Subscribe("greet.*", func(msg *nats.Msg) { name := msg.Subject[6:] msg.Respond([]byte("hello, " + name)) }) rep, _ := nc.Request("greet.joe", nil, time.Second) fmt.Println(string(rep.Data)) rep, _ = nc.Request("greet.sue", nil, time.Second) fmt.Println(string(rep.Data)) rep, _ = nc.Request("greet.bob", nil, time.Second) fmt.Println(string(rep.Data)) sub.Unsubscribe() _, err := nc.Request("greet.joe", nil, time.Second) fmt.Println(err)}
output
hello, joehello, suehello, bobnats: no responders available for request
Limits-based Stream
package mainimport ( "encoding/json" "fmt" "log" "os" "time" "github.com/nats-io/nats.go")func main() { url := os.Getenv("NATS_URL") if url == "" { url = nats.DefaultURL } nc, _ := nats.Connect(url) defer nc.Drain() js, _ := nc.JetStream() cfg := nats.StreamConfig{ Name: "EVENTS", Subjects: []string{"events.>"}, } cfg.Storage = nats.FileStorage js.AddStream(&cfg) fmt.Println("created the stream") js.Publish("events.page_loaded", nil) js.Publish("events.mouse_clicked", nil) js.Publish("events.mouse_clicked", nil) js.Publish("events.page_loaded", nil) js.Publish("events.mouse_clicked", nil) js.Publish("events.input_focused", nil) fmt.Println("published 6 messages") js.PublishAsync("events.input_changed", nil) js.PublishAsync("events.input_blurred", nil) js.PublishAsync("events.key_pressed", nil) js.PublishAsync("events.input_focused", nil) js.PublishAsync("events.input_changed", nil) js.PublishAsync("events.input_blurred", nil) select { case <-js.PublishAsyncComplete(): fmt.Println("published 6 messages") case <-time.After(time.Second): log.Fatal("publish took too long") } printStreamState(js, cfg.Name) // 限度音讯数量 cfg.MaxMsgs = 10 js.UpdateStream(&cfg) fmt.Println("set max messages to 10") printStreamState(js, cfg.Name) // 限度音讯大小 cfg.MaxBytes = 300 js.UpdateStream(&cfg) fmt.Println("set max bytes to 300") printStreamState(js, cfg.Name) // 限度音讯最大存活工夫 cfg.MaxAge = time.Second js.UpdateStream(&cfg) fmt.Println("set max age to one second") printStreamState(js, cfg.Name) fmt.Println("sleeping one second...") time.Sleep(time.Second) printStreamState(js, cfg.Name)}func printStreamState(js nats.JetStreamContext, name string) { info, _ := js.StreamInfo(name) b, _ := json.MarshalIndent(info.State, "", " ") fmt.Println("inspecting stream info") fmt.Println(string(b))}
output
created the streampublished 6 messagespublished 6 messagesinspecting stream info{ "messages": 12, "bytes": 594, "first_seq": 1, "first_ts": "2022-07-22T13:04:47.814798969Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0}set max messages to 10inspecting stream info{ "messages": 10, "bytes": 496, "first_seq": 3, "first_ts": "2022-07-22T13:04:47.815772395Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0}set max bytes to 300inspecting stream info{ "messages": 6, "bytes": 298, "first_seq": 7, "first_ts": "2022-07-22T13:04:47.817220635Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0}set max age to one secondinspecting stream info{ "messages": 6, "bytes": 298, "first_seq": 7, "first_ts": "2022-07-22T13:04:47.817220635Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0}sleeping one second...inspecting stream info{ "messages": 0, "bytes": 0, "first_seq": 13, "first_ts": "1970-01-01T00:00:00Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0}
更多示例参考: https://natsbyexample.com/
reference
官网文档: https://docs.nats.io/
官网GitHub: https://github.com/nats-io/na...
代码示例: https://natsbyexample.com/
https://marco79423.net/articl...
本文由mdnice多平台公布