- 为什么写这个库
- 利用场景有哪些
- 如何应用
- 总结
为什么要写这个库?
在开始自研 go-queue
之前,针对以下咱们调研目前的开源队列计划:
beanstalkd
beanstalkd
有一些非凡好用性能:反对工作 priority、延时 (delay)、超时重发(time-to-run) 和预留(buried),可能很好的反对分布式的后台任务和定时工作解决。如下是 beanstalkd
根本局部:
job
:工作单元;tube
:工作队列,存储对立类型job
。producer 和 consumer 操作对象;producer
:job
生产者,通过 put 将 job 退出一个 tube;consumer
:job
消费者,通过 reserve/release/bury/delete 来获取 job 或扭转 job 的状态;
很侥幸的是官网提供了 go client:https://github.com/beanstalkd…。
然而这对不相熟 beanstalkd
操作的 go 开发者而言,须要学习老本。
kafka
相似基于 kafka
音讯队列作为存储的计划,存储单元是音讯,如果要实现延时执行,能够想到的计划是以延时执行的工夫作为 topic
,这样在大型的音讯零碎中,充斥大量一次性的 topic
(dq_1616324404788, dq_1616324417622
),当工夫扩散,会容易造成磁盘随机写的状况。
而且在 go 生态中,
同时思考以下因素:
- 反对延时工作
- 高可用,保证数据不失落
- 可扩大资源和性能
所以咱们本人基于以上两个根底组件开发了 go-queue
:
- 基于
beanstalkd
开发了dq
,反对定时和延时操作。同时退出redis
保障生产唯一性。 - 基于
kafka
开发了kq
,简化生产者和消费者的开发 API,同时在写入 kafka 应用批量写,节俭 IO。
整体设计如下:
利用场景
首先在生产场景来说,一个是针对工作队列,一个是音讯队列。而两者最大的区别:
- 工作是没有程序束缚;音讯须要;
- 工作在退出中,或者是期待中,可能存在状态更新(或是勾销);音讯则是繁多的存储即可;
所以在背地的基础设施选型上,也是基于这种生产场景。
dq
:依赖于beanstalkd
,适宜延时、定时工作执行;kq
:依赖于kafka
,实用于异步、批量工作执行;
而从其中 dq
的 API 中也能够看出:
// 提早工作执行
- dq.Delay(msg, delayTime);
// 定时工作执行
- dq.At(msg, atTime);
而在咱们外部:
- 如果是 异步音讯生产 / 推送,则会抉择应用
kq
:kq.Push(msg)
; - 如果是 15 分钟揭示 / 今天中午发送短信 等,则应用
dq
;
如何应用
别离介绍 dq
和 kq
的应用形式:
dq
// [Producer]
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})
for i := 1000; i < 1005; i++ {_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
if err != nil {fmt.Println(err)
}
}
// [Consumer]
consumer := dq.NewConsumer(dq.DqConf{Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
})
consumer.Consume(func(body []byte) {
// your consume logic
fmt.Println(string(body))
})
和一般的 生产者 - 消费者 模型相似,开发者也只须要关注以下:
- 开发者只须要关注本人的工作类型「延时 / 定时」
- 生产端的生产逻辑
kq
producer.go
:
// message structure
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
pusher := kq.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19093",
"127.0.0.1:19094",
}, "kq")
ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
select {
case <-ticker.C:
count := rand.Intn(100)
// 筹备音讯
m := message{Key: strconv.FormatInt(time.Now().UnixNano(), 10),
Value: fmt.Sprintf("%d,%d", round, count),
Payload: fmt.Sprintf("%d,%d", round, count),
}
body, err := json.Marshal(m)
if err != nil {log.Fatal(err)
}
fmt.Println(string(body))
// push to kafka broker
if err := pusher.Push(string(body)); err != nil {log.Fatal(err)
}
}
}
config.yaml
:
Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
consumer.go
:
var c kq.KqConf
conf.MustLoad("config.yaml", &c)
// WithHandle: 具体的解决 msg 的 logic
// 这也是开发者须要依据本人的业务定制化
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {fmt.Printf("=> %s\n", v)
return nil
}))
defer q.Stop()
q.Start()
和 dq
不同的是:开发者不须要关注工作类型(在这里也没有工作的概念,传递的都是 message data
)。
其余操作和 dq
相似,只是将 业务处理函数 当成配置间接传入消费者中。
总结
在咱们目前的场景中,kq
大量应用在咱们的异步音讯服务;而延时工作,咱们除了 dq
,还能够应用内存版的 TimingWheel「go-zero
生态组件」。
对于 go-queue
更多的设计和实现文章,能够继续关注咱们。欢送大家去关注和应用。
https://github.com/tal-tech/go-queue
https://github.com/tal-tech/go-zero
欢送应用 go-zero 并 star 反对咱们!
go-zero 系列文章见『微服务实际』公众号