关于golang:聊聊storagetapper的Lock

3次阅读

共计 3720 个字符,预计需要花费 10 分钟才能阅读完成。

本文次要钻研一下 storagetapper 的 Lock

Lock

storagetapper/lock/lock.go

/*Lock is general distributed lock interface*/
type Lock interface {
    // Try to acquire a lock. Returns false if failed.
    TryLock(s string) bool
    // Try to acquire a lock. Returns false if failed.
    // Allows n simultaneous locks to be held
    TryLockShared(s string, n int) bool

    // Try to acquire a lock and wait for specified period of time for the lock
    // to become available. Returns false if failed.
    Lock(s string, waitDuration time.Duration) bool

    // Check if we still have the lock. Try to reacquire if necessary.
    // Returns false in the case of failure
    Refresh() bool

    // Unlock the lock. Returns false if there was failure
    Unlock() bool

    //Close releases resources associated with the lock
    Close() bool}

Lock 接口定义了 TryLock、TryLockShared、Lock、Refresh、Unlock、Close 办法

myLock

storagetapper/lock/lock.go

type myLock struct {
    conn     *sql.DB
    connID   int64
    name     string
    ci       db.Addr
    n        int
    mu       sync.Mutex
    isLocked bool
}

// Lock waits for the duration specified for the lock to be available
// Also TryLock will reuse the connection if it already exists otherwise
// it will create a new one.
func (m *myLock) Lock(s string, timeout time.Duration) bool {return m.lock(s, timeout, 1)
}

// TryLock tries to take a lock and returns an error if it cannot. If the lock
// is already held then TryLock is noop.
func (m *myLock) TryLock(s string) bool {return m.Lock(s, 0)
}

// TryLockShared tries to take a lock and returns an error if it cannot. If the lock
// is already held then TryLock is noop.
func (m *myLock) TryLockShared(s string, n int) bool {return m.lock(s, 0, n)
}

// Refresh tries to keep the lock fresh.
func (m *myLock) Refresh() bool {
    if !m.isLocked {return true}
    if m.IsLockedByMe() {return true}

    return m.TryLock(m.name)
}

// Unlock releases locks associated with the connection
func (m *myLock) Unlock() bool {m.mu.Lock()
    defer m.mu.Unlock()
    if !m.isLocked {return false}
    m.isLocked = false
    if m.conn == nil {return false}
    var res sql.NullBool
    err := m.conn.QueryRow("SELECT RELEASE_LOCK(?)", m.lockName()).Scan(&res)
    if log.EL(m.log(), err) {return false}
    if !res.Valid {m.log().Errorf("Lock did not exists on release")
        return false
    }
    if !res.Bool {m.log().Errorf("Lock was not hold by me")
        return false
    }
    return true
}

func (m *myLock) Close() bool {return m.closeConn()
}

myLock 定义了 conn、connID、name、db.Addr、n、mu、isLocked 属性,它应用 db 实现了 Lock 接口;其 Lock、TryLock、TryLockShared 外部调用的是 lock 办法;Refresh 办法先判断是否还处于加锁状态,如果不是在判断是否是本人加锁的,如果不是则执行 TryLock;Unlock 通过 RELEASE_LOCK 来实现,另外还会更新 isLocked 为 false;Close 则会开释 conn

lock

storagetapper/lock/lock.go

// Lock waits for the duration specified for the lock to be available
// Also TryLock will reuse the connection if it already exists otherwise
// it will create a new one.
func (m *myLock) lock(s string, timeout time.Duration, ntickets int) bool {m.mu.Lock()
    defer m.mu.Unlock()

    var err error
    var res sql.NullBool
    m.name = s
    for m.n = 0; m.n < ntickets; m.n++ {if !m.createConn() {return false}
        err = m.conn.QueryRow("SELECT GET_LOCK(?,?)", m.lockName(), timeout/time.Second).Scan(&res)
        //true - success, false - timeout, NULL - error
        if err == nil && res.Valid && res.Bool {
            m.isLocked = true
            return true
        }
        if log.EL(m.log(), err) {m.closeConn()
        }
    }
    return false
}

lock 办法执行 GET_LOCK,如果加锁胜利则设置 isLocked 为 true,否则持续重试,重试 ntickets 次

IsLockedByMe

storagetapper/lock/lock.go

// IsLockedByMe checks whether the lock is held by the same connection as
// the current lock object.
func (m *myLock) IsLockedByMe() bool {m.mu.Lock()
    defer m.mu.Unlock()

    var lockedBy int64
    if m.conn == nil {return false}

    err := m.conn.QueryRow("SELECT IFNULL(IS_USED_LOCK(?), 0)", m.lockName()).Scan(&lockedBy)
    log.Debugf("lock: lockedBy: %v, myConnID: %v", lockedBy, m.connID)

    if err != nil || m.connID == 0 || lockedBy != m.connID {
        if err != nil {m.log().Errorf("IsLockedByMe: error:" + err.Error())
            m.closeConn()} else {m.log().Debugf("IsLockedByMe: lockedBy: %v, != myConnID: %v", lockedBy, m.connID)
        }
        return false
    }
    return true
}

IsLockedByMe 通过 IS_USED_LOCK 来查问 lockedBy,之后判断 lockedBy 是否与 connID 相等,不等则返回 false

小结

storagetapper 的 Lock 接口定义了 TryLock、TryLockShared、Lock、Refresh、Unlock、Close 办法;myLock 定义了 conn、connID、name、db.Addr、n、mu、isLocked 属性,它应用 db 实现了 Lock 接口,它借助了 mysql 的 GET_LOCK、RELEASE_LOCK、IS_USED_LOCK 函数来实现。

doc

  • storagetapper
  • locking-functions
正文完
 0