本文次要钻研一下xxl-job-executor-go

Executor

//执行器type Executor interface {    //初始化    Init(...Option)    //日志查问    LogHandler(handler LogHandler)    //注册工作    RegTask(pattern string, task TaskFunc)    //运行工作    RunTask(writer http.ResponseWriter, request *http.Request)    //杀死工作    KillTask(writer http.ResponseWriter, request *http.Request)    //工作日志    TaskLog(writer http.ResponseWriter, request *http.Request)    //运行服务    Run() error}
Executor定义了Init、LogHandler、RegTask、RunTask、KillTask、TaskLog、Run办法

executor

type executor struct {    opts    Options    address string    regList *taskList //注册工作列表    runList *taskList //正在执行工作列表    mu      sync.RWMutex    log     Logger    logHandler LogHandler //日志查问handler}
executor定义了opts、address、regList、runList、mu、log、logHandler属性

Init

func (e *executor) Init(opts ...Option) {    for _, o := range opts {        o(&e.opts)    }    e.log = e.opts.l    e.regList = &taskList{        data: make(map[string]*Task),    }    e.runList = &taskList{        data: make(map[string]*Task),    }    e.address = e.opts.ExecutorIp + ":" + e.opts.ExecutorPort    go e.registry()}
Init办法遍历opts利用opt,而后初始化regList、runList、address,最初异步e.registry()

RegTask

//注册工作func (e *executor) RegTask(pattern string, task TaskFunc) {    var t = &Task{}    t.fn = task    e.regList.Set(pattern, t)    return}
RegTask办法往regList增加指定pattern的task

runTask

func (e *executor) runTask(writer http.ResponseWriter, request *http.Request) {    e.mu.Lock()    defer e.mu.Unlock()    req, _ := ioutil.ReadAll(request.Body)    param := &RunReq{}    err := json.Unmarshal(req, &param)    if err != nil {        _, _ = writer.Write(returnCall(param, 500, "params err"))        e.log.Error("参数解析谬误:" + string(req))        return    }    e.log.Info("工作参数:%v", param)    if !e.regList.Exists(param.ExecutorHandler) {        _, _ = writer.Write(returnCall(param, 500, "Task not registered"))        e.log.Error("工作[" + Int64ToStr(param.JobID) + "]没有注册:" + param.ExecutorHandler)        return    }    //阻塞策略解决    if e.runList.Exists(Int64ToStr(param.JobID)) {        if param.ExecutorBlockStrategy == coverEarly { //笼罩之前调度            oldTask := e.runList.Get(Int64ToStr(param.JobID))            if oldTask != nil {                oldTask.Cancel()                e.runList.Del(Int64ToStr(oldTask.Id))            }        } else { //单机串行,抛弃后续调度 都进行阻塞            _, _ = writer.Write(returnCall(param, 500, "There are tasks running"))            e.log.Error("工作[" + Int64ToStr(param.JobID) + "]曾经在运行了:" + param.ExecutorHandler)            return        }    }    cxt := context.Background()    task := e.regList.Get(param.ExecutorHandler)    if param.ExecutorTimeout > 0 {        task.Ext, task.Cancel = context.WithTimeout(cxt, time.Duration(param.ExecutorTimeout)*time.Second)    } else {        task.Ext, task.Cancel = context.WithCancel(cxt)    }    task.Id = param.JobID    task.Name = param.ExecutorHandler    task.Param = param    task.log = e.log    e.runList.Set(Int64ToStr(task.Id), task)    go task.Run(func(code int64, msg string) {        e.callback(task, code, msg)    })    e.log.Info("工作[" + Int64ToStr(param.JobID) + "]开始执行:" + param.ExecutorHandler)    _, _ = writer.Write(returnGeneral())}
runTask办法先判断task是否曾经注册了,则依据ExecutorBlockStrategy做不同解决,若是coverEarly则cancel掉已有的task;最初通过task.Run来异步执行工作

killTask

func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) {    e.mu.Lock()    defer e.mu.Unlock()    req, _ := ioutil.ReadAll(request.Body)    param := &killReq{}    _ = json.Unmarshal(req, &param)    if !e.runList.Exists(Int64ToStr(param.JobID)) {        _, _ = writer.Write(returnKill(param, 500))        e.log.Error("工作[" + Int64ToStr(param.JobID) + "]没有运行")        return    }    task := e.runList.Get(Int64ToStr(param.JobID))    task.Cancel()    e.runList.Del(Int64ToStr(param.JobID))    _, _ = writer.Write(returnGeneral())}
killTask办法则执行task.Cancel(),同时将其从runList移除

taskLog

func (e *executor) taskLog(writer http.ResponseWriter, request *http.Request) {    var res *LogRes    data, err := ioutil.ReadAll(request.Body)    req := &LogReq{}    if err != nil {        e.log.Error("日志申请失败:" + err.Error())        reqErrLogHandler(writer, req, err)        return    }    err = json.Unmarshal(data, &req)    if err != nil {        e.log.Error("日志申请解析失败:" + err.Error())        reqErrLogHandler(writer, req, err)        return    }    e.log.Info("日志申请参数:%+v", req)    if e.logHandler != nil {        res = e.logHandler(req)    } else {        res = defaultLogHandler(req)    }    str, _ := json.Marshal(res)    _, _ = writer.Write(str)}
taskLog办法通过e.logHandler(req)或者defaultLogHandler(req)来获取日志

小结

xxl-job-executor-go的Executor定义了Init、LogHandler、RegTask、RunTask、KillTask、TaskLog、Run办法;executor实现了Executor接口,并提供了http的api接口。

doc

  • xxl-job-executor-go