本文次要钻研一下storagetapper的Lock

Lock

storagetapper/lock/lock.go

/*Lock is general distributed lock interface*/type Lock interface {    // Try to acquire a lock. Returns false if failed.    TryLock(s string) bool    // Try to acquire a lock. Returns false if failed.    // Allows n simultaneous locks to be held    TryLockShared(s string, n int) bool    // Try to acquire a lock and wait for specified period of time for the lock    // to become available. Returns false if failed.    Lock(s string, waitDuration time.Duration) bool    // Check if we still have the lock. Try to reacquire if necessary.    // Returns false in the case of failure    Refresh() bool    // Unlock the lock. Returns false if there was failure    Unlock() bool    //Close releases resources associated with the lock    Close() bool}
Lock接口定义了TryLock、TryLockShared、Lock、Refresh、Unlock、Close办法

myLock

storagetapper/lock/lock.go

type myLock struct {    conn     *sql.DB    connID   int64    name     string    ci       db.Addr    n        int    mu       sync.Mutex    isLocked bool}// Lock waits for the duration specified for the lock to be available// Also TryLock will reuse the connection if it already exists otherwise// it will create a new one.func (m *myLock) Lock(s string, timeout time.Duration) bool {    return m.lock(s, timeout, 1)}// TryLock tries to take a lock and returns an error if it cannot. If the lock// is already held then TryLock is noop.func (m *myLock) TryLock(s string) bool {    return m.Lock(s, 0)}// TryLockShared tries to take a lock and returns an error if it cannot. If the lock// is already held then TryLock is noop.func (m *myLock) TryLockShared(s string, n int) bool {    return m.lock(s, 0, n)}// Refresh tries to keep the lock fresh.func (m *myLock) Refresh() bool {    if !m.isLocked {        return true    }    if m.IsLockedByMe() {        return true    }    return m.TryLock(m.name)}// Unlock releases locks associated with the connectionfunc (m *myLock) Unlock() bool {    m.mu.Lock()    defer m.mu.Unlock()    if !m.isLocked {        return false    }    m.isLocked = false    if m.conn == nil {        return false    }    var res sql.NullBool    err := m.conn.QueryRow("SELECT RELEASE_LOCK(?)", m.lockName()).Scan(&res)    if log.EL(m.log(), err) {        return false    }    if !res.Valid {        m.log().Errorf("Lock did not exists on release")        return false    }    if !res.Bool {        m.log().Errorf("Lock was not hold by me")        return false    }    return true}func (m *myLock) Close() bool {    return m.closeConn()}
myLock定义了conn、connID、name、db.Addr、n、mu、isLocked属性,它应用db实现了Lock接口;其Lock、TryLock、TryLockShared外部调用的是lock办法;Refresh办法先判断是否还处于加锁状态,如果不是在判断是否是本人加锁的,如果不是则执行TryLock;Unlock通过RELEASE_LOCK来实现,另外还会更新isLocked为false;Close则会开释conn

lock

storagetapper/lock/lock.go

// Lock waits for the duration specified for the lock to be available// Also TryLock will reuse the connection if it already exists otherwise// it will create a new one.func (m *myLock) lock(s string, timeout time.Duration, ntickets int) bool {    m.mu.Lock()    defer m.mu.Unlock()    var err error    var res sql.NullBool    m.name = s    for m.n = 0; m.n < ntickets; m.n++ {        if !m.createConn() {            return false        }        err = m.conn.QueryRow("SELECT GET_LOCK(?,?)", m.lockName(), timeout/time.Second).Scan(&res)        //true - success, false - timeout, NULL - error        if err == nil && res.Valid && res.Bool {            m.isLocked = true            return true        }        if log.EL(m.log(), err) {            m.closeConn()        }    }    return false}
lock办法执行GET_LOCK,如果加锁胜利则设置isLocked为true,否则持续重试,重试ntickets次

IsLockedByMe

storagetapper/lock/lock.go

// IsLockedByMe checks whether the lock is held by the same connection as// the current lock object.func (m *myLock) IsLockedByMe() bool {    m.mu.Lock()    defer m.mu.Unlock()    var lockedBy int64    if m.conn == nil {        return false    }    err := m.conn.QueryRow("SELECT IFNULL(IS_USED_LOCK(?), 0)", m.lockName()).Scan(&lockedBy)    log.Debugf("lock: lockedBy: %v, myConnID: %v", lockedBy, m.connID)    if err != nil || m.connID == 0 || lockedBy != m.connID {        if err != nil {            m.log().Errorf("IsLockedByMe: error: " + err.Error())            m.closeConn()        } else {            m.log().Debugf("IsLockedByMe: lockedBy: %v, != myConnID: %v", lockedBy, m.connID)        }        return false    }    return true}
IsLockedByMe通过IS_USED_LOCK来查问lockedBy,之后判断lockedBy是否与connID相等,不等则返回false

小结

storagetapper的Lock接口定义了TryLock、TryLockShared、Lock、Refresh、Unlock、Close办法;myLock定义了conn、connID、name、db.Addr、n、mu、isLocked属性,它应用db实现了Lock接口,它借助了mysql的GET_LOCK、RELEASE_LOCK、IS_USED_LOCK函数来实现。

doc

  • storagetapper
  • locking-functions