本文只分析了insert语句执行的主路径,和路径上部分关键函数,很多细节没有深入,留给读者继续分析

    create table t1(id int);

    insert into t1 values(1)

    略过建立连接,从 mysql_parse() 开始分析

    1. void mysql_parse(THD *thd, char *rawbuf, uint length,
    2. Parser_state *parser_state)
    3. {
    4. /* ...... */
    5. /* 检查query_cache,如果结果存在于cache中,直接返回 */
    6. if (query_cache_send_result_to_client(thd, rawbuf, length) <= 0)
    7. {
    8. LEX *lex= thd->lex;
    9. /* 解析语句 */
    10. bool err= parse_sql(thd, parser_state, NULL);
    11. /* 整理语句格式,记录 general log */
    12. /* ...... */
    13. /* 执行语句 */
    14. error= mysql_execute_command(thd);
    15. /* 提交或回滚没结束的事务(事务可能在mysql_execute_command中提交,用trx_end_by_hint标记事务是否已经提交) */
    16. if (!thd->trx_end_by_hint)
    17. {
    18. if (!error && lex->ci_on_success)
    19. trans_commit(thd);
    20. if (error && lex->rb_on_fail)
    21. trans_rollback(thd);
    22. }

    进入 mysql_execute_command()

    1. /* */
    2. /* ...... */
    3. case SQLCOM_INSERT:
    4. {
    5. /* 检查权限 */
    6. if ((res= insert_precheck(thd, all_tables)))
    7. break;
    8. /* 执行insert */
    9. res= mysql_insert(thd, all_tables, lex->field_list, lex->many_values,
    10. lex->update_list, lex->value_list,
    11. lex->duplicates, lex->ignore);
    12. /* 提交或者回滚事务 */
    13. if (!res)
    14. {
    15. trans_commit_stmt(thd);
    16. trans_commit(thd);
    17. thd->trx_end_by_hint= TRUE;
    18. }
    19. else if (res)
    20. {
    21. trans_rollback_stmt(thd);
    22. trans_rollback(thd);
    23. thd->trx_end_by_hint= TRUE;
    24. }

    进入 mysql_insert()

    1. bool mysql_insert(THD *thd,TABLE_LIST *table_list,
    2. List<Item> &fields, /* insert 的字段 */
    3. List<List_item> &values_list, /* insert 的值 */
    4. List<Item> &update_fields,
    5. List<Item> &update_values,
    6. enum_duplicates duplic,
    7. bool ignore)
    8. {
    9. /*对每条记录调用 write_record */
    10. while ((values= its++))
    11. {
    12. if (lock_type == TL_WRITE_DELAYED)
    13. {
    14. LEX_STRING const st_query = { query, thd->query_length() };
    15. DEBUG_SYNC(thd, "before_write_delayed");
    16. /* insert delay */
    17. error= write_delayed(thd, table, st_query, log_on, &info);
    18. DEBUG_SYNC(thd, "after_write_delayed");
    19. query=0;
    20. }
    21. else
    22. /* normal insert */
    23. error= write_record(thd, table, &info, &update);
    24. }
    25. /*
    26. 这里还有
    27. thd->binlog_query()写binlog
    28. my_ok()返回ok报文,ok报文中包含影响行数
    29. */

    进入 write_record

    1. /*
    2. COPY_INFO *info 用来处理唯一键冲突,记录影响行数
    3. COPY_INFO *update 处理 INSERT ON DUPLICATE KEY UPDATE 相关信息
    4. */
    5. int write_record(THD *thd, TABLE *table, COPY_INFO *info, COPY_INFO *update)
    6. {
    7. if (duplicate_handling == DUP_REPLACE || duplicate_handling == DUP_UPDATE)
    8. {
    9. /* 处理 INSERT ON DUPLICATE KEY UPDATE 等复杂情况 */
    10. }
    11. /* 调用存储引擎的接口 */
    12. else if ((error=table->file->ha_write_row(table->record[0])))
    13. {
    14. DEBUG_SYNC(thd, "write_row_noreplace");
    15. if (!ignore_errors ||
    16. table->file->is_fatal_error(error, HA_CHECK_DUP))
    17. goto err;
    18. table->file->restore_auto_increment(prev_insert_id);
    19. goto ok_or_after_trg_err;
    20. }
    21. }

    进入ha_write_row、write_row

    1. /* handler 是各个存储引擎的基类,这里我们使用InnoDB引擎*/
    2. int handler::ha_write_row(uchar *buf)
    3. {
    4. /* 指定log_event类型*/
    5. Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
    6. error= write_row(buf);
    7. }

    进入引擎层,这里是innodb引擎,handler对应ha_innobase 插入的表信息保存在handler中

    1. int
    2. ha_innobase::write_row(
    3. /*===================*/
    4. uchar* record) /*!< in: a row in MySQL format */
    5. {
    6. error = row_insert_for_mysql((byte*) record, prebuilt);
    7. }
    1. UNIV_INTERN
    2. dberr_t
    3. row_insert_for_mysql(
    4. /*=================*/
    5. byte* mysql_rec, /*!< in: row in the MySQL format */
    6. row_prebuilt_t* prebuilt) /*!< in: prebuilt struct in MySQL
    7. handle */
    8. {
    9. /*记录格式从MySQL转换成InnoDB*/
    10. row_mysql_convert_row_to_innobase(node->row, prebuilt, mysql_rec);
    11. thr->run_node = node;
    12. thr->prev_node = node;
    13. /*插入记录*/
    14. row_ins_step(thr);
    15. }
    1. UNIV_INTERN
    2. que_thr_t*
    3. row_ins_step(
    4. /*=========*/
    5. que_thr_t* thr) /*!< in: query thread */
    6. {
    7. /*给表加IX锁*/
    8. err = lock_table(0, node->table, LOCK_IX, thr);
    9. /*插入记录*/
    10. err = row_ins(node, thr);
    11. }

    InnoDB表是基于B+树的索引组织表

    如果InnoDB表没有主键和唯一键,需要分配隐含的row_id组织聚集索引

    row_id分配逻辑在row_ins中,这里不详细展开

    1. static __attribute__((nonnull, warn_unused_result))
    2. dberr_t
    3. row_ins(
    4. /*====*/
    5. ins_node_t* node, /*!< in: row insert node */
    6. que_thr_t* thr) /*!< in: query thread */
    7. {
    8. if (node->state == INS_NODE_ALLOC_ROW_ID) {
    9. /*若innodb表没有主键和唯一键,用row_id组织索引*/
    10. row_ins_alloc_row_id_step(node);
    11. /*获取row_id的索引*/
    12. node->index = dict_table_get_first_index(node->table);
    13. node->entry = UT_LIST_GET_FIRST(node->entry_list);
    14. }
    15. /*遍历所有索引,向每个索引中插入记录*/
    16. while (node->index != NULL) {
    17. if (node->index->type != DICT_FTS) {
    18. /* 向索引中插入记录 */
    19. err = row_ins_index_entry_step(node, thr);
    20. if (err != DB_SUCCESS) {
    21. return(err);
    22. }
    23. }
    24. /*获取下一个索引*/
    25. node->index = dict_table_get_next_index(node->index);
    26. node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
    27. }
    28. }
    29. }

    插入单个索引项

    1. static __attribute__((nonnull, warn_unused_result))
    2. dberr_t
    3. row_ins_index_entry_step(
    4. /*=====================*/
    5. ins_node_t* node, /*!< in: row insert node */
    6. que_thr_t* thr) /*!< in: query thread */
    7. {
    8. dberr_t err;
    9. /*给索引项赋值*/
    10. row_ins_index_entry_set_vals(node->index, node->entry, node->row);
    11. /*插入索引项*/
    12. err = row_ins_index_entry(node->index, node->entry, thr);
    13. return(err);
    14. }
    1. static
    2. dberr_t
    3. row_ins_index_entry(
    4. /*================*/
    5. dict_index_t* index, /*!< in: index */
    6. dtuple_t* entry, /*!< in/out: index entry to insert */
    7. que_thr_t* thr) /*!< in: query thread */
    8. {
    9. if (dict_index_is_clust(index)) {
    10. /* 插入聚集索引 */
    11. return(row_ins_clust_index_entry(index, entry, thr, 0));
    12. } else {
    13. /* 插入二级索引 */
    14. return(row_ins_sec_index_entry(index, entry, thr));
    15. }
    16. }

    row_ins_clust_index_entry 和 row_ins_sec_index_entry 函数结构类似,只分析插入聚集索引

    1. UNIV_INTERN
    2. dberr_t
    3. row_ins_clust_index_entry(
    4. /*======================*/
    5. dict_index_t* index, /*!< in: clustered index */
    6. dtuple_t* entry, /*!< in/out: index entry to insert */
    7. que_thr_t* thr, /*!< in: query thread */
    8. ulint n_ext) /*!< in: number of externally stored columns */
    9. {
    10. if (UT_LIST_GET_FIRST(index->table->foreign_list)) {
    11. err = row_ins_check_foreign_constraints(
    12. index->table, index, entry, thr);
    13. if (err != DB_SUCCESS) {
    14. return(err);
    15. }
    16. }
    17. /* flush log,make checkpoint(如果需要) */
    18. log_free_check();
    19. /* 先尝试乐观插入,修改叶子节点 BTR_MODIFY_LEAF */
    20. err = row_ins_clust_index_entry_low(
    21. 0, BTR_MODIFY_LEAF, index, n_uniq, entry, n_ext, thr,
    22. &page_no, &modify_clock);
    23. if (err != DB_FAIL) {
    24. DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
    25. return(err);
    26. }
    27. /* flush log,make checkpoint(如果需要) */
    28. log_free_check();
    29. /* 乐观插入失败,尝试悲观插入 BTR_MODIFY_TREE */
    30. return(row_ins_clust_index_entry_low(
    31. 0, BTR_MODIFY_TREE, index, n_uniq, entry, n_ext, thr,
    32. &page_no, &modify_clock));

    row_ins_clust_index_entry_low 和 row_ins_sec_index_entry_low 函数结构类似,只分析插入聚集索引

    1. UNIV_INTERN
    2. dberr_t
    3. row_ins_clust_index_entry_low(
    4. /*==========================*/
    5. ulint flags, /*!< in: undo logging and locking flags */
    6. ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
    7. depending on whether we wish optimistic or
    8. pessimistic descent down the index tree */
    9. dict_index_t* index, /*!< in: clustered index */
    10. ulint n_uniq, /*!< in: 0 or index->n_uniq */
    11. dtuple_t* entry, /*!< in/out: index entry to insert */
    12. ulint n_ext, /*!< in: number of externally stored columns */
    13. que_thr_t* thr, /*!< in: query thread */
    14. ulint* page_no,/*!< *page_no and *modify_clock are used to decide
    15. whether to call btr_cur_optimistic_insert() during
    16. pessimistic descent down the index tree.
    17. in: If this is optimistic descent, then *page_no
    18. must be ULINT_UNDEFINED. If it is pessimistic
    19. descent, *page_no must be the page_no to which an
    20. optimistic insert was attempted last time
    21. row_ins_index_entry_low() was called.
    22. out: If this is the optimistic descent, *page_no is set
    23. to the page_no to which an optimistic insert was
    24. attempted. If it is pessimistic descent, this value is
    25. not changed. */
    26. ullint* modify_clock) /*!< in/out: *modify_clock == ULLINT_UNDEFINED
    27. during optimistic descent, and the modify_clock
    28. value for the page that was used for optimistic
    29. insert during pessimistic descent */
    30. {
    31. /* 将cursor移动到索引上待插入的位置 */
    32. btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, mode,
    33. &cursor, 0, __FILE__, __LINE__, &mtr);
    34. /*根据不同的flag检查主键冲突*/
    35. err = row_ins_duplicate_error_in_clust_online(
    36. n_uniq, entry, &cursor,
    37. &offsets, &offsets_heap);
    38. err = row_ins_duplicate_error_in_clust(
    39. flags, &cursor, entry, thr, &mtr);
    40. /*
    41. 如果要插入的索引项已存在,则把insert操作改为update操作
    42. 索引项已存在,且没有主键冲突,是因为之前的索引项对应的数据被标记为已删除
    43. 本次插入的数据和上次删除的一样,而索引项并未删除,所以变为update操作
    44. */
    45. if (row_ins_must_modify_rec(&cursor)) {
    46. /* There is already an index entry with a long enough common
    47. prefix, we must convert the insert into a modify of an
    48. existing record */
    49. mem_heap_t* entry_heap = mem_heap_create(1024);
    50. /* 更新数据到存在的索引项 */
    51. err = row_ins_clust_index_entry_by_modify(
    52. flags, mode, &cursor, &offsets, &offsets_heap,
    53. entry_heap, &big_rec, entry, thr, &mtr);
    54. /*如果索引正在online_ddl,先记录insert*/
    55. if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
    56. row_log_table_insert(rec, index, offsets);
    57. }
    58. /*提交mini transaction*/
    59. mtr_commit(&mtr);
    60. mem_heap_free(entry_heap);
    61. } else {
    62. rec_t* insert_rec;
    63. if (mode != BTR_MODIFY_TREE) {
    64. /*进行一次乐观插入*/
    65. err = btr_cur_optimistic_insert(
    66. flags, &cursor, &offsets, &offsets_heap,
    67. entry, &insert_rec, &big_rec,
    68. n_ext, thr, &mtr);
    69. } else {
    70. /*
    71. 如果buffer pool余量不足25%,插入失败,返回DB_LOCK_TABLE_FULL
    72. 处理DB_LOCK_TABLE_FULL错误时,会回滚事务
    73. 防止大事务的锁占满buffer pool(注释里写的)
    74. */
    75. if (buf_LRU_buf_pool_running_out()) {
    76. err = DB_LOCK_TABLE_FULL;
    77. goto err_exit;
    78. }
    79. if (/*太长了,略*/) {
    80. /*进行一次乐观插入*/
    81. err = btr_cur_optimistic_insert(
    82. flags, &cursor,
    83. &offsets, &offsets_heap,
    84. entry, &insert_rec, &big_rec,
    85. n_ext, thr, &mtr);
    86. } else {
    87. err = DB_FAIL;
    88. }
    89. if (err == DB_FAIL) {
    90. /*乐观插入失败,进行悲观插入*/
    91. err = btr_cur_pessimistic_insert(
    92. flags, &cursor,
    93. &offsets, &offsets_heap,
    94. entry, &insert_rec, &big_rec,
    95. n_ext, thr, &mtr);
    96. }
    97. }
    98. }

    btr_cur_optimistic_insert 和 btr_cur_pessimistic_insert 涉及B+树的操作,内部细节很多,以后再做分析