序
本文次要钻研一下 storagetapper 的 Lock
Lock
storagetapper/lock/lock.go
/*Lock is general distributed lock interface*/
type Lock interface {
// Try to acquire a lock. Returns false if failed.
TryLock(s string) bool
// Try to acquire a lock. Returns false if failed.
// Allows n simultaneous locks to be held
TryLockShared(s string, n int) bool
// Try to acquire a lock and wait for specified period of time for the lock
// to become available. Returns false if failed.
Lock(s string, waitDuration time.Duration) bool
// Check if we still have the lock. Try to reacquire if necessary.
// Returns false in the case of failure
Refresh() bool
// Unlock the lock. Returns false if there was failure
Unlock() bool
//Close releases resources associated with the lock
Close() bool}
Lock 接口定义了 TryLock、TryLockShared、Lock、Refresh、Unlock、Close 办法
myLock
storagetapper/lock/lock.go
type myLock struct {
conn *sql.DB
connID int64
name string
ci db.Addr
n int
mu sync.Mutex
isLocked bool
}
// Lock waits for the duration specified for the lock to be available
// Also TryLock will reuse the connection if it already exists otherwise
// it will create a new one.
func (m *myLock) Lock(s string, timeout time.Duration) bool {return m.lock(s, timeout, 1)
}
// TryLock tries to take a lock and returns an error if it cannot. If the lock
// is already held then TryLock is noop.
func (m *myLock) TryLock(s string) bool {return m.Lock(s, 0)
}
// TryLockShared tries to take a lock and returns an error if it cannot. If the lock
// is already held then TryLock is noop.
func (m *myLock) TryLockShared(s string, n int) bool {return m.lock(s, 0, n)
}
// Refresh tries to keep the lock fresh.
func (m *myLock) Refresh() bool {
if !m.isLocked {return true}
if m.IsLockedByMe() {return true}
return m.TryLock(m.name)
}
// Unlock releases locks associated with the connection
func (m *myLock) Unlock() bool {m.mu.Lock()
defer m.mu.Unlock()
if !m.isLocked {return false}
m.isLocked = false
if m.conn == nil {return false}
var res sql.NullBool
err := m.conn.QueryRow("SELECT RELEASE_LOCK(?)", m.lockName()).Scan(&res)
if log.EL(m.log(), err) {return false}
if !res.Valid {m.log().Errorf("Lock did not exists on release")
return false
}
if !res.Bool {m.log().Errorf("Lock was not hold by me")
return false
}
return true
}
func (m *myLock) Close() bool {return m.closeConn()
}
myLock 定义了 conn、connID、name、db.Addr、n、mu、isLocked 属性,它应用 db 实现了 Lock 接口;其 Lock、TryLock、TryLockShared 外部调用的是 lock 办法;Refresh 办法先判断是否还处于加锁状态,如果不是在判断是否是本人加锁的,如果不是则执行 TryLock;Unlock 通过 RELEASE_LOCK 来实现,另外还会更新 isLocked 为 false;Close 则会开释 conn
lock
storagetapper/lock/lock.go
// Lock waits for the duration specified for the lock to be available
// Also TryLock will reuse the connection if it already exists otherwise
// it will create a new one.
func (m *myLock) lock(s string, timeout time.Duration, ntickets int) bool {m.mu.Lock()
defer m.mu.Unlock()
var err error
var res sql.NullBool
m.name = s
for m.n = 0; m.n < ntickets; m.n++ {if !m.createConn() {return false}
err = m.conn.QueryRow("SELECT GET_LOCK(?,?)", m.lockName(), timeout/time.Second).Scan(&res)
//true - success, false - timeout, NULL - error
if err == nil && res.Valid && res.Bool {
m.isLocked = true
return true
}
if log.EL(m.log(), err) {m.closeConn()
}
}
return false
}
lock 办法执行 GET_LOCK,如果加锁胜利则设置 isLocked 为 true,否则持续重试,重试 ntickets 次
IsLockedByMe
storagetapper/lock/lock.go
// IsLockedByMe checks whether the lock is held by the same connection as
// the current lock object.
func (m *myLock) IsLockedByMe() bool {m.mu.Lock()
defer m.mu.Unlock()
var lockedBy int64
if m.conn == nil {return false}
err := m.conn.QueryRow("SELECT IFNULL(IS_USED_LOCK(?), 0)", m.lockName()).Scan(&lockedBy)
log.Debugf("lock: lockedBy: %v, myConnID: %v", lockedBy, m.connID)
if err != nil || m.connID == 0 || lockedBy != m.connID {
if err != nil {m.log().Errorf("IsLockedByMe: error:" + err.Error())
m.closeConn()} else {m.log().Debugf("IsLockedByMe: lockedBy: %v, != myConnID: %v", lockedBy, m.connID)
}
return false
}
return true
}
IsLockedByMe 通过 IS_USED_LOCK 来查问 lockedBy,之后判断 lockedBy 是否与 connID 相等,不等则返回 false
小结
storagetapper 的 Lock 接口定义了 TryLock、TryLockShared、Lock、Refresh、Unlock、Close 办法;myLock 定义了 conn、connID、name、db.Addr、n、mu、isLocked 属性,它应用 db 实现了 Lock 接口,它借助了 mysql 的 GET_LOCK、RELEASE_LOCK、IS_USED_LOCK 函数来实现。
doc
- storagetapper
- locking-functions