乐趣区

关于golang:还在用crontab-分布式定时任务了解一下

前言

日常工作凋谢中,咱们会有很多异步、批量、定时、提早工作要解决,go-zero 中有 go-queue,举荐应用 go-queue 去解决,go-queue 自身也是基于 go-zero 开发的,其自身是有两种模式:

  • dq:依赖于 beanstalkd,适宜延时、定时工作执行;
  • kq:依赖于 kafka,实用于异步、批量工作执行;

本篇就先从 dq 开始,缓缓探索 go-queue 背地执行的逻辑。

dq 简介

dq 封装底层 beanstalkd 操作,分布式存储,提早、定时设置。重启服务能够从新执行,然而音讯不会失落,因为音讯的解决都交由 beanstalkd 实现。

能够看出应用非常简单,同时 dq 中应用了 redis setnx 保障了每个音讯只被生产一次。然而在生产者端没有应用 redis 做音讯存储,这个和后面形容的统一。

dq 的整体架构做了简略介绍,上面就开始正式的摸索

生产者 example

func main() {producer := dq.NewProducer([]dq.Beanstalk{
        {
            Endpoint: "localhost:11300",
            Tube:     "tube",
        },
        {
            Endpoint: "localhost:11301",
            Tube:     "tube",
        },
    })
    for i := 1000; i < 1005; i++ {
    // Delay:提早执行
        _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
    // At:在某一个时刻执行
        //_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5))
        if err != nil {fmt.Println(err)
        }
    }
}

从应用上,简略分为两步:

  1. NewProducer(opts):将本地队列的端口配置和主题配置传入生产者;
  2. producer.Delay():应用刚创立好的 生产者 ,调用它的 Delay()。将须要异步发送的音讯传入,Delay 还须要传入提早执行的工夫。

须要阐明的是:创立的 producer 是一个接口,Delay() 只是接口其中的一个办法。后续会其余的办法和外部设计。那咱们就持续往下摸索吧~~~

深刻生产者执行流程

上面从 example 的代码进去,看整个函数的调用链。

初始化

dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...})    // 初始化生产者
    |- NewProducerNode(endpoint, tube)                                // endpoint,tube 来自传入的配置数组 

紧接着就到 producerNode.go,这个局部就会牵涉到 beanstalk 的初始化:

NewProducerNode(endpoint, tube)
    |- conn: newConnection(endpoint, tube)
        |- return &connection{}    

这就波及到 beanstalkconnection.conn -> *beanstalk.Conn

然而在 newConnection() 中并没有对 beanstalk.Conn 进行初始化,这属于 提早初始化

Delay

首先是生产者端调用 producer.Delay(data, timesecond),就把音讯插入到外部队列,timesecond 就是提早执行的工夫。咱们来看看 Delay() 到底做了什么?

p.Delay(data, timesecond)
    |- p.wrap(data, time)            // 将 data 和 time 包装到一块
        |- p.insert(nodeFn)
            |- node.Delay()             // for rangre p.node 每一个 node 都执行一遍 `Delay()`

p.insert 就是将上一步封装好的 data 传递给 p{cluster} 的每一个 node 去执行 node.Delay

在后面的 初始化 说过,最开始是没有对 conn 进行初始化,那当初要插入数据,总不能不初始化这个 conn

node.Delay()                                    // 配置中的每个 node 都执行 `Delay()`
    |- node.conn.get()                    // 获取 node 中的 conn【conn==nil,就初始化一个 conn】|- _, err := conn.Put(data, deplay, opts...)
        |- node.conn.reset()             // 呈现 err 状况下,如 OOM/Timeout 等状况 -> 敞开 conn,避免透露 

所以最初 Delay 实际上是执行 tube.Put(data, delay)

tube.Put(data, delay)
    |- tube.Conn.cmd("put", ...)        // 生产者公布 job

这里就波及到 beanstalkPut 操作:首先看看生产者 Put 指令参数阐明:

put <pri> <delay> <ttr> <bytes> <data>
  • <pri>:优先级,值越小优先级越高,默认为 1024;
  • <delay>:提早 ready 秒数,在这段时间 job 为 delayed 状态;
  • <ttr>time to run,容许 worker 执行的最大秒数,如果 worker 在这段时间不能 delete,release,bury job,那么当 job 超时,服务器将主动 release 此 job;
  • <bytes>job body 的长度,不蕴含 \r\n
  • <data>:job body data;

OK。那插入 job 胜利,响应什么呢?

INSERTED <id>\r\n

返回的 id 是插入 job 的工作标识。到此 Put 剖析结束,跟着代码走一遍:

tube.Put(data, priority, daley, ttr)
    |- tube.Conn.cmd("put", ...)
    |- tube.Conn.readResp("INSERTED id")
|- return id, err            // 将 id 返回 

这样咱们在 example 中间接能够看到的 生产者 执行的操作就介绍完了。上图,图更好谈话:

producer interface

那么除了 example 中应用的 Delay(),还有其余几个办法:

Producer interface {At(body []byte, at time.Time) (string, error)
  Close() error
  Delay(body []byte, delay time.Duration) (string, error)
  Revoke(ids string) error
}
  • At:指定某个工夫执行【本质也是执行 Delay()
  • Close:敞开全副 node 的连贯
  • Delay:提早执行。传入提早的工夫。
  • Revoke:本质上是当呈现最小写入节点 <2 时,触发增加失败,将增加胜利的 job 删除掉。

当然,事实上 dq 应用上,开发者只须要应用 At/Delay 就行了。也就是你只有晓得你的工作是定时触发还是提早触发即可。剩下的,dq 外部的封装都曾经帮你做好了。

框架地址

https://github.com/tal-tech/g…

同时在 go-queue 也大量应用 go-zero 的流式解决库 fx

https://github.com/tal-tech/g…

欢送应用 go-queuestar 反对咱们!一起构建 go-zero 生态!????

go-zero 系列文章见『微服务实际』公众号

退出移动版