

简单来说在RocksDB中,每一个ColumnFamily都有自己的Memtable,当Memtable超过固定大小之后(或者WAL文件超过限制),它将会被设置为immutable,然后会有后台的线程启动来刷新这个immutable memtable到磁盘(SST).


  1. write_buffer_size 表示每个columnfamily的memtable的大小限制
  2. db_write_buffer_size 总的memtable的大小限制(所有的ColumnFamily).
  3. max_write_buffer_number 最大的memtable的个数
  4. min_write_buffer_number_to_merge 表示最小的可以被flush的memtable的个数

Flush Memtable的触发条件

在下面这几种条件下RocksDB会flush memtable到磁盘.

  1. 当某一个memtable的大小超过write_buffer_size.
  2. 当总的memtable的大小超过db_write_buffer_size.
  3. 当WAL文件的大小超过max_total_wal_size之后 最后一个条件的原因是,当WAL文件大小太大之后,我们需要清理WAL,因此此时我们需要将此WAL对应的数据都刷新到磁盘,也是刷新Memtable.



  1. class DBImpl {
  2. ................................
  3. std::deque<ColumnFamilyData*> flush_queue_;
  4. ...................
  5. };

然后我们来看IsFlushPending的实现.这个函数的意思就是至少有一个memtable需要被flush.而MemTableList这个类则是保存了所有的immutable memtables.

  1. bool MemTableList::IsFlushPending() const {
  2. if ((flush_requested_ && num_flush_not_started_ >= 1) ||
  3. (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
  4. assert(imm_flush_needed.load(std::memory_order_relaxed));
  5. return true;
  6. }
  7. return false;
  8. }

上面这几个变量的含义在注释中比较清楚, 而min_write_buffer_number_to_merge_就是min_write_buffer_number_to_merge.

  1. // the number of elements that still need flushing
  2. int num_flush_not_started_;
  3. // committing in progress
  4. bool commit_in_progress_;
  5. // Requested a flush of all memtables to storage
  6. bool flush_requested_;

可以看到在SchedulePendingFlush函数中,最终会将对应的ColumnFamily加入到flush queue中.

  1. void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd,
  2. FlushReason flush_reason) {
  3. if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
  4. AddToFlushQueue(cfd, flush_reason);
  5. ++unscheduled_flushes_;
  6. }
  7. }


  1. Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
  2. LogBuffer* log_buffer) {
  3. ................................
  4. while (!flush_queue_.empty()) {
  5. // This cfd is already referenced
  6. auto first_cfd = PopFirstFromFlushQueue();
  7. if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
  8. // can't flush this CF, try next one
  9. if (first_cfd->Unref()) {
  10. delete first_cfd;
  11. }
  12. continue;
  13. }
  14. // found a flush!
  15. cfd = first_cfd;
  16. break;
  17. }
  18. if (cfd != nullptr) {
  19. ....................................
  20. status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
  21. job_context, log_buffer);
  22. if (cfd->Unref()) {
  23. delete cfd;
  24. }
  25. }
  26. return status;
  27. }



  1. void DBImpl::MaybeScheduleFlushOrCompaction() {
  2. ..........................................
  3. auto bg_job_limits = GetBGJobLimits();
  4. bool is_flush_pool_empty =
  5. env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
  6. while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
  7. bg_flush_scheduled_ < bg_job_limits.max_flushes) {
  8. unscheduled_flushes_--;
  9. bg_flush_scheduled_++;
  10. env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
  11. }
  12. ...........................................
  13. }

在RocksDB中,有一个SwitchMemtable函数,这个函数用来将现在的memtable改变为immutable,然后再新建一个memtable,也就是说理论上来说每一次内存的memtable被刷新到磁盘之前肯定会调用这个函数.而在实现中,每一次调用SwitchMemtable之后,都会调用对应immutable memtable的FlushRequested函数来设置对应memtable的flush_requeseted_, 并且会调用上面的SchedulePendingFlush来将对应的ColumnFamily加入到flush_queue_队列中.因此这里我们就通过这几个函数的调用栈来分析RocksDB中何时会触发flush操作.


  1. DbImpl::HandleWriteBufferFull
  2. DBImpl::SwitchWAL
  3. DBImpl::FlushMemTable
  4. DBImpl::ScheduleFlushes



  1. Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
  2. ...................................
  3. for (auto cfd : *versions_->GetColumnFamilySet()) {
  4. ...............................
  5. if (cfd_picked != nullptr) {
  6. status = SwitchMemtable(cfd_picked, write_context,
  7. FlushReason::kWriteBufferFull);
  8. if (status.ok()) {
  9. cfd_picked->imm()->FlushRequested();
  10. SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull);
  11. MaybeScheduleFlushOrCompaction();
  12. }
  13. }
  14. return status;
  15. }


  1. Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
  2. bool* need_log_sync,
  3. WriteContext* write_context) {
  4. ..........................................
  5. if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
  6. // Before a new memtable is added in SwitchMemtable(),
  7. // write_buffer_manager_->ShouldFlush() will keep returning true. If another
  8. // thread is writing to another DB with the same write buffer, they may also
  9. // be flushed. We may end up with flushing much more DBs than needed. It's
  10. // suboptimal but still correct.
  11. status = HandleWriteBufferFull(write_context);
  12. }
  13. ........................................
  14. }


  1. // Should only be called from write thread
  2. bool ShouldFlush() const {
  3. if (enabled()) {
  4. if (mutable_memtable_memory_usage() > mutable_limit_) {
  5. return true;
  6. }
  7. if (memory_usage() >= buffer_size_ &&
  8. mutable_memtable_memory_usage() >= buffer_size_ / 2) {
  9. // If the memory exceeds the buffer size, we trigger more aggressive
  10. // flush. But if already more than half memory is being flushed,
  11. // triggering more flush may not help. We will hold it instead.
  12. return true;
  13. }
  14. }
  15. return false;
  16. }


  1. WriteBufferManager::WriteBufferManager(size_t _buffer_size,
  2. std::shared_ptr<Cache> cache)
  3. : buffer_size_(_buffer_size),
  4. mutable_limit_(buffer_size_ * 7 / 8),

然后我们来看mutable_memtable_memory_usage和memory_usage,这两个函数用来返回整体的write_buffer所使用的内存(memory_used_)以及将要被释放的内存(memory_active_),比如一个memory table被标记为immutable,则表示这块内存将要被释放.

  1. // Only valid if enabled()
  2. size_t memory_usage() const {
  3. return memory_used_.load(std::memory_order_relaxed);
  4. }
  5. size_t mutable_memtable_memory_usage() const {
  6. return memory_active_.load(std::memory_order_relaxed);
  7. }


  1. Status DBImpl::SwitchWAL(WriteContext* write_context) {
  2. ...............................................
  3. for (auto cfd : *versions_->GetColumnFamilySet()) {
  4. if (cfd->IsDropped()) {
  5. continue;
  6. }
  7. if (cfd->OldestLogToKeep() <= oldest_alive_log) {
  8. status = SwitchMemtable(cfd, write_context);
  9. if (!status.ok()) {
  10. break;
  11. }
  12. cfd->imm()->FlushRequested();
  13. SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager);
  14. }
  15. }
  16. MaybeScheduleFlushOrCompaction();
  17. return status;
  18. }


  1. Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
  2. bool* need_log_sync,
  3. WriteContext* write_context) {
  4. .................................................
  5. if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
  6. total_log_size_ > GetMaxTotalWalSize())) {
  7. status = SwitchWAL(write_context);
  8. }


  1. Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
  2. const FlushOptions& flush_options,
  3. FlushReason flush_reason, bool writes_stopped) {
  4. Status s;
  5. uint64_t flush_memtable_id = 0;
  6. {
  7. .........................................
  8. // SwitchMemtable() will release and reacquire mutex during execution
  9. s = SwitchMemtable(cfd, &context);
  10. flush_memtable_id = cfd->imm()->GetLatestMemTableID();
  11. if (!writes_stopped) {
  12. write_thread_.ExitUnbatched(&w);
  13. }
  14. cfd->imm()->FlushRequested();
  15. // schedule flush
  16. SchedulePendingFlush(cfd, flush_reason);
  17. MaybeScheduleFlushOrCompaction();
  18. }
  19. ...........................
  20. return s;
  21. }



  2. void MemTable::UpdateFlushState() {
  3. auto state = flush_state_.load(std::memory_order_relaxed);
  4. if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
  5. // ignore CAS failure, because that means somebody else requested
  6. // a flush
  7. flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
  8. std::memory_order_relaxed,
  9. std::memory_order_relaxed);
  10. }
  11. }




  1. bool MemTable::ShouldFlushNow() const {
  2. size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
  3. const double kAllowOverAllocationRatio = 0.6;
  4. // If arena still have room for new block allocation, we can safely say it
  5. // shouldn't flush.
  6. auto allocated_memory = table_->ApproximateMemoryUsage() +
  7. range_del_table_->ApproximateMemoryUsage() +
  8. arena_.MemoryAllocatedBytes();
  9. // if we can still allocate one more block without exceeding the
  10. // over-allocation ratio, then we should not flush.
  11. if (allocated_memory + kArenaBlockSize <
  12. write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
  13. return false;
  14. }
  15. // if user keeps adding entries that exceeds write_buffer_size, we need to
  16. // flush earlier even though we still have much available memory left.
  17. if (allocated_memory >
  18. write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
  19. return true;
  20. }
  21. return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
  22. }


  1. bool ShouldScheduleFlush() const {
  2. return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
  3. }


  1. void CheckMemtableFull() {
  2. if (flush_scheduler_ != nullptr) {
  3. auto* cfd = cf_mems_->current();
  4. assert(cfd != nullptr);
  5. if (cfd->mem()->ShouldScheduleFlush() &&
  6. cfd->mem()->MarkFlushScheduled()) {
  7. // MarkFlushScheduled only returns true if we are the one that
  8. // should take action, so no need to dedup further
  9. flush_scheduler_->ScheduleFlush(cfd);
  10. }
  11. }
  12. }


  1. bool MarkFlushScheduled() {
  2. auto before = FLUSH_REQUESTED;
  3. return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
  4. std::memory_order_relaxed,
  5. std::memory_order_relaxed);
  6. }


  1. void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
  2. #ifndef NDEBUG
  3. std::lock_guard<std::mutex> lock(checking_mutex_);
  4. assert(checking_set_.count(cfd) == 0);
  5. checking_set_.insert(cfd);
  6. #endif // NDEBUG
  7. cfd->Ref();
  8. // Suppress false positive clang analyzer warnings.
  9. #ifndef __clang_analyzer__
  10. Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
  11. while (!head_.compare_exchange_strong(
  12. node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
  13. // failing CAS updates the first param, so we are already set for
  14. // retry. TakeNextColumnFamily won't happen until after another
  15. // inter-thread synchronization, so we don't even need release
  16. // semantics for this CAS
  17. }
  18. #endif // __clang_analyzer__
  19. }


  1. delete操作
  2. put操作
  3. merge操作.


  1. Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
  2. bool* need_log_sync,
  3. WriteContext* write_context) {
  4. ..................................................................
  5. if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
  6. status = ScheduleFlushes(write_context);
  7. }


  1. Status DBImpl::ScheduleFlushes(WriteContext* context) {
  2. ColumnFamilyData* cfd;
  3. while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
  4. auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull);
  5. if (cfd->Unref()) {
  6. delete cfd;
  7. }
  8. if (!status.ok()) {
  9. return status;
  10. }
  11. }
  12. return Status::OK();
  13. }



  1. Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
  2. FileMetaData* file_meta) {
  3. ...........................................
  4. // This will release and re-acquire the mutex.
  5. Status s = WriteLevel0Table();
  6. if (s.ok() &&
  7. (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
  8. s = Status::ShutdownInProgress(
  9. "Database shutdown or Column family drop during flush");
  10. }
  11. if (!s.ok()) {
  12. cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
  13. } else {
  14. TEST_SYNC_POINT("FlushJob::InstallResults");
  15. // Replace immutable memtable with the generated Table
  16. s = cfd_->imm()->InstallMemtableFlushResults(
  17. cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
  18. meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
  19. log_buffer_);
  20. }
  21. ........................................................
  22. }