本文次要钻研一下dbsync的jobs

Job

//Job represents db sync jobtype Job struct {    ID        string    Error     string    Status    string    Progress  Progress    Items     []*Transferable    Chunked   bool    mutex     *sync.Mutex    StartTime time.Time    EndTime   *time.Time}//NewJob creates a new jobfunc NewJob(id string) *Job {    return &Job{        ID:        id,        StartTime: time.Now(),        mutex:     &sync.Mutex{},        Items:     make([]*Transferable, 0),    }}
Job办法定义了ID、Error、Status、Progress、Items、Chunked、mutex、StartTime、EndTime

Update

//Update updates job progressfunc (j *Job) Update() {    if len(j.Items) == 0 {        return    }    sourceCount := 0    destCount := 0    transferred := 0    for i := range j.Items {        if j.Items[i].Status == nil {            continue        }        sourceCount += j.Items[i].Source.Count()        destCount += j.Items[i].Dest.Count()        transferred += int(atomic.LoadUint32(&j.Items[i].Transferred))    }    j.Progress.Transferred = transferred    j.Progress.SourceCount = sourceCount    j.Progress.DestCount = destCount    if sourceCount > 0 {        j.Progress.Pct = transferred / sourceCount    }}
Update办法遍历Items,统计transferred、sourceCount、destCount

Done

//Done flag job as donefunc (j *Job) Done(now time.Time) {    if j.Status != shared.StatusError {        j.Status = shared.StatusDone    }    j.EndTime = &now}
Done办法更新Status和EndTime

Add

//Add add transferablefunc (j *Job) Add(transferable *Transferable) {    j.mutex.Lock()    defer j.mutex.Unlock()    j.Items = append(j.Items, transferable)}
Add办法往transferable增加Items

IsRunning

//IsRunning returns true if jos has running statusfunc (j *Job) IsRunning() bool {    return j.Status == shared.StatusRunning || j.EndTime == nil}
IsRunning办法通过status和EndTime来判断是否是running

Service

//Service represents a job servicetype Service interface {    //List lists all active or recently active jobs    List(request *ListRequest) *ListResponse    //Create creates a new job    Create(ID string) *core.Job    //Get returns a job for supplied ID or nil    Get(ID string) *core.Job}type service struct {    registry *registry}//New create a job servicefunc New() Service {    return &service{        registry: newRegistry(),    }}
Service接口定义了List、Create、Get

Get

//Get returns job by ID or nilfunc (s *service) Get(ID string) *core.Job {    jobs := s.registry.list()    for i := range jobs {        if jobs[i].ID == ID {            jobs[i].Update()            return jobs[i]        }    }    return nil}
Get办法先执行registry.list(),而后遍历list找到ID对应的job,而后执行Update

List

//List lists all jobsfunc (s *service) List(request *ListRequest) *ListResponse {    jobs := s.registry.list()    if len(request.IDs) == 0 {        return &ListResponse{            Jobs: jobs,        }    }    var requestedIDs = make(map[string]bool)    for i := range request.IDs {        requestedIDs[request.IDs[i]] = true    }    var filtered = make([]*core.Job, 0)    for i := range jobs {        if _, has := requestedIDs[jobs[i].ID]; !has {            continue        }        jobs[i].Update()        filtered = append(filtered, jobs[i])    }    return &ListResponse{        Jobs: filtered,    }}
List办法先执行registry.list(),之后依据requestedIDs找出对应的job,执行Update,最初返回

Create

//Create creates a new jobfunc (s *service) Create(ID string) *core.Job {    job := core.NewJob(ID)    s.registry.add(job)    return job}
Create办法通过core.NewJob(ID)创立job,而后执行registry.add(job)

小结

dbsync的Schedulable定义了URL、ID、*contract.Sync、Schedule、Status、status属性,它提供了Clone、Done、IsRunning、ScheduleNexRun、Init、Validate办法。

doc

  • dbsync