引言

最近做了一个需要,是定时工作相干的。以前定时工作都是通过 linux crontab 去实现的,当初服务上云(tkex)了,尝试了 tkex 的 CronJob,因为公司提供的是界面化工具,应用、查看起来很不不便。于是有了本文,通过一个单 pod 去实现一个常驻服务,去跑定时工作。

通过筛选,选用了 cron 这个库,它反对 linux cronjob 语法取配置定时工作,还反对@every 10s@hourly 等描述符去配置定时工作,齐全满足咱们要求,比方上面的例子:

package mainimport (    "fmt"    "github.com/natefinch/lumberjack"    "github.com/robfig/cron/v3"    "github.com/sirupsen/logrus")type CronLogger struct {    clog *logrus.Logger}func (l *CronLogger) Info(msg string, keysAndValues ...interface{}) {    l.clog.WithFields(logrus.Fields{        "data": keysAndValues,    }).Info(msg)}func (l *CronLogger) Error(err error, msg string, keysAndValues ...interface{}) {    l.clog.WithFields(logrus.Fields{        "msg":  msg,        "data": keysAndValues,    }).Warn(err.Error())}func main() {    logger := logrus.New()    _logger := &lumberjack.Logger{        Filename:   "./test.log",        MaxSize:    50,        MaxAge:     15,        MaxBackups: 5,    }    logger.SetOutput(_logger)    logger.SetFormatter(&logrus.JSONFormatter{        DisableHTMLEscape: true,    })    c := cron.New(cron.WithLogger(&CronLogger{        clog: logger,    }))    c.AddFunc("*/5 * * * *", func() {        fmt.Println("你的流量包行将过期了")    })    c.AddFunc("*/2 * * * *", func() {        fmt.Println("你的转码包行将过期了")    })    c.Start()    for {        select {}    }}

应用了 cronjob、并联合了 golang 的 log 组建,输入日志到文件,应用很不便。

然而,在应用过程中,发现还有些有余,短少某些性能,比方我很想应用的查看工作列表。

类库介绍

扩展性强

此类库扩展性挺强,通过 JobWrapper 去包装一个工作,NewChain(w1, w2, w3).Then(job),相干实现如下:

type JobWrapper func(Job) Jobtype Chain struct {    wrappers []JobWrapper}func NewChain(c ...JobWrapper) Chain {    return Chain{c}}func (c Chain) Then(j Job) Job {    for i := range c.wrappers {        j = c.wrappers[len(c.wrappers)-i-1](j)    }    return j}

比方以后脚本如果还没有执行完,下次工作工夫又到了,就能够通过如下默认提供的 wrapper 去防止继续执行。能够看到最初执行的工作 j.Run() 被包装在了一个函数闭包中,并且依据闭包中的 channel 去判断是否执行,防止反复执行。首次执行的时候,容量为 1 的 channel 中曾经有数据了,反复执行时,channel 无数据,默认跳过,等上次工作执行实现后,又像 channel 中写入一条数据,下次 channel 能够读出数据,又能够执行工作了:

func SkipIfStillRunning(j Job) Job {    var ch = make(chan struct{}, 1)    ch <- struct{}{}    return FuncJob(func() {        select {        case v := <-ch:            defer func() { ch <- v }()            j.Run()        default:            // "skip"        }    })}

主流程

cron 主流程是启动一个协程,外面有双重 for 循环,上面咱们来一起剖析一下。

定时器

第一层循环,首先计算下次最早执行工作的工夫跟以后工夫距离 gap,而后设置定时器为 gap,这里很奇妙,定时器距离不是 1s/次,而是跟下次工作的工夫相干,这样就防止了无用的定时器循环,也让执行工夫更精准,不存在设置小了浪费资源,设置大了误差大的状况。接下来进入第二层循环。

sort.Sort(byTime(c.entries))timer = time.NewTimer(c.entries[0].Next.Sub(now))

事件循环

事件循环中,蕴含了很多事件,比方 增加工作进行移除工作,当 cron 启动之后,这些工作都是异步的。比方增加工作,不会间接将工作信息写入内存中,而是进入事件循环,退出之后,从新计算第一二层循环,防止了正在批改工作信息,又执行工作信息,而后出错的状况。

有人可能会问了,为何不在事件中加锁,这样也能防止内存竞争。我想说,咱们执行的是脚本工作,有的事件可能很长,可能会阻塞有些事件,所以这些事件都放在循环中,防止了加锁,也满足了要求。

for {    select {    case now = <-timer.C:        // 执行工作    case newEntry := <-c.add:        // 增加工作    case replyChan := <-c.snapshot:        // 获取工作信息    case <-c.stop:        //  进行工作    case id := <-c.remove:        // 移除工作    }    break}

类库革新

在理解了我的项目的根本状况之后,对我的项目做了局部革新,方便使用。

打印工作列表信息

在主循环汇总退出了信号量监听,当触发信号量 SIGUSR1,将工作信息输入到日志:

usrSig := make(chan os.Signal, 1)signal.Notify(usrSig, syscall.SIGUSR1)for {    select {    case <-usrSig:        // 启动独自的协程去打印定时工作执行信息        continue    }    break}

依据名称移除脚本

目前脚本只能依据脚本 id 去移除要执行的工作,执行过程中,也不能通过命令去移除工作,不是太不便。比方有个脚本马上要执行了,然而该脚本发现问题了,这时候生产环境的话,就须要更新代码,而后重启服务去下线脚本工作,这时候,黄花菜可能都凉了。

所以我也是通过信号量,来解决运行之后,运行中移除工作的问题,收到信号量之后,读取文件中的内容,依据命令去解决 runing 中的内存:

usrSig2 := make(chan os.Signal, 1)signal.Notify(usrSig2, syscall.SIGUSR2)......case <-usrSig2:    actionByte, err := os.ReadFile("/tmp/cron.action")    ...... //校验命令正确性    action := strings.Fields(string(actionByte))    switch action[0] {    case "removeTag":        timer.Stop()        now = c.now()        c.removeEntryByTag(action[1])        c.logger.Info("removedByTag", "tag", action[1])    }......

革新成果

因为原我的项目曾经 2 年多没有个更新过了,就算发动 pr 预计也不会被解决,所以 fork 一份放在了这里 aizuyan/cron 进行革新,上面是改良之后的代码:

package mainimport (    // 加载配置文件    "fmt"    "github.com/aizuyan/cron/v3")func main() {    c := cron.New(cron.WithLogger(cron.DefaultLogger))    c.AddFuncWithTag("流量包过期", "*/5 * * * *", func() {        fmt.Println("你的流量包行将过期了")    })    c.AddFuncWithTag("转码包过期", "*/2 * * * *", func() {        fmt.Println("你的转码包行将过期了")    })    c.Start()    for {        select {}    }}

对每个定时工作减少了一个名称标识,当工作启动后,当咱们执行 kill -SIGUSR1 <pid> 的时候,会看到 stdout 输入了运行的工作列表信息:

+----+------------+-------------+---------------------+---------------------+| ID |    TAG     |    SPEC     |        PREV         |        NEXT         |+----+------------+-------------+---------------------+---------------------+|  2 | 转码包过期 | */2 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:22:00 ||  1 | 流量包过期 | */5 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:25:00 |+----+------------+-------------+---------------------+---------------------+

执行 kill -SIGUSR2 <pid>,移除转码包过期工作,防止了应用 ID 容易出错的问题。

cat /tmp/cron.action removeTag 转码包过期// {"data":["tag","转码包过期"],"level":"info","msg":"removedByTag","time":"2023-04-02T18:32:56+08:00"}

放目前为止,是不是更好用了,根本能满足咱们的需要了,也能够本人去再做各种扩大。