概述

首先我们知道在RocksDB中,最终数据的持久化都是保存在SST中,而SST则是由Memtable刷新到磁盘生成的,因此这次我们就主要来分析在RocksDB中何时以及如何来Flush内存数据(memtable)到SST.

简单来说在RocksDB中,每一个ColumnFamily都有自己的Memtable,当Memtable超过固定大小之后(或者WAL文件超过限制),它将会被设置为immutable,然后会有后台的线程启动来刷新这个immutable memtable到磁盘(SST).

相关设置

  1. write_buffer_size 表示每个columnfamily的memtable的大小限制
  2. db_write_buffer_size 总的memtable的大小限制(所有的ColumnFamily).
  3. max_write_buffer_number 最大的memtable的个数
  4. min_write_buffer_number_to_merge 表示最小的可以被flush的memtable的个数

Flush Memtable的触发条件

在下面这几种条件下RocksDB会flush memtable到磁盘.

  1. 当某一个memtable的大小超过write_buffer_size.
  2. 当总的memtable的大小超过db_write_buffer_size.
  3. 当WAL文件的大小超过max_total_wal_size之后 最后一个条件的原因是,当WAL文件大小太大之后,我们需要清理WAL,因此此时我们需要将此WAL对应的数据都刷新到磁盘,也是刷新Memtable.

源码

首先在全局的DBImpl中包含了一个flush_queue_的队列,这个队列将会保存所有的将要被flush到磁盘的ColumnFamily.只有当当前的ColumnFamily满足flush条件(cfd->imm()->IsFlushPending())才会将此CF加入到flush队列.

  1. class DBImpl {
  2. ................................
  3. std::deque<ColumnFamilyData*> flush_queue_;
  4. ...................
  5. };

然后我们来看IsFlushPending的实现.这个函数的意思就是至少有一个memtable需要被flush.而MemTableList这个类则是保存了所有的immutable memtables.

  1. bool MemTableList::IsFlushPending() const {
  2. if ((flush_requested_ && num_flush_not_started_ >= 1) ||
  3. (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
  4. assert(imm_flush_needed.load(std::memory_order_relaxed));
  5. return true;
  6. }
  7. return false;
  8. }

上面这几个变量的含义在注释中比较清楚, 而min_write_buffer_number_to_merge_就是min_write_buffer_number_to_merge.

  1. // the number of elements that still need flushing
  2. int num_flush_not_started_;
  3. // committing in progress
  4. bool commit_in_progress_;
  5. // Requested a flush of all memtables to storage
  6. bool flush_requested_;

可以看到在SchedulePendingFlush函数中,最终会将对应的ColumnFamily加入到flush queue中.

  1. void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd,
  2. FlushReason flush_reason) {
  3. if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
  4. AddToFlushQueue(cfd, flush_reason);
  5. ++unscheduled_flushes_;
  6. }
  7. }

而刷新MemTable到磁盘是一个后台线程来做的,这个后台线程叫做BGWorkFlush,最终这个函数会调用BackgroundFlush函数,而BackgroundFlush主要功能是在flush_queue_中找到一个ColumnFamily然后刷新它的memtable到磁盘.

  1. Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
  2. LogBuffer* log_buffer) {
  3. ................................
  4. while (!flush_queue_.empty()) {
  5. // This cfd is already referenced
  6. auto first_cfd = PopFirstFromFlushQueue();
  7. if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
  8. // can't flush this CF, try next one
  9. if (first_cfd->Unref()) {
  10. delete first_cfd;
  11. }
  12. continue;
  13. }
  14. // found a flush!
  15. cfd = first_cfd;
  16. break;
  17. }
  18. if (cfd != nullptr) {
  19. ....................................
  20. status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
  21. job_context, log_buffer);
  22. if (cfd->Unref()) {
  23. delete cfd;
  24. }
  25. }
  26. return status;
  27. }

通过上面可以看到最终会调用FlushMemTableToOutputFile来刷新Memtable到磁盘,等到最后我们来分析这个函数.

而这个刷新线程的调用是在MaybeScheduleFlushOrCompaction函数中进行的。这里可以看到刷新县城的限制是在max_flushes中设置的.

  1. void DBImpl::MaybeScheduleFlushOrCompaction() {
  2. ..........................................
  3. auto bg_job_limits = GetBGJobLimits();
  4. bool is_flush_pool_empty =
  5. env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
  6. while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
  7. bg_flush_scheduled_ < bg_job_limits.max_flushes) {
  8. unscheduled_flushes_--;
  9. bg_flush_scheduled_++;
  10. env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
  11. }
  12. ...........................................
  13. }

在RocksDB中,有一个SwitchMemtable函数,这个函数用来将现在的memtable改变为immutable,然后再新建一个memtable,也就是说理论上来说每一次内存的memtable被刷新到磁盘之前肯定会调用这个函数.而在实现中,每一次调用SwitchMemtable之后,都会调用对应immutable memtable的FlushRequested函数来设置对应memtable的flush_requeseted_, 并且会调用上面的SchedulePendingFlush来将对应的ColumnFamily加入到flush_queue_队列中.因此这里我们就通过这几个函数的调用栈来分析RocksDB中何时会触发flush操作.

在RocksDB中会有四个地方会调用SwitchMemtable,分别是:

  1. DbImpl::HandleWriteBufferFull
  2. DBImpl::SwitchWAL
  3. DBImpl::FlushMemTable
  4. DBImpl::ScheduleFlushes

接下来我们就来一个个分析这几个函数.

先来看HandleWriteBufferFull.这个函数主要是处理所有ColumnFamily的memtable内存超过限制的情况.可以看到它会调用SwitchMemtable然后再将对应的cfd加入到flush_queue_,最后再来调用后台刷新线程.

  1. Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
  2. ...................................
  3. for (auto cfd : *versions_->GetColumnFamilySet()) {
  4. ...............................
  5. if (cfd_picked != nullptr) {
  6. status = SwitchMemtable(cfd_picked, write_context,
  7. FlushReason::kWriteBufferFull);
  8. if (status.ok()) {
  9. cfd_picked->imm()->FlushRequested();
  10. SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull);
  11. MaybeScheduleFlushOrCompaction();
  12. }
  13. }
  14. return status;
  15. }

这个函数的调用是在是在写WAL之前,也就是每次写WAL都会进行这个判断.

  1. Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
  2. bool* need_log_sync,
  3. WriteContext* write_context) {
  4. ..........................................
  5. if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
  6. // Before a new memtable is added in SwitchMemtable(),
  7. // write_buffer_manager_->ShouldFlush() will keep returning true. If another
  8. // thread is writing to another DB with the same write buffer, they may also
  9. // be flushed. We may end up with flushing much more DBs than needed. It's
  10. // suboptimal but still correct.
  11. status = HandleWriteBufferFull(write_context);
  12. }
  13. ........................................
  14. }

可以看到会调用write_buffer的shouldflush来判断是否处理bufferfull.而这个函数很简单,就是判断memtable所使用的内存是否已经超过限制.

  1. // Should only be called from write thread
  2. bool ShouldFlush() const {
  3. if (enabled()) {
  4. if (mutable_memtable_memory_usage() > mutable_limit_) {
  5. return true;
  6. }
  7. if (memory_usage() >= buffer_size_ &&
  8. mutable_memtable_memory_usage() >= buffer_size_ / 2) {
  9. // If the memory exceeds the buffer size, we trigger more aggressive
  10. // flush. But if already more than half memory is being flushed,
  11. // triggering more flush may not help. We will hold it instead.
  12. return true;
  13. }
  14. }
  15. return false;
  16. }

而mutable_limit_和buffer_size_的初始化在这里,这里buffer_size_就是db_write_buffer_size这个可配置的选项.

  1. WriteBufferManager::WriteBufferManager(size_t _buffer_size,
  2. std::shared_ptr<Cache> cache)
  3. : buffer_size_(_buffer_size),
  4. mutable_limit_(buffer_size_ * 7 / 8),

然后我们来看mutable_memtable_memory_usage和memory_usage,这两个函数用来返回整体的write_buffer所使用的内存(memory_used_)以及将要被释放的内存(memory_active_),比如一个memory table被标记为immutable,则表示这块内存将要被释放.

  1. // Only valid if enabled()
  2. size_t memory_usage() const {
  3. return memory_used_.load(std::memory_order_relaxed);
  4. }
  5. size_t mutable_memtable_memory_usage() const {
  6. return memory_active_.load(std::memory_order_relaxed);
  7. }

然后我们来看SwitchWAL,流程和上面的HandleWriteBufferFull基本一致.

  1. Status DBImpl::SwitchWAL(WriteContext* write_context) {
  2. ...............................................
  3. for (auto cfd : *versions_->GetColumnFamilySet()) {
  4. if (cfd->IsDropped()) {
  5. continue;
  6. }
  7. if (cfd->OldestLogToKeep() <= oldest_alive_log) {
  8. status = SwitchMemtable(cfd, write_context);
  9. if (!status.ok()) {
  10. break;
  11. }
  12. cfd->imm()->FlushRequested();
  13. SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager);
  14. }
  15. }
  16. MaybeScheduleFlushOrCompaction();
  17. return status;
  18. }

这个函数被调用比较简单,就是判断是否WAL的大小是否已经超过了设置的wal大小(max_total_wal_size).可以看到它的调用也是在每次写WAL之前.

  1. Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
  2. bool* need_log_sync,
  3. WriteContext* write_context) {
  4. .................................................
  5. if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
  6. total_log_size_ > GetMaxTotalWalSize())) {
  7. status = SwitchWAL(write_context);
  8. }

然后是FlushMemTable,这个函数用来强制刷新刷新memtable到磁盘,比如用户直接调用Flush接口.可以看到和上面的集中情况基本一致,switchmemtable->flushrequested->maybescheduleflushorcompaction.

  1. Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
  2. const FlushOptions& flush_options,
  3. FlushReason flush_reason, bool writes_stopped) {
  4. Status s;
  5. uint64_t flush_memtable_id = 0;
  6. {
  7. .........................................
  8. // SwitchMemtable() will release and reacquire mutex during execution
  9. s = SwitchMemtable(cfd, &context);
  10. flush_memtable_id = cfd->imm()->GetLatestMemTableID();
  11. if (!writes_stopped) {
  12. write_thread_.ExitUnbatched(&w);
  13. }
  14. cfd->imm()->FlushRequested();
  15. // schedule flush
  16. SchedulePendingFlush(cfd, flush_reason);
  17. MaybeScheduleFlushOrCompaction();
  18. }
  19. ...........................
  20. return s;
  21. }

最后我们来看最后一种情况,这种情况和前面三种有一个最大的区别就是前面三种情况的出现都是需要立即调用flush线程来刷新memtable到磁盘,而还有一种情况则是没那么紧急的情况,也就是说可以等到后面某个时间段再调用flush线程来刷新内容到磁盘.

在这种情况下,每一个memtable都会有一个状态叫做flush_state_,而每个memtable都有可能有三种状态.而状态的更新是通过UpdateFlushState来进行的.这里可以推测的到这些都是对于单个memtable的限制.

  1. enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
  2. void MemTable::UpdateFlushState() {
  3. auto state = flush_state_.load(std::memory_order_relaxed);
  4. if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
  5. // ignore CAS failure, because that means somebody else requested
  6. // a flush
  7. flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
  8. std::memory_order_relaxed,
  9. std::memory_order_relaxed);
  10. }
  11. }

而UpdateFlushState什么时候会被调用呢,很简单,就是当你每次操作memtable的时候,比如update/add这些操作.

可以看到当shoudflushnow之后,将会设置flush_state_状态为FLUSH_REQUESTED,也就是此memtable将会被flush.

然后来看shouldflushnow函数,这个函数主要的判断就是判断是否当前MemTable的内存使用是否超过了write_buffer_size,如果超过了,那么就返回true.

  1. bool MemTable::ShouldFlushNow() const {
  2. size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
  3. const double kAllowOverAllocationRatio = 0.6;
  4. // If arena still have room for new block allocation, we can safely say it
  5. // shouldn't flush.
  6. auto allocated_memory = table_->ApproximateMemoryUsage() +
  7. range_del_table_->ApproximateMemoryUsage() +
  8. arena_.MemoryAllocatedBytes();
  9. // if we can still allocate one more block without exceeding the
  10. // over-allocation ratio, then we should not flush.
  11. if (allocated_memory + kArenaBlockSize <
  12. write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
  13. return false;
  14. }
  15. // if user keeps adding entries that exceeds write_buffer_size, we need to
  16. // flush earlier even though we still have much available memory left.
  17. if (allocated_memory >
  18. write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
  19. return true;
  20. }
  21. return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
  22. }

然后我们来看当设置了flush_state_状态之后,会做什么操作.对应的MEmtable有一个ShouldScheduleFlush函数,这个函数用来返回当前的memtable是否已经被设置flush_requested状态位。

  1. bool ShouldScheduleFlush() const {
  2. return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
  3. }

而这个函数会在checkmemtablefull中被调用,这个函数主要用来将已经设置flush_state_为flush_requested的memtable的状态改变为flush_schedule(意思就是已经进入flush的调度队列),然后将这个columnfamily加入到对应的调度队列.

  1. void CheckMemtableFull() {
  2. if (flush_scheduler_ != nullptr) {
  3. auto* cfd = cf_mems_->current();
  4. assert(cfd != nullptr);
  5. if (cfd->mem()->ShouldScheduleFlush() &&
  6. cfd->mem()->MarkFlushScheduled()) {
  7. // MarkFlushScheduled only returns true if we are the one that
  8. // should take action, so no need to dedup further
  9. flush_scheduler_->ScheduleFlush(cfd);
  10. }
  11. }
  12. }

其中MarkFlushScheduled就是用来改变状态.

  1. bool MarkFlushScheduled() {
  2. auto before = FLUSH_REQUESTED;
  3. return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
  4. std::memory_order_relaxed,
  5. std::memory_order_relaxed);
  6. }

而ScheduleFlush则是比较重要的一个函数,就是用来将对应的CF加入到flush调度队列(FlushScheduler).

  1. void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
  2. #ifndef NDEBUG
  3. std::lock_guard<std::mutex> lock(checking_mutex_);
  4. assert(checking_set_.count(cfd) == 0);
  5. checking_set_.insert(cfd);
  6. #endif // NDEBUG
  7. cfd->Ref();
  8. // Suppress false positive clang analyzer warnings.
  9. #ifndef __clang_analyzer__
  10. Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
  11. while (!head_.compare_exchange_strong(
  12. node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
  13. // failing CAS updates the first param, so we are already set for
  14. // retry. TakeNextColumnFamily won't happen until after another
  15. // inter-thread synchronization, so we don't even need release
  16. // semantics for this CAS
  17. }
  18. #endif // __clang_analyzer__
  19. }

而checkmemtablefull会在下面三种条件下被调用

  1. delete操作
  2. put操作
  3. merge操作.

然后我们来看flushscheduler如何来调度flush线程.首先在每次写WAL之前都会调用PreprocessWrite,然后这个函数会判断flush_scheduler是否为空(也就是是否有已经满掉的memtable需要刷新到磁盘).

  1. Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
  2. bool* need_log_sync,
  3. WriteContext* write_context) {
  4. ..................................................................
  5. if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
  6. status = ScheduleFlushes(write_context);
  7. }

而在SscheduleFlushes中,则会遍历之前所有的需要被flush的memtable,然后调用switchMemtable来进行后续操作.这里要注意在SwitchMemtable也会触发调用flush线程.

  1. Status DBImpl::ScheduleFlushes(WriteContext* context) {
  2. ColumnFamilyData* cfd;
  3. while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
  4. auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull);
  5. if (cfd->Unref()) {
  6. delete cfd;
  7. }
  8. if (!status.ok()) {
  9. return status;
  10. }
  11. }
  12. return Status::OK();
  13. }

刷新memtable到sst

在RocksDB中刷新是通过FlushJob这个类来实现的,整个实现还是比较简单.最终这里是调用WriteLevel0Table来刷新内容到磁盘。这里就不分析sst的格式了,需要了解具体格式的可以看RocksDB的wiki.

  1. Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
  2. FileMetaData* file_meta) {
  3. ...........................................
  4. // This will release and re-acquire the mutex.
  5. Status s = WriteLevel0Table();
  6. if (s.ok() &&
  7. (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
  8. s = Status::ShutdownInProgress(
  9. "Database shutdown or Column family drop during flush");
  10. }
  11. if (!s.ok()) {
  12. cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
  13. } else {
  14. TEST_SYNC_POINT("FlushJob::InstallResults");
  15. // Replace immutable memtable with the generated Table
  16. s = cfd_->imm()->InstallMemtableFlushResults(
  17. cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
  18. meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
  19. log_buffer_);
  20. }
  21. ........................................................
  22. }