综述

在RocksDB中,将MemTable刷新到磁盘之后,将会有很多sstable,而这些sstable则是可能包含了相同的key的不同时间的值,这样子就会导致两个问题:

  1. 浪费磁盘空间
  2. 读取内容将会非常慢.

而compact就是用来解决上面两个问题的,简单来说compact就是读取几个sstable然后合并为一个(或者多个)sstable. 而什么时候合并,合并的时候如何来挑选sstable,这个就是compcation strategy.一般来说compact strategy的目的都是为了更低的amplification:

  • 避免一次读请求读取太多的sstables.
    • 读放大
  • 避免一些临时数据(deleted/overwritten/expired)在磁盘上停留时间过长
  • 避免磁盘上临时空间过大
    • 空间放大
  • 避免compact相同的数据太多次
    • 写放大

而在RockDB中实现了多种compact strategy,不同的strategy有不同的侧重,这里我们只分析默认的strategy, 那就是leveled-N compaction.

在Leveled compaction中,所有的SSTables被分为很多levels(level0/1/2/3…).

  • 最新的SSTable(从memtable中刷新下来的)是属于Level0
    • 每一个SSTable都是有序的
    • 只有Level0的SSTable允许overlap
  • 除了level0之外其他的level的总的SSTable大小有一个最大的限制
    • 通过level_compaction_dynamic_level_bytes来计算
  • 在Level0,如果积攒够了足够的(level0_file_num_compaction_trigger)SSTable,则就会进行compact.
    • 一般来说会把全部的SSTables compact到下一个level(Level1).
    • 不会写一个很大的SSTable,
  • 一般来说百分之90的空间都是给最后一级level的.

源码

Compact运行的条件

先来看在RocksDB中是什么时候会引起compact.在RocksDB中所有的compact都是在后台线程中进行的,这个线程就是BGWorkCompaction.这个线程只有在两种情况下被调用,一个是 手动compact(RunManualCompaction),一个就是自动(MaybeScheduleFlushOrCompaction),我们主要来看自动的compact,而MaybeScheduleFlushOrCompaction这个函数我们在之前介绍flush的时候已经介绍过了,简单来说就是会在切换WAL(SwitchWAL)或者writebuffer满的时候(HandleWriteBufferFull)被调用.

我们来看在MaybeScheduleFlushOrCompaction中compact的调用.这里可以看到RocksDB中后台运行的compact会有一个限制(max_compactions).而我们可以看到这里还有一个变量 unscheduled_compactions_,这个变量表示需要被compact的columnfamily的队列长度.

  1. while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
  2. unscheduled_compactions_ > 0) {
  3. CompactionArg* ca = new CompactionArg;
  4. ca->db = this;
  5. ca->prepicked_compaction = nullptr;
  6. bg_compaction_scheduled_++;
  7. unscheduled_compactions_--;
  8. env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
  9. &DBImpl::UnscheduleCallback);
  10. }

类似flush的逻辑,compact的时候RocksDB也有一个队列叫做DBImpl::compaction_queue_.

  1. std::deque<ColumnFamilyData*> compaction_queue_;

然后我们来看这个队列何时被更新,其中unscheduled_compactions_和队列的更新是同步的,因此只有compaction_queue_更新之后,调用compact后台线程才会进入compact处理.

  1. void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
  2. if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
  3. AddToCompactionQueue(cfd);
  4. ++unscheduled_compactions_;
  5. }
  6. }

上面的核心函数是NeedsCompaction,通过这个函数来判断是否有sst需要被compact,因此接下来我们就来详细分析这个函数.当满足下列几个条件之一就将会更新compact队列

  • 有超时的sst(ExpiredTtlFiles)
  • files_marked_for_compaction_或者bottommost_files_marked_for_compaction_都不为空
    • 后面会介绍这两个队列
  • 遍历所有的level的sst,然后判断是否需要compact
    • 最核心的条件(上面两个队列都是在这里更新的).
  1. bool LevelCompactionPicker::NeedsCompaction(
  2. const VersionStorageInfo* vstorage) const {
  3. if (!vstorage->ExpiredTtlFiles().empty()) {
  4. return true;
  5. }
  6. if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
  7. return true;
  8. }
  9. if (!vstorage->FilesMarkedForCompaction().empty()) {
  10. return true;
  11. }
  12. for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
  13. if (vstorage->CompactionScore(i) >= 1) {
  14. return true;
  15. }
  16. }
  17. return false;
  18. }

因此接下来我们来分析最核心的CompactionScore,这里将会涉及到两个变量,这两个变量分别保存了level以及每个level所对应的score(这里score越高表示compact优先级越高),而score小于1则表示不需要compact.

  1. std::vector<double> compaction_score_;
  2. std::vector<int> compaction_level_;

这两个vector是在VersionStorageInfo::ComputeCompactionScore中被更新,因此我们来看这个函数,这个函数中会对level-0和其他的level区别处理。 首先来看level-0的处理:

  1. 首先会计算level-0下所有文件的大小(total_size)以及文件个数(num_sorted_runs).
  2. 用文件个数除以level0_file_num_compaction_trigger来得到对应的score
  3. 如果当前不止一层level,那么将会从上面的score和(total_size/max_bytes_for_level_base)取最大值.

之所以要做第三步,主要还是为了防止level-0的文件size过大,那么当它需要compact的时候有可能会需要和level-1 compact,那么此时就有可能会有一个很大的compact.

  1. if (level == 0) {
  2. int num_sorted_runs = 0;
  3. uint64_t total_size = 0;
  4. for (auto* f : files_[level]) {
  5. if (!f->being_compacted) {
  6. total_size += f->compensated_file_size;
  7. num_sorted_runs++;
  8. }
  9. }
  10. .........................
  11. score = static_cast<double>(num_sorted_runs) /
  12. mutable_cf_options.level0_file_num_compaction_trigger;
  13. if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
  14. score = std::max(
  15. score, static_cast<double>(total_size) /
  16. mutable_cf_options.max_bytes_for_level_base);
  17. }
  18. }

然后是非level-0的处理,这里也是计算level的文件大小然后再除以MaxBytesForLevel,然后得到当前level的score.

  1. uint64_t level_bytes_no_compacting = 0;
  2. for (auto f : files_[level]) {
  3. if (!f->being_compacted) {
  4. level_bytes_no_compacting += f->compensated_file_size;
  5. }
  6. }
  7. score = static_cast<double>(level_bytes_no_compacting) /
  8. MaxBytesForLevel(level);

上面我们看到有一个MaxBytesForLevel,这个函数的作用就是得到当前level的最大的文件大小.而这个函数实现也很简单.

  1. uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
  2. // Note: the result for level zero is not really used since we set
  3. // the level-0 compaction threshold based on number of files.
  4. assert(level >= 0);
  5. assert(level < static_cast<int>(level_max_bytes_.size()));
  6. return level_max_bytes_[level];
  7. }

可以看到核心就是level_max_bytes_这个数组,接下来我们就来看这个数组是在哪里被初始化的。level_max_bytes这个数组是在VersionStorageInfo::CalculateBaseBytes 这个函数中被初始化,这里RocksDB有一个option叫做level_compaction_dynamic_level_bytes,这个配置如果被设置,那么level_max_bytes将会这样 设置(这里我们只关注level):

  • 如果是level-1那么level-1的的文件大小限制为options.max_bytes_for_level_base.
  • 如果level大于1那么当前level-i的大小限制为(其中max_bytes这两个变量都是options中设置的)

    1. Target_Size(Ln+1) = Target_Size(Ln) * max_bytes_for_level_multiplier * max_bytes_for_level_multiplier_additional[n].

    举个例子,如果max_bytes_for_level_base=1024,max_bytes_for_level_multiplier=10,然后max_bytes_for_level_multiplier_additional未设置,那么L1, L2,L3的大小限制分别为1024,10240,102400.

下面是对应代码.

  1. if (!ioptions.level_compaction_dynamic_level_bytes) {
  2. base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
  3. // Calculate for static bytes base case
  4. for (int i = 0; i < ioptions.num_levels; ++i) {
  5. if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
  6. level_max_bytes_[i] = options.max_bytes_for_level_base;
  7. } else if (i > 1) {
  8. level_max_bytes_[i] = MultiplyCheckOverflow(
  9. MultiplyCheckOverflow(level_max_bytes_[i - 1],
  10. options.max_bytes_for_level_multiplier),
  11. options.MaxBytesMultiplerAdditional(i - 1));
  12. } else {
  13. level_max_bytes_[i] = options.max_bytes_for_level_base;
  14. }
  15. }
  16. }

然后我们来看如果设置了level_compaction_dynamic_level_bytes会如何来计算.如果设置了dynamic,那么就说明每次计算出来的每个level的最大值都是不一样的, 首先我们要知道调用CalculateBaseBytes是在每次创建version的时候。因此他是这样计算的.最大的level(num_levels -1 )的大小限制是不计入计算的,然后就是这样计算.

  1. Target_Size(Ln-1) = Target_Size(Ln) / max_bytes_for_level_multiplier

举个例子,假设调用CalculateBaseBytes的时候,max_bytes_for_level_base是1G,然后num_levels = 6,然后当前最大的level的大小为256G,那么从L1-L6的大小是 0, 0, 0.276GB, 2.76GB, 27.6GB 和 276GB.

首先计算第一个非空的level.

  1. for (int i = 1; i < num_levels_; i++) {
  2. uint64_t total_size = 0;
  3. for (const auto& f : files_[i]) {
  4. total_size += f->fd.GetFileSize();
  5. }
  6. if (total_size > 0 && first_non_empty_level == -1) {
  7. first_non_empty_level = i;
  8. }
  9. if (total_size > max_level_size) {
  10. max_level_size = total_size;
  11. }
  12. }

得到最小的那个非0的level的size.

  1. uint64_t base_bytes_max = options.max_bytes_for_level_base;
  2. uint64_t base_bytes_min = static_cast<uint64_t>(
  3. base_bytes_max / options.max_bytes_for_level_multiplier);
  4. // Try whether we can make last level's target size to be max_level_size
  5. uint64_t cur_level_size = max_level_size;
  6. for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
  7. // Round up after dividing
  8. cur_level_size = static_cast<uint64_t>(
  9. cur_level_size / options.max_bytes_for_level_multiplier);
  10. }

找到base_level_size,一般来说也就是cur_level_size.

  1. // Find base level (where L0 data is compacted to).
  2. base_level_ = first_non_empty_level;
  3. while (base_level_ > 1 && cur_level_size > base_bytes_max) {
  4. --base_level_;
  5. cur_level_size = static_cast<uint64_t>(
  6. cur_level_size / options.max_bytes_for_level_multiplier);
  7. }
  8. if (cur_level_size > base_bytes_max) {
  9. // Even L1 will be too large
  10. assert(base_level_ == 1);
  11. base_level_size = base_bytes_max;
  12. } else {
  13. base_level_size = cur_level_size;
  14. }

然后给level_max_bytes_ 赋值

  1. uint64_t level_size = base_level_size;
  2. for (int i = base_level_; i < num_levels_; i++) {
  3. if (i > base_level_) {
  4. level_size = MultiplyCheckOverflow(
  5. level_size, options.max_bytes_for_level_multiplier);
  6. }
  7. // Don't set any level below base_bytes_max. Otherwise, the LSM can
  8. // assume an hourglass shape where L1+ sizes are smaller than L0. This
  9. // causes compaction scoring, which depends on level sizes, to favor L1+
  10. // at the expense of L0, which may fill up and stall.
  11. level_max_bytes_[i] = std::max(level_size, base_bytes_max);
  12. }
  13. }

Compact实现细节

分析完毕何时会触发Compact,那么我们接下来来分析如何Compact.其中Compact的所有操作都在DBImpl::BackgroundCompaction中进行,因此接下来我们来分析 这个函数. 首先是从compaction_queue_队列中读取第一个需要compact的column family.

  1. // cfd is referenced here
  2. auto cfd = PopFirstFromCompactionQueue();
  3. // We unreference here because the following code will take a Ref() on
  4. // this cfd if it is going to use it (Compaction class holds a
  5. // reference).
  6. // This will all happen under a mutex so we don't have to be afraid of
  7. // somebody else deleting it.
  8. if (cfd->Unref()) {
  9. delete cfd;
  10. // This was the last reference of the column family, so no need to
  11. // compact.
  12. return Status::OK();
  13. }

然后就是选取当前CF中所需要compact的内容.

  1. c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));

从上面可以看到PickCompaction这个函数,而这个函数会根据设置的不同的Compact策略调用不同的方法,这里我们只看默认的LevelCompact的对应函数.

  1. Compaction* LevelCompactionBuilder::PickCompaction() {
  2. // Pick up the first file to start compaction. It may have been extended
  3. // to a clean cut.
  4. SetupInitialFiles();
  5. if (start_level_inputs_.empty()) {
  6. return nullptr;
  7. }
  8. assert(start_level_ >= 0 && output_level_ >= 0);
  9. // If it is a L0 -> base level compaction, we need to set up other L0
  10. // files if needed.
  11. if (!SetupOtherL0FilesIfNeeded()) {
  12. return nullptr;
  13. }
  14. // Pick files in the output level and expand more files in the start level
  15. // if needed.
  16. if (!SetupOtherInputsIfNeeded()) {
  17. return nullptr;
  18. }
  19. // Form a compaction object containing the files we picked.
  20. Compaction* c = GetCompaction();
  21. TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);
  22. return c;
  23. }

这里PickCompaction分别调用了三个主要的函数.

  • SetupInitialFiles 这个函数主要用来初始化需要Compact的文件.
  • SetupOtherL0FilesIfNeeded 如果需要compact的话,那么还需要再设置对应的L0文件
  • SetupOtherInputsIfNeeded 选择对应的输出文件

先来看SetupInitialFiles,这个函数他会遍历所有的level,然后来选择对应需要compact的input和output.

这里可看到,他会从之前计算好的的compact信息中得到对应的score.

  1. void LevelCompactionBuilder::SetupInitialFiles() {
  2. // Find the compactions by size on all levels.
  3. bool skipped_l0_to_base = false;
  4. for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) {
  5. start_level_score_ = vstorage_->CompactionScore(i);
  6. start_level_ = vstorage_->CompactionScoreLevel(i);
  7. assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1));
  8. ................................................................
  9. }

只有当score大于一才有必要进行compact的处理(所有操作都在上面的循环中).这里可以看到如果是level0的话,那么output_level 则是vstorage_->base_level(),否则就是level+1. 这里base_level()可以认为就是level1或者是最小的非空的level(之前CalculateBaseBytes中计算).

  1. if (start_level_score_ >= 1) {
  2. if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) {
  3. // If L0->base_level compaction is pending, don't schedule further
  4. // compaction from base level. Otherwise L0->base_level compaction
  5. // may starve.
  6. continue;
  7. }
  8. output_level_ =
  9. (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
  10. if (PickFileToCompact()) {
  11. // found the compaction!
  12. if (start_level_ == 0) {
  13. // L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
  14. compaction_reason_ = CompactionReason::kLevelL0FilesNum;
  15. } else {
  16. // L1+ score = `Level files size` / `MaxBytesForLevel`
  17. compaction_reason_ = CompactionReason::kLevelMaxLevelSize;
  18. }
  19. break;
  20. } else {
  21. // didn't find the compaction, clear the inputs
  22. ......................................................
  23. }
  24. }
  25. }

上面的代码中我们可以看到最终是通过PickFileToCompact来选择input以及output文件.因此我们接下来就来分这个函数.

首先是得到当前level(start_level_)的未compacted的最大大小的文件

  1. // Pick the largest file in this level that is not already
  2. // being compacted
  3. const std::vector<int>& file_size =
  4. vstorage_->FilesByCompactionPri(start_level_);
  5. const std::vector<FileMetaData*>& level_files =
  6. vstorage_->LevelFiles(start_level_);

紧接着就是这个函数最核心的功能了,它会开始遍历当前的输入level的所有待compact的文件,然后选择一些合适的文件然后compact到下一个level.

  1. unsigned int cmp_idx;
  2. for (cmp_idx = vstorage_->NextCompactionIndex(start_level_);
  3. cmp_idx < file_size.size(); cmp_idx++) {
  4. ..........................................
  5. }

然后我们来详细分析上面循环中所做的事情 首先选择好文件之后,将会扩展当前文件的key的范围,得到一个”clean cut”的范围, 这里”clean cut”是这个意思,假设我们有五个文件他们的key range分别为:

  1. f1[a1 a2] f2[a3 a4] f3[a4 a6] f4[a6 a7] f5[a8 a9]

如果我们第一次选择了f3,那么我们通过clean cut,则将还会选择f2,f4,因为他们都是连续的. 选择好之后,会再做一次判断,这次是判断是否正在compact的out_level的文件范围是否和我们选择好的文件的key有重合,如果有,则跳过这个文件. 这里之所以会有这个判断,主要原因还是因为compact是会并行的执行的.

  1. int index = file_size[cmp_idx];
  2. auto* f = level_files[index];
  3. // do not pick a file to compact if it is being compacted
  4. // from n-1 level.
  5. if (f->being_compacted) {
  6. continue;
  7. }
  8. start_level_inputs_.files.push_back(f);
  9. start_level_inputs_.level = start_level_;
  10. if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
  11. &start_level_inputs_) ||
  12. compaction_picker_->FilesRangeOverlapWithCompaction(
  13. {start_level_inputs_}, output_level_)) {
  14. // A locked (pending compaction) input-level file was pulled in due to
  15. // user-key overlap.
  16. start_level_inputs_.clear();
  17. continue;
  18. }

选择好输入文件之后,接下来就是选择输出level中需要一起被compact的文件(output_level_inputs). 实现也是比较简单,就是从输出level的所有文件中找到是否有和上面选择好的input中有重合的文件,如果有,那么则需要一起进行compact.

  1. InternalKey smallest, largest;
  2. compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
  3. CompactionInputFiles output_level_inputs;
  4. output_level_inputs.level = output_level_;
  5. vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
  6. &output_level_inputs.files);
  7. if (!output_level_inputs.empty() &&
  8. !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
  9. &output_level_inputs)) {
  10. start_level_inputs_.clear();
  11. continue;
  12. }
  13. base_index_ = index;
  14. break;

继续分析PickCompaction,我们知道在RocksDB中level-0会比较特殊,那是因为只有level-0中的文件是无序的,而在上面的操作中, 我们是假设在非level-0,因此接下来我们需要处理level-0的情况,这个函数就是SetupOtherL0FilesIfNeeded.

这里如果start_level_为0,也就是level-0的话,才会进行下面的处理,就是从level-0中得到所有的重合key的文件,然后加入到start_level_inputs中.

  1. if (start_level_ == 0 && output_level_ != 0) {
  2. // Two level 0 compaction won't run at the same time, so don't need to worry
  3. // about files on level 0 being compacted.
  4. assert(compaction_picker_->level0_compactions_in_progress()->empty());
  5. InternalKey smallest, largest;
  6. compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
  7. // Note that the next call will discard the file we placed in
  8. // c->inputs_[0] earlier and replace it with an overlapping set
  9. // which will include the picked file.
  10. start_level_inputs_.files.clear();
  11. vstorage_->GetOverlappingInputs(0, &smallest, &largest,
  12. &start_level_inputs_.files);
  13. // If we include more L0 files in the same compaction run it can
  14. // cause the 'smallest' and 'largest' key to get extended to a
  15. // larger range. So, re-invoke GetRange to get the new key range
  16. compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
  17. if (compaction_picker_->IsRangeInCompaction(
  18. vstorage_, &smallest, &largest, output_level_, &parent_index_)) {
  19. return false;
  20. }
  21. }

假设start_level_inputs被扩展了,那么对应的output也需要被扩展,因为非level0的其他的level的文件key都是不会overlap的. 那么此时就是会调用SetupOtherInputsIfNeeded.

  1. if (output_level_ != 0) {
  2. output_level_inputs_.level = output_level_;
  3. if (!compaction_picker_->SetupOtherInputs(
  4. cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_,
  5. &output_level_inputs_, &parent_index_, base_index_)) {
  6. return false;
  7. }
  8. compaction_inputs_.push_back(start_level_inputs_);
  9. if (!output_level_inputs_.empty()) {
  10. compaction_inputs_.push_back(output_level_inputs_);
  11. }
  12. // In some edge cases we could pick a compaction that will be compacting
  13. // a key range that overlap with another running compaction, and both
  14. // of them have the same output level. This could happen if
  15. // (1) we are running a non-exclusive manual compaction
  16. // (2) AddFile ingest a new file into the LSM tree
  17. // We need to disallow this from happening.
  18. if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_,
  19. output_level_)) {
  20. // This compaction output could potentially conflict with the output
  21. // of a currently running compaction, we cannot run it.
  22. return false;
  23. }
  24. compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
  25. output_level_inputs_, &grandparents_);
  26. }

最后就是构造一个compact然后返回.

  1. // Form a compaction object containing the files we picked.
  2. Compaction* c = GetCompaction();
  3. TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);
  4. return c;

最后再回到BackgroundCompaction中,这里就是在得到需要compact的文件之后,进行具体的compact. 这里我们可以看到核心的数据结构就是CompactionJob,每一次的compact都是一个job,最终对于文件的compact都是在 CompactionJob::run中实现.

  1. CompactionJob compaction_job(
  2. job_context->job_id, c.get(), immutable_db_options_,
  3. env_options_for_compaction_, versions_.get(), &shutting_down_,
  4. preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
  5. GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
  6. &mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
  7. snapshot_checker, table_cache_, &event_logger_,
  8. c->mutable_cf_options()->paranoid_file_checks,
  9. c->mutable_cf_options()->report_bg_io_stats, dbname_,
  10. &compaction_job_stats);
  11. compaction_job.Prepare();
  12. mutex_.Unlock();
  13. compaction_job.Run();
  14. TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
  15. mutex_.Lock();
  16. status = compaction_job.Install(*c->mutable_cf_options());
  17. if (status.ok()) {
  18. InstallSuperVersionAndScheduleWork(
  19. c->column_family_data(), &job_context->superversion_context,
  20. *c->mutable_cf_options(), FlushReason::kAutoCompaction);
  21. }
  22. *made_progress = true;

在RocksDB中,Compact是会多线程并发的执行,而这里怎样并发,并发多少线程都是在CompactionJob中实现的,简单来说,当你的compact的文件range不重合的话,那么都是可以并发执行的。

我们先来看CompactionJob::Prepare函数,在这个函数中主要是做一些执行前的准备工作,首先是取得对应的compact的边界,这里每一个需要并发的compact都被抽象为一个sub compaction.因此在GenSubcompactionBoundaries会解析到对应的sub compaction以及边界.解析完毕之后,则将会把对应的信息全部加入sub_compact_states中。

  1. void CompactionJob::Prepare() {
  2. ..........................
  3. if (c->ShouldFormSubcompactions()) {
  4. const uint64_t start_micros = env_->NowMicros();
  5. GenSubcompactionBoundaries();
  6. MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME,
  7. env_->NowMicros() - start_micros);
  8. assert(sizes_.size() == boundaries_.size() + 1);
  9. for (size_t i = 0; i <= boundaries_.size(); i++) {
  10. Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
  11. Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
  12. compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
  13. }
  14. MeasureTime(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
  15. compact_->sub_compact_states.size());
  16. }
  17. ......................................
  18. }

因此我们来详细分析GenSubcompactionBoundaries,这个函数比较长,我们来分开分析,首先是遍历所有的需要compact的level,然后取得每一个level的边界(也就是最大最小key)。

  1. void CompactionJob::GenSubcompactionBoundaries() {
  2. ...........................
  3. // Add the starting and/or ending key of certain input files as a potential
  4. // boundary
  5. for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
  6. int lvl = c->level(lvl_idx);
  7. if (lvl >= start_lvl && lvl <= out_lvl) {
  8. const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
  9. size_t num_files = flevel->num_files;
  10. .....................
  11. if (lvl == 0) {
  12. // For level 0 add the starting and ending key of each file since the
  13. // files may have greatly differing key ranges (not range-partitioned)
  14. for (size_t i = 0; i < num_files; i++) {
  15. bounds.emplace_back(flevel->files[i].smallest_key);
  16. bounds.emplace_back(flevel->files[i].largest_key);
  17. }
  18. } else {
  19. // For all other levels add the smallest/largest key in the level to
  20. // encompass the range covered by that level
  21. bounds.emplace_back(flevel->files[0].smallest_key);
  22. bounds.emplace_back(flevel->files[num_files - 1].largest_key);
  23. if (lvl == out_lvl) {
  24. // For the last level include the starting keys of all files since
  25. // the last level is the largest and probably has the widest key
  26. // range. Since it's range partitioned, the ending key of one file
  27. // and the starting key of the next are very close (or identical).
  28. for (size_t i = 1; i < num_files; i++) {
  29. bounds.emplace_back(flevel->files[i].smallest_key);
  30. }
  31. }
  32. }
  33. }
  34. }
  35. ......................

然后则是对取得的bounds进行排序以及去重.

  1. std::sort(bounds.begin(), bounds.end(),
  2. [cfd_comparator](const Slice& a, const Slice& b) -> bool {
  3. return cfd_comparator->Compare(ExtractUserKey(a),
  4. ExtractUserKey(b)) < 0;
  5. });
  6. // Remove duplicated entries from bounds
  7. bounds.erase(
  8. std::unique(bounds.begin(), bounds.end(),
  9. [cfd_comparator](const Slice& a, const Slice& b) -> bool {
  10. return cfd_comparator->Compare(ExtractUserKey(a),
  11. ExtractUserKey(b)) == 0;
  12. }),
  13. bounds.end());

接近着就来计算理想情况下所需要的subcompactions的个数以及输出文件的个数.

  1. // Group the ranges into subcompactions
  2. const double min_file_fill_percent = 4.0 / 5;
  3. int base_level = v->storage_info()->base_level();
  4. uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
  5. sum / min_file_fill_percent /
  6. MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
  7. c->immutable_cf_options()->compaction_style, base_level,
  8. c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
  9. uint64_t subcompactions =
  10. std::min({static_cast<uint64_t>(ranges.size()),
  11. static_cast<uint64_t>(c->max_subcompactions()),
  12. max_output_files});

最后更新boundaries_,这里会根据根据文件的大小,通过平均的size,来吧所有的range分为几份,最终这些都会保存在boundaries_中.

  1. if (subcompactions > 1) {
  2. double mean = sum * 1.0 / subcompactions;
  3. // Greedily add ranges to the subcompaction until the sum of the ranges'
  4. // sizes becomes >= the expected mean size of a subcompaction
  5. sum = 0;
  6. for (size_t i = 0; i < ranges.size() - 1; i++) {
  7. sum += ranges[i].size;
  8. if (subcompactions == 1) {
  9. // If there's only one left to schedule then it goes to the end so no
  10. // need to put an end boundary
  11. continue;
  12. }
  13. if (sum >= mean) {
  14. boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
  15. sizes_.emplace_back(sum);
  16. subcompactions--;
  17. sum = 0;
  18. }
  19. }
  20. sizes_.emplace_back(sum + ranges.back().size);
  21. }

然后我们来看CompactJob::Run的实现,在这个函数中,就是会遍历所有的sub_compact,然后启动线程来进行对应的compact工作,最后等到所有的线程完成,然后退出.

  1. // Launch a thread for each of subcompactions 1...num_threads-1
  2. std::vector<port::Thread> thread_pool;
  3. thread_pool.reserve(num_threads - 1);
  4. for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
  5. thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
  6. &compact_->sub_compact_states[i]);
  7. }
  8. // Always schedule the first subcompaction (whether or not there are also
  9. // others) in the current thread to be efficient with resources
  10. ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
  11. // Wait for all other threads (if there are any) to finish execution
  12. for (auto& thread : thread_pool) {
  13. thread.join();
  14. }
  15. if (output_directory_) {
  16. output_directory_->Fsync();
  17. }

最后我们可以看到最终compact工作是在CompactionJob::ProcessKeyValueCompaction是实现的,这个函数我们暂时就不分析了,我们只需要知道所有的compact工作都是在这个函数中执行的.