Loader简介

Loader设计思路是把二元组缓存在内存中,对每个索引在后台计算索引key。等所有的数据插入操作完成后,对每个索引进行排序,最后用排好序的索引key数组建立FT文件。

创建FT的过程:依次创建每个叶子节点(leaf node)和中间节点(internal node)。最后维护元数据,这里包括index descriptor,block allocation table和FT header。

对于数据量很大的索引,因为loader预留的空间有限无法保存所有的key,所以排序的过程可能会有多轮。

每轮把内存的索引key进行merge sort,之后写入临时文件。最后,把所有经过部分排序的临时文件进行多路归并,得到一个有序的索引key数组。

Tokudb loader的主要应用场景:1)外部导入数据,2)同步创建索引。

使用loader有个限制:每个索引必须是空的。所以,loader不适合增量数据插入场景。

以导入数据场景为例:

  • 创建表:(pk和二级索引)
  • 调用db_env->create_loader创建loader
  • 不断调用loader->put()插入数据
  • 中途出错,调用loader->abort()
  • 全部成功,最后调用loader->close()
  • 若loader处理出错,loader使用者需显式drop table删除第一步中创建的表

酱,新的表就可以使用了。

主要数据结构

Note:由于篇幅有限,本文只列出了数据结构重要字段和代码片段。

DB_LOADER

描述了loader提供给Tokudb handler的接口。

  1. typedef struct __toku_loader DB_LOADER;
  2. struct __toku_loader {
  3. struct __toku_loader_internal *i;
  4. int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra);
  5. int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra);
  6. int (*put)(DB_LOADER *loader, DBT *key, DBT* val);
  7. int (*close)(DB_LOADER *loader);
  8. int (*abort)(DB_LOADER *loader);
  9. };

__toku_loader_internal

描述了一些全局信息(如env和事务txn),还有一些callback函数(poll,error)

除此之外,__toku_loader_internal还包含索引的信息:

  • env:全局的db_env
  • txn:创建loader上下文指定的事务
  • N指的是新建索引个数
  • src_db是源索引,一般是NULL
  • dbs是新建索引数组,共N个
  • db_flags是数组,表示每个新建索引的put_flags,pk的put_flags是在set_main_dict_put_flags生成;非pk索引的一般是0
  • dbt_flags也是数组,一般是DB_DBT_REALLOC
  • loader_flags:可能是LOADER_COMPRESS_INTERMEDIATES表示临时文件保存的中间结果需要压缩,一般是0
  • temp_file_template:临时文件的文件名模板
  • err_key:记录loader->put失败的key
  • err_val:记录loader->put失败的value
  • err_i:未使用,本意是希望记录loader->put在写哪个dictionary失败的。loader是pipleline方式实现的,而在loader->put阶段无法确定是在哪个dictionary失败的
  • err_errno:记录loader->put失败的errno
  • inames_in_env:每个新建索引对应的文件名
  1. struct __toku_loader_internal {
  2. DB_ENV *env;
  3. DB_TXN *txn;
  4. FTLOADER ft_loader;
  5. int N;
  6. DB **dbs; /* [N] */
  7. DB *src_db;
  8. uint32_t *db_flags;
  9. uint32_t *dbt_flags;
  10. uint32_t loader_flags;
  11. void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra);
  12. void *error_extra;
  13. int (*poll_func)(void *poll_extra, float progress);
  14. void *poll_extra;
  15. char *temp_file_template;
  16. DBT err_key; /* error key */
  17. DBT err_val; /* error val */
  18. int err_i; /* error i */
  19. int err_errno;
  20. char **inames_in_env; /* [N] inames of new files to be created */
  21. };

ft_loader

Loader内部主要数据结构的定义在ft_loader,用于生成FT文件。这个数据结构比较重要,也比较大,我们分成几个部分来介绍。

  1. struct ft_loader_s {
  2. ...
  3. generate_row_for_put_func generate_row_for_put;
  4. CACHETABLE cachetable;
  5. bool did_reserve_memory;
  6. bool compress_intermediates;
  7. bool allow_puts;
  8. uint64_t reserved_memory;
  9. ...
  10. }
  • generate_row_for_put:为每个索引生成索引key的callback函数
  • cachetable:全局buffer pool指针
  • did_reserve_memory:是否从cachetable里reserve内存
  • compress_intermediates:中间结果是否需要压缩
  • allow_puts:是否接收数据输入;设置成false,表示把directory重定向到空的FT
  • reserved_memory:did_reserve_memory为TRUE,表示从cachetable中reserve了多少内存;did_reserve_memory为FALSE时,使用是512M内存
  1. struct ft_loader_s {
  2. ...
  3. DB *src_db;
  4. int N;
  5. DB **dbs; // N of these
  6. DESCRIPTOR *descriptors; // N of these.
  7. TXNID *root_xids_that_created; // N of these.
  8. const char **new_fnames_in_env; // N of these.
  9. ft_compare_func *bt_compare_funs; // N of these
  10. ...
  11. }
  • N,src_db,dbs跟__toku_loader_internal里相应字段的是一样的
  • descriptors:descriptors[which_db]表示索引的descriptor
  • root_xids_that_created:root_xids_that_created[which_db]表示索引创建时的root txnid
  • new_fnames_in_env:new_fnames_in_env[which_db]表示索引的文件名
  • bt_compare_funs:bt_compare_funs[which_db]表示索引的比较函数
  1. struct ft_loader_s {
  2. ...
  3. uint64_t n_rows;
  4. struct rowset primary_rowset;
  5. struct rowset primary_rowset_temp;
  6. QUEUE primary_rowset_queue;
  7. toku_pthread_t extractor_thread;
  8. bool extractor_live;
  9. struct rowset *rows;// N of these.
  10. uint64_t *extracted_datasizes; // N of these.
  11. DBT *last_key;// N of these.
  12. struct file_infos file_infos;
  13. ...
  14. }
  • n_rows:一共有多少行数据
  • primary_rowset:缓存二元组的数组,loader->put的数据先被缓存到这里
  • primary_rowset_temp:没有被使用,怀疑是legacy code
  • primary_rowset_queue:当primary_rowset占用一定量的内存时,loader->put所在线程会调用函数enqueue_for_extraction,把当前primary_rowset克隆一份,然后挂到primary_rowset_queue队尾。后台extractor线程在不停取这个队列上的rowset,依次进行处理
  • extractor_thread:extractor线程
  • extractor_live:extractor线程是否正在工作
  • rows:rows[which_db]保存索引的key
  • extracted_datasizes:extracted_datasizes[which_db]表示索引缓存了多少数据
  • last_key:没有被使用,怀疑是legacy code
  • file_infos:记录打开文件的信息
  1. struct ft_loader_s {
  2. ...
  3. LSN load_lsn;
  4. TXNID load_root_xid;
  5. struct merge_fileset *fs;// N of these.
  6. QUEUE *fractal_queues; // N of these.
  7. toku_pthread_t *fractal_threads;// N of these.
  8. bool *fractal_threads_live; // N of these.
  9. unsigned fractal_workers;
  10. ...
  11. };
  • load_lsn:第一个loader->put之前的lsn位置
  • load_root_xid:创建ft_loader时,局部事务loader_txn的txnid
  • fs:fs[which_db]表示索引的merge信息
  • fractal_queues:fractal_queues[which_db]表示索引的fractal线程使用的队列
  • fractal_threads:fractal_threads[which_db]表示索引的fractal线程
  • fractal_threads_live:fractal_threads_live[which_db]索引的fractal线程是否正在运行
  • fractal_workers:是否有fractal线程正在构建索引FT,因为merge和构建FT的过程可能会并发,这个字段主要用来计算merge阶段可用的内存大小

Loader主要流程

Loader处理过程大致分为两个阶段:extractor阶段和merge阶段。

Extractor阶段创建extractor后台线程生成索引的key,并存储到rows[which_db]中;merge阶段创建fractal后台线程生成FT文件。

Extractor阶段,loader使用者不停地调用loader->put插入数据,当primary_rowset里面缓存的数据超过一定量后,它会把primary_rowset克隆一份挂到primary_rowset_queue尾部;后台的extractor线程不停地从primary_rowset_queue读取rowset,并对rowset中的每行row计算索引key,然后把生成的索引key缓存到其对应的rowset(rows[which_db])中。如果rows[which_db]中缓存的数据超过一定量后,需要对属于当前primary_rowset的索引key排序并写到临时文件中。每个临时文件中存储的是部分排序的key的有序序列。

Loader插入数据过程结束,每个索引需要对所有的key进行排序。rows[which_db]的空间有限,前面排好序的key可能已经写入临时文件中。

此时,primary_rowset可能还有数据需要extractor线程处理,那么需要把primary_rowset挂到primary_rowset_queue队尾,最后把primary_rowset_queue队列设置成EOF,等待evictor线程处理完退出。

当evictor线程结束后,loader会为每个索引创建fractal_threads[which_db]线程,在loader当前上下文把索引的所有临时文件中的数据按照key升序的方式排序,然后把排好序的key发送给fractal_queues[which_db], 由fractal线程创建FT文件。

创建loader

Loader使用者调用db_env->create_loader创建loader数据结构并进行初始化,也就是初始化前面我们介绍的这些字段。

真正执行创建loader的函数是toku_loader_create_loader。

在外部数据导入和同步创建索引的场景下,loader_flags中并没有设置DB_PRELOCKED_WRITE标记,需要调用toku_db_pre_acquire_table_lock获取区间整个表区间的range锁。

前面提过,loader只适用空表/空索引的情况,因为在loader->close的阶段会创建一个新的FT文件,并在tokudb.directory里修改索引和索引文件的映射关系。整个过程都执行成功的情况下,老的FT文件在关闭时被删除。所以,在创建loader的过程一般都需要确认db是空的。

在locked_load_inames对每个索引生成新的FT文件名,并修改tokudb.directory里面的映射关系,最后对每个FT索引生成loader相关的undo和redo log。这些操作是在loader_txn保护下进行的。

  1. int
  2. toku_loader_create_loader(DB_ENV *env,
  3. DB_TXN *txn,
  4. DB_LOADER **blp,
  5. DB *src_db,
  6. int N,
  7. DB *dbs[],
  8. uint32_t db_flags[/*N*/],
  9. uint32_t dbt_flags[/*N*/],
  10. uint32_t loader_flags,
  11. bool check_empty) {
  12. // lock tables and check empty
  13. for(int i=0;i<N;i++) {
  14. if (!(loader_flags&DB_PRELOCKED_WRITE)) {
  15. rval = toku_db_pre_acquire_table_lock(dbs[i], txn);
  16. if (rval!=0) {
  17. goto create_exit;
  18. }
  19. }
  20. if (check_empty) {
  21. bool empty = toku_ft_is_empty_fast(dbs[i]->i->ft_handle);
  22. if (!empty) {
  23. rval = ENOTEMPTY;
  24. goto create_exit;
  25. }
  26. }
  27. }
  28. {
  29. if (env->i->open_flags & DB_INIT_TXN) {
  30. rval = env->txn_begin(env, txn, &loader_txn, 0);
  31. if (rval) {
  32. goto create_exit;
  33. }
  34. }
  35. ft_compare_func compare_functions[N];
  36. for (int i=0; i<N; i++) {
  37. compare_functions[i] = env->i->bt_compare;
  38. }
  39. // time to open the big kahuna
  40. char **XMALLOC_N(N, new_inames_in_env);
  41. for (int i = 0; i < N; i++) {
  42. new_inames_in_env[i] = nullptr;
  43. }
  44. FT_HANDLE *XMALLOC_N(N, fts);
  45. for (int i=0; i<N; i++) {
  46. fts[i] = dbs[i]->i->ft_handle;
  47. }
  48. LSN load_lsn;
  49. rval = locked_load_inames(env, loader_txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed);
  50. if ( rval!=0 ) {
  51. }
  52. TOKUTXN ttxn = loader_txn ? db_txn_struct_i(loader_txn)->tokutxn : NULL;
  53. rval = toku_ft_loader_open(&loader->i->ft_loader,
  54. env->i->cachetable,
  55. env->i->generate_row_for_put,
  56. src_db,
  57. N,
  58. fts,
  59. dbs,
  60. (const char **)new_inames_in_env,
  61. compare_functions,
  62. loader->i->temp_file_template,
  63. load_lsn,
  64. ttxn,
  65. puts_allowed,
  66. env->get_loader_memory_size(env),
  67. compress_intermediates,
  68. puts_allowed);
  69. if ( rval!=0 ) {
  70. }
  71. loader->i->inames_in_env = new_inames_in_env;
  72. toku_free(fts);
  73. if (!puts_allowed) {
  74. rval = ft_loader_close_and_redirect(loader);
  75. assert_zero(rval);
  76. loader->i->ft_loader = NULL;
  77. // close the ft_loader and skip to the redirection
  78. rval = 0;
  79. }
  80. rval = loader_txn->commit(loader_txn, 0);
  81. assert_zero(rval);
  82. loader_txn = nullptr;
  83. rval = 0;
  84. }
  85. *blp = loader;
  86. create_exit:
  87. if (loader_txn) {
  88. int r = loader_txn->abort(loader_txn);
  89. assert_zero(r);
  90. loader_txn = nullptr;
  91. }
  92. if (rval == 0) {
  93. }
  94. else {
  95. free_loader(loader);
  96. }
  97. return rval;
  98. }

一般使用loader的场景都需要往loader里面插入数据,所以allow_puts一般是TRUE。初始化结束后,需要创建extractor线程。

Extractor线程前面有提到过,是从primary_rowset_queue不断地拉rowset,对rowset里面的每行数据生成索引的二元组,并把这些二元组缓存到rows[which_db]里面。当rows[which_db]缓存的数据量积累到一定程度会进行排序,然后把排序好的写到临时文件中。

顺便说下,除了pk和clustering index以外,二级索引的value都是NULL。

toku_ft_loader_internal_init负责初始化数据,这里主要是初始化rowset(索引的rowset:rows[which_db]和接收输入的primary_rowset),创建primary_rowset_queue队列。这部分代码比较直观,读者可以自行分析。

函数toku_ft_loader_internal_init注册了3个重要的callback函数loader->put,loader->close和loader->abort分别处理数据插入,loader关闭和loader异常退出。

  1. int toku_ft_loader_open (FTLOADER *blp, /* out */
  2. CACHETABLE cachetable,
  3. generate_row_for_put_func g,
  4. DB *src_db,
  5. int N, FT_HANDLE fts[/*N*/],
  6. DB* dbs[/*N*/],
  7. const char *new_fnames_in_env[/*N*/],
  8. ft_compare_func bt_compare_functions[/*N*/],
  9. const char *temp_file_template,
  10. LSN load_lsn,
  11. TOKUTXN txn,
  12. bool reserve_memory,
  13. uint64_t reserve_memory_size,
  14. bool compress_intermediates,
  15. bool allow_puts) {
  16. int result = 0;
  17. {
  18. int r = toku_ft_loader_internal_init(blp, cachetable, g,
  19. src_db,
  20. N, fts, dbs,
  21. new_fnames_in_env,
  22. bt_compare_functions,
  23. temp_file_template,
  24. load_lsn,
  25. txn,
  26. reserve_memory,
  27. reserve_memory_size,
  28. compress_intermediates,
  29. allow_puts);
  30. if (r!=0) result = r;
  31. }
  32. if (result==0 && allow_puts) {
  33. FTLOADER bl = *blp;
  34. int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
  35. if (r==0) {
  36. bl->extractor_live = true;
  37. } else {
  38. result = r;
  39. (void) toku_ft_loader_internal_destroy(bl, true);
  40. }
  41. }
  42. return result;
  43. }

值得一提的是memory_per_rowset_during_extract函数。在extractor阶段rowset的大小是函数memory_per_rowset_during_extract计算出来的。

这个阶段分配的内存主要包括两个部分:

  • rowset:primary row + rows(N个) + primary_rowset_queue队列的长度 + 挂到primary_rowset_queue分配的rowset + 存放索引排序结果的rowset(N个)
  • 临时文件写缓冲区(N个):大小16M
  1. static uint64_t memory_per_rowset_during_extract (FTLOADER bl)
  2. // Return how much memory can be allocated for each rowset.
  3. {
  4. if (size_factor==1) {
  5. return 16*1024;
  6. } else {
  7. // There is a primary rowset being maintained by the foreground thread.
  8. // There could be two more in the queue.
  9. // There is one rowset for each index (bl->N) being filled in.
  10. // Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
  11. int n_copies = (1 // primary rowset
  12. +EXTRACTOR_QUEUE_DEPTH // the number of primaries in the queue
  13. +bl->N // the N rowsets being constructed by the extractor thread.
  14. +bl->N // the N sort buffers
  15. +1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill.
  16. );
  17. int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE; // for each index we are writing to a file at any given time.
  18. int64_t tentative_rowset_size = ((int64_t)(bl->reserved_memory - extra_reserved_memory))/(n_copies);
  19. return MAX(tentative_rowset_size, (int64_t)MIN_ROWSET_MEMORY);
  20. }
  21. }

loader->put

Loader的处理过程分为两个阶段:extract阶段和merge阶段。

Extract阶段是从extractor线程命名而来,extractor线程不断地从primary_rowset_queue拉rowset,对rowset里面的row计算索引key并保存到loader->rows[which_db]里面。

Merge阶段是数据插入结束,显式调用loader->close方法。Loader对每个db,把所有部分排序的数据进行归并得到一个按索引key升序的二元组序列,最后根据这个序列生成FT文件。

Loader->put的处理过程就是extract阶段。

  • 前景线程(创建loader上下文所在线程)不断的接收数据,并把数据缓存到primary_rowset,当其缓存的数据量大于primary_rowset->memory_budget时,会把primary_rowset克隆一份挂到primary_rowset_queue队尾并重新初始化primary_rowset,前景线程就可以继续接收数据了
  • 背景线程(extractor)不断从primary_rowset_queue拉rowset,并计算索引key和value。当其缓存的数据量大于primary_rowset->memory_budget时,对索引的二元组按照key升序方式进行merge-sort并把排好序的序列转存到临时文件中

在extractor阶段,只有一个背景线程(extractor线程),连接前景线程和extractor线程的是primary_rowset_queue。

screenshot.png

Loader的使用者调用loader->put,其实是调用函数toku_loader_put,这个函数是在loader创建阶段注册的put方法的callback函数。

这个函数首先需要检查LOADER_DISALLOW_PUTS是否设置,这个标记设置表示只使用loader来重定向director,而不是真正插入数据。所以,检查到这个标记被设置就直接退出了。

一般使用loader的用法都没有设置LOADER_DISALLOW_PUTS标记。

插入数据是在函数toku_ft_loader_put处理的。

  1. int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
  2. {
  3. int r = 0;
  4. int i = 0;
  5. if (loader->i->loader_flags & LOADER_DISALLOW_PUTS) {
  6. r = EINVAL;
  7. goto cleanup;
  8. }
  9. else {
  10. r = toku_ft_loader_put(loader->i->ft_loader, key, val);
  11. }
  12. if ( r != 0 ) {
  13. }
  14. cleanup:
  15. return r;
  16. }

在toku_ft_loader_put中,重复检查是否设置了LOADER_DISALLOW_PUTS,如果设置了返回EINVAL并退出。

n_rows表示loader一共接收了多少行数据,这是个全局的counter。

toku_ft_loader_put是loader_do_put简单封装,真正处理put的在loader_do_put。

  1. int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val)
  2. /* Effect: Put a key-value pair into the ft loader. Called by DB_LOADER->put().
  3. * Return value: 0 on success, an error number otherwise.
  4. */
  5. {
  6. if (!bl->allow_puts || ft_loader_get_error(&bl->error_callback))
  7. return EINVAL; // previous panic
  8. bl->n_rows++;
  9. return loader_do_put(bl, key, val);
  10. }

在loader_do_put,首先把插入到primary_rowset。

rowset包括两部分:

  • rows:是行数据的描述符
  • data:是存储数据的缓冲区

每个二元组是按照到达的顺序存储到primary_rowset->data指向的缓冲区中,先key后value连续存储;row.off记录了这行数据在rows.data中的偏移位置;row.klen和row.vlen分别表示key/value的长度。

rows和data都是动态数组,随着数据量增大可以动态扩展内存。

  1. struct row {
  2. size_t off; // the offset in the data array.
  3. int klen,vlen;
  4. };
  5. struct rowset {
  6. uint64_t memory_budget;
  7. size_t n_rows, n_rows_limit;
  8. struct row *rows;
  9. size_t n_bytes, n_bytes_limit;
  10. char *data;
  11. };

数据插入到primary_rowset,如果primary_rowset中缓存的数据量(rows+data)大于memory_budget,意味着primary_rowset满了,loader前景线程会把primary_rowset克隆一份挂到primary_rowset_queue尾部,并重新初始化primary_rowset。

Extract阶段,rowset的memory_budget是memory_per_rowset_during_extract计算的。前面有介绍过,这里不再赘述。

  1. static int row_wont_fit (struct rowset *rows, size_t size)
  2. /* Effect: Return nonzero if adding a row of size SIZE would be too big (bigger than the buffer limit) */
  3. {
  4. // Account for the memory used by the data and also the row structures.
  5. size_t memory_in_use = (rows->n_rows*sizeof(struct row)
  6. + rows->n_bytes);
  7. return (rows->memory_budget < memory_in_use + size);
  8. }
  9. static int loader_do_put(FTLOADER bl,
  10. DBT *pkey,
  11. DBT *pval)
  12. {
  13. int result;
  14. result = add_row(&bl->primary_rowset, pkey, pval);
  15. if (result == 0 && row_wont_fit(&bl->primary_rowset, 0)) {
  16. // queue the rows for further processing by the extractor thread
  17. enqueue_for_extraction(bl);
  18. {
  19. int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
  20. // bl->primary_rowset will get destroyed by toku_ft_loader_abort
  21. if (r != 0)
  22. result = r;
  23. }
  24. }
  25. return result;
  26. }
  27. static void enqueue_for_extraction (FTLOADER bl) {
  28. struct rowset *XMALLOC(enqueue_me);
  29. *enqueue_me = bl->primary_rowset;
  30. zero_rowset(&bl->primary_rowset);
  31. int r = toku_queue_enq(bl->primary_rowset_queue, (void*)enqueue_me, 1, NULL);
  32. resource_assert_zero(r);
  33. }

呵呵,到这里extract的前景线程就处理完了,可以等待新的数据到来。

下面,我们一起看下extract阶段的背景线程。这个线程是在函数toku_ft_loader_open创建的。Extractor线程不断从primary_rowset_queue拉rowset,对每个rowset调用process_primary_rows进行处理。如果插入数据结束,用户会调用loader->close往primary_rowset_queue挂一个EOF,让extractor线程结束。

  1. static void* extractor_thread (void *blv) {
  2. FTLOADER bl = (FTLOADER)blv;
  3. int r = 0;
  4. while (1) {
  5. void *item;
  6. {
  7. int rq = toku_queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
  8. if (rq==EOF) break;
  9. invariant(rq==0); // other errors are arbitrarily bad.
  10. }
  11. struct rowset *primary_rowset = (struct rowset *)item;
  12. {
  13. r = process_primary_rows(bl, primary_rowset);
  14. if (r)
  15. ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
  16. }
  17. }
  18. if (r == 0) {
  19. r = finish_primary_rows(bl);
  20. if (r)
  21. ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
  22. }
  23. return NULL;
  24. }

函数process_primary_rows是process_primary_rows_internal的简单封装。

函数process_primary_rows_internal有两个循环:外层循环是依次处理每个索引的db,里层循环是为extractor线程当前的primary_rowset的每行row生成索引的key和value。

bl->generate_row_for_put是为索引生成key和value的函数,基于二元组为索引bl->dbs[which_db]生成索引key和value。

生成的索引key和value会被缓存到索引的rowset里面,即bl->rows[which_db];bl->rows[which_db]满时,会对这个索引的rowset按照key升序的顺序排序并转存到临时文件中。

  1. static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_rowset)
  2. {
  3. int error_count = 0;
  4. int *XMALLOC_N(bl->N, error_codes);
  5. for (int i = 0; i < bl->N; i++) {
  6. error_codes[i] = 0;
  7. struct rowset *rows = &(bl->rows[i]);
  8. struct merge_fileset *fs = &(bl->fs[i]);
  9. ft_compare_func compare = bl->bt_compare_funs[i];
  10. for (size_t prownum=0; prownum<primary_rowset->n_rows; prownum++) {
  11. if (error_count) break;
  12. struct row *prow = &primary_rowset->rows[prownum];
  13. DBT pkeypval
  14. pkey.data = primary_rowset->data + prow->off;
  15. pkey.size = prow->klen;
  16. pval.data = primary_rowset->data + prow->off + prow->klen;
  17. pval.size = prow->vlen;
  18. DBT_ARRAY key_array;
  19. DBT_ARRAY val_array;
  20. if (bl->dbs[i] != bl->src_db) {
  21. int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &dest_keys, &dest_vals, &pkey, &pval);
  22. if (r != 0) {
  23. error_codes[i] = r;
  24. inc_error_count();
  25. break;
  26. }
  27. paranoid_invariant(dest_keys.size <= dest_keys.capacity);
  28. paranoid_invariant(dest_vals.size <= dest_vals.capacity);
  29. paranoid_invariant(dest_keys.size == dest_vals.size);
  30. key_array = dest_keys;
  31. val_array = dest_vals;
  32. } else {
  33. key_array.size = key_array.capacity = 1;
  34. key_array.dbts = &pkey;
  35. val_array.size = val_array.capacity = 1;
  36. val_array.dbts = &pval;
  37. }
  38. for (uint32_t row = 0; row < key_array.size; row++) {
  39. DBT *dest_key = &key_array.dbts[row];
  40. DBT *dest_val = &val_array.dbts[row];
  41. bl->extracted_datasizes[i] += ft_loader_leafentry_size(dest_key->size, dest_val->size, leafentry_xid(bl, i));
  42. if (row_wont_fit(rows, dest_key->size + dest_val->size)) {
  43. int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare);
  44. init_rowset(rows, memory_per_rowset_during_extract(bl));
  45. if (r != 0) {
  46. error_codes[i] = r;
  47. inc_error_count();
  48. break;
  49. }
  50. }
  51. int r = add_row(rows, dest_key, dest_val);
  52. if (r != 0) {
  53. break;
  54. }
  55. }
  56. }
  57. }
  58. toku_dbt_array_destroy(&dest_keys);
  59. toku_dbt_array_destroy(&dest_vals);
  60. destroy_rowset(primary_rowset);
  61. toku_free(primary_rowset);
  62. int r = 0;
  63. if (error_count > 0) {
  64. }
  65. toku_free(error_codes);
  66. return r;
  67. }

排序结束,就把索引在当前rowset里面的二元组写入到临时文件中,每个临时文件里的数据都是按照key有序的。为了尽量合并这些部分排序的中间结果,loader为每个临时文件维护了prev_key,里面存储了当前打开临时文件的最大key。

如果当前rowset的所有key都比prev_key大,那么就可以把当前结果合并到正在打开的临时文件中,不必另起一个新文件,此时需要更新prev_key。

如果不能合并,就需要关闭当前临时文件,另起一个新的临时文件写入,并重新维护prev_key。

每个索引都为临时文件配置了16M的缓冲区。开启一个新的临时文件是在函数extend_fileset里面实现的,把一个预先分配好的16M缓冲区attach到那个文件。

  1. int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare)
  2. {
  3. int result;
  4. if (rows.n_rows == 0) {
  5. result = 0;
  6. } else {
  7. result = sort_rows(&rows, which_db, dest_db, compare, bl);
  8. if (result == 0) {
  9. DBT min_rowset_key = make_dbt(rows.data+rows.rows[0].off, rows.rows[0].klen);
  10. if (fs->have_sorted_output && compare(dest_db, &fs->prev_key, &min_rowset_key) < 0) {
  11. // write everything to the same output if the max key in the temp file (prev_key) is < min of the sorted rowset
  12. result = write_rowset_to_file(bl, fs->sorted_output, rows);
  13. if (result == 0) {
  14. // set the max key in the temp file to the max key in the sorted rowset
  15. result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
  16. }
  17. } else {
  18. // write the sorted rowset into a new temp file
  19. if (fs->have_sorted_output) {
  20. fs->have_sorted_output = false;
  21. result = ft_loader_fi_close(&bl->file_infos, fs->sorted_output, true);
  22. }
  23. if (result == 0) {
  24. FIDX sfile = FIDX_NULL;
  25. result = extend_fileset(bl, fs, &sfile);
  26. if (result == 0) {
  27. result = write_rowset_to_file(bl, sfile, rows);
  28. if (result == 0) {
  29. fs->have_sorted_output = true;
  30. fs->sorted_output = sfile;
  31. // set the max key in the temp file to the max key in the sorted rowset
  32. result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
  33. }
  34. }
  35. }
  36. }
  37. }
  38. }
  39. destroy_rowset(&rows);
  40. return result;
  41. }

呵呵,数据插入结束,loader会调用toku_queue_eof往primary_rowset_queue挂一个EOF通知evictor线程结束。

loader->close

我们一起回顾一下批量数据导入的应用场景,数据插入结束后,load的使用者调用loader->close()。

函数toku_loader_close是toku_ft_loader_internal_init注册的loader->close方法。

如果loader->put阶段处理出错,toku_loader_close会根据LOADER_DISALLOW_PUTS标记是否被设置做不同的处理;如果LOADER_DISALLOW_PUTS没有被设置,它会直接调用redirect_loader_to_empty_dictionaries做隐式abort,把每个索引redirect到空的FT上。如果LOADER_DISALLOW_PUTS被设置直接返回错误。

如果loader->put阶段处理成功,但是toku_loader_close内部执行出错,它会直接调用函数redirect_loader_to_empty_dictionaries做隐式abort,把每个索引redirect到空的FT上。

如果之前的处理都正常,toku_loader_close只处理LOADER_DISALLOW_PUTS没有被设置的情况,它会调用ft_loader_close_and_redirect进入merge阶段的处理。

  1. int toku_loader_close(DB_LOADER *loader)
  2. {
  3. if ( loader->i->err_errno != 0 ) {
  4. if ( loader->i->error_callback != NULL ) {
  5. loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
  6. }
  7. if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
  8. r = toku_ft_loader_abort(loader->i->ft_loader, true);
  9. redirect_loader_to_empty_dictionaries(loader);
  10. }
  11. else {
  12. r = loader->i->err_errno;
  13. }
  14. }
  15. else { // no error outstanding
  16. if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
  17. r = ft_loader_close_and_redirect(loader);
  18. if (r) {
  19. redirect_loader_to_empty_dictionaries(loader);
  20. }
  21. }
  22. }
  23. free_loader(loader);
  24. return r;
  25. }

函数ft_loader_close_and_redirect调用toku_ft_loader_close把每个索引的二元组按照key升序的顺序排序并生成FT文件。然后,调用toku_dictionary_redirect把每个索引redirect到刚刚生成的FT上。

  1. static int ft_loader_close_and_redirect(DB_LOADER *loader) {
  2. int r;
  3. // use the bulk loader
  4. // in case you've been looking - here is where the real work is done!
  5. r = toku_ft_loader_close(loader->i->ft_loader, loader->i->error_callback,
  6. loader->i->error_extra, loader->i->poll_func, loader->i->poll_extra);
  7. if ( r==0 ) {
  8. for (int i=0; i<loader->i->N; i++) {
  9. toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect.
  10. r = toku_dictionary_redirect(loader->i->inames_in_env[i],
  11. loader->i->dbs[i]->i->ft_handle,
  12. db_txn_struct_i(loader->i->txn)->tokutxn);
  13. toku_multi_operation_client_unlock();
  14. if ( r!=0 ) break;
  15. }
  16. }
  17. return r;
  18. }

函数toku_ft_loader_close负责结束loader的extractor阶段,把之前缓存的rowset挂到primary_rowset_queue,并在最后往primary_rowset_queue挂一个EOF,通知extractor线程停止。

如果设置了LOADER_DISALLOW_PUTS,这个loader不接收任何数据输入,一般是用作特殊作用,即把每个索引redirect到空的FT上。

bl->extractor_live为FALSE表示extractor线程没有被启动,回顾一下函数toku_ft_loader_open的处理,只有在toku_ft_loader_internal_init返回成功并且没有设置LOADER_DISALLOW_PUTS的情况下,才会启动extractor线程,并把extractor_live设置为TRUE。

所以,bl->extractor_live为FALSE的条件在大部分情况下都是不成立的,笔者猜测这是为了防止loader的使用者在创建loader出错的情况下错误地使用loader->close方式释放loader。

处理merge的重头戏在函数toku_ft_loader_close_internal里,为每个索引创建fractal线程和相应的queue。

  1. int toku_ft_loader_close (FTLOADER bl, ft_loader_error_func error_function, void *error_extra,
  2. ft_loader_poll_func poll_function, void *poll_extra)
  3. {
  4. int result = 0;
  5. int r;
  6. ft_loader_set_error_function(&bl->error_callback, error_function, error_extra);
  7. ft_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra);
  8. if (bl->extractor_live) {
  9. r = finish_extractor(bl);
  10. if (r)
  11. result = r;
  12. invariant(!bl->extractor_live);
  13. } else {
  14. r = finish_primary_rows(bl);
  15. if (r)
  16. result = r;
  17. }
  18. // check for an error during extraction
  19. if (result == 0) {
  20. r = ft_loader_call_error_function(&bl->error_callback);
  21. if (r)
  22. result = r;
  23. }
  24. if (result == 0) {
  25. r = toku_ft_loader_close_internal(bl);
  26. if (r && result == 0)
  27. result = r;
  28. } else
  29. toku_ft_loader_internal_destroy(bl, true);
  30. return result;
  31. }

toku_ft_loader_close_internal为每个索引调用loader_do_i进行处理。这个函数还为每个索引创建fractal_queues[which_db]接收排好序的索引二元组序列。

在extractor阶段部分排序的结果存储在临时文件中,bl->fs[which_db]中记录临时文件的数据结构。

Extractor阶段结束后,所有索引的二元组应该都被处理过了,bl->rows[which_db].rows一定是NULL。

函数loader_do_i会根据dest_db原有的设置来配置新的FT,包括:node size,basement size,compression method和fanout。

一般情况下都没有设置LOADER_DISALLOW_PUTS标记,loader会为当前索引创建fractal线程,并调用merge_files函数把所有部分排序的结果进行归并产生按照索引key有序的序列并转发到对应的fractal_queues[which_db]上,fractal线程根据这些二元组产生新的FT文件。

若设置LOADER_DISALLOW_PUTS,loader会调用toku_loader_write_ft_from_q产生一个空的FT。

如果fractal线程在生成新FT过程中出错,fractal线程退出后bl->rows[which_db]可能还有未被处理过的索引二元组。所以,loader_do_i在退出前需要释放rows->data和rows->rows。

在merge阶段中:

  • 前景线程:等待extractor线程把部分排序并转储临时文件的工作结束。循环处理每个索引,对这个索引的所有临时文件(部分排序结果)进行归并,生成一个全局按key有序的二元组序列。并把这个二元组序列转发给fractal_queues[which_db]
  • 背景线程(fractal):fractal[which_db]不断从fractal_queues[which_db]去拉排好序的二元组序列,根据这个序列生成最终的FT文件。

在merge阶段有个N个背景线程,连接前景线程(创建loader的上下文)和背景线程 fractal[which_db]的是fractal_queues[which_db]。

screenshot.png

  1. static int loader_do_i (FTLOADER bl,
  2. int which_db,
  3. DB *dest_db,
  4. ft_compare_func compare,
  5. const DESCRIPTOR descriptor,
  6. const char *new_fname,
  7. int progress_allocation // how much progress do I need to add into bl->progress by the end..
  8. )
  9. /* Effect: Handle the file creating for one particular DB in the bulk loader. */
  10. /* Requires: The data is fully extracted, so we can do merges out of files and write the ft file. */
  11. {
  12. struct merge_fileset *fs = &(bl->fs[which_db]);
  13. struct rowset *rows = &(bl->rows[which_db]);
  14. invariant(rows->data==NULL); // the rows should be all cleaned up already
  15. int r = toku_queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
  16. if (r) goto error;
  17. {
  18. mode_t mode = S_IRUSR+S_IWUSR + S_IRGRP+S_IWGRP;
  19. int fd = toku_os_open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode); // #2621
  20. if (fd < 0) {
  21. r = get_error_errno(); goto error;
  22. }
  23. uint32_t target_nodesize, target_basementnodesize, target_fanout;
  24. enum toku_compression_method target_compression_method;
  25. r = dest_db->get_pagesize(dest_db, &target_nodesize);
  26. invariant_zero(r);
  27. r = dest_db->get_readpagesize(dest_db, &target_basementnodesize);
  28. invariant_zero(r);
  29. r = dest_db->get_compression_method(dest_db, &target_compression_method);
  30. invariant_zero(r);
  31. r = dest_db->get_fanout(dest_db, &target_fanout);
  32. invariant_zero(r);
  33. if (bl->allow_puts) {
  34. // a better allocation would be to figure out roughly how many merge passes we'll need.
  35. int allocation_for_merge = (2*progress_allocation)/3;
  36. progress_allocation -= allocation_for_merge;
  37. // This structure must stay live until the join below.
  38. struct fractal_thread_args fta = {
  39. bl,
  40. descriptor,
  41. fd,
  42. progress_allocation,
  43. bl->fractal_queues[which_db],
  44. bl->extracted_datasizes[which_db],
  45. 0,
  46. which_db,
  47. target_nodesize,
  48. target_basementnodesize,
  49. target_compression_method,
  50. target_fanout
  51. };
  52. r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
  53. if (r) {
  54. int r2 __attribute__((__unused__)) = toku_queue_destroy(bl->fractal_queues[which_db]);
  55. // ignore r2, since we already have an error
  56. bl->fractal_queues[which_db] = nullptr;
  57. goto error;
  58. }
  59. invariant(bl->fractal_threads_live[which_db]==false);
  60. bl->fractal_threads_live[which_db] = true;
  61. r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
  62. {
  63. void *toku_pthread_retval;
  64. int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
  65. invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug put that struct into a C block statement.
  66. resource_assert_zero(r2);
  67. invariant(toku_pthread_retval==NULL);
  68. invariant(bl->fractal_threads_live[which_db]);
  69. bl->fractal_threads_live[which_db] = false;
  70. if (r == 0) r = fta.errno_result;
  71. }
  72. } else {
  73. toku_queue_eof(bl->fractal_queues[which_db]);
  74. r = toku_loader_write_ft_from_q(bl, descriptor, fd, progress_allocation,
  75. bl->fractal_queues[which_db], bl->extracted_datasizes[which_db], which_db,
  76. target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
  77. }
  78. }
  79. error: // this is the cleanup code. Even if r==0 (no error) we fall through to here.
  80. if (bl->fractal_queues[which_db]) {
  81. int r2 = toku_queue_destroy(bl->fractal_queues[which_db]);
  82. invariant(r2==0);
  83. bl->fractal_queues[which_db] = nullptr;
  84. }
  85. // if we get here we need to free up the merge_fileset and the rowset, as well as the keys
  86. toku_free(rows->data); rows->data = NULL;
  87. toku_free(rows->rows); rows->rows = NULL;
  88. toku_free(fs->data_fidxs); fs->data_fidxs = NULL;
  89. return r;
  90. }

函数merge_files进行多轮归并,每轮把前一轮的临时文件集合归并成数目更少的临时文件集合。每次归并时,顺序选择几个临时文件(输入集)进行归并,生成新的临时文件(输出文件)。每轮的所有输出文件构成输出集,被下一轮作为输入集使用。

新产生的临时文件(输出文件)包含所有输入集的二元组,文件中的二元组是按照key有序排列的。每轮归并旨在对输入集中的二元组按照key进行排序并减少下一轮的输入集的文件个数,直到输出集只包含一个输出文件。

最后一轮产生的索引二元组序列,是按照key有序的全序序列。

loader把他们分成几批存储到rows[which_db]的rowset里面,并以rows[which_db]作为媒介转发给fractal线程。

函数merge_files在读取临时文件中部分排序的结果时,使用dbufio来异步读取临时文件中的key和value。dbufio是double buffering io的意思,维护两个buffer和一个异步I/O线程,前景线程永远从buf[0]读取数据,异步的I/O线程负责把buf[1]填满。

当buf[0]的数据被消耗掉后,会交换buf[0]和buf[1],并且告诉I/O线程把后面的数据读到buf[1](老的buf[0])里面。如果前景线程消耗数据太快,异步I/O线程的读操作还没有完成,前景线程会阻塞在dbufio内部的条件变量上。这是个典型的生产者消费者模型。

Merge阶段,需要使用内存的buffer主要包含三部分:

  • 多轮归并时的dbufio:每轮输入集大小 * DBUFIO_DEPTH(2)
  • fractol线程使用的内存:fractal_queue深度 + 每次挂到fractal_queue时分配的rowset + attach到pivot_file的buffer
  • fractal线程把node写入磁盘时存放序列化后数据的buffer(压缩和未压缩的)
  1. // To compute a merge, we have a certain amount of memory to work with.
  2. // We perform only one fanin at a time.
  3. // If the fanout is F then we are using
  4. // F merges. Each merge uses
  5. // DBUFIO_DEPTH buffers for double buffering. Each buffer is of size at least MERGE_BUF_SIZE
  6. // so the memory is
  7. // F*MERGE_BUF_SIZE*DBUFIO_DEPTH storage.
  8. // We use some additional space to buffer the outputs.
  9. // That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile.
  10. // And we have FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE per queue
  11. // And if we are doing a fractal, each worker could have have a fractal tree that it's working on.
  12. //
  13. // DBUFIO_DEPTH*F*MERGE_BUF_SIZE + FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE + WORKERS*NODESIZE*2 <= RESERVED_MEMORY
  14. static int64_t memory_avail_during_merge(FTLOADER bl, bool is_fractal_node) {
  15. // avail memory = reserved memory - WORKERS*NODESIZE*2 for the last merge stage only
  16. int64_t avail_memory = bl->reserved_memory;
  17. if (is_fractal_node) {
  18. // reserve space for the fractal writer thread buffers
  19. avail_memory -= (int64_t)ft_loader_get_fractal_workers_count(bl) * (int64_t)default_loader_nodesize * 2; // compressed and uncompressed buffers
  20. }
  21. return avail_memory;
  22. }
  23. static uint64_t memory_per_rowset_during_merge (FTLOADER bl, int merge_factor, bool is_fractal_node // if it is being sent to a q
  24. ) {
  25. int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
  26. int64_t nbuffers = DBUFIO_DEPTH * merge_factor;
  27. if (is_fractal_node)
  28. nbuffers += FRACTAL_WRITER_ROWSETS;
  29. return MAX(memory_avail / nbuffers, (int64_t)MIN_MERGE_BUF_SIZE);
  30. }

以上是归并的过程。

生成新FT的过程是在函数toku_loader_write_ft_from_q处理的。这个函数太长,就不在这贴代码了。

大致过程是这样的:

  1. 从fractal_queues[which_db]依次读取二元组并建立leaf node,每个leaf上存储的数据量一定小于7/8 nodesize,后续数据保存在下一个leaf上。
  2. 为了建立internal subtree,在创建leaf node的过程中,维护了一个pivot文件保存每个leaf node中存储的最大key,pivot文件中最后一个key是没有用的,后面的处理会丢弃掉最后一个key;除了pivot文件,loader还为internal node维护一个subtree_info数组记录每个leaf node的blocknum。
  3. 建立internal subtree过程分多轮进行,每轮建立某一高度的internal node,从高度1开始。
  4. 写descriptor
  5. 写BTT
  6. 写FT header

internal node的处理过程,每轮处理: 依次读取上一轮产生的pivot文件中key,每次读取15个pivot key,最后一个key用作建立下一轮的pivot文件。用剩下的14个pivot key和上一轮得到的subtree_info数组中的blocknum建立本轮的internal node,并把新创建internal node的blocknum存储到另一个subtree_info数组,提供给下轮处理使用。

每轮处理完成时,会生成下轮处理需要的pivot文件和subtree_info数组。height 1时读取的是创建leaf node时生成的pivot文件和subtree_info数组。

如果一切顺利,在这之后就调用toku_dictionary_redirect把原来db[which_db]上打开的txn和ft_handle重定向到新的FT上。

loader->abort

异常退出的时候,loader首先释放ft_loader中的内部数据结构,然后调用redirect_loader_to_empty_dictionaries创建带有LOADER_DISALLOW_PUTS标记的temporary loader。

这个temporary loader首先在tokudb.directory里,把原来的loader处理的那些映射关系改成。然后在创建ft_loader之后,直接调用ft_loader_close_and_redirect把db[which_db]上打开的txn和ft_handle重定向到空的FT上。

  1. static void redirect_loader_to_empty_dictionaries(DB_LOADER *loader) {
  2. DB_LOADER* tmp_loader = NULL;
  3. int r = toku_loader_create_loader(
  4. loader->i->env,
  5. loader->i->txn,
  6. &tmp_loader,
  7. loader->i->src_db,
  8. loader->i->N,
  9. loader->i->dbs,
  10. loader->i->db_flags,
  11. loader->i->dbt_flags,
  12. LOADER_DISALLOW_PUTS,
  13. false
  14. );
  15. lazy_assert_zero(r);
  16. r = toku_loader_close(tmp_loader);
  17. }
  18. int
  19. toku_loader_create_loader(DB_ENV *env,
  20. DB_TXN *txn,
  21. DB_LOADER **blp,
  22. DB *src_db,
  23. int N,
  24. DB *dbs[],
  25. uint32_t db_flags[/*N*/],
  26. uint32_t dbt_flags[/*N*/],
  27. uint32_t loader_flags,
  28. bool check_empty) {
  29. rval = locked_load_inames(env, loader_txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed);
  30. if ( rval!=0 ) {
  31. free_inames(new_inames_in_env, N);
  32. toku_free(fts);
  33. goto create_exit;
  34. }
  35. TOKUTXN ttxn = loader_txn ? db_txn_struct_i(loader_txn)->tokutxn : NULL;
  36. rval = toku_ft_loader_open(&loader->i->ft_loader,
  37. env->i->cachetable,
  38. env->i->generate_row_for_put,
  39. src_db,
  40. N,
  41. fts, dbs,
  42. (const char **)new_inames_in_env,
  43. compare_functions,
  44. loader->i->temp_file_template,
  45. load_lsn,
  46. ttxn,
  47. puts_allowed,
  48. env->get_loader_memory_size(env),
  49. compress_intermediates,
  50. puts_allowed);
  51. if ( rval!=0 ) {
  52. free_inames(new_inames_in_env, N);
  53. toku_free(fts);
  54. goto create_exit;
  55. }
  56. loader->i->inames_in_env = new_inames_in_env;
  57. toku_free(fts);
  58. if (!puts_allowed) {
  59. rval = ft_loader_close_and_redirect(loader);
  60. assert_zero(rval);
  61. loader->i->ft_loader = NULL;
  62. // close the ft_loader and skip to the redirection
  63. rval = 0;
  64. }

酱,loader的处理过程大致就是这样~