概述

在RocksDB中每一次数据的更新都会涉及到两个结构,一个是内存中的memtable(后续会刷新到磁盘成为SST),第二个是WAL(WriteAheadLog)。 本篇文章主要就是来介绍WAL.

WAL主要的功能是当RocksDB异常退出后,能够恢复出错前的内存中(memtable)数据,因此RocksDB默认是每次用户写都会刷新数据到WAL. 每次当当前WAL对应的内存数据(memtable)刷新到磁盘之后,都会新建一个WAL.

所有的WAL文件都是保存在WAL目录(options.wal_dir),为了保证数据的状态,所有的WAL文件的名字都是按照顺序的(log_number).

WAL文件格式

WAL文件由一堆变长的record组成,而每个record是由kBlockSize(32k)来分组,比如某一个record大于kBlockSize的话,他就会被切分为多个record(通过type来判断).

  1. +-----+-------------+--+----+----------+------+-- ... ----+
  2. File | r0 | r1 |P | r2 | r3 | r4 | |
  3. +-----+-------------+--+----+----------+------+-- ... ----+
  4. <--- kBlockSize ------>|<-- kBlockSize ------>|
  5. rn = variable size records
  6. P = Padding

record的格式如下:

  1. +---------+-----------+-----------+--- ... ---+
  2. |CRC (4B) | Size (2B) | Type (1B) | Payload |
  3. +---------+-----------+-----------+--- ... ---+
  4. CRC = 32bit hash computed over the payload using CRC
  5. Size = Length of the payload data
  6. Type = Type of record
  7. (kZeroType, kFullType, kFirstType, kLastType, kMiddleType )
  8. The type is used to group a bunch of records together to represent
  9. blocks that are larger than kBlockSize
  10. Payload = Byte stream as long as specified by the payload size

最后是WAL的payload的格式.

  1. // WriteBatch::rep_ :=
  2. // sequence: fixed64
  3. // count: fixed32
  4. // data: record[count]
  5. // record :=
  6. // kTypeValue varstring varstring
  7. // kTypeDeletion varstring
  8. // kTypeSingleDeletion varstring
  9. // kTypeMerge varstring varstring
  10. // kTypeColumnFamilyValue varint32 varstring varstring
  11. // kTypeColumnFamilyDeletion varint32 varstring varstring
  12. // kTypeColumnFamilySingleDeletion varint32 varstring varstring
  13. // kTypeColumnFamilyMerge varint32 varstring varstring
  14. // kTypeBeginPrepareXID varstring
  15. // kTypeEndPrepareXID
  16. // kTypeCommitXID varstring
  17. // kTypeRollbackXID varstring
  18. // kTypeNoop
  19. // varstring :=
  20. // len: varint32
  21. // data: uint8[len]

上面的格式中可以看到有一个sequence的值,这个值主要用来表示WAL中操作的时序,这里要注意每次sequence的更新是按照WriteBatch来更新的.

  1. Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
  2. log::Writer* log_writer, uint64_t* log_used,
  3. bool need_log_sync, bool need_log_dir_sync,
  4. SequenceNumber sequence) {
  5. Status status;
  6. .........................................
  7. WriteBatchInternal::SetSequence(merged_batch, sequence);

创建WAL

首先是一个新的DB被打开的时候会创建一个WAL;

  1. Status DB::Open(const DBOptions& db_options, const std::string& dbname,
  2. const std::vector<ColumnFamilyDescriptor>& column_families,
  3. std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
  4. ......................................................................
  5. s = impl->Recover(column_families);
  6. if (s.ok()) {
  7. uint64_t new_log_number = impl->versions_->NewFileNumber();
  8. .............................................
  9. s = NewWritableFile(
  10. impl->immutable_db_options_.env,
  11. LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
  12. &lfile, opt_env_options);
  13. ................................................

第二个情况是当一个CF(column family)被刷新到磁盘之后,也会创建新的WAL,这种情况下创建WAL是用过SwitchMemtable函数. 这个函数主要是用来切换memtable,也就是做flush之前的切换(生成新的memtable,然后把老的刷新到磁盘)

  1. Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
  2. ..................................................
  3. {
  4. if (creating_new_log) {
  5. ...............................................
  6. } else {
  7. s = NewWritableFile(
  8. env_, LogFileName(immutable_db_options_.wal_dir, new_log_number),
  9. &lfile, opt_env_opt);
  10. }
  11. .................................
  12. }
  13. ...............................................
  14. return s;
  15. }

通过上面的两个函数我们可以看到每次新建WAL都会有一个new_log_number,这个值就是对应的WAL的文件名前缀,可以看到每次生成新的log_number, 基本都会调用NewFileNumber函数.这里注意如果option设置了recycle_log_file_num的话,是有可能重用老的log_number的。我们先来看下NewFileNumber函数:

  1. uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }

可以看到函数实现很简单,就是每次log_number加一,因此一般来说WAL的文件格式都是类似0000001.LOG这样子.

WAL的清理

WAL的删除只有当包含在此WAL中的所有的数据都已经被持久化为SST之后(也有可能会延迟删除,因为有时候需要master发送transcation Log到slave来回放). 先来看DBImpl::FIndObsoleteFiles函数,这个函数很长,我们只关注对应的WAL部分,这里逻辑很简单,就是遍历所有的WAL,然后找出log_number小于当前min_log_number的文件然后加入到对应的结构(log_delete_files).

  1. if (!alive_log_files_.empty() && !logs_.empty()) {
  2. uint64_t min_log_number = job_context->log_number;
  3. size_t num_alive_log_files = alive_log_files_.size();
  4. // find newly obsoleted log files
  5. while (alive_log_files_.begin()->number < min_log_number) {
  6. auto& earliest = *alive_log_files_.begin();
  7. if (immutable_db_options_.recycle_log_file_num >
  8. log_recycle_files.size()) {
  9. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  10. "adding log %" PRIu64 " to recycle list\n",
  11. earliest.number);
  12. log_recycle_files.push_back(earliest.number);
  13. } else {
  14. job_context->log_delete_files.push_back(earliest.number);
  15. }
  16. .....................................................................
  17. }
  18. while (!logs_.empty() && logs_.front().number < min_log_number) {
  19. auto& log = logs_.front();
  20. if (log.getting_synced) {
  21. log_sync_cv_.Wait();
  22. // logs_ could have changed while we were waiting.
  23. continue;
  24. }
  25. logs_to_free_.push_back(log.ReleaseWriter());
  26. {
  27. InstrumentedMutexLock wl(&log_write_mutex_);
  28. logs_.pop_front();
  29. }
  30. }
  31. // Current log cannot be obsolete.
  32. assert(!logs_.empty());
  33. }

这里可以看到有两个核心的数据结构alive_log_files和logs_,他们的区别就是前一个表示有写入的WAL,而后一个则是包括了所有的WAL(比如open一个DB,而没有写入数据,此时也会生成WAL).

最终删除WAL的操作是在DBImpl::DeleteObsoleteFileImpl这个函数,而WAL删除不会单独触发,而是和temp/sst这类文件一起被删除的(PurgeObsoleteFiles).

查看WAL的工具

我们可以使用RocksDB自带的ldb工具来查看对应的WAL内容

  1. pagefault@god ~/tools/rocksdb/data/.rocksdb $ ../../bin/ldb dump_wal --walfile=./000285.log --header
  2. Sequence,Count,ByteSize,Physical Offset,Key(s)
  3. 1255,1,110,0,PUT(1) : 0x00000006000000000000013C