本文次要钻研一下machinery的Lock

Lock

type Lock interface {    //Acquire the lock with retry    //key: the name of the lock,    //value: at the nanosecond timestamp that lock needs to be released automatically    LockWithRetries(key string, value int64) error    //Acquire the lock with once    //key: the name of the lock,    //value: at the nanosecond timestamp that lock needs to be released automatically    Lock(key string, value int64) error}
Lock接口定义了LockWithRetries、Lock办法

redis/Lock

var (    ErrRedisLockFailed = errors.New("redis lock: failed to acquire lock"))type Lock struct {    rclient  redis.UniversalClient    retries  int    interval time.Duration}func New(cnf *config.Config, addrs []string, db, retries int) Lock {    if retries <= 0 {        return Lock{}    }    lock := Lock{retries: retries}    var password string    parts := strings.Split(addrs[0], "@")    if len(parts) == 2 {        password = parts[0]        addrs[0] = parts[1]    }    ropt := &redis.UniversalOptions{        Addrs:    addrs,        DB:       db,        Password: password,    }    if cnf.Redis != nil {        ropt.MasterName = cnf.Redis.MasterName    }    lock.rclient = redis.NewUniversalClient(ropt)    return lock}func (r Lock) LockWithRetries(key string, unixTsToExpireNs int64) error {    for i := 0; i <= r.retries; i++ {        err := r.Lock(key, unixTsToExpireNs)        if err == nil {            //胜利拿到锁,返回            return nil        }        time.Sleep(r.interval)    }    return ErrRedisLockFailed}func (r Lock) Lock(key string, unixTsToExpireNs int64) error {    now := time.Now().UnixNano()    expiration := time.Duration(unixTsToExpireNs + 1 - now)    ctx := r.rclient.Context()    success, err := r.rclient.SetNX(ctx, key, unixTsToExpireNs, expiration).Result()    if err != nil {        return err    }    if !success {        v, err := r.rclient.Get(ctx, key).Result()        if err != nil {            return err        }        timeout, err := strconv.Atoi(v)        if err != nil {            return err        }        if timeout != 0 && now > int64(timeout) {            newTimeout, err := r.rclient.GetSet(ctx, key, unixTsToExpireNs).Result()            if err != nil {                return err            }            curTimeout, err := strconv.Atoi(newTimeout)            if err != nil {                return err            }            if now > int64(curTimeout) {                // success to acquire lock with get set                // set the expiration of redis key                r.rclient.Expire(ctx, key, expiration)                return nil            }            return ErrRedisLockFailed        }        return ErrRedisLockFailed    }    return nil}
Lock定义了rclient、retries、interval属性;New办法依据cnf、addrs、db、retries创立lock;LockWithRetries办法依据retries次数来尝试r.Lock(key, unixTsToExpireNs),都没有胜利则返回ErrRedisLockFailed;Lock办法执行r.rclient.SetNX,如果不胜利则判断是否过期,过期的话执行执行r.rclient.GetSet,若的确过期了则执行r.rclient.Expire(ctx, key, expiration)更新新的过期工夫

小结

machinery的Lock接口定义了LockWithRetries、Lock办法;基于redis的实现则通过r.rclient.SetNX、r.rclient.GetSet、r.rclient.Expire实现。

doc

  • machinery