序列化和写盘

Tokudb数据节点写盘主要是由后台线程异步完成的:

  • checkpoint线程:把cachetable(innodb术语buffer pool)中所有脏页写回
  • evictor线程:释放内存,如果victim节点是dirty的,需要先将数据写回。

数据在磁盘上是序列化过的,序列化的过程就是把一个数据结构转换成字节流。

写数据包括两个阶段:

  • 序列化:把结构化数据转成字节流
  • 压缩:对序列化好的数据进行压缩

tokudb序列化和压缩单位是partition,对于internal节点,就是把msg buffer序列化并压缩;对于leaf节点,就是把basement node序列化并压缩。

一个节点(node)在磁盘上是如何存储的呢? 节点数据在写盘时会被写到某个offset开始的位置,这个offset是从blocktable里面分配的一个空闲的空间。我们后面会专门写一篇有关btt(Block Translation Table)和block table的文章。 一个node的数据包含:header,pivot key和partition三部分:

  • header:节点meta信息
  • pivot key:记录了每个partition的key区间
  • partition:排序数据;一个node如果包含多个partition,这些partition是依次顺序存放的

有趣的是,压缩算法的信息是存放在partition压缩buffer的第一个字节。所以,tokudb支持FT索引内部同时使用多种压缩算法。

反序列化和读盘

Tokudb读盘的过程是在cachetable里通过调用get_and_pin系列函数实现

  • 前景线程调用get_and_pin系列函数
  • cleaner线程调用bring_node_fully_into_memory,这个函数调用pf_callback把不在内存中的那些partition读到内存。

数据从磁盘读到内存之前需要进行解压缩,然后对解压缩好的buffer进行反序列化,转换成内存数据结构。反序列化是使用序列化相反的方法把数据解析出来。

前面提过序列化和压缩的单位是partition,反序列化和解压缩的单位也是partition。

酱,节点数据就可以被FT层访问了。

序列化和压缩过程详解

这里顺便提一下BTT (Block Translation Table),这个表记录了节点(blocknum)在FT文件存储位置(offset)的映射关系。

为什么要引入这个表?Tokudb刷脏时,数据被写到一个新的空闲位置,避免了in-place update,简化recovery过程。

toku_ftnode_flush_callback是调用get_and_pin系列函数提供的flush_callback回调,checkpoint线程(也包含checkpoint thread pool的线程,在checkpoint过程中帮助前景线程做节点数据的回写)或evictor线程在这个函数里面会调用toku_serialize_ftnode_to做序列化和压缩工作。

toku_serialize_ftnode_to比较简单,首先调用toku_serialize_ftnode_to_memory执行序列化和压缩,然后调用blocktable.realloc_on_disk,为blocknum分配一个新的offset,最后调用pwrite把压缩的buffer写到盘上,回写完成清node->dirty标记。

这里单独说一下toku_serialize_ftnode_to_memory的第6个参数in_parallel,true表示并行处理序列化和压缩过程,false表示串行处理。

toku_ftnode_flush_callback通常是在evictor或者checkpoint线程上下文调用的,不影响前景线程服务客户端,这个参数一般是false,只有在loader场景下是true。

  1. toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT ft, bool for_checkpoint) {
  2. size_t n_to_write;
  3. size_t n_uncompressed_bytes;
  4. char *compressed_buf = nullptr;
  5. // because toku_serialize_ftnode_to is only called for
  6. // in toku_ftnode_flush_callback, we pass false
  7. // for in_parallel. The reasoning is that when we write
  8. // nodes to disk via toku_ftnode_flush_callback, we
  9. // assume that it is being done on a non-critical
  10. // background thread (probably for checkpointing), and therefore
  11. // should not hog CPU,
  12. //
  13. // Should the above facts change, we may want to revisit
  14. // passing false for in_parallel here
  15. //
  16. // alternatively, we could have made in_parallel a parameter
  17. // for toku_serialize_ftnode_to, but instead we did this.
  18. int r = toku_serialize_ftnode_to_memory(
  19. node,
  20. ndd,
  21. ft->h->basementnodesize,
  22. ft->h->compression_method,
  23. do_rebalancing,
  24. toku_drd_unsafe_fetch(&toku_serialize_in_parallel),
  25. &n_to_write,
  26. &n_uncompressed_bytes,
  27. &compressed_buf
  28. );
  29. if (r != 0) {
  30. return r;
  31. }
  32. // If the node has never been written, then write the whole buffer, including the zeros
  33. invariant(blocknum.b>=0);
  34. DISKOFF offset;
  35. // Dirties the ft
  36. ft->blocktable.realloc_on_disk(blocknum, n_to_write, &offset,
  37. ft, fd, for_checkpoint);
  38. tokutime_t t0 = toku_time_now();
  39. toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
  40. tokutime_t t1 = toku_time_now();
  41. tokutime_t io_time = t1 - t0;
  42. toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
  43. toku_free(compressed_buf);
  44. node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
  45. return 0;
  46. }

序列化和压缩过程是在toku_serialize_ftnode_to_memory实现,这个函数比较长,我们分成3段来看。

  • partition序列化和压缩
  • pivot key序列化和压缩
  • header序列化

partition序列化和压缩

toku_serialize_ftnode_to_memory的第5个参数do_rebalancing表示leaf节点在写回之前是否要做rebalance,这个参数是在toku_ftnode_flush_callback指定的,如果写回的是数据节点本身,那么是需要做rebalance的。

toku_serialize_ftnode_to_memory首先确保整个数据节点都在内存中,这么做是因为节点的partition数据是依次顺序存放的;然后根据do_rebalancing决定是否要对leaf节点做rebalance;接着是一大段内存分配:

  • sb包含节点partition压缩数据的数组,每个元素包含partition的uncompressed的buffer和compressed的buffer
  • ndd是指针数组,记录了每个partition压缩后数据的offset和size

这里有个小的优化,并没有为每个partition申请compressed的buffer,而是申请了一个足够大的buffer,每个partition使用其中的一段。uncompressed的buffer也是一样处理的。

足够大的buffer是什么意思呢?

  • uncompressed的buffer:各个partition的size总和。
  • compressed的buffer:压缩后的最大可能长度加上8个字节的overhead(每个partition压缩前的size和压缩后的size)

使用不同压缩算法,压缩之后的最大可能长度是不同的。

分配好buffer之后,调用serialize_and_compress_in_parallel或者serialize_and_compress_serially进行序列化和压缩。

  1. int toku_serialize_ftnode_to_memory(FTNODE node,
  2. FTNODE_DISK_DATA* ndd,
  3. unsigned int basementnodesize,
  4. enum toku_compression_method compression_method,
  5. bool do_rebalancing,
  6. bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false
  7. /*out*/ size_t *n_bytes_to_write,
  8. /*out*/ size_t *n_uncompressed_bytes,
  9. /*out*/ char **bytes_to_write)
  10. // Effect: Writes out each child to a separate malloc'd buffer, then compresses
  11. // all of them, and writes the uncompressed header, to bytes_to_write,
  12. // which is malloc'd.
  13. //
  14. // The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed).
  15. // 512-byte padding is for O_DIRECT to work.
  16. {
  17. toku_ftnode_assert_fully_in_memory(node);
  18. if (do_rebalancing && node->height == 0) {
  19. toku_ftnode_leaf_rebalance(node, basementnodesize);
  20. }
  21. const int npartitions = node->n_children;
  22. // Each partition represents a compressed sub block
  23. // For internal nodes, a sub block is a message buffer
  24. // For leaf nodes, a sub block is a basement node
  25. toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
  26. struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
  27. XREALLOC_N(npartitions, *ndd);
  28. //
  29. // First, let's serialize and compress the individual sub blocks
  30. //
  31. // determine how large our serialization and compression buffers need to be.
  32. size_t serialize_buf_size = 0, compression_buf_size = 0;
  33. for (int i = 0; i < node->n_children; i++) {
  34. sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
  35. sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
  36. serialize_buf_size += sb[i].uncompressed_size;
  37. compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
  38. }
  39. // give each sub block a base pointer to enough buffer space for serialization and compression
  40. toku::scoped_malloc serialize_buf(serialize_buf_size);
  41. toku::scoped_malloc compression_buf(compression_buf_size);
  42. for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
  43. sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
  44. sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
  45. uncompressed_offset += sb[i].uncompressed_size;
  46. compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
  47. invariant(uncompressed_offset <= serialize_buf_size);
  48. invariant(compressed_offset <= compression_buf_size);
  49. }
  50. // do the actual serialization now that we have buffer space
  51. struct serialize_times st = { 0, 0 };
  52. if (in_parallel) {
  53. serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
  54. } else {
  55. serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
  56. }

serialize_and_compress_serially就是串行调用serialize_and_compress_partition进行序列化和压缩。

  1. static void
  2. serialize_and_compress_serially(FTNODE node,
  3. int npartitions,
  4. enum toku_compression_method compression_method,
  5. struct sub_block sb[],
  6. struct serialize_times *st) {
  7. for (int i = 0; i < npartitions; i++) {
  8. serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
  9. }
  10. }

serialize_and_compress_in_parallel使用了threadpool来并行执行序列化和压缩,每个partition由一个专门的线程来处理。当前上下文也可以执行序列化和压缩,所以threadpool只创建了(npartitions-1)个线程。

threadpool线程执行的函数也是serialize_and_compress_partition;threadpool线程和当前上下文之间是使用work进行同步的。

  1. static void *
  2. serialize_and_compress_worker(void *arg) {
  3. struct workset *ws = (struct workset *) arg;
  4. while (1) {
  5. struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
  6. if (w == NULL)
  7. break;
  8. int i = w->i;
  9. serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
  10. }
  11. workset_release_ref(ws);
  12. return arg;
  13. }
  14. static void
  15. serialize_and_compress_in_parallel(FTNODE node,
  16. int npartitions,
  17. enum toku_compression_method compression_method,
  18. struct sub_block sb[],
  19. struct serialize_times *st) {
  20. if (npartitions == 1) {
  21. serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
  22. } else {
  23. int T = num_cores;
  24. if (T > npartitions)
  25. T = npartitions;
  26. if (T > 0)
  27. T = T - 1;
  28. struct workset ws;
  29. ZERO_STRUCT(ws);
  30. workset_init(&ws);
  31. struct serialize_compress_work work[npartitions];
  32. workset_lock(&ws);
  33. for (int i = 0; i < npartitions; i++) {
  34. work[i] = (struct serialize_compress_work) { .base = ,
  35. .node = node,
  36. .i = i,
  37. .compression_method = compression_method,
  38. .sb = sb,
  39. .st = { .serialize_time = 0, .compress_time = 0} };
  40. workset_put_locked(&ws, &work[i].base);
  41. }
  42. workset_unlock(&ws);
  43. toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws);
  44. workset_add_ref(&ws, T);
  45. serialize_and_compress_worker(&ws);
  46. workset_join(&ws);
  47. workset_destroy(&ws);
  48. // gather up the statistics from each thread's work item
  49. for (int i = 0; i < npartitions; i++) {
  50. st->serialize_time += work[i].st.serialize_time;
  51. st->compress_time += work[i].st.compress_time;
  52. }
  53. }
  54. }

pivot key序列化和压缩

回到toku_serialize_ftnode_to_memory,序列化partition之后就是序列化pivot key的过程。 sb_node_info存放pivot key压缩数据的信息:

  • uncompressed_ptr和uncompressed_size是未压缩数据的buffer和size
  • compressed_ptr和compressed_size_bound是压缩后数据的buffer和压缩后最大可能的size+8个字节的overhead(未压缩数据size和压缩后数据的size)

前面提到,压缩后的size是由压缩算法决定,不同的压缩算法压缩之后最大可能的size是不同的。

toku_serialize_ftnode_to_memory调用serialize_and_compress_sb_node_info把pivot key信息序列化并压缩。

pivot key的compressed buffer头8个字节分别存储pivot key的compressed size和uncompressed size,从第9个字节开始才是压缩的字节流;而checksum是针对整个compressed buffer做的。

  1. //
  2. // Now lets create a sub-block that has the common node information,
  3. // This does NOT include the header
  4. //
  5. // determine how large our serialization and copmression buffers need to be
  6. struct sub_block sb_node_info;
  7. sub_block_init(&sb_node_info);
  8. size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
  9. size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
  10. toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
  11. toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
  12. sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
  13. sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
  14. sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
  15. sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();
  16. // do the actual serialization now that we have buffer space
  17. serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
  18. //
  19. // At this point, we have compressed each of our pieces into individual sub_blocks,
  20. // we can put the header and all the subblocks into a single buffer and return it.
  21. //
  22. // update the serialize times, ignore the header for simplicity. we captured all
  23. // of the partitions' serialize times so that's probably good enough.
  24. toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);

header序列化

序列化pivot key之后,toku_serialize_ftnode_to_memory计算节点node压缩前size和压缩后的size。 计算方法很简单:partition的size总和 + pivot key的size + header的size + 4个字节的overhead(pivot key的checksum)。

节点node压缩之后的size是为分配压缩后的数据buffer,为了支持direct I/O,分配的buffer和buffer size必须是512对齐的。

分配的buffer size记在n_bytes_to_write返回给调用函数;压缩之后的数据存储在bytes_to_write指向的buffer中。

节点node压缩之前的size,就是为了返回给调用函数,记在n_uncompressed_bytes参数中。

  1. // The total size of the node is:
  2. // size of header + disk size of the n+1 sub_block's created above
  3. uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
  4. + sb_node_info.compressed_size // compressed nodeinfo (without its checksum)
  5. + 4); // nodeinfo's checksum
  6. uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
  7. + sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum)
  8. + 4); // nodeinfo's checksum
  9. // store the BP_SIZESs
  10. for (int i = 0; i < node->n_children; i++) {
  11. uint32_t len = sb[i].compressed_size + 4; // data and checksum
  12. BP_SIZE (*ndd,i) = len;
  13. BP_START(*ndd,i) = total_node_size;
  14. total_node_size += sb[i].compressed_size + 4;
  15. total_uncompressed_size += sb[i].uncompressed_size + 4;
  16. }
  17. // now create the final serialized node
  18. uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
  19. char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
  20. char *curr_ptr = data;

前面提到节点node序列化的过程分为3个阶段:

  • partition序列化和压缩
  • pivot key序列化和压缩
  • header序列化

前2个阶段都讨论过了,header的部分是调用serialize_node_header实现的。

到这里其他部分的序列化和压缩工作都做好了,header的序列化直接在前面分配好的压缩后数据buffer上进行,不需要压缩,也不必分配sub_block数据结构。

header处理完,直接把pivot key的sub_block的compressed_ptr数据和checksum拷贝过来。

pivot key处理完,直接把每个partition的compressed_ptr和checksum依次拷贝过来。

pad的部分写0。

  1. // write the header
  2. struct wbuf wb;
  3. wbuf_init(&wb, curr_ptr, serialize_node_header_size(node));
  4. serialize_node_header(node, *ndd, &wb);
  5. assert(wb.ndone == wb.size);
  6. curr_ptr += serialize_node_header_size(node);
  7. // now write sb_node_info
  8. memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size);
  9. curr_ptr += sb_node_info.compressed_size;
  10. // write the checksum
  11. *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum);
  12. curr_ptr += sizeof(sb_node_info.xsum);
  13. for (int i = 0; i < npartitions; i++) {
  14. memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size);
  15. curr_ptr += sb[i].compressed_size;
  16. // write the checksum
  17. *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum);
  18. curr_ptr += sizeof(sb[i].xsum);
  19. }
  20. // Zero the rest of the buffer
  21. memset(data + total_node_size, 0, total_buffer_size - total_node_size);
  22. assert(curr_ptr - data == total_node_size);
  23. *bytes_to_write = data;
  24. *n_bytes_to_write = total_buffer_size;
  25. *n_uncompressed_bytes = total_uncompressed_size;
  26. invariant(*n_bytes_to_write % 512 == 0);
  27. invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
  28. return 0;
  29. }

假若一个node包含2个partition,它的序列化结构如下所示:

image.png

反序列化和解压缩过程详解

由于tokudb支持partial fetch(只读某几个partition)和partial evict(即把clean节点的部分partition释放掉),反序列化过程相比序列化过程略复杂一些。

fetch callback通过bfe这个hint告诉toku_deserialize_ftnode_from需要读那些partition。

bfe有五种类型:

  • ftnode_fetch_none:只需要读header和pivot key,不需要读任何partition。只用于optimizer计算cost
  • ftnode_fetch_keymatch:只需要读match某个key的partition,ydb层提供的一个接口,一般不用
  • ftnode_fetch_prefetch:prefetch时使用
  • ftnode_fetch_all:需要把所有partition读上来;写节点时使用(msg inject或者msg apply的子节点)
  • ftnode_fetch_subset:需要读若干个partition,FT search路径上使用。

只有在ft search高度>1以上的中间节点时,read_all_partitions会被设置成true,走老的代码路径deserialize_ftnode_from_fd,一次性把所有partition都读到内存中。

其他情况会调用read_ftnode_header_from_fd_into_rbuf_if_small_enough,把节点的header读到内存中,然后反序列化header并设置ndd(每个partition的offset和size);解压缩和反序列化pivot key设置pivot信息;根据bfe读取需要的partition。

节点的header,pivot key和partition都有自己的checksum信息,解析每个部分时都要确认checksum是匹配的。

  1. enum ftnode_fetch_type {
  2. ftnode_fetch_none = 1, // no partitions needed.
  3. ftnode_fetch_subset, // some subset of partitions needed
  4. ftnode_fetch_prefetch, // this is part of a prefetch call
  5. ftnode_fetch_all, // every partition is needed
  6. ftnode_fetch_keymatch, // one child is needed if it holds both keys
  7. };
  8. int
  9. toku_deserialize_ftnode_from (int fd,
  10. BLOCKNUM blocknum,
  11. uint32_t fullhash,
  12. FTNODE *ftnode,
  13. FTNODE_DISK_DATA* ndd,
  14. ftnode_fetch_extra *bfe
  15. )
  16. // Effect: Read a node in. If possible, read just the header.
  17. {
  18. int r = 0;
  19. struct rbuf rb = RBUF_INITIALIZER;
  20. // each function below takes the appropriate io/decompression/deserialize statistics
  21. if (!bfe->read_all_partitions) {
  22. read_ftnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->ft, &rb, bfe);
  23. r = deserialize_ftnode_header_from_rbuf_if_small_enough(ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
  24. } else {
  25. // force us to do it the old way
  26. r = -1;
  27. }
  28. if (r != 0) {
  29. // Something went wrong, go back to doing it the old way.
  30. r = deserialize_ftnode_from_fd(fd, blocknum, fullhash, ftnode, ndd, bfe, NULL);
  31. }
  32. toku_free(rb.buf);
  33. return r;
  34. }

deserialize_ftnode_header_from_rbuf_if_small_enough比较长,基本是toku_serialize_ftnode_to_memory的相反过程。

header部分是不压缩的,直接解析,比较magic number,解析node->n_children和ndd等。

然后比较header的checksum

  1. node->n_children = rbuf_int(rb);
  2. // Guaranteed to be have been able to read up to here. If n_children
  3. // is too big, we may have a problem, so check that we won't overflow
  4. // while reading the partition locations.
  5. unsigned int nhsize;
  6. nhsize = serialize_node_header_size(node); // we can do this because n_children is filled in.
  7. unsigned int needed_size;
  8. needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo.
  9. if (needed_size > rb->size) {
  10. r = toku_db_badformat();
  11. goto cleanup;
  12. }
  13. XMALLOC_N(node->n_children, node->bp);
  14. XMALLOC_N(node->n_children, *ndd);
  15. // read the partition locations
  16. for (int i=0; i<node->n_children; i++) {
  17. BP_START(*ndd,i) = rbuf_int(rb);
  18. BP_SIZE (*ndd,i) = rbuf_int(rb);
  19. }
  20. uint32_t checksum;
  21. checksum = toku_x1764_memory(rb->buf, rb->ndone);
  22. uint32_t stored_checksum;
  23. stored_checksum = rbuf_int(rb);
  24. if (stored_checksum != checksum) {
  25. dump_bad_block(rb->buf, rb->size);
  26. r = TOKUDB_BAD_CHECKSUM;
  27. goto cleanup;
  28. }

接着处理pivot key,比较pivot key部分的checksum,解压缩,反序列化,设置pivot信息。

  1. // Finish reading compressed the sub_block
  2. const void **cp;
  3. cp = (const void **) &sb_node_info.compressed_ptr;
  4. rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
  5. sb_node_info.xsum = rbuf_int(rb);
  6. // let's check the checksum
  7. uint32_t actual_xsum;
  8. actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr-8, 8+sb_node_info.compressed_size);
  9. if (sb_node_info.xsum != actual_xsum) {
  10. r = TOKUDB_BAD_CHECKSUM;
  11. goto cleanup;
  12. }
  13. // Now decompress the subblock
  14. {
  15. toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
  16. sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
  17. tokutime_t decompress_t0 = toku_time_now();
  18. toku_decompress(
  19. (Bytef *) sb_node_info.uncompressed_ptr,
  20. sb_node_info.uncompressed_size,
  21. (Bytef *) sb_node_info.compressed_ptr,
  22. sb_node_info.compressed_size
  23. );
  24. tokutime_t decompress_t1 = toku_time_now();
  25. decompress_time = decompress_t1 - decompress_t0;
  26. // at this point sb->uncompressed_ptr stores the serialized node info.
  27. r = deserialize_ftnode_info(&sb_node_info, node);
  28. if (r != 0) {
  29. goto cleanup;
  30. }
  31. }

最后是根据bfe读取需要的partition,读partition是通过调用pf_callback实现的。

  1. // Now we have the ftnode_info. We have a bunch more stuff in the
  2. // rbuf, so we might be able to store the compressed data for some
  3. // objects.
  4. // We can proceed to deserialize the individual subblocks.
  5. // setup the memory of the partitions
  6. // for partitions being decompressed, create either message buffer or basement node
  7. // for partitions staying compressed, create sub_block
  8. setup_ftnode_partitions(node, bfe, false);
  9. // We must capture deserialize and decompression time before
  10. // the pf_callback, otherwise we would double-count.
  11. t1 = toku_time_now();
  12. deserialize_time = (t1 - t0) - decompress_time;
  13. // do partial fetch if necessary
  14. if (bfe->type != ftnode_fetch_none) {
  15. PAIR_ATTR attr;
  16. r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr, NULL);
  17. if (r != 0) {
  18. goto cleanup;
  19. }
  20. }

deserialize_ftnode_from_fd的部分留给读者自行分析。