共计 2954 个字符,预计需要花费 8 分钟才能阅读完成。
前言
日常工作凋谢中,咱们会有很多异步、批量、定时、提早工作要解决,go-zero 中有 go-queue
,举荐应用 go-queue
去解决,go-queue
自身也是基于 go-zero
开发的,其自身是有两种模式:
dq
:依赖于beanstalkd
,适宜延时、定时工作执行;kq
:依赖于kafka
,实用于异步、批量工作执行;
本篇就先从 dq
开始,缓缓探索 go-queue
背地执行的逻辑。
dq 简介
dq
封装底层 beanstalkd
操作,分布式存储,提早、定时设置。重启服务能够从新执行,然而音讯不会失落,因为音讯的解决都交由 beanstalkd
实现。
能够看出应用非常简单,同时 dq
中应用了 redis setnx
保障了每个音讯只被生产一次。然而在生产者端没有应用 redis
做音讯存储,这个和后面形容的统一。
对 dq
的整体架构做了简略介绍,上面就开始正式的摸索
生产者 example
func main() {producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})
for i := 1000; i < 1005; i++ {
// Delay:提早执行
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
// At:在某一个时刻执行
//_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5))
if err != nil {fmt.Println(err)
}
}
}
从应用上,简略分为两步:
NewProducer(opts)
:将本地队列的端口配置和主题配置传入生产者;producer.Delay()
:应用刚创立好的 生产者 ,调用它的Delay()
。将须要异步发送的音讯传入,Delay
还须要传入提早执行的工夫。
须要阐明的是:创立的 producer
是一个接口,Delay()
只是接口其中的一个办法。后续会其余的办法和外部设计。那咱们就持续往下摸索吧~~~
深刻生产者执行流程
上面从 example
的代码进去,看整个函数的调用链。
初始化
dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...}) // 初始化生产者
|- NewProducerNode(endpoint, tube) // endpoint,tube 来自传入的配置数组
紧接着就到 producerNode.go
,这个局部就会牵涉到 beanstalk
的初始化:
NewProducerNode(endpoint, tube)
|- conn: newConnection(endpoint, tube)
|- return &connection{}
这就波及到 beanstalk
:connection.conn -> *beanstalk.Conn
。
然而在 newConnection()
中并没有对 beanstalk.Conn
进行初始化,这属于 提早初始化
Delay
首先是生产者端调用 producer.Delay(data, timesecond)
,就把音讯插入到外部队列,timesecond
就是提早执行的工夫。咱们来看看 Delay()
到底做了什么?
p.Delay(data, timesecond)
|- p.wrap(data, time) // 将 data 和 time 包装到一块
|- p.insert(nodeFn)
|- node.Delay() // for rangre p.node 每一个 node 都执行一遍 `Delay()`
而 p.insert
就是将上一步封装好的 data 传递给 p{cluster}
的每一个 node 去执行 node.Delay
。
在后面的 初始化 说过,最开始是没有对 conn
进行初始化,那当初要插入数据,总不能不初始化这个 conn
。
node.Delay() // 配置中的每个 node 都执行 `Delay()`
|- node.conn.get() // 获取 node 中的 conn【conn==nil,就初始化一个 conn】|- _, err := conn.Put(data, deplay, opts...)
|- node.conn.reset() // 呈现 err 状况下,如 OOM/Timeout 等状况 -> 敞开 conn,避免透露
所以最初 Delay
实际上是执行 tube.Put(data, delay)
:
tube.Put(data, delay)
|- tube.Conn.cmd("put", ...) // 生产者公布 job
这里就波及到 beanstalk
的 Put
操作:首先看看生产者 Put
指令参数阐明:
put <pri> <delay> <ttr> <bytes> <data>
<pri>
:优先级,值越小优先级越高,默认为 1024;<delay>
:提早ready
秒数,在这段时间 job 为delayed
状态;<ttr>
:time to run
,容许 worker 执行的最大秒数,如果 worker 在这段时间不能 delete,release,bury job,那么当 job 超时,服务器将主动 release 此 job;<bytes>
:job body
的长度,不蕴含\r\n
;<data>
:job body data;
OK。那插入 job
胜利,响应什么呢?
INSERTED <id>\r\n
返回的 id
是插入 job
的工作标识。到此 Put
剖析结束,跟着代码走一遍:
tube.Put(data, priority, daley, ttr)
|- tube.Conn.cmd("put", ...)
|- tube.Conn.readResp("INSERTED id")
|- return id, err // 将 id 返回
这样咱们在 example
中间接能够看到的 生产者 执行的操作就介绍完了。上图,图更好谈话:
producer interface
那么除了 example
中应用的 Delay()
,还有其余几个办法:
Producer interface {At(body []byte, at time.Time) (string, error)
Close() error
Delay(body []byte, delay time.Duration) (string, error)
Revoke(ids string) error
}
At
:指定某个工夫执行【本质也是执行Delay()
】Close
:敞开全副 node 的连贯Delay
:提早执行。传入提早的工夫。Revoke
:本质上是当呈现最小写入节点 <2 时,触发增加失败,将增加胜利的 job 删除掉。
当然,事实上 dq
应用上,开发者只须要应用 At/Delay
就行了。也就是你只有晓得你的工作是定时触发还是提早触发即可。剩下的,dq
外部的封装都曾经帮你做好了。
框架地址
https://github.com/tal-tech/g…
同时在 go-queue
也大量应用 go-zero
的流式解决库 fx
。
https://github.com/tal-tech/g…
欢送应用 go-queue
并 star 反对咱们!一起构建 go-zero
生态!????
go-zero 系列文章见『微服务实际』公众号