乐趣区

关于leveldb:leveldb源代码分析系列-recover流程major-compaction

理清 leveldb 的 recover 流程对于了解 leveldb 如何保证数据正确性和一致性(即便在节点解体的状况下)是十分有帮忙的。
首先从 Open 函数开始,结构一个 DBImpl 实例,而后调用了其 Recover 办法。

Status DB::Open(const Options& options, const std::string& dbname,
                DB** dbptr) {
  *dbptr = NULL;

  DBImpl* impl = new DBImpl(options, dbname);
  impl->mutex_.Lock();
  VersionEdit edit;
  // Recover handles create_if_missing, error_if_exists
  bool save_manifest = false;
  Status s = impl->Recover(&edit, &save_manifest);
  if (s.ok() && impl->mem_ == NULL) {
    // Create new log and a corresponding memtable.
    uint64_t new_log_number = impl->versions_->NewFileNumber();
    WritableFile* lfile;
    s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                     &lfile);
    if (s.ok()) {edit.SetLogNumber(new_log_number);
      impl->logfile_ = lfile;
      impl->logfile_number_ = new_log_number;
      impl->log_ = new log::Writer(lfile);
      impl->mem_ = new MemTable(impl->internal_comparator_);
      impl->mem_->Ref();}
  }
  if (s.ok() && save_manifest) {edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
    edit.SetLogNumber(impl->logfile_number_);
    s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
  }
  if (s.ok()) {impl->DeleteObsoleteFiles();
    impl->MaybeScheduleCompaction();}
  impl->mutex_.Unlock();
  if (s.ok()) {assert(impl->mem_ != NULL);
    *dbptr = impl;
  } else {delete impl;}
  return s;
}
Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {mutex_.AssertHeld();

  // Ignore error from CreateDir since the creation of the DB is
  // committed only when the descriptor is created, and this directory
  // may already exist from a previous failed creation attempt.
  env_->CreateDir(dbname_);
  assert(db_lock_ == NULL);
  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
  if (!s.ok()) {return s;}

  if (!env_->FileExists(CurrentFileName(dbname_))) {if (options_.create_if_missing) {s = NewDB();
      if (!s.ok()) {return s;}
    } else {
      return Status::InvalidArgument(dbname_, "does not exist (create_if_missing is false)");
    }
  } else {if (options_.error_if_exists) {
      return Status::InvalidArgument(dbname_, "exists (error_if_exists is true)");
    }
  }
  // 这里调用了 versions_->Recover 函数
  s = versions_->Recover(save_manifest);
  if (!s.ok()) {return s;}
  SequenceNumber max_sequence(0);

  // Recover from all newer log files than the ones named in the
  // descriptor (new log files may have been added by the previous
  // incarnation without registering them in the descriptor).
  //
  // Note that PrevLogNumber() is no longer used, but we pay
  // attention to it in case we are recovering a database
  // produced by an older version of leveldb.
  // 以下是复原在 descriptor 中记录的最初一个 log file 之后的所有日志文件
  const uint64_t min_log = versions_->LogNumber();
  const uint64_t prev_log = versions_->PrevLogNumber();
  std::vector<std::string> filenames;
  s = env_->GetChildren(dbname_, &filenames);
  if (!s.ok()) {return s;}
  std::set<uint64_t> expected;
  versions_->AddLiveFiles(&expected);
  uint64_t number;
  FileType type;
  std::vector<uint64_t> logs;
  for (size_t i = 0; i < filenames.size(); i++) {if (ParseFileName(filenames[i], &number, &type)) {expected.erase(number);
      // 对于日志文件而言,所有文件编号大于等于 versions_->LogNumber() 的日志文件都没有来得及被写入以后版本,此时须要回放。if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
        logs.push_back(number);
    }
  }
  if (!expected.empty()) {char buf[50];
    snprintf(buf, sizeof(buf), "%d missing files; e.g.",
             static_cast<int>(expected.size()));
    return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
  }

  // Recover in the order in which the logs were generated
  std::sort(logs.begin(), logs.end());
  for (size_t i = 0; i < logs.size(); i++) {s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
                       &max_sequence);
    if (!s.ok()) {return s;}

    // The previous incarnation may not have written any MANIFEST
    // records after allocating this log number.  So we manually
    // update the file number allocation counter in VersionSet.
    versions_->MarkFileNumberUsed(logs[i]);
  }

  if (versions_->LastSequence() < max_sequence) {versions_->SetLastSequence(max_sequence);
  }

  return Status::OK();}

上面关注下 versions_->Recover() 函数

Status VersionSet::Recover(bool *save_manifest) {
  struct LogReporter : public log::Reader::Reporter {
    Status* status;
    virtual void Corruption(size_t bytes, const Status& s) {if (this->status->ok()) *this->status = s;
    }
  };

  // Read "CURRENT" file, which contains a pointer to the current manifest file
  std::string current;
  Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
  if (!s.ok()) {return s;}
  if (current.empty() || current[current.size()-1] != '\n') {return Status::Corruption("CURRENT file does not end with newline");
  }
  current.resize(current.size() - 1);

  std::string dscname = dbname_ + "/" + current;
  // manifast file
  SequentialFile* file;
  s = env_->NewSequentialFile(dscname, &file);
  if (!s.ok()) {return s;}

  bool have_log_number = false;
  bool have_prev_log_number = false;
  bool have_next_file = false;
  bool have_last_sequence = false;
  uint64_t next_file = 0;
  uint64_t last_sequence = 0;
  uint64_t log_number = 0;
  uint64_t prev_log_number = 0;
  // Builder 能够看作是 version edit 的汇合中转站
  Builder builder(this, current_);

  {
    LogReporter reporter;
    reporter.status = &s;
    log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
    Slice record;
    std::string scratch;
    // 这里能够看到,每个 manifast 文件是依照日志格局存储的(有校验码,保证数据的正确性)while (reader.ReadRecord(&record, &scratch) && s.ok()) {
      VersionEdit edit;
      // 每条日志都记录了一个 VersionEdit 的信息,这里解码到 edit 中
      s = edit.DecodeFrom(record);
      if (s.ok()) {
        // 首先判断 comparator 是不是统一的,不统一间接返回
        if (edit.has_comparator_ &&
            edit.comparator_ != icmp_.user_comparator()->Name()) {
          s = Status::InvalidArgument(
              edit.comparator_ + "does not match existing comparator",
              icmp_.user_comparator()->Name());
        }
      }

      if (s.ok()) {
        //edit 解码日志胜利,写入 builder 中
        builder.Apply(&edit);
      }
      // 这里阐明一下:每个 version edit 中记录的 log_number_的含意:对于以后版本而言,所有日志文件编号小于这个值的日志都是能够被删除的(因为曾经平安的被写入 sstable)if (edit.has_log_number_) {
        log_number = edit.log_number_;
        have_log_number = true;
      }
      // 旧版本,不思考
      if (edit.has_prev_log_number_) {
        prev_log_number = edit.prev_log_number_;
        have_prev_log_number = true;
      }
      // 下一个能够应用的文件编号
      if (edit.has_next_file_number_) {
        next_file = edit.next_file_number_;
        have_next_file = true;
      }
      // 下一个能够应用的递增的 sequence
      if (edit.has_last_sequence_) {
        last_sequence = edit.last_sequence_;
        have_last_sequence = true;
      }
    }
  }
  delete file;
  file = NULL;

  if (s.ok()) {if (!have_next_file) {s = Status::Corruption("no meta-nextfile entry in descriptor");
    } else if (!have_log_number) {s = Status::Corruption("no meta-lognumber entry in descriptor");
    } else if (!have_last_sequence) {s = Status::Corruption("no last-sequence-number entry in descriptor");
    }
    // 为了兼容旧版本,这一项不再应用
    if (!have_prev_log_number) {prev_log_number = 0;}

    MarkFileNumberUsed(prev_log_number);
    MarkFileNumberUsed(log_number);
  }

  if (s.ok()) {Version* v = new Version(this);
    // 
    builder.SaveTo(v);
    // Install recovered version
    // 计算 v 的 best compaction
    Finalize(v);
    AppendVersion(v);
    // 为 manifast 文件调配一个文件编号
    manifest_file_number_ = next_file;
    next_file_number_ = next_file + 1;
    last_sequence_ = last_sequence;
    log_number_ = log_number;
    prev_log_number_ = prev_log_number;

    // See if we can reuse the existing MANIFEST file.
    // 这个函数查看了是否复用以后的 manifast 文件
    if (ReuseManifest(dscname, current)) {// No need to save new manifest} else {*save_manifest = true;}
  }

  return s;
}
退出移动版