关于c++:pika-hash-结构解读


title: pika hash 表 常识总结
date: 2021-01-29 13:32:22
author: 李朋飞
tags:

  • pika
  • cache

本文次要对 pika 中 hash 数据结构的应用做一个小结。

<!–more–>

pika 是 360 开源的一个非关系型数据库,能够兼容 Redis 零碎的大部分命令。反对主从同步。次要的区别是 pika 反对的数据量不受内存的限度,仅和硬盘大小无关。底层应用了RocksDB 做 KV 数据的存储。

本文次要对Pika 的hash 数据结构做个小结。

命令反对

接口 状态
HDEL 反对
HEXISTS 反对
HGET 反对
HGETALL 反对
HINCRBY 反对
HINCRBYFLOAT 反对
HKEYS 反对
HLEN 反对
HMGET 反对
HMSET 反对
HSET 暂不反对单条命令设置多个field value,如有需要请用HMSET
HSETNX 反对
HVALS 反对
HSCAN 反对
HSTRLEN 反对

存储引擎

在做数据存储时,pika 会把hash 数据转成一层kv 构造做存储。例如,如果执行如下命令:

HSET key field value

会首先创立一个 hash 的 meta kv 值,数据格式如下:

为了保留field 和 value 值,将会再创立一个kv,格局如下:

从上图能够看出,每个field 的存储,会存储 整个hash 的key以及key的版本信息。

CRUD操作

CU 操作

例如 Hset 操作:

Status RedisHashes::HSet(const Slice& key, const Slice& field,
                         const Slice& value, int32_t* res) {
  rocksdb::WriteBatch batch;
  // 此操作须要加锁
  // 函数完结,锁解除
  ScopeRecordLock l(lock_mgr_, key);

  int32_t version = 0;
  uint32_t statistic = 0;
  std::string meta_value;
  // 获取meta 数据
  Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);

    if (parsed_hashes_meta_value.IsStale()
      || parsed_hashes_meta_value.count() == 0) {
      // 如果meta 存在,然而没有用到
      // 则间接更新meta & field & value
      version = parsed_hashes_meta_value.InitialMetaValue();
      parsed_hashes_meta_value.set_count(1);
      batch.Put(handles_[0], key, meta_value);
      HashesDataKey data_key(key, version, field);
      batch.Put(handles_[1], data_key.Encode(), value);
      *res = 1;
    } else {
      // 如果存在,且工夫未过期, 版本正确
      version = parsed_hashes_meta_value.version();
      std::string data_value;
      HashesDataKey hashes_data_key(key, version, field);

      // 获取field 数据
      s = db_->Get(default_read_options_,
          handles_[1], hashes_data_key.Encode(), &data_value);
      if (s.ok()) {
        // 如果以后存的field 数据正确
        *res = 0;
        if (data_value == value.ToString()) { // 值也相等,则不操作
          return Status::OK();
        } else {
          // 批改kv
          batch.Put(handles_[1], hashes_data_key.Encode(), value);
          statistic++;
        }
      } else if (s.IsNotFound()) {
          // 如果没有存在kv, 则增加,并更新meta
        parsed_hashes_meta_value.ModifyCount(1);
        batch.Put(handles_[0], key, meta_value);
        batch.Put(handles_[1], hashes_data_key.Encode(), value);
        *res = 1;
      } else {
        // 获取失败
        return s;
      }
    }
  } else if (s.IsNotFound()) {
    // 若meta 未找到, 编码,写入
    char str[4];
    EncodeFixed32(str, 1);
    HashesMetaValue meta_value(std::string(str, sizeof(int32_t)));
    version = meta_value.UpdateVersion();
    batch.Put(handles_[0], key, meta_value.Encode());
    HashesDataKey data_key(key, version, field);
    batch.Put(handles_[1], data_key.Encode(), value);
    *res = 1;
  } else {
    return s;
  }

  // 最初批量写
  s = db_->Write(default_write_options_, &batch);
  // 更新总的统计信息
  UpdateSpecificKeyStatistics(key.ToString(), statistic);
  return s;
}

能够看出,在做HSet 操作时,和料想中一样,会对 metadata 和 field value 同时操作,并须要同时更新,而且须要做锁操作。
因为pika是多线程的,在频繁拜访同一个hash中的数据时,会有大量的锁抵触呈现。

R 操作

Status RedisHashes::HGet(const Slice& key, const Slice& field,
                         std::string* value) {
  std::string meta_value;
  int32_t version = 0;
  rocksdb::ReadOptions read_options;
  const rocksdb::Snapshot* snapshot;
  ScopeSnapshot ss(db_, &snapshot);
  read_options.snapshot = snapshot;

  // 获取meta 数据
  Status s = db_->Get(read_options, handles_[0], key, &meta_value);
  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
    if (parsed_hashes_meta_value.IsStale()) {
    // 如果存在meta,且失效
      return Status::NotFound("Stale");
    } else if (parsed_hashes_meta_value.count() == 0) {
      return Status::NotFound();
    } else {
      // 获取key 值
      version = parsed_hashes_meta_value.version();
      HashesDataKey data_key(key, version, field);
      s = db_->Get(read_options, handles_[1], data_key.Encode(), value);
    }
  }
  return s;
}

读操作比较简单,不须要加锁。先获取metadata值,再通过metadata 计算出 field 存储key,再返回即可。

D 操作

删除操作有两种,一种是删除整个hash 表,一种是删除一个field。首先看下删除整个hash 表的操作。

Status RedisHashes::Del(const Slice& key) {
  std::string meta_value;
  ScopeRecordLock l(lock_mgr_, key); // 删除操作须要加锁
  Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
  // 获取 metadata 值
  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);

    if (parsed_hashes_meta_value.IsStale()) {
      // 如果生效了
      return Status::NotFound("Stale");
    } else if (parsed_hashes_meta_value.count() == 0) {
      // 无值
      return Status::NotFound();
    } else {
      // 更新统计值,更新meta_value 即可。 
      uint32_t statistic = parsed_hashes_meta_value.count();
      parsed_hashes_meta_value.InitialMetaValue();
      s = db_->Put(default_write_options_, handles_[0], key, meta_value);
      UpdateSpecificKeyStatistics(key.ToString(), statistic);
    }
  }
  return s;
}

这里有个须要留神的中央,hash 删表,并不是删除所有数据,只是把meta_value 值更新即可。(批改count值,工夫戳,以及version值)
因为field的key 是通过metadata 中的版本值计算出来的,因为meta_value 版本更新,所有 field value 均生效。
这个是pika 的一个个性,叫秒删性能。

上面是删除一个field 的办法:

// 从参数能够看出 HDEL 是反对同时删除多个field 的
Status RedisHashes::HDel(const Slice& key,
                         const std::vector<std::string>& fields,
                         int32_t* ret) {
  uint32_t statistic = 0;
  std::vector<std::string> filtered_fields;
  std::unordered_set<std::string> field_set;

  // field 去重
  for (auto iter = fields.begin(); iter != fields.end(); ++iter) {
    std::string field = *iter;
    if (field_set.find(field) == field_set.end()) {
      field_set.insert(field);
      filtered_fields.push_back(*iter);
    }
  }

  rocksdb::WriteBatch batch;
  rocksdb::ReadOptions read_options;
  const rocksdb::Snapshot* snapshot;

  std::string meta_value;
  int32_t del_cnt = 0;
  int32_t version = 0;

  // 加锁
  ScopeRecordLock l(lock_mgr_, key);
  ScopeSnapshot ss(db_, &snapshot);
  read_options.snapshot = snapshot;
  Status s = db_->Get(read_options, handles_[0], key, &meta_value);
  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
    if (parsed_hashes_meta_value.IsStale()
      || parsed_hashes_meta_value.count() == 0) {
      *ret = 0;
      return Status::OK();
    } else {
      std::string data_value;
      version = parsed_hashes_meta_value.version();
      // 遍历所有数据,并删除
      for (const auto& field : filtered_fields) {
        HashesDataKey hashes_data_key(key, version, field);
        s = db_->Get(read_options, handles_[1],
                hashes_data_key.Encode(), &data_value);
        if (s.ok()) {
          del_cnt++;
          statistic++;
          batch.Delete(handles_[1], hashes_data_key.Encode());
        } else if (s.IsNotFound()) {
          continue;
        } else {
          return s;
        }
      }
      *ret = del_cnt;
      parsed_hashes_meta_value.ModifyCount(-del_cnt);
      batch.Put(handles_[0], key, meta_value);
    }
  } else if (s.IsNotFound()) {
    *ret = 0;
    return Status::OK();
  } else {
    return s;
  }
  s = db_->Write(default_write_options_, &batch);
  UpdateSpecificKeyStatistics(key.ToString(), statistic);
  return s;
}

HDEL 操作反对批量操作。

数据的清理

下面提到了,pika 对于 hash 做了秒删的性能,那秒删之后field中的数据,如何做清理工作呢?
通过钻研,发现其实pika没有做被动删除的逻辑,只是通过rocksDB 的compaction(数据压缩)来实现的被动删除。

rocksDB 的compaction, 次要是为了压缩内存和硬盘的应用空间,晋升查找速度。在做数据压缩时,提供了可定制的filter 接口。在pika 中,就是通过实现该接口来做秒删性能的。

// 针对存储数据的k-v 构造的过滤 (还有一种meta 数据过滤的办法)
class BaseDataFilter : public rocksdb::CompactionFilter {
 public:
  BaseDataFilter(rocksdb::DB* db,
                 std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr) :
    db_(db),
    cf_handles_ptr_(cf_handles_ptr),
    cur_key_(""),
    meta_not_found_(false),
    cur_meta_version_(0),
    cur_meta_timestamp_(0) {}

  bool Filter(int level, const Slice& key,
              const rocksdb::Slice& value,
              std::string* new_value, bool* value_changed) const override {
    ParsedBaseDataKey parsed_base_data_key(key);
    Trace("==========================START==========================");
    Trace("[DataFilter], key: %s, data = %s, version = %d",
          parsed_base_data_key.key().ToString().c_str(),
          parsed_base_data_key.data().ToString().c_str(),
          parsed_base_data_key.version());

    // 如果是简单数据结构的key, 两个值不相等,须要取meta中的版本和工夫戳
    if (parsed_base_data_key.key().ToString() != cur_key_) {
      cur_key_ = parsed_base_data_key.key().ToString();
      std::string meta_value;
      // destroyed when close the database, Reserve Current key value
      if (cf_handles_ptr_->size() == 0) {
        return false;
      }
      // 基于datakey,算出metakey
      // 查看meta 的状态
      Status s = db_->Get(default_read_options_,
              (*cf_handles_ptr_)[0], cur_key_, &meta_value);
      if (s.ok()) {
        meta_not_found_ = false;
        ParsedBaseMetaValue parsed_base_meta_value(&meta_value);
        cur_meta_version_ = parsed_base_meta_value.version();
        cur_meta_timestamp_ = parsed_base_meta_value.timestamp();
      } else if (s.IsNotFound()) {
        meta_not_found_ = true;
      } else {
        cur_key_ = "";
        Trace("Reserve[Get meta_key faild]");
        return false;
      }
    }

    if (meta_not_found_) {
      Trace("Drop[Meta key not exist]");
      return true;
    }

    //判断版本和过期工夫
    int64_t unix_time;
    rocksdb::Env::Default()->GetCurrentTime(&unix_time);
    if (cur_meta_timestamp_ != 0
      && cur_meta_timestamp_ < static_cast<int32_t>(unix_time)) {
      Trace("Drop[Timeout]");
      return true;
    }

    if (cur_meta_version_ > parsed_base_data_key.version()) {
      Trace("Drop[data_key_version < cur_meta_version]");
      return true;
    } else {
      Trace("Reserve[data_key_version == cur_meta_version]");
      return false;
    }
  }

  const char* Name() const override { return "BaseDataFilter"; }

 private:
  rocksdb::DB* db_;
  std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr_;
  rocksdb::ReadOptions default_read_options_;
  mutable std::string cur_key_;
  mutable bool meta_not_found_;
  mutable int32_t cur_meta_version_;
  mutable int32_t cur_meta_timestamp_;
};

从上述代码中能够看出,其实在pika中,对于大批量的数据(比方list,hash,set 等数据结构)均是具备秒删性能的,应用秒删性能比间接删除一方面能够节俭执行工夫,另一方面能够缩小内存碎片,算是以空间换工夫的典型例子了。

学习小结

  1. pika 中,能够存在雷同的key 不同存储类型的数据。
  2. hash 存储值不超过 2^32 , 因为 hash size 存储在 4bytes 的空间中。
  3. 从Hset中,能够看到,在设计数据结构时,尽量减小 hash 中key 值的数量,缩小锁meta的工夫。
  4. pika hash 构造具备秒删性能,对于大批量数据的hash 后果,删除操作和失常命令一样会疾速执行。(这个和redis有肯定区别)

备注:本文源码来自 github.com/Qihoo360/blackwidow 中。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理