共计 2781 个字符,预计需要花费 7 分钟才能阅读完成。
序
本文次要钻研一下 dbsync 的 jobs
Job
//Job represents db sync job
type Job struct {
ID string
Error string
Status string
Progress Progress
Items []*Transferable
Chunked bool
mutex *sync.Mutex
StartTime time.Time
EndTime *time.Time
}
//NewJob creates a new job
func NewJob(id string) *Job {
return &Job{
ID: id,
StartTime: time.Now(),
mutex: &sync.Mutex{},
Items: make([]*Transferable, 0),
}
}
Job 办法定义了 ID、Error、Status、Progress、Items、Chunked、mutex、StartTime、EndTime
Update
//Update updates job progress
func (j *Job) Update() {if len(j.Items) == 0 {return}
sourceCount := 0
destCount := 0
transferred := 0
for i := range j.Items {if j.Items[i].Status == nil {continue}
sourceCount += j.Items[i].Source.Count()
destCount += j.Items[i].Dest.Count()
transferred += int(atomic.LoadUint32(&j.Items[i].Transferred))
}
j.Progress.Transferred = transferred
j.Progress.SourceCount = sourceCount
j.Progress.DestCount = destCount
if sourceCount > 0 {j.Progress.Pct = transferred / sourceCount}
}
Update 办法遍历 Items,统计 transferred、sourceCount、destCount
Done
//Done flag job as done
func (j *Job) Done(now time.Time) {
if j.Status != shared.StatusError {j.Status = shared.StatusDone}
j.EndTime = &now
}
Done 办法更新 Status 和 EndTime
Add
//Add add transferable
func (j *Job) Add(transferable *Transferable) {j.mutex.Lock()
defer j.mutex.Unlock()
j.Items = append(j.Items, transferable)
}
Add 办法往 transferable 增加 Items
IsRunning
//IsRunning returns true if jos has running status
func (j *Job) IsRunning() bool {return j.Status == shared.StatusRunning || j.EndTime == nil}
IsRunning 办法通过 status 和 EndTime 来判断是否是 running
Service
//Service represents a job service
type Service interface {
//List lists all active or recently active jobs
List(request *ListRequest) *ListResponse
//Create creates a new job
Create(ID string) *core.Job
//Get returns a job for supplied ID or nil
Get(ID string) *core.Job
}
type service struct {registry *registry}
//New create a job service
func New() Service {
return &service{registry: newRegistry(),
}
}
Service 接口定义了 List、Create、Get
Get
//Get returns job by ID or nil
func (s *service) Get(ID string) *core.Job {jobs := s.registry.list()
for i := range jobs {if jobs[i].ID == ID {jobs[i].Update()
return jobs[i]
}
}
return nil
}
Get 办法先执行 registry.list(),而后遍历 list 找到 ID 对应的 job,而后执行 Update
List
//List lists all jobs
func (s *service) List(request *ListRequest) *ListResponse {jobs := s.registry.list()
if len(request.IDs) == 0 {
return &ListResponse{Jobs: jobs,}
}
var requestedIDs = make(map[string]bool)
for i := range request.IDs {requestedIDs[request.IDs[i]] = true
}
var filtered = make([]*core.Job, 0)
for i := range jobs {if _, has := requestedIDs[jobs[i].ID]; !has {continue}
jobs[i].Update()
filtered = append(filtered, jobs[i])
}
return &ListResponse{Jobs: filtered,}
}
List 办法先执行 registry.list(),之后依据 requestedIDs 找出对应的 job,执行 Update,最初返回
Create
//Create creates a new job
func (s *service) Create(ID string) *core.Job {job := core.NewJob(ID)
s.registry.add(job)
return job
}
Create 办法通过 core.NewJob(ID) 创立 job,而后执行 registry.add(job)
小结
dbsync 的 Schedulable 定义了 URL、ID、*contract.Sync、Schedule、Status、status 属性,它提供了 Clone、Done、IsRunning、ScheduleNexRun、Init、Validate 办法。
doc
- dbsync
正文完