概述

在RocksDB中MANIFEST保存了存储引擎的内部的一些状态元数据,简单来说当系统异常重启,或者程序异常被退出之后,RocksDB需要有一种机制能够恢复到一个一致性的状态, 而这个一致性的状态就是靠MANIFEST来保证的.

MANIFEST在RocksDB中是一个单独的文件,而这个文件所保存的数据基本是来自于VersionEdit这个结构.

MANIFEST包含了两个文件,一个log文件一个包含最新MANIFEST文件名的文件,Manifest的log文件名是这样 MANIFEST-(seqnumber),这个seq会一直增长.只有当 超过了指定的大小之后,MANIFEST会刷新一个新的文件,当新的文件刷新到磁盘(并且文件名更新)之后,老的文件会被删除掉.这里可以认为每一次MANIFEST的更新都代表一次snapshot.

下面就是MANIFEST的基本文件组成:

  1. MANIFEST = { CURRENT, MANIFEST-<seq-no>* }
  2. CURRENT = File pointer to the latest manifest log
  3. MANIFEST-<seq no> = Contains snapshot of RocksDB state and subsequent modifications

在RocksDB中任意时间存储引擎的状态都会保存为一个Version(也就是SST的集合),而每次对Version的修改都是一个VersionEdit,而最终这些VersionEdit就是 组成manifest-log文件的内容.

下面就是MANIFEST的log文件的基本构成:

  1. version-edit = Any RocksDB state change
  2. version = { version-edit* }
  3. manifest-log-file = { version, version-edit* }
  4. = { version-edit* }

实现

整个MANIFEST涉及到三个数据结构分别是VersionEdit/Version/VersionSet,其中前两个上面已经有介绍,而最后一个VersionSet顾名思义表示一堆Version的集合,其实就是 记录了各个版本的信息用来管理整个Version.

  1. class VersionSet {
  2. public:
  3. VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
  4. const EnvOptions& env_options, Cache* table_cache,
  5. WriteBufferManager* write_buffer_manager,
  6. WriteController* write_controller);
  7. ~VersionSet();
  8. .......................
  9. private:
  10. struct ManifestWriter;
  11. friend class Version;
  12. .................................
  13. // Opened lazily
  14. unique_ptr<log::Writer> descriptor_log_;
  15. // generates a increasing version number for every new version
  16. uint64_t current_version_number_;
  17. // Queue of writers to the manifest file
  18. std::deque<ManifestWriter*> manifest_writers_;
  19. ..........................................

这里最关键的两个数据结构是descriptor_log_和manifest_writers_,前一个表示了当前manifest-log文件,后一个表示需要写入到manifest-log文件中的内容.

下面就是ManifestWriter的结构,可以看到其中包含了一个VersionEdit的数组,这个数组就是即将要写入到manifest-log文件中的内容.

  1. // this is used to batch writes to the manifest file
  2. struct VersionSet::ManifestWriter {
  3. Status status;
  4. bool done;
  5. InstrumentedCondVar cv;
  6. ColumnFamilyData* cfd;
  7. const autovector<VersionEdit*>& edit_list;
  8. explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
  9. const autovector<VersionEdit*>& e)
  10. : done(false), cv(mu), cfd(_cfd), edit_list(e) {}
  11. };

然后我们来看RocksDB如何来创建以及写入文件,下面所有的代码都是包含在VersionSet::LogAndApply这个函数中.

首先在每次LogAndApply的时候都会创建一个新的ManifesWriter加入到manifest_writers_队列中.这里只有当之前保存在队列中 的所有Writer都写入完毕之后才会加入到队列,否则就会等待.

  1. // queue our request
  2. ManifestWriter w(mu, column_family_data, edit_list);
  3. manifest_writers_.push_back(&w);
  4. while (!w.done && &w != manifest_writers_.front()) {
  5. w.cv.Wait();
  6. }
  7. if (w.done) {
  8. return w.status;
  9. }

接下来就是保存对应的数据到batch_edits中(manifest_writers_).

  1. autovector<VersionEdit*> batch_edits;
  2. ....................................
  3. if (w.edit_list.front()->IsColumnFamilyManipulation()) {
  4. // no group commits for column family add or drop
  5. LogAndApplyCFHelper(w.edit_list.front());
  6. batch_edits.push_back(w.edit_list.front());
  7. } else {
  8. v = new Version(column_family_data, this, current_version_number_++);
  9. ........................................................
  10. for (const auto& writer : manifest_writers_) {
  11. if (writer->edit_list.front()->IsColumnFamilyManipulation() ||
  12. writer->cfd->GetID() != column_family_data->GetID()) {
  13. break;
  14. }
  15. last_writer = writer;
  16. for (const auto& edit : writer->edit_list) {
  17. ...........................................
  18. batch_edits.push_back(edit);
  19. }
  20. }
  21. builder->SaveTo(v->storage_info());
  22. }

然后就是创建新的manifest-log文件的逻辑.这里可以看到要么是第一次进入,要么文件大小大于option对应的值才会创建新的文件

  1. if (!descriptor_log_ ||
  2. manifest_file_size_ > db_options_->max_manifest_file_size) {
  3. pending_manifest_file_number_ = NewFileNumber();
  4. batch_edits.back()->SetNextFile(next_file_number_.load());
  5. new_descriptor_log = true;
  6. } else {
  7. pending_manifest_file_number_ = manifest_file_number_;
  8. }
  9. if (new_descriptor_log) {
  10. // if we're writing out new snapshot make sure to persist max column family
  11. if (column_family_set_->GetMaxColumnFamily() > 0) {
  12. w.edit_list.front()->SetMaxColumnFamily(
  13. column_family_set_->GetMaxColumnFamily());
  14. }
  15. }

如果需要创建新的manifest-log文件,则开始构造对应的文件信息并创建文件.

  1. if (new_descriptor_log) {
  2. // create manifest file
  3. ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
  4. pending_manifest_file_number_);
  5. unique_ptr<WritableFile> descriptor_file;
  6. EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
  7. s = NewWritableFile(
  8. env_, DescriptorFileName(dbname_, pending_manifest_file_number_),
  9. &descriptor_file, opt_env_opts);
  10. if (s.ok()) {
  11. descriptor_file->SetPreallocationBlockSize(
  12. db_options_->manifest_preallocation_size);
  13. unique_ptr<WritableFileWriter> file_writer(
  14. new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
  15. descriptor_log_.reset(
  16. new log::Writer(std::move(file_writer), 0, false));
  17. s = WriteSnapshot(descriptor_log_.get());
  18. }
  19. }

开始写入对应的VersionEdit的record到文件(最后我们会来看这个record的构成),这里看到写入完成后会调用Sync来刷新内容到磁盘,等这些操作都做完之后,则会更新Current文件也就是更新最新的manifest-log文件名到CURRENT文件中.

  1. for (auto& e : batch_edits) {
  2. std::string record;
  3. if (!e->EncodeTo(&record)) {
  4. s = Status::Corruption(
  5. "Unable to Encode VersionEdit:" + e->DebugString(true));
  6. break;
  7. }
  8. TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
  9. rocksdb_kill_odds * REDUCE_ODDS2);
  10. s = descriptor_log_->AddRecord(record);
  11. if (!s.ok()) {
  12. break;
  13. }
  14. }
  15. if (s.ok()) {
  16. s = SyncManifest(env_, db_options_, descriptor_log_->file());
  17. }
  18. .............................
  19. // If we just created a new descriptor file, install it by writing a
  20. // new CURRENT file that points to it.
  21. if (s.ok() && new_descriptor_log) {
  22. s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
  23. db_directory);
  24. }

CURRENT文件更新完毕之后,就可以删除老的mainfest文件了.

  1. // Append the old mainfest file to the obsolete_manifests_ list to be deleted
  2. // by PurgeObsoleteFiles later.
  3. if (s.ok() && new_descriptor_log) {
  4. obsolete_manifests_.emplace_back(
  5. DescriptorFileName("", manifest_file_number_));
  6. }

最后则是更新manifest_writers_队列,唤醒之前阻塞的内容.

  1. // wake up all the waiting writers
  2. while (true) {
  3. ManifestWriter* ready = manifest_writers_.front();
  4. manifest_writers_.pop_front();
  5. if (ready != &w) {
  6. ready->status = s;
  7. ready->done = true;
  8. ready->cv.Signal();
  9. }
  10. if (ready == last_writer) break;
  11. }
  12. // Notify new head of write queue
  13. if (!manifest_writers_.empty()) {
  14. manifest_writers_.front()->cv.Signal();
  15. }

文件内容

具体的文件格式可以看RocksDB的wiki,这里我来介绍下对应的源码. 通过上面的分析我们可以看到最终是通过VersionEdit::EncodeTo来序列化数据,而VersionEdit主要包含了比如log_number/last_sequence_这些字段,这里还有一个比较关键的信息被序列化了,那就是FileMetaData,也就是SST文件的元信息.

  1. struct FileMetaData {
  2. FileDescriptor fd;
  3. InternalKey smallest; // Smallest internal key served by table
  4. InternalKey largest; // Largest internal key served by table
  5. SequenceNumber smallest_seqno; // The smallest seqno in this file
  6. SequenceNumber largest_seqno; // The largest seqno in this file
  7. .........................................
  8. // File size compensated by deletion entry.
  9. // This is updated in Version::UpdateAccumulatedStats() first time when the
  10. // file is created or loaded. After it is updated (!= 0), it is immutable.
  11. uint64_t compensated_file_size;
  12. // These values can mutate, but they can only be read or written from
  13. // single-threaded LogAndApply thread
  14. uint64_t num_entries; // the number of entries.
  15. uint64_t num_deletions; // the number of deletion entries.
  16. uint64_t raw_key_size; // total uncompressed key size.
  17. uint64_t raw_value_size; // total uncompressed value size.
  18. int refs; // Reference count
  19. bool being_compacted; // Is this file undergoing compaction?
  20. bool init_stats_from_file; // true if the data-entry stats of this file
  21. // has initialized from file.
  22. bool marked_for_compaction; // True if client asked us nicely to compact this
  23. // file.
  24. };

工具

这里查看MANIFEST文件依旧是使用ldb工具:

  1. pagefault@god ~/tools/polarbase/data/.rocksdb $ ../../bin/ldb manifest_dump --path=./MANIFEST-000001
  2. --------------- Column family "default" (ID 0) --------------
  3. log number: 13
  4. comparator: <NO COMPARATOR>
  5. --- level 0 --- version# 0 ---
  6. 11:80860['
  7. --------------- Column family "__system__" (ID 1) --------------
  8. log number: 24
  9. comparator: RocksDB_SE_v3.10
  10. --- level 0 --- version# 1 ---
  11. 25:1094['
  12. next_file_number 27 last_sequence 190 prev_log_number 0 max_column_family 1