乐趣区

数据库内核RocksDB事务锁设计与实现

本文主要介绍 RocksDB 锁结构设计、加锁解锁过程,并与 InnoDB 锁实现做一个简单对比。

本文由作者授权发布,未经许可,请勿转载。

作者:王刚,网易杭研数据库内核开发工程师

MyRocks 引擎目前是支持行锁的,包括共享锁和排它锁,主要是在 RocksDB 层面实现的,与 InnoDB 引擎的锁系统相比,简单很多。

本文主要介绍 RocksDB 锁结构设计、加锁解锁过程,并与 InnoDB 锁实现做一个简单对比。

事务锁的实现类是:TransactionLockMgr,它的主要数据成员包括:

private:
  PessimisticTransactionDB* txn_db_impl_;
  // 默认 16 个 lock map 分片
  const size_t default_num_stripes_;
  // 每个 column family 最大行锁数
  const int64_t max_num_locks_;
  // lock map 互斥锁
  InstrumentedMutex lock_map_mutex_;

  // Map of ColumnFamilyId to locked key info
  using LockMaps = std::unordered_map<uint32_t, std::shared_ptr<LockMap>>;
  LockMaps lock_maps_;
  std::unique_ptr<ThreadLocalPtr> lock_maps_cache_;
  // Must be held when modifying wait_txn_map_ and rev_wait_txn_map_.
  std::mutex wait_txn_map_mutex_;
  // Maps from waitee -> number of waiters.
  HashMap<TransactionID, int> rev_wait_txn_map_;
  // Maps from waiter -> waitee.
  HashMap<TransactionID, TrackedTrxInfo> wait_txn_map_;
  DeadlockInfoBuffer dlock_buffer_;
  // Used to allocate mutexes/condvars to use when locking keys
  std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;

加锁的入口函数是:TransactionLockMgr::TryLock

Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, // 加锁的事务
                                   uint32_t column_family_id, // 所属的 CF
                                   const std::string& key, // 加锁的健 Env* env,
                                   bool exclusive // 是否排它锁 ) {
  // Lookup lock map for this column family id
  std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); //1. 根据 cf id 查找其 LockMap
  LockMap* lock_map = lock_map_ptr.get();
  if (lock_map == nullptr) {char msg[255];
    snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
             column_family_id);

    return Status::InvalidArgument(msg);
  }

  // Need to lock the mutex for the stripe that this key hashes to
  size_t stripe_num = lock_map->GetStripe(key);// 2. 根据 key 的哈希获取 stripe_num, 默认 16 个 stripe
  assert(lock_map->lock_map_stripes_.size() > stripe_num);
  LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);

  LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
  int64_t timeout = txn->GetLockTimeout();

  return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
                            timeout, lock_info); // 实际加锁函数 

GetLockMap(column_family_id) 函数根据 cf id 查找其 LockMap , 其逻辑包括两步:

  1. 在 thread 本地缓存 lock_maps_cache 中查找;
  2. 第 1 步没有查找到,则去全局的 lock_map_ 中查找。
std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(uint32_t column_family_id) {
  // First check thread-local cache
  if (lock_maps_cache_->Get() == nullptr) {lock_maps_cache_->Reset(new LockMaps());
  }
  auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
  auto lock_map_iter = lock_maps_cache->find(column_family_id);
  if (lock_map_iter != lock_maps_cache->end()) {
    // Found lock map for this column family.
    return lock_map_iter->second;
  }
  // Not found in local cache, grab mutex and check shared LockMaps
  InstrumentedMutexLock l(&lock_map_mutex_);
  lock_map_iter = lock_maps_.find(column_family_id);
  if (lock_map_iter == lock_maps_.end()) {return std::shared_ptr<LockMap>(nullptr);
  } else {
    // Found lock map.  Store in thread-local cache and return.
    std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
    lock_maps_cache->insert({column_family_id, lock_map});
    return lock_map;
  }
}

lock_maps_ 是全局锁结构:

 // Map of ColumnFamilyId to locked key info
  using LockMaps = std::unordered_map<uint32_t, std::shared_ptr<LockMap>>;
  LockMaps lock_maps_;

LockMap 是每个 CF 的锁结构:

struct LockMap {

  // Number of sepearate LockMapStripes to create, each with their own Mutex
  const size_t num_stripes_;

  // Count of keys that are currently locked in this column family.
  // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
  std::atomic<int64_t> lock_cnt{0};

  std::vector<LockMapStripe*> lock_map_stripes_;

};

为了减少加锁时 mutex 的争用,LockMap 内部又进行了分片,num_stripes_ = 16(默认值),
LockMapStripe 是每个分片的锁结构:

struct LockMapStripe {

  // Mutex must be held before modifying keys map
  std::shared_ptr<TransactionDBMutex> stripe_mutex;

  // Condition Variable per stripe for waiting on a lock
  std::shared_ptr<TransactionDBCondVar> stripe_cv;

  // Locked keys mapped to the info about the transactions that locked them.
  // TODO(agiardullo): Explore performance of other data structures.
  std::unordered_map<std::string, LockInfo> keys;
};

LockMapStripe 内部还是一个 unordered_map, 还包括 stripe_mutex、stripe_cv。这样设计避免了一把大锁的尴尬,减小锁的粒度常用的方法,LockInfo 包含事务 id:

struct LockInfo {
  bool exclusive;
  autovector<TransactionID> txn_ids;
  // Transaction locks are not valid after this time in us
  uint64_t expiration_time;
};

LockMaps、LockMap、LockMapStripe、LockInfo 就是 RocksDB 事务锁用到的数据结构了,可以看到并不复杂,代码实现起来简单,代价当然也有,后文在介绍再介绍。

AcquireWithTimeout 函数内部先获取 stripe mutex,获取到了在进入 AcquireLocked 函数:

if (timeout < 0) {
    // If timeout is negative, we wait indefinitely to acquire the lock
    result = stripe->stripe_mutex->Lock();} else {result = stripe->stripe_mutex->TryLockFor(timeout);
  }

获取了 stripe_mutex 之后,准备获取锁:

// Acquire lock if we are able to
  uint64_t expire_time_hint = 0;
  autovector<TransactionID> wait_ids;
  result = AcquireLocked(lock_map, stripe, key, env, lock_info,
                         &expire_time_hint, &wait_ids);

AcquireLocked 函数实现获取锁逻辑,它的实现逻辑是:

  1. 在 stripe 的 map 中查找该 key 是否已经被锁住。
  2. 如果 key 没有被锁住,判断是否超过了 max_num_locks_,没超过则在 stripe 的 map 中插入 {key, txn_lock_info},超过了 max_num_locks_,加锁失败,返回状态信息。
  3. 如果 key 已经被锁住了,要判断加在 key 上的锁是排它锁还是共享锁,如果是共享锁,那事务的加锁请求可以满足;
  4. 如果是排它锁,如果是同一事务,加锁请求可以满足,如果不是同一事务,如果锁没有超时,则加锁请求失败,否则抢占过来。

以上是加锁过程,解锁过程类似,也是需要根据 cf_id 和 key 计算出落到哪个 stripe 上,然后就是从 map 中把数据清理掉,同时还要唤醒该 stripe 上的等待线程,这个唤醒的粒度有点大。

void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
                                uint32_t column_family_id,
                                const std::string& key, Env* env) {std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);// 通过 cf_id 获取 Lock_Map
  LockMap* lock_map = lock_map_ptr.get();
  if (lock_map == nullptr) {
    // Column Family must have been dropped.
    return;
  }
  // Lock the mutex for the stripe that this key hashes to
  size_t stripe_num = lock_map->GetStripe(key);// 根据 key 计算落到哪个 stripe 上
  assert(lock_map->lock_map_stripes_.size() > stripe_num);
  LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  stripe->stripe_mutex->Lock();
  UnLockKey(txn, key, stripe, lock_map, env); // 从锁的 map 中清理掉该 key
  stripe->stripe_mutex->UnLock();
  // Signal waiting threads to retry locking
  stripe->stripe_cv->NotifyAll();}

rocksdb lock 整体结构如下:

如果一个事务需要锁住大量记录,rocksdb 锁的实现方式可能要比 innodb 消耗更多的内存,innodb 的锁结构如下图所示:

由于锁信息是常驻内存,我们简单分析下 RocksDB 锁占用的内存。每个锁实际上是 unordered_map 中的一个元素,则锁占用的内存为 key_length+8+8+1,假设 key 为 bigint,占 8 个字节,则 100w 行记录,需要消耗大约 22M 内存。但是由于内存与 key_length 正相关,导致 RocksDB 的内存消耗不可控。我们可以简单算算 RocksDB 作为 MySQL 存储引擎时,key_length 的范围。对于单列索引,最大值为 2048 个字节,具体可以参考 max_supported_key_part_length 实现;对于复合索引,索引最大长度为 3072 个字节,具体可以参考 max_supported_key_length 实现。

假设最坏的情况,key_length=3072,则 100w 行记录,需要消耗 3G 内存,如果是锁 1 亿行记录,则需要消耗 300G 内存,这种情况下内存会有撑爆的风险。因此 RocksDB 提供参数配置 rocksdb_max_row_locks,确保内存可控,默认 rocksdb_max_row_locks 设置为 1048576,对于大部分 key 为 bigint 场景,极端情况下,也需要消耗 22G 内存。而在这方面,InnoDB 则比较友好,hash 表的 key 是 (space_id, page_no),所以无论 key 有多大,key 部分的内存消耗都是恒定的。InnoDB 在一个事务需要锁大量记录场景下是有优化的,多个记录可以公用一把锁,这样也间接可以减少内存。

总结

RocksDB 事务锁的实现整体来说不复杂,只支持行锁,还不支持 gap lock,锁占的资源也比较大,可通过 rocksdb_max_row_locks 限制事务施加行锁的数量。

利益相关:

网易轻舟微服务,提供分布式事务框架 GTXS,支持跨服务事务、跨数据源事务、混合事务、事务状态监控、异常事务处理等能力,应用案例 工商银行。

退出移动版