乐趣区

关于golang:聊聊dbsync的jobs

本文次要钻研一下 dbsync 的 jobs

Job

//Job represents db sync job
type Job struct {
    ID        string
    Error     string
    Status    string
    Progress  Progress
    Items     []*Transferable
    Chunked   bool
    mutex     *sync.Mutex
    StartTime time.Time
    EndTime   *time.Time
}

//NewJob creates a new job
func NewJob(id string) *Job {
    return &Job{
        ID:        id,
        StartTime: time.Now(),
        mutex:     &sync.Mutex{},
        Items:     make([]*Transferable, 0),
    }
}

Job 办法定义了 ID、Error、Status、Progress、Items、Chunked、mutex、StartTime、EndTime

Update

//Update updates job progress
func (j *Job) Update() {if len(j.Items) == 0 {return}
    sourceCount := 0
    destCount := 0
    transferred := 0
    for i := range j.Items {if j.Items[i].Status == nil {continue}
        sourceCount += j.Items[i].Source.Count()
        destCount += j.Items[i].Dest.Count()
        transferred += int(atomic.LoadUint32(&j.Items[i].Transferred))

    }
    j.Progress.Transferred = transferred
    j.Progress.SourceCount = sourceCount
    j.Progress.DestCount = destCount
    if sourceCount > 0 {j.Progress.Pct = transferred / sourceCount}
}

Update 办法遍历 Items,统计 transferred、sourceCount、destCount

Done

//Done flag job as done
func (j *Job) Done(now time.Time) {
    if j.Status != shared.StatusError {j.Status = shared.StatusDone}
    j.EndTime = &now
}

Done 办法更新 Status 和 EndTime

Add

//Add add transferable
func (j *Job) Add(transferable *Transferable) {j.mutex.Lock()
    defer j.mutex.Unlock()
    j.Items = append(j.Items, transferable)
}

Add 办法往 transferable 增加 Items

IsRunning

//IsRunning returns true if jos has running status
func (j *Job) IsRunning() bool {return j.Status == shared.StatusRunning || j.EndTime == nil}

IsRunning 办法通过 status 和 EndTime 来判断是否是 running

Service

//Service represents a job service
type Service interface {
    //List lists all active or recently active jobs
    List(request *ListRequest) *ListResponse
    //Create creates a new job
    Create(ID string) *core.Job
    //Get returns a job for supplied ID or nil
    Get(ID string) *core.Job
}

type service struct {registry *registry}

//New create a job service
func New() Service {
    return &service{registry: newRegistry(),
    }
}

Service 接口定义了 List、Create、Get

Get

//Get returns job by ID or nil
func (s *service) Get(ID string) *core.Job {jobs := s.registry.list()
    for i := range jobs {if jobs[i].ID == ID {jobs[i].Update()
            return jobs[i]
        }
    }
    return nil

}

Get 办法先执行 registry.list(),而后遍历 list 找到 ID 对应的 job,而后执行 Update

List

//List lists all jobs
func (s *service) List(request *ListRequest) *ListResponse {jobs := s.registry.list()
    if len(request.IDs) == 0 {
        return &ListResponse{Jobs: jobs,}
    }
    var requestedIDs = make(map[string]bool)
    for i := range request.IDs {requestedIDs[request.IDs[i]] = true
    }
    var filtered = make([]*core.Job, 0)
    for i := range jobs {if _, has := requestedIDs[jobs[i].ID]; !has {continue}
        jobs[i].Update()
        filtered = append(filtered, jobs[i])
    }
    return &ListResponse{Jobs: filtered,}
}

List 办法先执行 registry.list(),之后依据 requestedIDs 找出对应的 job,执行 Update,最初返回

Create

//Create creates a new job
func (s *service) Create(ID string) *core.Job {job := core.NewJob(ID)
    s.registry.add(job)
    return job
}

Create 办法通过 core.NewJob(ID) 创立 job,而后执行 registry.add(job)

小结

dbsync 的 Schedulable 定义了 URL、ID、*contract.Sync、Schedule、Status、status 属性,它提供了 Clone、Done、IsRunning、ScheduleNexRun、Init、Validate 办法。

doc

  • dbsync
退出移动版