本文次要钻研一下dkron的Scheduler

Scheduler

// Scheduler represents a dkron scheduler instance, it stores the cron engine// and the related parameters.type Scheduler struct {    Cron        *cron.Cron    Started     bool    EntryJobMap sync.Map}// NewScheduler creates a new Scheduler instancefunc NewScheduler() *Scheduler {    schedulerStarted.Set(0)    return &Scheduler{        Cron:        nil,        Started:     false,        EntryJobMap: sync.Map{},    }}// Start the cron scheduler, adding its corresponding jobs and// executing them on time.func (s *Scheduler) Start(jobs []*Job, agent *Agent) error {    s.Cron = cron.New(cron.WithParser(extcron.NewParser()))    metrics.IncrCounter([]string{"scheduler", "start"}, 1)    for _, job := range jobs {        job.Agent = agent        s.AddJob(job)    }    s.Cron.Start()    s.Started = true    schedulerStarted.Set(1)    return nil}// Stop stops the scheduler effectively not running any job.func (s *Scheduler) Stop() {    if s.Started {        log.Debug("scheduler: Stopping scheduler")        s.Cron.Stop()        s.Started = false        // Keep Cron exists and let the jobs which have been scheduled can continue to finish,        // even the node's leadership will be revoked.        // Ignore the running jobs and make s.Cron to nil may cause whole process crashed.        //s.Cron = nil        // expvars        cronInspect.Do(func(kv expvar.KeyValue) {            kv.Value = nil        })    }    schedulerStarted.Set(0)}// Restart the schedulerfunc (s *Scheduler) Restart(jobs []*Job, agent *Agent) {    s.Stop()    s.ClearCron()    s.Start(jobs, agent)}// Clear cron separately, this can only be called when agent will be stop.func (s *Scheduler) ClearCron() {    s.Cron = nil}
Scheduler定义了Cron、Started、EntryJobMap属性;NewScheduler办法创立默认的Scheduler;Start办法遍历jobs,挨个设置job.Agent,而后增加到Scheduler中,之后执行Scheduler.Cron.Start();Stop办法执行Scheduler.Cron.Stop();Restart办法执行Stop、ClearCron、Start办法;ClearCron设置Cron为nil

AddJob

// AddJob Adds a job to the cron schedulerfunc (s *Scheduler) AddJob(job *Job) error {    // Check if the job is already set and remove it if exists    if _, ok := s.EntryJobMap.Load(job.Name); ok {        s.RemoveJob(job)    }    if job.Disabled || job.ParentJob != "" {        return nil    }    log.WithFields(logrus.Fields{        "job": job.Name,    }).Debug("scheduler: Adding job to cron")    // If Timezone is set on the job, and not explicitly in its schedule,    // AND its not a descriptor (that don't support timezones), add the    // timezone to the schedule so robfig/cron knows about it.    schedule := job.Schedule    if job.Timezone != "" &&        !strings.HasPrefix(schedule, "@") &&        !strings.HasPrefix(schedule, "TZ=") &&        !strings.HasPrefix(schedule, "CRON_TZ=") {        schedule = "CRON_TZ=" + job.Timezone + " " + schedule    }    id, err := s.Cron.AddJob(schedule, job)    if err != nil {        return err    }    s.EntryJobMap.Store(job.Name, id)    cronInspect.Set(job.Name, job)    metrics.IncrCounterWithLabels([]string{"scheduler", "job_add"}, 1, []metrics.Label{{Name: "job", Value: job.Name}})    return nil}
AddJob办法先移除EntryJobMap中的同名job,而后执行Cron.AddJob(schedule, job),最初存储到EntryJobMap

RemoveJob

// RemoveJob removes a job from the cron schedulerfunc (s *Scheduler) RemoveJob(job *Job) {    log.WithFields(logrus.Fields{        "job": job.Name,    }).Debug("scheduler: Removing job from cron")    if v, ok := s.EntryJobMap.Load(job.Name); ok {        s.Cron.Remove(v.(cron.EntryID))        s.EntryJobMap.Delete(job.Name)        cronInspect.Delete(job.Name)        metrics.IncrCounterWithLabels([]string{"scheduler", "job_delete"}, 1, []metrics.Label{{Name: "job", Value: job.Name}})    }}
RemoveJob办法先从EntryJobMap移除同名job,而后执行cronInspect.Delete(job.Name)

小结

dkron的Scheduler定义了Cron、Started、EntryJobMap属性;NewScheduler办法创立默认的Scheduler;它提供了Start、Stop、Restart、ClearCron、AddJob、RemoveJob办法。

doc

  • dkron