序
本文次要钻研一下 dkron 的 Scheduler
Scheduler
// Scheduler represents a dkron scheduler instance, it stores the cron engine
// and the related parameters.
type Scheduler struct {
Cron *cron.Cron
Started bool
EntryJobMap sync.Map
}
// NewScheduler creates a new Scheduler instance
func NewScheduler() *Scheduler {schedulerStarted.Set(0)
return &Scheduler{
Cron: nil,
Started: false,
EntryJobMap: sync.Map{},}
}
// Start the cron scheduler, adding its corresponding jobs and
// executing them on time.
func (s *Scheduler) Start(jobs []*Job, agent *Agent) error {s.Cron = cron.New(cron.WithParser(extcron.NewParser()))
metrics.IncrCounter([]string{"scheduler", "start"}, 1)
for _, job := range jobs {
job.Agent = agent
s.AddJob(job)
}
s.Cron.Start()
s.Started = true
schedulerStarted.Set(1)
return nil
}
// Stop stops the scheduler effectively not running any job.
func (s *Scheduler) Stop() {
if s.Started {log.Debug("scheduler: Stopping scheduler")
s.Cron.Stop()
s.Started = false
// Keep Cron exists and let the jobs which have been scheduled can continue to finish,
// even the node's leadership will be revoked.
// Ignore the running jobs and make s.Cron to nil may cause whole process crashed.
//s.Cron = nil
// expvars
cronInspect.Do(func(kv expvar.KeyValue) {kv.Value = nil})
}
schedulerStarted.Set(0)
}
// Restart the scheduler
func (s *Scheduler) Restart(jobs []*Job, agent *Agent) {s.Stop()
s.ClearCron()
s.Start(jobs, agent)
}
// Clear cron separately, this can only be called when agent will be stop.
func (s *Scheduler) ClearCron() {s.Cron = nil}
Scheduler 定义了 Cron、Started、EntryJobMap 属性;NewScheduler 办法创立默认的 Scheduler;Start 办法遍历 jobs,挨个设置 job.Agent,而后增加到 Scheduler 中,之后执行 Scheduler.Cron.Start();Stop 办法执行 Scheduler.Cron.Stop();Restart 办法执行 Stop、ClearCron、Start 办法;ClearCron 设置 Cron 为 nil
AddJob
// AddJob Adds a job to the cron scheduler
func (s *Scheduler) AddJob(job *Job) error {
// Check if the job is already set and remove it if exists
if _, ok := s.EntryJobMap.Load(job.Name); ok {s.RemoveJob(job)
}
if job.Disabled || job.ParentJob != "" {return nil}
log.WithFields(logrus.Fields{"job": job.Name,}).Debug("scheduler: Adding job to cron")
// If Timezone is set on the job, and not explicitly in its schedule,
// AND its not a descriptor (that don't support timezones), add the
// timezone to the schedule so robfig/cron knows about it.
schedule := job.Schedule
if job.Timezone != "" &&
!strings.HasPrefix(schedule, "@") &&
!strings.HasPrefix(schedule, "TZ=") &&
!strings.HasPrefix(schedule, "CRON_TZ=") {schedule = "CRON_TZ=" + job.Timezone + " " + schedule}
id, err := s.Cron.AddJob(schedule, job)
if err != nil {return err}
s.EntryJobMap.Store(job.Name, id)
cronInspect.Set(job.Name, job)
metrics.IncrCounterWithLabels([]string{"scheduler", "job_add"}, 1, []metrics.Label{{Name: "job", Value: job.Name}})
return nil
}
AddJob 办法先移除 EntryJobMap 中的同名 job,而后执行 Cron.AddJob(schedule, job),最初存储到 EntryJobMap
RemoveJob
// RemoveJob removes a job from the cron scheduler
func (s *Scheduler) RemoveJob(job *Job) {
log.WithFields(logrus.Fields{"job": job.Name,}).Debug("scheduler: Removing job from cron")
if v, ok := s.EntryJobMap.Load(job.Name); ok {s.Cron.Remove(v.(cron.EntryID))
s.EntryJobMap.Delete(job.Name)
cronInspect.Delete(job.Name)
metrics.IncrCounterWithLabels([]string{"scheduler", "job_delete"}, 1, []metrics.Label{{Name: "job", Value: job.Name}})
}
}
RemoveJob 办法先从 EntryJobMap 移除同名 job,而后执行 cronInspect.Delete(job.Name)
小结
dkron 的 Scheduler 定义了 Cron、Started、EntryJobMap 属性;NewScheduler 办法创立默认的 Scheduler;它提供了 Start、Stop、Restart、ClearCron、AddJob、RemoveJob 办法。
doc
- dkron