关于c++:pika-hash-结构解读

2次阅读

共计 8301 个字符,预计需要花费 21 分钟才能阅读完成。


title: pika hash 表 常识总结
date: 2021-01-29 13:32:22
author: 李朋飞
tags:

  • pika
  • cache

本文次要对 pika 中 hash 数据结构的应用做一个小结。

<!–more–>

pika 是 360 开源的一个非关系型数据库,能够兼容 Redis 零碎的大部分命令。反对主从同步。次要的区别是 pika 反对的数据量不受内存的限度,仅和硬盘大小无关。底层应用了 RocksDB 做 KV 数据的存储。

本文次要对 Pika 的 hash 数据结构做个小结。

命令反对

接口 状态
HDEL 反对
HEXISTS 反对
HGET 反对
HGETALL 反对
HINCRBY 反对
HINCRBYFLOAT 反对
HKEYS 反对
HLEN 反对
HMGET 反对
HMSET 反对
HSET 暂不反对单条命令设置多个 field value,如有需要请用 HMSET
HSETNX 反对
HVALS 反对
HSCAN 反对
HSTRLEN 反对

存储引擎

在做数据存储时,pika 会把 hash 数据转成一层 kv 构造做存储。例如,如果执行如下命令:

HSET key field value

会首先创立一个 hash 的 meta kv 值,数据格式如下:

为了保留 field 和 value 值,将会再创立一个 kv,格局如下:

从上图能够看出,每个 field 的存储,会存储 整个 hash 的 key 以及 key 的版本信息。

CRUD 操作

CU 操作

例如 Hset 操作:

Status RedisHashes::HSet(const Slice& key, const Slice& field,
                         const Slice& value, int32_t* res) {
  rocksdb::WriteBatch batch;
  // 此操作须要加锁
  // 函数完结,锁解除
  ScopeRecordLock l(lock_mgr_, key);

  int32_t version = 0;
  uint32_t statistic = 0;
  std::string meta_value;
  // 获取 meta 数据
  Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
  if (s.ok()) {ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);

    if (parsed_hashes_meta_value.IsStale()
      || parsed_hashes_meta_value.count() == 0) {
      // 如果 meta 存在,然而没有用到
      // 则间接更新 meta & field & value
      version = parsed_hashes_meta_value.InitialMetaValue();
      parsed_hashes_meta_value.set_count(1);
      batch.Put(handles_[0], key, meta_value);
      HashesDataKey data_key(key, version, field);
      batch.Put(handles_[1], data_key.Encode(), value);
      *res = 1;
    } else {
      // 如果存在,且工夫未过期, 版本正确
      version = parsed_hashes_meta_value.version();
      std::string data_value;
      HashesDataKey hashes_data_key(key, version, field);

      // 获取 field 数据
      s = db_->Get(default_read_options_,
          handles_[1], hashes_data_key.Encode(), &data_value);
      if (s.ok()) {
        // 如果以后存的 field 数据正确
        *res = 0;
        if (data_value == value.ToString()) { // 值也相等,则不操作
          return Status::OK();} else {
          // 批改 kv
          batch.Put(handles_[1], hashes_data_key.Encode(), value);
          statistic++;
        }
      } else if (s.IsNotFound()) {
          // 如果没有存在 kv, 则增加,并更新 meta
        parsed_hashes_meta_value.ModifyCount(1);
        batch.Put(handles_[0], key, meta_value);
        batch.Put(handles_[1], hashes_data_key.Encode(), value);
        *res = 1;
      } else {
        // 获取失败
        return s;
      }
    }
  } else if (s.IsNotFound()) {
    // 若 meta 未找到, 编码,写入
    char str[4];
    EncodeFixed32(str, 1);
    HashesMetaValue meta_value(std::string(str, sizeof(int32_t)));
    version = meta_value.UpdateVersion();
    batch.Put(handles_[0], key, meta_value.Encode());
    HashesDataKey data_key(key, version, field);
    batch.Put(handles_[1], data_key.Encode(), value);
    *res = 1;
  } else {return s;}

  // 最初批量写
  s = db_->Write(default_write_options_, &batch);
  // 更新总的统计信息
  UpdateSpecificKeyStatistics(key.ToString(), statistic);
  return s;
}

能够看出,在做 HSet 操作时,和料想中一样,会对 metadata 和 field value 同时操作,并须要同时更新,而且须要做锁操作。
因为 pika 是多线程的,在频繁拜访同一个 hash 中的数据时,会有大量的锁抵触呈现。

R 操作

Status RedisHashes::HGet(const Slice& key, const Slice& field,
                         std::string* value) {
  std::string meta_value;
  int32_t version = 0;
  rocksdb::ReadOptions read_options;
  const rocksdb::Snapshot* snapshot;
  ScopeSnapshot ss(db_, &snapshot);
  read_options.snapshot = snapshot;

  // 获取 meta 数据
  Status s = db_->Get(read_options, handles_[0], key, &meta_value);
  if (s.ok()) {ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
    if (parsed_hashes_meta_value.IsStale()) {
    // 如果存在 meta,且失效
      return Status::NotFound("Stale");
    } else if (parsed_hashes_meta_value.count() == 0) {return Status::NotFound();
    } else {
      // 获取 key 值
      version = parsed_hashes_meta_value.version();
      HashesDataKey data_key(key, version, field);
      s = db_->Get(read_options, handles_[1], data_key.Encode(), value);
    }
  }
  return s;
}

读操作比较简单,不须要加锁。先获取 metadata 值,再通过 metadata 计算出 field 存储 key,再返回即可。

D 操作

删除操作有两种,一种是删除整个 hash 表,一种是删除一个 field。首先看下删除整个 hash 表的操作。

Status RedisHashes::Del(const Slice& key) {
  std::string meta_value;
  ScopeRecordLock l(lock_mgr_, key); // 删除操作须要加锁
  Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
  // 获取 metadata 值
  if (s.ok()) {ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);

    if (parsed_hashes_meta_value.IsStale()) {
      // 如果生效了
      return Status::NotFound("Stale");
    } else if (parsed_hashes_meta_value.count() == 0) {
      // 无值
      return Status::NotFound();} else {
      // 更新统计值,更新 meta_value 即可。uint32_t statistic = parsed_hashes_meta_value.count();
      parsed_hashes_meta_value.InitialMetaValue();
      s = db_->Put(default_write_options_, handles_[0], key, meta_value);
      UpdateSpecificKeyStatistics(key.ToString(), statistic);
    }
  }
  return s;
}

这里有个须要留神的中央,hash 删表,并不是删除所有数据,只是把 meta_value 值更新即可。(批改 count 值,工夫戳,以及 version 值)
因为 field 的 key 是通过 metadata 中的版本值计算出来的,因为 meta_value 版本更新,所有 field value 均生效。
这个是 pika 的一个个性,叫秒删性能。

上面是删除一个 field 的办法:

// 从参数能够看出 HDEL 是反对同时删除多个 field 的
Status RedisHashes::HDel(const Slice& key,
                         const std::vector<std::string>& fields,
                         int32_t* ret) {
  uint32_t statistic = 0;
  std::vector<std::string> filtered_fields;
  std::unordered_set<std::string> field_set;

  // field 去重
  for (auto iter = fields.begin(); iter != fields.end(); ++iter) {
    std::string field = *iter;
    if (field_set.find(field) == field_set.end()) {field_set.insert(field);
      filtered_fields.push_back(*iter);
    }
  }

  rocksdb::WriteBatch batch;
  rocksdb::ReadOptions read_options;
  const rocksdb::Snapshot* snapshot;

  std::string meta_value;
  int32_t del_cnt = 0;
  int32_t version = 0;

  // 加锁
  ScopeRecordLock l(lock_mgr_, key);
  ScopeSnapshot ss(db_, &snapshot);
  read_options.snapshot = snapshot;
  Status s = db_->Get(read_options, handles_[0], key, &meta_value);
  if (s.ok()) {ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
    if (parsed_hashes_meta_value.IsStale()
      || parsed_hashes_meta_value.count() == 0) {
      *ret = 0;
      return Status::OK();} else {
      std::string data_value;
      version = parsed_hashes_meta_value.version();
      // 遍历所有数据,并删除
      for (const auto& field : filtered_fields) {HashesDataKey hashes_data_key(key, version, field);
        s = db_->Get(read_options, handles_[1],
                hashes_data_key.Encode(), &data_value);
        if (s.ok()) {
          del_cnt++;
          statistic++;
          batch.Delete(handles_[1], hashes_data_key.Encode());
        } else if (s.IsNotFound()) {continue;} else {return s;}
      }
      *ret = del_cnt;
      parsed_hashes_meta_value.ModifyCount(-del_cnt);
      batch.Put(handles_[0], key, meta_value);
    }
  } else if (s.IsNotFound()) {
    *ret = 0;
    return Status::OK();} else {return s;}
  s = db_->Write(default_write_options_, &batch);
  UpdateSpecificKeyStatistics(key.ToString(), statistic);
  return s;
}

HDEL 操作反对批量操作。

数据的清理

下面提到了,pika 对于 hash 做了秒删的性能,那秒删之后 field 中的数据,如何做清理工作呢?
通过钻研,发现其实 pika 没有做被动删除的逻辑,只是通过 rocksDB 的 compaction(数据压缩)来实现的被动删除。

rocksDB 的 compaction, 次要是为了压缩内存和硬盘的应用空间,晋升查找速度。在做数据压缩时,提供了可定制的 filter 接口。在 pika 中,就是通过实现该接口来做秒删性能的。

// 针对存储数据的 k -v 构造的过滤(还有一种 meta 数据过滤的办法)class BaseDataFilter : public rocksdb::CompactionFilter {
 public:
  BaseDataFilter(rocksdb::DB* db,
                 std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr) :
    db_(db),
    cf_handles_ptr_(cf_handles_ptr),
    cur_key_(""),
    meta_not_found_(false),
    cur_meta_version_(0),
    cur_meta_timestamp_(0) {}

  bool Filter(int level, const Slice& key,
              const rocksdb::Slice& value,
              std::string* new_value, bool* value_changed) const override {ParsedBaseDataKey parsed_base_data_key(key);
    Trace("==========================START==========================");
    Trace("[DataFilter], key: %s, data = %s, version = %d",
          parsed_base_data_key.key().ToString().c_str(),
          parsed_base_data_key.data().ToString().c_str(),
          parsed_base_data_key.version());

    // 如果是简单数据结构的 key, 两个值不相等,须要取 meta 中的版本和工夫戳
    if (parsed_base_data_key.key().ToString() != cur_key_) {cur_key_ = parsed_base_data_key.key().ToString();
      std::string meta_value;
      // destroyed when close the database, Reserve Current key value
      if (cf_handles_ptr_->size() == 0) {return false;}
      // 基于 datakey,算出 metakey
      // 查看 meta 的状态
      Status s = db_->Get(default_read_options_,
              (*cf_handles_ptr_)[0], cur_key_, &meta_value);
      if (s.ok()) {
        meta_not_found_ = false;
        ParsedBaseMetaValue parsed_base_meta_value(&meta_value);
        cur_meta_version_ = parsed_base_meta_value.version();
        cur_meta_timestamp_ = parsed_base_meta_value.timestamp();} else if (s.IsNotFound()) {meta_not_found_ = true;} else {
        cur_key_ = "";
        Trace("Reserve[Get meta_key faild]");
        return false;
      }
    }

    if (meta_not_found_) {Trace("Drop[Meta key not exist]");
      return true;
    }

    // 判断版本和过期工夫
    int64_t unix_time;
    rocksdb::Env::Default()->GetCurrentTime(&unix_time);
    if (cur_meta_timestamp_ != 0
      && cur_meta_timestamp_ < static_cast<int32_t>(unix_time)) {Trace("Drop[Timeout]");
      return true;
    }

    if (cur_meta_version_ > parsed_base_data_key.version()) {Trace("Drop[data_key_version < cur_meta_version]");
      return true;
    } else {Trace("Reserve[data_key_version == cur_meta_version]");
      return false;
    }
  }

  const char* Name() const override { return "BaseDataFilter";}

 private:
  rocksdb::DB* db_;
  std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr_;
  rocksdb::ReadOptions default_read_options_;
  mutable std::string cur_key_;
  mutable bool meta_not_found_;
  mutable int32_t cur_meta_version_;
  mutable int32_t cur_meta_timestamp_;
};

从上述代码中能够看出,其实在 pika 中,对于大批量的数据 (比方 list,hash,set 等数据结构) 均是具备秒删性能的,应用秒删性能比间接删除一方面能够节俭执行工夫, 另一方面能够缩小内存碎片,算是以空间换工夫的典型例子了。

学习小结

  1. pika 中,能够存在雷同的 key 不同存储类型的数据。
  2. hash 存储值不超过 2^32 , 因为 hash size 存储在 4bytes 的空间中。
  3. 从 Hset 中,能够看到,在设计数据结构时,尽量减小 hash 中 key 值的数量,缩小锁 meta 的工夫。
  4. pika hash 构造具备秒删性能,对于大批量数据的 hash 后果,删除操作和失常命令一样会疾速执行。(这个和 redis 有肯定区别)

备注:本文源码来自 github.com/Qihoo360/blackwidow 中。

正文完
 0