Mysql是基于代价cost来选择索引,如果一个表有好几个索引,optimizer会分别计算每个索引访问的代价,选择代价最小的索引进行访问,这个索引也被称为access path。

Pickup index

Mysql在执行query语句的时候会在server层计算每个可选索引的代价,并选择代价最小的索引作为访问路径(access path)去引擎读取数据。 server层的handler类为引擎层提供一个框架来计算索引的代价。

  1. scan_time:计算全表扫描需要执行时间
  2. records_in_range:计算索引在search condition范围内包含多少行数据
  3. read_time:计算索引range query执行时间

Tokudb的records_in_range:根据search condition区间的大小做不同的处理。

  • 如果search condition为NULL,并且start_key和end_key均为NULL,这个函数调用estimate_num_rows去读ft的内存统计信息in_memory_stats.numrows,得到索引有多少个pair,也即unique key的个数。因为mysql的二级索引的key都会拼上pk,到了索引层所有的key都是unique的。
  • 把search condition的start_key和end_key封装成ft的key,调用ft的keys_range64函数计算落在区间的key个数。less表示小于start_key的个数,equal1表示等于start_key的个数,middle表示大于等于start key且小于end_key的个数,equal2表示等于end_key的个数,greater表示大于等于end_key的个数。这个函数递归计算,代码比较多,但是不复杂。
  • 如果start_key和end_key落在同一个basement节点,就读取那个basement节点并把满足条件的记录条数返回给server层。
  • 如果search conditionn跨越多个basement节点,就需要把索引中存储的键值key个数(in_memory_stats.numrows)分摊到从root到leaf路径上的每一层节点上,这样得到每一层节点的权重。然后把start_key到end_key区间在每一层节点上的权重累加起来得到区间的记录条数。

当start_key和end_key不在同一个basement节点时,keys_range64是通过估算得到记录条数的。 估算的值受FT tree layout影响,FT tree可以是瘦高的,也可以是扁平的,不同的layout计算的结果可能会差别比较大。 而且,tokudb的键值key个数(in_memory_stats.numrows)也是个统计值,是每次在leaf节点做msn apply更新的,这个值也可能不准确。

  1. ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range* end_key) {
  2. DBT *pleft_key, *pright_key;
  3. DBT left_key, right_key;
  4. ha_rows ret_val = HA_TOKUDB_RANGE_COUNT;
  5. DB *kfile = share->key_file[keynr];
  6. uint64_t rows = 0;
  7. int error;
  8. // get start_rows and end_rows values so that we can estimate range
  9. // when calling key_range64, the only value we can trust is the value for less
  10. // The reason is that the key being passed in may be a prefix of keys in the DB
  11. // As a result, equal may be 0 and greater may actually be equal+greater
  12. // So, we call key_range64 on the key, and the key that is after it.
  13. if (!start_key && !end_key) {
  14. error = estimate_num_rows(kfile, &rows, transaction);
  15. if (error) {
  16. ret_val = HA_TOKUDB_RANGE_COUNT;
  17. goto cleanup;
  18. }
  19. ret_val = (rows <= 1) ? 1 : rows;
  20. goto cleanup;
  21. }
  22. if (start_key) {
  23. uchar inf_byte = (start_key->flag == HA_READ_KEY_EXACT) ? COL_NEG_INF : COL_POS_INF;
  24. pack_key(&left_key, keynr, key_buff, start_key->key, start_key->length, inf_byte);
  25. pleft_key = &left_key;
  26. } else {
  27. pleft_key = NULL;
  28. }
  29. if (end_key) {
  30. uchar inf_byte = (end_key->flag == HA_READ_BEFORE_KEY) ? COL_NEG_INF : COL_POS_INF;
  31. pack_key(&right_key, keynr, key_buff2, end_key->key, end_key->length, inf_byte);
  32. pright_key = &right_key;
  33. } else {
  34. pright_key = NULL;
  35. }
  36. // keys_range64 can not handle a degenerate range (left_key > right_key), so we filter here
  37. if (pleft_key && pright_key && tokudb_cmp_dbt_key(kfile, pleft_key, pright_key) > 0) {
  38. rows = 0;
  39. } else {
  40. uint64_t less, equal1, middle, equal2, greater;
  41. bool is_exact;
  42. error = kfile->keys_range64(kfile, transaction, pleft_key, pright_key,
  43. &less, &equal1, &middle, &equal2, &greater, &is_exact);
  44. if (error) {
  45. ret_val = HA_TOKUDB_RANGE_COUNT;
  46. goto cleanup;
  47. }
  48. rows = middle;
  49. }
  50. // MySQL thinks a return value of 0 means there are exactly 0 rows
  51. // Therefore, always return non-zero so this assumption is not made
  52. ret_val = (ha_rows) (rows <= 1 ? 1 : rows);
  53. cleanup:
  54. if (tokudb_debug & TOKUDB_DEBUG_RETURN) {
  55. TOKUDB_HANDLER_TRACE("return %" PRIu64 " %" PRIu64, (uint64_t) ret_val, rows);
  56. }
  57. DBUG_RETURN(ret_val);
  58. }

用户创建的表可能只有数据没有索引,也可能有好几个索引。optimizer选择索引的过程:用search condition找出可用索引的集合,然后尝试用每个可选索引计算代价,也就是计算read_time。这个值是根据records_in_range返回的记录条数计算出来的。 Optimizer会选择代价最小的索引(在server层被称为access path)去引擎里面取数据,访问access path的方式可能是index point query/index range query也可能是full index scan,也可能是full table scan。

Read data

选定了索引,server层会把索引信息(keynr)传给引擎,在引擎层创建索引的cursor去读取数据。一般来说,对于full table scan引擎层会隐式转成pk index scan。

Full table scan

我们先来看一下full table scan的函数:

  • rnd_init: 调用index_init隐式转为pk index scan,并锁表
  • rnd_next:调用get_next读取下一行数据,这个函数后面会详细讨论
  • rnd_end:结束scan
  1. int ha_tokudb::rnd_init(bool scan) {
  2. int error = 0;
  3. range_lock_grabbed = false;
  4. error = index_init(MAX_KEY, 0);
  5. if (error) { goto cleanup;}
  6. if (scan) {
  7. error = prelock_range(NULL, NULL);
  8. if (error) { goto cleanup; }
  9. range_lock_grabbed = true;
  10. }
  11. error = 0;
  12. cleanup:
  13. if (error) {
  14. index_end();
  15. last_cursor_error = error;
  16. }
  17. TOKUDB_HANDLER_DBUG_RETURN(error);
  18. }
  19. int ha_tokudb::rnd_next(uchar * buf) {
  20. ha_statistic_increment(&SSV::ha_read_rnd_next_count);
  21. int error = get_next(buf, 1, NULL, false);
  22. TOKUDB_HANDLER_DBUG_RETURN(error);
  23. }
  24. int ha_tokudb::rnd_end() {
  25. range_lock_grabbed = false;
  26. TOKUDB_HANDLER_DBUG_RETURN(index_end());
  27. }

Index scan

ICP

当search condition非空时,server层可能会选择使用ICP (index condition pushdown),把search condition下推到引擎层来做过滤。

  • keyno_arg:server层选的索引index:keyno_arg
  • idx_cond_arg:server层的search condition,可能包含过滤条件
  1. Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) {
  2. toku_pushed_idx_cond_keyno = keyno_arg;
  3. toku_pushed_idx_cond = idx_cond_arg;
  4. return idx_cond_arg;
  5. }

Index init

前面提到full table scan会隐式转为pk index scan,在rnd_init中调用index_init把tokudb_active_index设置为primary_key。 Handler类成员active_index表示当前索引index,这个值等于MAX_KEY(64)表示full table scan。 Tokudb类成员tokudb_active_index表示tokudb当前的索引index,一般来说这个值跟active_index是一样的。 Full table scan是个例外,active_index等于MAX_KEY,tokudb_active_index等于primary_key。 Index_init中最重要的工作就是创建cursor,并且重置bulk fetch信息。bulk fetch将在get_next函数中详细讨论。

  1. int ha_tokudb::index_init(uint keynr, bool sorted) {
  2. int error;
  3. THD* thd = ha_thd();
  4. /*
  5. Under some very rare conditions (like full joins) we may already have
  6. an active cursor at this point
  7. */
  8. if (cursor) {
  9. int r = cursor->c_close(cursor);
  10. assert(r==0);
  11. remove_from_trx_handler_list();
  12. }
  13. active_index = keynr;
  14. if (active_index < MAX_KEY) {
  15. DBUG_ASSERT(keynr <= table->s->keys);
  16. } else {
  17. DBUG_ASSERT(active_index == MAX_KEY);
  18. keynr = primary_key;
  19. }
  20. tokudb_active_index = keynr;
  21. #if TOKU_CLUSTERING_IS_COVERING
  22. if (keynr < table->s->keys && table->key_info[keynr].option_struct->clustering)
  23. key_read = false;
  24. #endif
  25. last_cursor_error = 0;
  26. range_lock_grabbed = false;
  27. range_lock_grabbed_null = false;
  28. DBUG_ASSERT(share->key_file[keynr]);
  29. cursor_flags = get_cursor_isolation_flags(lock.type, thd);
  30. if (use_write_locks) {
  31. cursor_flags |= DB_RMW;
  32. }
  33. if (get_disable_prefetching(thd)) {
  34. cursor_flags |= DBC_DISABLE_PREFETCHING;
  35. }
  36. if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) {
  37. last_cursor_error = error;
  38. cursor = NULL; // Safety
  39. goto exit;
  40. }
  41. cursor->c_set_check_interrupt_callback(cursor, tokudb_killed_thd_callback, thd);
  42. memset((void *) &last_key, 0, sizeof(last_key));
  43. add_to_trx_handler_list();
  44. if (thd_sql_command(thd) == SQLCOM_SELECT) {
  45. set_query_columns(keynr);
  46. unpack_entire_row = false;
  47. }
  48. else {
  49. unpack_entire_row = true;
  50. }
  51. invalidate_bulk_fetch();
  52. doing_bulk_fetch = false;
  53. maybe_index_scan = false;
  54. error = 0;
  55. exit:
  56. TOKUDB_HANDLER_DBUG_RETURN(error);
  57. }

Prepare index

初始化cursor之后,server层会调下面四个函数之一去拿区间的range锁。

  • prepare_index_scan
  • prepare_index_key_scan
  • prepare_range_scan
  • read_range_first

这四个函数都是调用prelock_range去拿rangelock。

  • prepare_index_scan拿的是<负无穷,正无穷>区间的rangelock,其实就是锁表。
  • prepare_index_key_scan只拿对应key的rangelock,
  • prepare_range_scan和read_range_first都是拿区间的rangelock。前者是处理reverse index range scan的,后者是处理index range scan的。

Start_key和end_key就是server层传下来的range区间的起点和终点,是server层的数据结构,prelock_range会生成相应的索引key并获取索引key的rangelock。

Full table scan的时候,rnd_init直接调用prelock_range拿<负无穷,正无穷>区间的rangelock,也就是锁表。 由于full table scan转pk index scan是在引擎内部做隐式转换,sever层并不知道,不走prepare_index_scan。

Rangelock的机制在之前的月报有提到。 如果既不是SERIALIZABLE隔离级别,也不是为写操作读取数据,调用prelock_range是不会真的去拿rangelock锁的。此种情况的rangelock锁是在query成功返回前拿的,防止并发事务更新相应的数据。

  1. static int
  2. c_set_bounds(DBC *dbc, const DBT *left_key, const DBT *right_key, bool pre_acquire, int out_of_range_error) {
  3. if (out_of_range_error != DB_NOTFOUND &&
  4. out_of_range_error != TOKUDB_OUT_OF_RANGE &&
  5. out_of_range_error != 0) {
  6. return toku_ydb_do_error(
  7. dbc->dbp->dbenv,
  8. EINVAL,
  9. "Invalid out_of_range_error [%d] for %s\n",
  10. out_of_range_error,
  11. __FUNCTION__
  12. );
  13. }
  14. if (left_key == toku_dbt_negative_infinity() && right_key == toku_dbt_positive_infinity()) {
  15. out_of_range_error = 0;
  16. }
  17. DB *db = dbc->dbp;
  18. DB_TXN *txn = dbc_struct_i(dbc)->txn;
  19. HANDLE_PANICKED_DB(db);
  20. toku_ft_cursor_set_range_lock(dbc_ftcursor(dbc), left_key, right_key,
  21. (left_key == toku_dbt_negative_infinity()),
  22. (right_key == toku_dbt_positive_infinity()),
  23. out_of_range_error);
  24. if (!db->i->lt || !txn || !pre_acquire)
  25. return 0;
  26. //READ_UNCOMMITTED and READ_COMMITTED transactions do not need read locks.
  27. if (!dbc_struct_i(dbc)->rmw && dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE)
  28. return 0;
  29. toku::lock_request::type lock_type = dbc_struct_i(dbc)->rmw ?
  30. toku::lock_request::type::WRITE : toku::lock_request::type::READ;
  31. int r = toku_db_get_range_lock(db, txn, left_key, right_key, lock_type);
  32. return r;
  33. }

Read index

如果server层指定了range的start_key和end_key,handler的执行框架会根据execution plan指定的方式访问索引数据。

第一行数据的访问方式:

  • Index point query:server层直接调用index_read
  • Index range scan且start_key非空:server层通常是调用read_range_first函数读取第一行数据。read_range_first最终也是调用index_read
  • Index range scan且start_key为空:server层直接调用index_first
  • Index reverse range scan且end_key非空:server层通常是调用index_read读数据
  • Index reverse range scan且end_key为空:server层直接调用index_last
  • Full index scan:server层直接调用index_first
  • Reverse full index scan:server层直接调用index_last

Index_read函数比较长,举几个常见的场景来说明 1) index point query:

  • HA_READ_KEY_EXACT

2) index range query:

  • HA_READ_AFTER_KEY:处理大于start_key的情况
  • HA_READ_KEY_OR_NEXT:处理大于等于start_key的情况

3) reverse index range query:

  • HA_READ_BEFORE_KEY:处理小于end_key的情况
  • HA_READ_PREFIX_LAST_OR_PREV:处理小于等于end_key的情况
  1. int ha_tokudb::index_read(
  2. uchar* buf,
  3. const uchar* key,
  4. uint key_len,
  5. enum ha_rkey_function find_flag) {
  6. invalidate_bulk_fetch();
  7. DBT row;
  8. DBT lookup_key;
  9. int error = 0;
  10. uint32_t flags = 0;
  11. THD* thd = ha_thd();
  12. tokudb_trx_data* trx = (tokudb_trx_data*)thd_get_ha_data(thd, tokudb_hton);
  13. struct smart_dbt_info info;
  14. struct index_read_info ir_info;
  15. HANDLE_INVALID_CURSOR();
  16. // if we locked a non-null key range and we now have a null key, then
  17. // remove the bounds from the cursor
  18. if (range_lock_grabbed &&
  19. !range_lock_grabbed_null &&
  20. index_key_is_null(table, tokudb_active_index, key, key_len)) {
  21. range_lock_grabbed = range_lock_grabbed_null = false;
  22. cursor->c_remove_restriction(cursor);
  23. }
  24. ha_statistic_increment(&SSV::ha_read_key_count);
  25. memset((void *) &row, 0, sizeof(row));
  26. info.ha = this;
  27. info.buf = buf;
  28. info.keynr = tokudb_active_index;
  29. ir_info.smart_dbt_info = info;
  30. ir_info.cmp = 0;
  31. flags = SET_PRELOCK_FLAG(0);
  32. switch (find_flag) {
  33. case HA_READ_KEY_EXACT: /* Find first record else error */ {
  34. pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF);
  35. DBT lookup_bound;
  36. pack_key(&lookup_bound, tokudb_active_index, key_buff4, key, key_len, COL_POS_INF);
  37. ir_info.orig_key = &lookup_key;
  38. error = cursor->c_getf_set_range_with_bound(cursor, flags, &lookup_key, &lookup_bound, SMART_DBT_IR_CALLBACK(key_read), &ir_info);
  39. if (ir_info.cmp) {
  40. error = DB_NOTFOUND;
  41. }
  42. break;
  43. }
  44. case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
  45. pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_POS_INF);
  46. error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK(key_read), &info);
  47. break;
  48. case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
  49. pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF);
  50. error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK(key_read), &info);
  51. break;
  52. case HA_READ_KEY_OR_NEXT: /* Record or next record */
  53. pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF);
  54. error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK(key_read), &info);
  55. break;
  56. //
  57. // This case does not seem to ever be used, it is ok for it to be slow
  58. //
  59. case HA_READ_KEY_OR_PREV: /* Record or previous */
  60. pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF);
  61. ir_info.orig_key = &lookup_key;
  62. error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK(key_read), &ir_info);
  63. if (error == DB_NOTFOUND) {
  64. error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK(key_read), &info);
  65. }
  66. else if (ir_info.cmp) {
  67. error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK(key_read), &info);
  68. }
  69. break;
  70. case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */
  71. pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_POS_INF);
  72. error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK(key_read), &info);
  73. break;
  74. case HA_READ_PREFIX_LAST:
  75. pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_POS_INF);
  76. ir_info.orig_key = &lookup_key;
  77. error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK(key_read), &ir_info);
  78. if (ir_info.cmp) {
  79. error = DB_NOTFOUND;
  80. }
  81. break;
  82. default:
  83. TOKUDB_HANDLER_TRACE("unsupported:%d", find_flag);
  84. error = HA_ERR_UNSUPPORTED;
  85. break;
  86. }
  87. error = handle_cursor_error(error,HA_ERR_KEY_NOT_FOUND,tokudb_active_index);
  88. if (!error && !key_read && tokudb_active_index != primary_key && !key_is_clustering(&table->key_info[tokudb_active_index])) {
  89. error = read_full_row(buf);
  90. }
  91. trx->stmt_progress.queried++;
  92. track_progress(thd);
  93. cleanup:
  94. TOKUDB_HANDLER_DBUG_RETURN(error);
  95. }

Index_read在调用ydb_cursor.cc中的回调函数时,flags参数初始化为0。ydb_cursor.cc中注册的回调函数会检查tokudb_cursor->rmw标记,如果tokudb_cursor->rmw是0,并且不是SERIALIZABLE隔离级别,函数 query_context_with_input_init会设置context的do_locking字段,告诉toku_ft_cursor_set_range在成功返回前去拿rangelock。

  1. static int
  2. c_getf_set_range_with_bound(DBC *c, uint32_t flag, DBT *key, DBT *key_bound, YDB_CALLBACK_FUNCTION f, void *extra) {
  3. HANDLE_PANICKED_DB(c->dbp);
  4. HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
  5. int r = 0;
  6. QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
  7. query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
  8. while (r == 0) {
  9. //toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
  10. r = toku_ft_cursor_set_range(dbc_ftcursor(c), key, key_bound, c_getf_set_range_callback, &context);
  11. if (r == DB_LOCK_NOTGRANTED) {
  12. r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
  13. } else {
  14. break;
  15. }
  16. }
  17. query_context_base_destroy(&context.base);
  18. return r;
  19. }
  20. static int
  21. c_getf_set_range_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
  22. QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
  23. QUERY_CONTEXT_BASE context = &super_context->base;
  24. int r;
  25. DBT found_key = { .data = (void *) key, .size = keylen };
  26. //Lock:
  27. // left(key,val) = (input_key, -infinity)
  28. // right(key) = found ? found_key : infinity
  29. // right(val) = found ? found_val : infinity
  30. if (context->do_locking) {
  31. const DBT *left_key = super_context->input_key;
  32. const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
  33. r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key, query_context_determine_lock_type(context), &context->request);
  34. } else {
  35. r = 0;
  36. }
  37. //Call application-layer callback if found and locks were successfully obtained.
  38. if (r==0 && key!=NULL && !lock_only) {
  39. DBT found_val = { .data = (void *) val, .size = vallen };
  40. context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
  41. r = context->r_user_callback;
  42. }
  43. //Give ft-layer an error (if any) to return from toku_ft_cursor_set_range
  44. return r;
  45. }

Get next

取到第一行数据后,server层会根据execution plan来调用 index_next(index_next_same)或者index_prev来取后面的记录。

  • index_next_same:读取相同index的下一个记录。
  • index_next:读取range区间内的下一个记录,如果设置ICP,还会对找到的记录进行过滤条件匹配。
  • index_prev:读取range区间内的上一个记录,如果设置ICP,还会对找到的记录进行过滤条件匹配。

还有两个读取数据的方法:

  • index_first:读取index的第一条记录
  • index_last:读取index最后一条记录

这几个函数比较简单,这里只分析index_next函数,感兴趣的朋友可以自行分析其余的函数。 index_next直接调用get_next函数读取下一条记录。

  1. int ha_tokudb::index_next(uchar * buf) {
  2. TOKUDB_HANDLER_DBUG_ENTER("");
  3. ha_statistic_increment(&SSV::ha_read_next_count);
  4. int error = get_next(buf, 1, NULL, key_read);
  5. TOKUDB_HANDLER_DBUG_RETURN(error);
  6. }

Bulk fetch

Tokudb为range query做了一个优化,被称作bulk fetch。对当前basement节点上落在range区间的key进行批量读取,一次msg apply多次读key操作,同时也减轻leaf节点读写锁争抢,避免频繁拿锁放锁。 Tokudb为提供bulk fetch功能,增加了如下几个数据成员:

  • doing_bulk_fetch:标记是否正在进行bulk fetch
  • range_query_buff:缓存批量读取数据的buffer
  • size_range_query_buff:range_query_buff的malloc_size
  • bytes_used_in_range_query_buff:range_query_buff的实际size
  • curr_range_query_buff_offset:range_query_buff的当前位置
  • bulk_fetch_iteration和rows_fetched_using_bulk_fetch是统计数据,控制批量大小
  1. class ha_tokudb : public handler {
  2. private:
  3. ...
  4. uchar* range_query_buff; // range query buffer
  5. uint32_t size_range_query_buff; // size of the allocated range query buffer
  6. uint32_t bytes_used_in_range_query_buff; // number of bytes used in the range query buffer
  7. uint32_t curr_range_query_buff_offset; // current offset into the range query buffer for queries to read
  8. uint64_t bulk_fetch_iteration;
  9. uint64_t rows_fetched_using_bulk_fetch;
  10. bool doing_bulk_fetch;
  11. ...
  12. };

Get_next函数首先判读是否可以从当前的bulk fetch bufffer中读取数据,判读的标准是bytes_used_in_range_query_buff - curr_range_query_buff_offset > 0,表示bulk fetch buffer有数据可以读取。

  1. 如果条件成立,调用read_data_from_range_query_buff直接从bulk fetch buffer中读数据。
  2. 如果bulk fetch buffer没有数据可读了,需要检查icp_went_out_of_range判断是否已超出range范围,那样的话表示没有更多数据,可以直接返回。
  3. 如果前面两个条件都不满足,需要调用cursor读取后面的数据。如果是bulk fetch的情况,需要调用invalidate_bulk_fetch重置bulf fetch的数据结构。

如果用户禁掉bulk fetch的功能,该如何处理呢?禁掉bulk fetch,第1和第2两个条件都不满足,直接执行invalidate_bulk_fetch,然后检查doing_bulk_fetch标记为false,调用cursor读取数据。这部分比较简单,请读者自行分析。

bulk fetch的处理跟非bulk fetch的处理类似,最大的区别在于cursor->c_getf_XXX方法的回调函数和回调函数参数。它们分别被设置为smart_dbt_bf_callback和struct smart_dbt_bf_info结构的指针。

smart_dbt_bf_info结构告诉回调函数如何缓存当前数据,并且如何读取下一行数据。

  • ha:tokudb handler指针
  • need_val:bulk fetch buffer是否要缓存value,对于pk和cluster index情况设置成true,其他情况为false
  • director:读取数据的方向。1表示next,-1表示prev
  • thd:server层的线程指针
  • buf:server层提供的buffer
  • key_to_compare:比较key,只有index_next_same需要设置这个参数
  1. typedef struct smart_dbt_bf_info {
  2. ha_tokudb* ha;
  3. bool need_val;
  4. int direction;
  5. THD* thd;
  6. uchar* buf;
  7. DBT* key_to_compare;
  8. } *SMART_DBT_BF_INFO

一个批量读取完成后,get_next会调用read_data_from_range_query_buff,从bulk fetch buffer中取数据。

Get_next成功读取一行数据后,需要判断是否是需要回表读取row。对于pk和cluster index的情况,index中存储了完整数据不需要回表。

  1. int ha_tokudb::get_next(
  2. uchar* buf,
  3. int direction,
  4. DBT* key_to_compare,
  5. bool do_key_read) {
  6. int error = 0;
  7. HANDLE_INVALID_CURSOR();
  8. if (maybe_index_scan) {
  9. maybe_index_scan = false;
  10. if (!range_lock_grabbed) {
  11. error = prepare_index_scan();
  12. }
  13. }
  14. if (!error) {
  15. uint32_t flags = SET_PRELOCK_FLAG(0);
  16. // we need to read the val of what we retrieve if
  17. // we do NOT have a covering index AND we are using a clustering secondary
  18. // key
  19. bool need_val =
  20. (do_key_read == 0) &&
  21. (tokudb_active_index == primary_key ||
  22. key_is_clustering(&table->key_info[tokudb_active_index]));
  23. if ((bytes_used_in_range_query_buff -
  24. curr_range_query_buff_offset) > 0) {
  25. error = read_data_from_range_query_buff(buf, need_val, do_key_read);
  26. } else if (icp_went_out_of_range) {
  27. icp_went_out_of_range = false;
  28. error = HA_ERR_END_OF_FILE;
  29. } else {
  30. invalidate_bulk_fetch();
  31. if (doing_bulk_fetch) {
  32. struct smart_dbt_bf_info bf_info;
  33. bf_info.ha = this;
  34. // you need the val if you have a clustering index and key_read is not 0;
  35. bf_info.direction = direction;
  36. bf_info.thd = ha_thd();
  37. bf_info.need_val = need_val;
  38. bf_info.buf = buf;
  39. bf_info.key_to_compare = key_to_compare;
  40. //
  41. // call c_getf_next with purpose of filling in range_query_buff
  42. //
  43. rows_fetched_using_bulk_fetch = 0;
  44. // it is expected that we can do ICP in the smart_dbt_bf_callback
  45. // as a result, it's possible we don't return any data because
  46. // none of the rows matched the index condition. Therefore, we need
  47. // this while loop. icp_out_of_range will be set if we hit a row that
  48. // the index condition states is out of our range. When that hits,
  49. // we know all the data in the buffer is the last data we will retrieve
  50. while (bytes_used_in_range_query_buff == 0 &&
  51. !icp_went_out_of_range && error == 0) {
  52. if (direction > 0) {
  53. error =
  54. cursor->c_getf_next(
  55. cursor,
  56. flags,
  57. smart_dbt_bf_callback,
  58. &bf_info);
  59. } else {
  60. error =
  61. cursor->c_getf_prev(
  62. cursor,
  63. flags,
  64. smart_dbt_bf_callback,
  65. &bf_info);
  66. }
  67. }
  68. // if there is no data set and we went out of range,
  69. // then there is nothing to return
  70. if (bytes_used_in_range_query_buff == 0 &&
  71. icp_went_out_of_range) {
  72. icp_went_out_of_range = false;
  73. error = HA_ERR_END_OF_FILE;
  74. }
  75. if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) {
  76. bulk_fetch_iteration++;
  77. }
  78. error =
  79. handle_cursor_error(
  80. error,
  81. HA_ERR_END_OF_FILE,
  82. tokudb_active_index);
  83. if (error) {
  84. goto cleanup;
  85. }
  86. //
  87. // now that range_query_buff is filled, read an element
  88. //
  89. error =
  90. read_data_from_range_query_buff(buf, need_val, do_key_read);
  91. } else {
  92. struct smart_dbt_info info;
  93. info.ha = this;
  94. info.buf = buf;
  95. info.keynr = tokudb_active_index;
  96. if (direction > 0) {
  97. error =
  98. cursor->c_getf_next(
  99. cursor,
  100. flags,
  101. SMART_DBT_CALLBACK(do_key_read),
  102. &info);
  103. } else {
  104. error =
  105. cursor->c_getf_prev(
  106. cursor,
  107. flags,
  108. SMART_DBT_CALLBACK(do_key_read),
  109. &info);
  110. }
  111. error =
  112. handle_cursor_error(
  113. error,
  114. HA_ERR_END_OF_FILE,
  115. tokudb_active_index);
  116. }
  117. }
  118. }
  119. //
  120. // at this point, one of two things has happened
  121. // either we have unpacked the data into buf, and we
  122. // are done, or we have unpacked the primary key
  123. // into last_key, and we use the code below to
  124. // read the full row by doing a point query into the
  125. // main table.
  126. //
  127. if (!error &&
  128. !do_key_read &&
  129. (tokudb_active_index != primary_key) &&
  130. !key_is_clustering(&table->key_info[tokudb_active_index])) {
  131. error = read_full_row(buf);
  132. }
  133. if (!error) {
  134. THD *thd = ha_thd();
  135. tokudb_trx_data* trx =
  136. static_cast<tokudb_trx_data*>(thd_get_ha_data(thd, tokudb_hton));
  137. trx->stmt_progress.queried++;
  138. track_progress(thd);
  139. if (thd_killed(thd))
  140. error = ER_ABORTING_CONNECTION;
  141. }
  142. cleanup:
  143. return error;
  144. }

下面我们一起看看回调函数smart_dbt_bf_callback的处理。这个函数是fill_range_query_buf简单封装,当成功读取一行索引数据后,把结果缓存到bulk fetch buffer中,并继续读取下一行数据。

  1. static int smart_dbt_bf_callback(
  2. DBT const* key,
  3. DBT const* row,
  4. void* context) {
  5. SMART_DBT_BF_INFO info = (SMART_DBT_BF_INFO)context;
  6. return
  7. info->ha->fill_range_query_buf(
  8. info->need_val,
  9. key,
  10. row,
  11. info->direction,
  12. info->thd,
  13. info->buf,
  14. info->key_to_compare);
  15. }

接下来,让我们把目光聚焦在fill_range_query_buf函数。参数key和value是当前读取到索引key和value;其余的参数是从smart_dbt_bf_info中结构提取出来的server层调用时指定的信息。

如果指定了key_to_compare,需要判断当前读取的key是否等于key_to_compare,因为二级索引的key后面拼了pk,所以这里做的是前缀比较。如果前缀不匹配,表示已经读到一个新key,设置icp_went_out_of_range并退出。

如果server层设置了ICP信息,需要判断当前读取的索引key是否在range范围内。 一般来说,判断是否在range范围内的方法是跟prelocked_right_range(range scan)或者prelocked_left_range(reverse range scan)比较的。 而ICP的情况下,判断是否在range范围内是跟end_range做比较的。 对于索引key不在range范围内的情况,设置icp_went_out_of_range并返回。

如果当前读到的索引key是在range范围内,ICP的情况还要做过滤条件检查。如果满足过滤条件,就存储到bulk fetch buffer中;不满足过滤条件,就跳过这条记录取下一条。

把key存储到bulk fetch buffer中时,需要检查need_val。为true时,先存key后存value;否则,只存key。 Value要存的数据可能是整个row,可能是set_query_columns函数记录的那些字段的数据。如果是第二种情况,需要把相应字段的数据提取出来。 Bulk fetch buffer中的数据按照一定格式存储,先存4个字节的size,接着存data。 当前key的存储位置是在bytes_used_in_range_query_buff偏移位置。 把key/value数据缓存到bulk fetch buffer中以后,还需要更新bytes_used_in_range_query_buff指向下一次写入的位置。

对于非ICP的情况,在fill_range_query_buf函数的最后判断是否超出range范围。这里跟prelocked_right_range(range scan)或者prelocked_left_range(reverse range scan)做比较。这两个值是在prelock_range函数设置的,也是rangelock的范围。

如果当前读取的key属于range范围内,需要继续读取下一条数据到bulk fetch buffer中,fill_range_query_buf返回TOKUDB_CURSOR_CONTINUE告诉toku_ft_search继续读取当前basement节点的下一条数据。 bulk fetch不能跨越basement节点,因为无法保证其他basement节点上是否做过msg apply。

  1. int ha_tokudb::fill_range_query_buf(
  2. bool need_val,
  3. DBT const* key,
  4. DBT const* row,
  5. int direction,
  6. THD* thd,
  7. uchar* buf,
  8. DBT* key_to_compare) {
  9. int error;
  10. //
  11. // first put the value into range_query_buf
  12. //
  13. uint32_t size_remaining =
  14. size_range_query_buff - bytes_used_in_range_query_buff;
  15. uint32_t size_needed;
  16. uint32_t user_defined_size = tokudb::sysvars::read_buf_size(thd);
  17. uchar* curr_pos = NULL;
  18. if (key_to_compare) {
  19. int cmp = tokudb_prefix_cmp_dbt_key(
  20. share->key_file[tokudb_active_index],
  21. key_to_compare,
  22. key);
  23. if (cmp) {
  24. icp_went_out_of_range = true;
  25. error = 0;
  26. goto cleanup;
  27. }
  28. }
  29. // if we have an index condition pushed down, we check it
  30. if (toku_pushed_idx_cond &&
  31. (tokudb_active_index == toku_pushed_idx_cond_keyno)) {
  32. unpack_key(buf, key, tokudb_active_index);
  33. enum icp_result result =
  34. toku_handler_index_cond_check(toku_pushed_idx_cond);
  35. // If we have reason to stop, we set icp_went_out_of_range and get out
  36. // otherwise, if we simply see that the current key is no match,
  37. // we tell the cursor to continue and don't store
  38. // the key locally
  39. if (result == ICP_OUT_OF_RANGE || thd_killed(thd)) {
  40. icp_went_out_of_range = true;
  41. error = 0;
  42. DEBUG_SYNC(ha_thd(), "tokudb_icp_asc_scan_out_of_range");
  43. goto cleanup;
  44. } else if (result == ICP_NO_MATCH) {
  45. // if we are performing a DESC ICP scan and have no end_range
  46. // to compare to stop using ICP filtering as there isn't much more
  47. // that we can do without going through contortions with remembering
  48. // and comparing key parts.
  49. if (!end_range &&
  50. direction < 0) {
  51. cancel_pushed_idx_cond();
  52. DEBUG_SYNC(ha_thd(), "tokudb_icp_desc_scan_invalidate");
  53. }
  54. error = TOKUDB_CURSOR_CONTINUE;
  55. goto cleanup;
  56. }
  57. }
  58. // at this point, if ICP is on, we have verified that the key is one
  59. // we are interested in, so we proceed with placing the data
  60. // into the range query buffer
  61. if (need_val) {
  62. if (unpack_entire_row) {
  63. size_needed = 2*sizeof(uint32_t) + key->size + row->size;
  64. } else {
  65. // this is an upper bound
  66. size_needed =
  67. // size of key length
  68. sizeof(uint32_t) +
  69. // key and row
  70. key->size + row->size +
  71. // lengths of varchars stored
  72. num_var_cols_for_query * (sizeof(uint32_t)) +
  73. // length of blobs
  74. sizeof(uint32_t);
  75. }
  76. } else {
  77. size_needed = sizeof(uint32_t) + key->size;
  78. }
  79. if (size_remaining < size_needed) {
  80. range_query_buff =
  81. static_cast<uchar*>(tokudb::memory::realloc(
  82. static_cast<void*>(range_query_buff),
  83. bytes_used_in_range_query_buff + size_needed,
  84. MYF(MY_WME)));
  85. if (range_query_buff == NULL) {
  86. error = ENOMEM;
  87. invalidate_bulk_fetch();
  88. goto cleanup;
  89. }
  90. size_range_query_buff = bytes_used_in_range_query_buff + size_needed;
  91. }
  92. //
  93. // now we know we have the size, let's fill the buffer, starting with the key
  94. //
  95. curr_pos = range_query_buff + bytes_used_in_range_query_buff;
  96. *reinterpret_cast<uint32_t*>(curr_pos) = key->size;
  97. curr_pos += sizeof(uint32_t);
  98. memcpy(curr_pos, key->data, key->size);
  99. curr_pos += key->size;
  100. if (need_val) {
  101. if (unpack_entire_row) {
  102. *reinterpret_cast<uint32_t*>(curr_pos) = row->size;
  103. curr_pos += sizeof(uint32_t);
  104. memcpy(curr_pos, row->data, row->size);
  105. curr_pos += row->size;
  106. } else {
  107. // need to unpack just the data we care about
  108. const uchar* fixed_field_ptr = static_cast<const uchar*>(row->data);
  109. fixed_field_ptr += table_share->null_bytes;
  110. const uchar* var_field_offset_ptr = NULL;
  111. const uchar* var_field_data_ptr = NULL;
  112. var_field_offset_ptr =
  113. fixed_field_ptr +
  114. share->kc_info.mcp_info[tokudb_active_index].fixed_field_size;
  115. var_field_data_ptr =
  116. var_field_offset_ptr +
  117. share->kc_info.mcp_info[tokudb_active_index].len_of_offsets;
  118. // first the null bytes
  119. memcpy(curr_pos, row->data, table_share->null_bytes);
  120. curr_pos += table_share->null_bytes;
  121. // now the fixed fields
  122. //
  123. // first the fixed fields
  124. //
  125. for (uint32_t i = 0; i < num_fixed_cols_for_query; i++) {
  126. uint field_index = fixed_cols_for_query[i];
  127. memcpy(
  128. curr_pos,
  129. fixed_field_ptr + share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val,
  130. share->kc_info.field_lengths[field_index]);
  131. curr_pos += share->kc_info.field_lengths[field_index];
  132. }
  133. //
  134. // now the var fields
  135. //
  136. for (uint32_t i = 0; i < num_var_cols_for_query; i++) {
  137. uint field_index = var_cols_for_query[i];
  138. uint32_t var_field_index =
  139. share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val;
  140. uint32_t data_start_offset;
  141. uint32_t field_len;
  142. get_var_field_info(
  143. &field_len,
  144. &data_start_offset,
  145. var_field_index,
  146. var_field_offset_ptr,
  147. share->kc_info.num_offset_bytes);
  148. memcpy(curr_pos, &field_len, sizeof(field_len));
  149. curr_pos += sizeof(field_len);
  150. memcpy(
  151. curr_pos,
  152. var_field_data_ptr + data_start_offset,
  153. field_len);
  154. curr_pos += field_len;
  155. }
  156. if (read_blobs) {
  157. uint32_t blob_offset = 0;
  158. uint32_t data_size = 0;
  159. //
  160. // now the blobs
  161. //
  162. get_blob_field_info(
  163. &blob_offset,
  164. share->kc_info.mcp_info[tokudb_active_index].len_of_offsets,
  165. var_field_data_ptr,
  166. share->kc_info.num_offset_bytes);
  167. data_size =
  168. row->size -
  169. blob_offset -
  170. static_cast<uint32_t>((var_field_data_ptr -
  171. static_cast<const uchar*>(row->data)));
  172. memcpy(curr_pos, &data_size, sizeof(data_size));
  173. curr_pos += sizeof(data_size);
  174. memcpy(curr_pos, var_field_data_ptr + blob_offset, data_size);
  175. curr_pos += data_size;
  176. }
  177. }
  178. }
  179. bytes_used_in_range_query_buff = curr_pos - range_query_buff;
  180. assert_always(bytes_used_in_range_query_buff <= size_range_query_buff);
  181. //
  182. // now determine if we should continue with the bulk fetch
  183. // we want to stop under these conditions:
  184. // - we overran the prelocked range
  185. // - we are close to the end of the buffer
  186. // - we have fetched an exponential amount of rows with
  187. // respect to the bulk fetch iteration, which is initialized
  188. // to 0 in index_init() and prelock_range().
  189. rows_fetched_using_bulk_fetch++;
  190. // if the iteration is less than the number of possible shifts on
  191. // a 64 bit integer, check that we haven't exceeded this iterations
  192. // row fetch upper bound.
  193. if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) {
  194. uint64_t row_fetch_upper_bound = 1LLU << bulk_fetch_iteration;
  195. assert_always(row_fetch_upper_bound > 0);
  196. if (rows_fetched_using_bulk_fetch >= row_fetch_upper_bound) {
  197. error = 0;
  198. goto cleanup;
  199. }
  200. }
  201. if (bytes_used_in_range_query_buff +
  202. table_share->rec_buff_length >
  203. user_defined_size) {
  204. error = 0;
  205. goto cleanup;
  206. }
  207. if (direction > 0) {
  208. // compare what we got to the right endpoint of prelocked range
  209. // because we are searching keys in ascending order
  210. if (prelocked_right_range_size == 0) {
  211. error = TOKUDB_CURSOR_CONTINUE;
  212. goto cleanup;
  213. }
  214. DBT right_range;
  215. memset(&right_range, 0, sizeof(right_range));
  216. right_range.size = prelocked_right_range_size;
  217. right_range.data = prelocked_right_range;
  218. int cmp = tokudb_cmp_dbt_key(
  219. share->key_file[tokudb_active_index],
  220. key,
  221. &right_range);
  222. error = (cmp > 0) ? 0 : TOKUDB_CURSOR_CONTINUE;
  223. } else {
  224. // compare what we got to the left endpoint of prelocked range
  225. // because we are searching keys in descending order
  226. if (prelocked_left_range_size == 0) {
  227. error = TOKUDB_CURSOR_CONTINUE;
  228. goto cleanup;
  229. }
  230. DBT left_range;
  231. memset(&left_range, 0, sizeof(left_range));
  232. left_range.size = prelocked_left_range_size;
  233. left_range.data = prelocked_left_range;
  234. int cmp = tokudb_cmp_dbt_key(
  235. share->key_file[tokudb_active_index],
  236. key,
  237. &left_range);
  238. error = (cmp < 0) ? 0 : TOKUDB_CURSOR_CONTINUE;
  239. }
  240. cleanup:
  241. return error;
  242. }

Bulk fetch buffer数据准备好了,我们就可以从read_data_from_range_query_buff读取数据了。 Curr_range_query_buff_offset表示当前读取的位置。 首先读key信息。如果need_value为true,还要读取data信息。可能读整行数据,也可能只需要读取函数set_query_columns设置的那些字段。 读取完成之后,调整curr_range_query_buff_offset指向下一次读取的位置。

  1. int ha_tokudb::read_data_from_range_query_buff(uchar* buf, bool need_val, bool do_key_read) {
  2. // buffer has the next row, get it from there
  3. int error;
  4. uchar* curr_pos = range_query_buff+curr_range_query_buff_offset;
  5. DBT curr_key;
  6. memset((void *) &curr_key, 0, sizeof(curr_key));
  7. // get key info
  8. uint32_t key_size = *(uint32_t *)curr_pos;
  9. curr_pos += sizeof(key_size);
  10. uchar* curr_key_buff = curr_pos;
  11. curr_pos += key_size;
  12. curr_key.data = curr_key_buff;
  13. curr_key.size = key_size;
  14. // if this is a covering index, this is all we need
  15. if (do_key_read) {
  16. assert_always(!need_val);
  17. extract_hidden_primary_key(tokudb_active_index, &curr_key);
  18. read_key_only(buf, tokudb_active_index, &curr_key);
  19. error = 0;
  20. }
  21. // we need to get more data
  22. else {
  23. DBT curr_val;
  24. memset((void *) &curr_val, 0, sizeof(curr_val));
  25. uchar* curr_val_buff = NULL;
  26. uint32_t val_size = 0;
  27. // in this case, we don't have a val, we are simply extracting the pk
  28. if (!need_val) {
  29. curr_val.data = curr_val_buff;
  30. curr_val.size = val_size;
  31. extract_hidden_primary_key(tokudb_active_index, &curr_key);
  32. error = read_primary_key( buf, tokudb_active_index, &curr_val, &curr_key);
  33. }
  34. else {
  35. extract_hidden_primary_key(tokudb_active_index, &curr_key);
  36. // need to extract a val and place it into buf
  37. if (unpack_entire_row) {
  38. // get val info
  39. val_size = *(uint32_t *)curr_pos;
  40. curr_pos += sizeof(val_size);
  41. curr_val_buff = curr_pos;
  42. curr_pos += val_size;
  43. curr_val.data = curr_val_buff;
  44. curr_val.size = val_size;
  45. error = unpack_row(buf,&curr_val, &curr_key, tokudb_active_index);
  46. }
  47. else {
  48. if (!(hidden_primary_key && tokudb_active_index == primary_key)) {
  49. unpack_key(buf,&curr_key,tokudb_active_index);
  50. }
  51. // read rows we care about
  52. // first the null bytes;
  53. memcpy(buf, curr_pos, table_share->null_bytes);
  54. curr_pos += table_share->null_bytes;
  55. // now the fixed sized rows
  56. for (uint32_t i = 0; i < num_fixed_cols_for_query; i++) {
  57. uint field_index = fixed_cols_for_query[i];
  58. Field* field = table->field[field_index];
  59. unpack_fixed_field(
  60. buf + field_offset(field, table),
  61. curr_pos,
  62. share->kc_info.field_lengths[field_index]
  63. );
  64. curr_pos += share->kc_info.field_lengths[field_index];
  65. }
  66. // now the variable sized rows
  67. for (uint32_t i = 0; i < num_var_cols_for_query; i++) {
  68. uint field_index = var_cols_for_query[i];
  69. Field* field = table->field[field_index];
  70. uint32_t field_len = *(uint32_t *)curr_pos;
  71. curr_pos += sizeof(field_len);
  72. unpack_var_field(
  73. buf + field_offset(field, table),
  74. curr_pos,
  75. field_len,
  76. share->kc_info.length_bytes[field_index]
  77. );
  78. curr_pos += field_len;
  79. }
  80. // now the blobs
  81. if (read_blobs) {
  82. uint32_t blob_size = *(uint32_t *)curr_pos;
  83. curr_pos += sizeof(blob_size);
  84. error = unpack_blobs(
  85. buf,
  86. curr_pos,
  87. blob_size,
  88. true
  89. );
  90. curr_pos += blob_size;
  91. if (error) {
  92. invalidate_bulk_fetch();
  93. goto exit;
  94. }
  95. }
  96. error = 0;
  97. }
  98. }
  99. }
  100. curr_range_query_buff_offset = curr_pos - range_query_buff;
  101. exit:
  102. return error;
  103. }

Index_end

所有数据都读完之后,handler框架会调用index_end关闭cursor,并重置一些状态变量。

  1. int ha_tokudb::index_end() {
  2. range_lock_grabbed = false;
  3. range_lock_grabbed_null = false;
  4. if (cursor) {
  5. int r = cursor->c_close(cursor);
  6. assert_always(r==0);
  7. cursor = NULL;
  8. remove_from_trx_handler_list();
  9. last_cursor_error = 0;
  10. }
  11. active_index = tokudb_active_index = MAX_KEY;
  12. //
  13. // reset query variables
  14. //
  15. unpack_entire_row = true;
  16. read_blobs = true;
  17. read_key = true;
  18. num_fixed_cols_for_query = 0;
  19. num_var_cols_for_query = 0;
  20. invalidate_bulk_fetch();
  21. invalidate_icp();
  22. doing_bulk_fetch = false;
  23. close_dsmrr();
  24. TOKUDB_HANDLER_DBUG_RETURN(0);
  25. }

这就是一条query语句在tokudb引擎执行的大致过程。下个月见!