MySQL · 源码分析 · CSV 引擎详解

CSV Engine

MySQL中有多种存储引擎,不同的存储引擎提供不同的存储机制、索引技巧、锁定水平等功能,使用不同的存储引擎,可以获得特定的功能。 ​

这里介绍的CSV引擎主要特点是简单方便,可以直接将文本形式的数据存储mysql中的表。csv引擎不支持索引、事务、查询下推等,一般用于日志表的数据存储或者作为数据转换的中间表,可以将直接excel表或者csv文件导入mysql中,方便用户使用。

  • csv文件的每条记录被分隔符分隔为字段(典型分隔符有逗号、分号或制表符;有时分隔符可以包括可选的空格)
  • 每条记录都有同样的字段序列。
  • mysql中支持的csv是逗号分隔,字符串型的field需要加双引号,field中的\r\n前需要加转义符,形式如下:
  1. 1,"xuhaiyan","\\n"
  2. 2,"xuhaiyan","\\r"
  3. 3,"xuhaiyan","\\r"

Relevant Code

MySQL CSV引擎的源码位于storage/csv下面。包括transparent_file 和 ha_tina 两个主要的部分。其中ha_tina继承文件操作的基类handler,与sever层进行交互。transparent_file 是作为csv文件写入和读取的file_buffer使用。

  1. class Transparent_file {
  2. File filedes;
  3. uchar *buff; /* in-memory window to the file or mmaped area */
  4. /* current window sizes */
  5. my_off_t lower_bound;
  6. my_off_t upper_bound;
  7. uint buff_size;
  8. public:
  9. Transparent_file();
  10. ~Transparent_file();
  11. void init_buff(File filedes_arg);
  12. uchar *ptr();
  13. my_off_t start();
  14. my_off_t end();
  15. char get_value(my_off_t offset);
  16. my_off_t read_next();
  17. };

通过transparent_file类的定义我们可以看到该csv文件的file_buffer的一些基本描述信息。

  • File类型的fileds是标识通过mysql_file_open()函数打开的文件。 — *buffer 是映射到文件的缓存,默认大小为4k。
  • lower_bound和 upper_bound分别对应当前buff在文件中的位置的下界和上界。
  • 类函数也比较简单,包括初始化、对buffer基本描述信息以及具体buffer中的内容的读取。
  1. class ha_tina : public handler {
  2. THR_LOCK_DATA lock; /* MySQL lock */
  3. TINA_SHARE *share; /* Shared lock info */
  4. my_off_t
  5. current_position; /* Current position in the file during a file scan */
  6. my_off_t next_position; /* Next position in the file scan */
  7. my_off_t local_saved_data_file_length; /* save position for reads */
  8. my_off_t temp_file_length;
  9. uchar byte_buffer[IO_SIZE];
  10. Transparent_file *file_buff;
  11. File data_file; /* File handler for readers */
  12. File update_temp_file;
  13. String buffer;
  14. };

通过ha_tina的类定义,我们了解csv引擎所支持的文件操作特性。

  • THR_LOCK_DATA lock是相关的锁,ha_tina引擎只支持表。
  • TINA_SHARE *share 对于同一张csv表,同一时刻可有多个handler,他们之间的数据共享是通过共同维护一个TINA_SHARE的实例实现的。
  • my_off_t current_position是当前数据读取的位置
  • my_off_t next_position是下一个数据读取的位置

Table Scan

csv不支持索引,也没有行的概念。仅仅依靠识别下面三种行尾标记来判断行。只有读到该字符时才能感知到行的存在,因此无法任意读取某一行数据。仅支持全表扫描。

  1. /*
  2. '\r' -- Old Mac OS line ending
  3. '\n' -- Traditional Unix and Mac OS X line ending
  4. '\r''\n' -- DOS\Windows line ending
  5. */
  6. static my_off_t find_eoln_buff(Transparent_file *data_buff, my_off_t begin,
  7. my_off_t end, int *eoln_len) {
  8. *eoln_len = 0;
  9. for (my_off_t x = begin; x < end; x++) {
  10. /* Unix (includes Mac OS X) */
  11. if (data_buff->get_value(x) == '\n')
  12. *eoln_len = 1;
  13. else if (data_buff->get_value(x) == '\r') // Mac or Dos
  14. {
  15. /* old Mac line ending */
  16. if (x + 1 == end || (data_buff->get_value(x + 1) != '\n'))
  17. *eoln_len = 1;
  18. else // DOS style ending
  19. *eoln_len = 2;
  20. }
  21. if (*eoln_len) // end of line was found
  22. return x;
  23. }
  24. return 0;
  25. }

全表扫描涉及的方法有rnd_init、rnd_next、rnd_end。

  • rnd_init()方法将buffer设置为文件开头。
  • rnd_next()方法中核心方法为find_current_row,该方法会从缓冲区中读入一行中各个字段的值。研读find_current_row()的源码发现,其读取方式并不是流式读取的,在真正开始读取一行之前,需要调用find_eoln_buff()方法,从当前位置逐个扫描每个字节直到发现行尾部标记。再回到起始位置读取完整一行的数据并进行解析。但如果buffer_size小于甚至远远小于一行数据的大小,则会在扫描过程中进行多次额外的I/O操作,会影响性能。
  1. int ha_tina::find_current_row(uchar *buf) {
  2. // ...
  3. /*
  4. find end of row
  5. */
  6. if ((end_offset = find_eoln_buff(file_buff, current_position,
  7. local_saved_data_file_length, &eoln_len)) ==
  8. 0)
  9. return HA_ERR_END_OF_FILE;
  10. /* We must read all columns in case a table is opened for update */
  11. read_all = !bitmap_is_clear_all(table->write_set);
  12. /* Avoid asserts in ::store() for columns that are not going to be updated */
  13. org_bitmap = dbug_tmp_use_all_columns(table, table->write_set);
  14. error = HA_ERR_CRASHED_ON_USAGE;
  15. memset(buf, 0, table->s->null_bytes);
  16. for (Field **field = table->field; *field; field++) {
  17. char curr_char;
  18. buffer.length(0);
  19. if (curr_offset >= end_offset) goto err;
  20. curr_char = file_buff->get_value(curr_offset);
  21. /*
  22. Parse the line obtained using the following algorithm
  23. BEGIN
  24. 1) Store the EOL (end of line) for the current row
  25. 2) Until all the fields in the current query have not been
  26. filled
  27. 2.1) If the current character is a quote
  28. 2.1.1) Until EOL has not been reached
  29. a) If end of current field is reached, move
  30. to next field and jump to step 2.3
  31. b) If current character is a \\ handle
  32. \\n, \\r, \\, \\"
  33. c) else append the current character into the buffer
  34. before checking that EOL has not been reached.
  35. 2.2) If the current character does not begin with a quote
  36. 2.2.1) Until EOL has not been reached
  37. a) If the end of field has been reached move to the
  38. next field and jump to step 2.3
  39. b) If current character begins with \\ handle
  40. \\n, \\r, \\, \\"
  41. c) else append the current character into the buffer
  42. before checking that EOL has not been reached.
  43. 2.3) Store the current field value and jump to 2)
  44. TERMINATE
  45. */
  46. }
  47. next_position = end_offset + eoln_len;
  48. error = 0;
  49. err:
  50. dbug_tmp_restore_column_map(table->write_set, org_bitmap);
  51. return error;
  52. }
  • rnd_end()在全表扫描结束后将是否知道行数的flag标记(records_is_known)为true。

Update & Delete

  1. struct tina_set {
  2. my_off_t begin;
  3. my_off_t end;
  4. };
  5. class ha_tina : public handler {
  6. /*
  7. The chain contains "holes" in the file, occurred because of
  8. deletes/updates. It is used in rnd_end() to get rid of them
  9. in the end of the query.
  10. */
  11. tina_set chain_buffer[DEFAULT_CHAIN_LENGTH];
  12. tina_set *chain;
  13. tina_set *chain_ptr;
  14. uchar chain_alloced;
  15. uint32 chain_size;
  16. uint local_data_file_version; /* Saved version of the data file used */
  17. bool records_is_known;
  18. MEM_ROOT blobroot;
  19. };

以上是跟数据更新相关的成员变量。

update、delete会改动数据文件,其中update操作会先将原记录delete,再插入新的数据。

update、delete操作在执行之前,需要执行rnd_next扫描表,找到所关联的row update、delete操作。

  • chain_buffer中存储了当前所有被标记为delete的row。
  • tina_set::begin指明该row在文件中的起点,tina_set::end为终点。
  • chain指向本次迭代扫描时的chain链的起点,chain_ptr指向chain链的尾部。

每次执行update/delete,都会调用chain_append方法往chain链表尾部插入删除点。

默认情况下,删除点tina_set会存放于预先分配的空间chain_buffer中。但当有大量删除点时,chain_append会调用realloc/malloc额外申请更大的空间。

对于delete操作,chain_append操作已经足够。对于update操作,则仍需要打开一个临时文件(后缀为.CSN),将更新后的数据插入到临时文件中。

  1. int ha_tina::rnd_end() {
  2. while ((file_buffer_start != (my_off_t)-1))
  3. {
  4. mysql_file_write(update_temp_file, ...);
  5. if (in_hole) {
  6. // skip hole
  7. }
  8. }
  9. mysql_file_rename(...)
  10. }

当全表扫描结束后,则在rnd_end中将原数据文件未有被标记为delete的记录插入到临时文件中。最后,删除原文件,并将临时文件重命名为数据文件。

Repair and Check

CSV存储引擎支持CHECK TABLE和REPAIR TABLE语句来验证损坏的CSV表,并尽可能修复CSV表。

当运行CHECK TABLE语句时,将通过查找正确的字段分隔符、转义字段(匹配或缺少引号)、与表定义比较的正确字段数量以及是否存在相应的CSV元文件来检查CSV文件的有效性。

  1. int ha_tina::check(THD *thd, HA_CHECK_OPT *) {
  2. // ...
  3. /* Read the file row-by-row. If everything is ok, repair is not needed. */
  4. while (!(rc = find_current_row(buf))) {
  5. thd_inc_row_count(thd);
  6. count--;
  7. current_position = next_position;
  8. }
  9. // ...
  10. if ((rc != HA_ERR_END_OF_FILE) || count) {
  11. share->crashed = true;
  12. return HA_ADMIN_CORRUPT;
  13. }
  14. return HA_ADMIN_OK;
  15. }

使用REPAIR TABLE修复表,它从现有CSV数据复制尽可能多的有效行,然后用恢复的行替换现有CSV文件。损坏数据以外的任何行都将丢失。

如果文件为空,更改文件中的行号并完成恢复。否则,扫描表寻找坏行。

如果没有找到,则将该文件标记为良好文件并返回。如果遇到坏行,则截断数据文件直到最后一个好的行。代码流程如下:

  1. int ha_tina::repair(THD *thd, HA_CHECK_OPT *) {
  2. |-// ...
  3. |
  4. | /* empty file */
  5. |-if (!share->saved_data_file_length) {
  6. | share->rows_recorded = 0;
  7. | goto end;
  8. | }
  9. |
  10. |-// ...
  11. |
  12. | /* Read the file row-by-row. If everything is ok, repair is not needed. */
  13. |-while (!(rc = find_current_row(buf))) {
  14. | // ...
  15. | }
  16. | current_position = next_position;
  17. | }
  18. |
  19. | /* all rows good,the file does not need repair */
  20. |-if (rc == HA_ERR_END_OF_FILE) {
  21. | // ...
  22. | }
  23. |
  24. | /* encountered a bad row => repair is needed =>create a temporary file */
  25. |-if(repair_file = mysql_file_create())
  26. | // ...
  27. | }
  28. | /* we just truncated the file up to the first bad row. update rows count. */
  29. | /* write repaired file */
  30. |-while (1) {
  31. | |-mysql_file_write();
  32. | |
  33. | |-file_buff->read_next();
  34. | }
  35. | /* Close the files and rename repaired file to the datafile. */
  36. |-if (share->tina_write_opened) {
  37. | /* Data file might be opened twice, close both instances */
  38. | |-if (mysql_file_close(share->tina_write_filedes, MYF(0)))
  39. | |-return my_errno() ? my_errno() : -1;
  40. | |-share->tina_write_opened = false;
  41. | }
  42. |-if (mysql_file_close(data_file, MYF(0)) ||
  43. | mysql_file_close(repair_file, MYF(0)) ||
  44. | mysql_file_rename(csv_key_file_data, repaired_fname,
  45. | share->data_file_name, MYF(0)))
  46. | return -1;
  47. | /* Open the file again, it should now be repaired */
  48. |-if ((data_file = mysql_file_open(csv_key_file_data, share->data_file_name,
  49. | O_RDWR | O_APPEND, MYF(MY_WME))) == -1)
  50. | return my_errno() ? my_errno() : -1;
  51. | /* Set new file size. */
  52. |-local_saved_data_file_length = (size_t)current_position;
  53. |
  54. end:
  55. |-share->crashed = false;
  56. |-return HA_ADMIN_OK;
  57. }

在修复期间,只有从CSV文件到第一个损坏行的行被复制到新表中。从第一个损坏行到表尾的所有其他行都将被删除,即使是有效的行。

Reference

CSV Doc
The Relevant Code