之前的一篇内核月报InnoDB IO子系统 中介绍了InnoDB IO子系统中包含的同步IO以及异步IO。本篇文章将从源码层面剖析一下InnoDB IO子系统中,数据页的同步IO以及异步IO请求的具体实现过程。

在MySQL5.6中,InnoDB的异步IO主要是用来处理预读以及对数据文件的写请求的。而对于正常的页面数据读取则是通过同步IO进行的。到底二者在代码层面上的实现过程有什么样的区别? 接下来我们将以Linux native io的执行过程为主线,对IO请求的执行过程进行梳理。

重点数据结构

  • os_aio_array_t
  1. /** 用来记录某一类(ibuf,log,read,write)异步IO(aio)请求的数组类型。每一个异步IO请求都会在类型对应的数组中注册一个innodb
  2. aio对象。*/
  3. os_aio_array_t {
  4. os_ib_mutex_t mutex; // 主要用来控制异步read/write线程的并发操作。对于ibuf,log类型,由于只有一个线程,所以不存在并发操作问题
  5. os_event_t not_full; // 一个条件变量event,用来通知等待获取slot的线程是否os_aio_array_t数组有空闲的slot供aio请求
  6. os_event_t is_empty; // 条件变量event,用来通知IO线程os_aio_array_t数组是否有pening的IO请求。
  7. ulint n_slots; // 数组容纳的IO请求数。= 线程数 * 每个segment允许pending的请求数(256)
  8. ulint n_segments; // 允许独立wait的segment数,即某种类型的IO的允许最大线程数
  9. ulint cur_seg; /* IO请求会按照round robin的方式分配到不同的segment中,该变量指示下一个IO请求可以分配的segment */
  10. ulint n_reserved; // 已经Pending的IO请求数
  11. os_aio_slot_t* slots; // 用来记录具体的每个IO请求对象的数组,也即n_segments 个线程共用n_slots个槽位来存放pending io请求
  12. \#ifdef __WIN__
  13. HANDLE* handles;
  14. /*!< Pointer to an array of OS native
  15. event handles where we copied the
  16. handles from slots, in the same
  17. order. This can be used in
  18. WaitForMultipleObjects; used only in
  19. Windows */
  20. \#endif __WIN__
  21. \#if defined(LINUX_NATIVE_AIO)
  22. io_context_t* aio_ctx; // aio上下文的数组,每个segment拥有独立的一个aio上下文数组,用来记录以及完成的IO请求上下文
  23. struct io_event* aio_events; // 该数组用来记录已经完成的IO请求事件。异步IO通过设置事件通知IO线程处理完成的IO请求
  24. struct iocb** pending; // 用来记录pending的aio请求
  25. ulint* count; // 该数组记录了每个segment对应的pending aio请求数量
  26. \#endif /* LINUX_NATIV_AIO */
  27. }
  • os_aio_slot_t
  1. // os_aio_array_t数组中用来记录一个异步IO(aio)请求的对象
  2. os_aio_slot_t {
  3. ibool is_read; /*!< TRUE if a read operation */
  4. ulint pos; // os_aio_array_t数组中所在的位置
  5. ibool reserved; // TRUE表示该Slot已经被别的IO请求占用了
  6. time_t reservation_time; // 占用的时间
  7. ulint len; // io请求的长度
  8. byte* buf; // 数据读取或者需要写入的buffer,通常指向buffer pool的一个页面,压缩页面有特殊处理
  9. ulint type; /* 请求类型,即读还是写IO请求 */
  10. os_offset_t offset; /*!< file offset in bytes */
  11. os_file_t file; /*!< file where to read or write */
  12. const char* name; /*!< 需要读取的文件及路径信息 */
  13. ibool io_already_done; /* TRUE表示IO已经完成了
  14. fil_node_t* message1; /* 该aio操作的innodb文件描述符(f_node_t)*/
  15. void* message2; /* 用来记录完成IO请求所对应的具体buffer pool bpage页 */
  16. \#ifdef WIN_ASYNC_IO
  17. HANDLE handle; /*!< handle object we need in the
  18. OVERLAPPED struct */
  19. OVERLAPPED control; /*!< Windows control block for the
  20. aio request */
  21. \#elif defined(LINUX_NATIVE_AIO)
  22. struct iocb control; /* 该slot使用的aio请求控制块iocb */
  23. int n_bytes; /* 读写bytes */
  24. int ret; /* AIO return code */
  25. \#endif /* WIN_ASYNC_IO */
  26. }

流程图

flow-aio.png

源码分析

  • 物理数据页操作入口函数os_aio_func
  1. ibool
  2. os_aio_func(
  3. /*========*/
  4. ulint type, /* IO类型,READ还是WRITE IO */
  5. ulint mode, /* 这里表示是否使用SIMULATED aio执行异步IO请求 */
  6. const char* name, /* IO需要打开的tablespace路径+名称 */
  7. os_file_t file, /* IO操作的文件 */
  8. void* buf, // 数据读取或者需要写入的buffer,通常指向buffer pool的一个页面,压缩页面有特殊处理
  9. os_offset_t offset, /*!< in: file offset where to read or write */
  10. ulint n, /* 读取或写入字节数 */
  11. fil_node_t* message1, /* 该aio操作的innodb文件描述符(f_node_t),只对异步IO起作用 */
  12. void* message2, /* 用来记录完成IO请求所对应的具体buffer pool bpage页,只对异步IO起作用 */
  13. ibool should_buffer, // 是否需要缓存aio请求,该变量主要对预读起作用
  14. ibool page_encrypt,
  15. /*!< in: Whether to encrypt */
  16. ulint page_size)
  17. /*!< in: Page size */
  18. {
  19. ...
  20. wake_later = mode & OS_AIO_SIMULATED_WAKE_LATER;
  21. mode = mode & (~OS_AIO_SIMULATED_WAKE_LATER);
  22. if (mode == OS_AIO_SYNC
  23. #ifdef WIN_ASYNC_IO
  24. && !srv_use_native_aio
  25. #endif /* WIN_ASYNC_IO */
  26. ) {
  27. /* This is actually an ordinary synchronous read or write:
  28. no need to use an i/o-handler thread. NOTE that if we use
  29. Windows async i/o, Windows does not allow us to use
  30. ordinary synchronous os_file_read etc. on the same file,
  31. therefore we have built a special mechanism for synchronous
  32. wait in the Windows case.
  33. Also note that the Performance Schema instrumentation has
  34. been performed by current os_aio_func()'s wrapper function
  35. pfs_os_aio_func(). So we would no longer need to call
  36. Performance Schema instrumented os_file_read() and
  37. os_file_write(). Instead, we should use os_file_read_func()
  38. and os_file_write_func() */
  39. /* 这里如果是同步IO,并且native io没有开启的情况下,直接使用os_file_read/write函数进行读取,
  40. 不需要经过IO线程进行处理 */
  41. if (type == OS_FILE_READ) {
  42. if (page_encrypt) {
  43. return(os_file_read_decrypt_page(file, buf, offset, n, page_size));
  44. } else {
  45. return(os_file_read_func(file, buf, offset, n));
  46. }
  47. }
  48. ut_ad(!srv_read_only_mode);
  49. ut_a(type == OS_FILE_WRITE);
  50. if (page_encrypt) {
  51. return(os_file_write_encrypt_page(name, file, buf, offset, n, page_size));
  52. } else {
  53. return(os_file_write_func(name, file, buf, offset, n));
  54. }
  55. }
  56. try_again:
  57. switch (mode) {
  58. // 根据访问类型,定位IO请求数组
  59. case OS_AIO_NORMAL:
  60. if (type == OS_FILE_READ) {
  61. array = os_aio_read_array;
  62. } else {
  63. ut_ad(!srv_read_only_mode);
  64. array = os_aio_write_array;
  65. }
  66. break;
  67. case OS_AIO_IBUF:
  68. ut_ad(type == OS_FILE_READ);
  69. /* Reduce probability of deadlock bugs in connection with ibuf:
  70. do not let the ibuf i/o handler sleep */
  71. wake_later = FALSE;
  72. if (srv_read_only_mode) {
  73. array = os_aio_read_array;
  74. }
  75. break;
  76. case OS_AIO_LOG:
  77. if (srv_read_only_mode) {
  78. array = os_aio_read_array;
  79. } else {
  80. array = os_aio_log_array;
  81. }
  82. break;
  83. case OS_AIO_SYNC:
  84. array = os_aio_sync_array;
  85. #if defined(LINUX_NATIVE_AIO)
  86. /* In Linux native AIO we don't use sync IO array. */
  87. ut_a(!srv_use_native_aio);
  88. #endif /* LINUX_NATIVE_AIO */
  89. break;
  90. default:
  91. ut_error;
  92. array = NULL; /* Eliminate compiler warning */
  93. }
  94. // 阻塞为当前IO请求申请一个用来执行异步IO的slot
  95. slot = os_aio_array_reserve_slot(type, array, message1, message2, file,
  96. name, buf, offset, n, page_encrypt, page_size);
  97. DBUG_EXECUTE_IF("simulate_slow_aio",
  98. {
  99. os_thread_sleep(1000000);
  100. }
  101. );
  102. if (type == OS_FILE_READ) {
  103. if (srv_use_native_aio) {
  104. os_n_file_reads++;
  105. os_bytes_read_since_printout += n;
  106. #ifdef WIN_ASYNC_IO
  107. // 这里是Windows用来处理异步IO读请求
  108. ret = ReadFile(file, buf, (DWORD) n, &len,
  109. &(slot->control));
  110. #elif defined(LINUX_NATIVE_AIO)
  111. // 这里是Linux来处理native io
  112. if (!os_aio_linux_dispatch(array, slot, should_buffer)) {
  113. goto err_exit;
  114. #endif /* WIN_ASYNC_IO */
  115. } else {
  116. if (!wake_later) {
  117. // 唤醒simulated aio处理线程
  118. os_aio_simulated_wake_handler_thread(
  119. os_aio_get_segment_no_from_slot(
  120. array, slot));
  121. }
  122. }
  123. } else if (type == OS_FILE_WRITE) {
  124. ut_ad(!srv_read_only_mode);
  125. if (srv_use_native_aio) {
  126. os_n_file_writes++;
  127. #ifdef WIN_ASYNC_IO
  128. // 这里是Windows用来处理异步IO写请求
  129. ret = WriteFile(file, buf, (DWORD) n, &len,
  130. &(slot->control));
  131. #elif defined(LINUX_NATIVE_AIO)
  132. // 这里是Linux来处理native io
  133. if (!os_aio_linux_dispatch(array, slot, false)) {
  134. goto err_exit;
  135. }
  136. #endif /* WIN_ASYNC_IO */
  137. } else {
  138. if (!wake_later) {
  139. // 唤醒simulated aio处理线程
  140. os_aio_simulated_wake_handler_thread(
  141. os_aio_get_segment_no_from_slot(
  142. array, slot));
  143. }
  144. }
  145. } else {
  146. ut_error;
  147. }
  148. ...
  149. }
  • 负责通知Linux内核执行native IO请求的函数os_aio_linux_dispatch
  1. static
  2. ibool
  3. os_aio_linux_dispatch(
  4. /*==================*/
  5. os_aio_array_t* array, /* IO请求函数 */
  6. os_aio_slot_t* slot, /* 申请好的slot */
  7. ibool should_buffer) // 是否需要缓存aio 请求,该变量主要对预读起作用
  8. {
  9. ...
  10. /* Find out what we are going to work with.
  11. The iocb struct is directly in the slot.
  12. The io_context is one per segment. */
  13. // 每个segment包含的slot个数,Linux下每个segment包含256个slot
  14. slots_per_segment = array->n_slots / array->n_segments;
  15. iocb = &slot->control;
  16. io_ctx_index = slot->pos / slots_per_segment;
  17. if (should_buffer) {
  18. /* 这里也可以看到aio请求缓存只对读请求起作用 */
  19. ut_ad(array == os_aio_read_array);
  20. ulint n;
  21. ulint count;
  22. os_mutex_enter(array->mutex);
  23. /* There are array->n_slots elements in array->pending, which is divided into
  24. * array->n_segments area of equal size. The iocb of each segment are
  25. * buffered in its corresponding area in the pending array consecutively as
  26. * they come. array->count[i] records the number of buffered aio requests in
  27. * the ith segment.*/
  28. n = io_ctx_index * slots_per_segment
  29. + array->count[io_ctx_index];
  30. array->pending[n] = iocb;
  31. array->count[io_ctx_index] ++;
  32. count = array->count[io_ctx_index];
  33. os_mutex_exit(array->mutex);
  34. // 如果当前segment的slot都已经被占用了,就需要提交一次异步aio请求
  35. if (count == slots_per_segment) {
  36. os_aio_linux_dispatch_read_array_submit(); //no cover line
  37. }
  38. // 否则就直接返回
  39. return (TRUE);
  40. }
  41. // 直接提交IO请求到内核
  42. ret = io_submit(array->aio_ctx[io_ctx_index], 1, &iocb);
  43. ...
  44. }
  • IO线程负责监控aio请求的主函数fil_aio_wait
  1. void
  2. fil_aio_wait(
  3. /*=========*/
  4. ulint segment) /*!< in: the number of the segment in the aio
  5. array to wait for */
  6. {
  7. ibool ret;
  8. fil_node_t* fil_node;
  9. void* message;
  10. ulint type;
  11. ut_ad(fil_validate_skip());
  12. if (srv_use_native_aio) { // 使用native io
  13. srv_set_io_thread_op_info(segment, "native aio handle");
  14. #ifdef WIN_ASYNC_IO
  15. ret = os_aio_windows_handle( // Window监控入口
  16. segment, 0, &fil_node, &message, &type);
  17. #elif defined(LINUX_NATIVE_AIO)
  18. ret = os_aio_linux_handle( // Linux native io监控入口
  19. segment, &fil_node, &message, &type);
  20. #else
  21. ut_error;
  22. ret = 0; /* Eliminate compiler warning */
  23. #endif /* WIN_ASYNC_IO */
  24. } else {
  25. srv_set_io_thread_op_info(segment, "simulated aio handle");
  26. ret = os_aio_simulated_handle( // Simulated aio监控入口
  27. segment, &fil_node, &message, &type);
  28. }
  29. ut_a(ret);
  30. if (fil_node == NULL) {
  31. ut_ad(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS);
  32. return;
  33. }
  34. srv_set_io_thread_op_info(segment, "complete io for fil node");
  35. mutex_enter(&fil_system->mutex);
  36. // 到这里表示至少有一个IO请求已经完成,该函数设置状态信息
  37. fil_node_complete_io(fil_node, fil_system, type);
  38. mutex_exit(&fil_system->mutex);
  39. ut_ad(fil_validate_skip());
  40. /* Do the i/o handling */
  41. /* IMPORTANT: since i/o handling for reads will read also the insert
  42. buffer in tablespace 0, you have to be very careful not to introduce
  43. deadlocks in the i/o system. We keep tablespace 0 data files always
  44. open, and use a special i/o thread to serve insert buffer requests. */
  45. if (fil_node->space->purpose == FIL_TABLESPACE) { // 数据文件读写IO
  46. srv_set_io_thread_op_info(segment, "complete io for buf page");
  47. // IO请求完成后,这里处理buffer pool对应的bpage相关的一些状态信息并根据checksum验证数据的正确性
  48. buf_page_io_complete(static_cast<buf_page_t*>(message));
  49. } else { // 日志文件的读写IO
  50. srv_set_io_thread_op_info(segment, "complete io for log");
  51. log_io_complete(static_cast<log_group_t*>(message));
  52. }
  53. }
  54. #endif /* UNIV_HOTBACKUP */
  • IO线程负责处理native IO请求的函数os_aio_linux_handle
  1. ibool
  2. os_aio_linux_handle(ulint global_seg, // 属于哪个segment
  3. fil_node_t**message1, /* 该aio操作的innodb文件描述符(f_node_t)*/
  4. void** message2, /* 用来记录完成IO请求所对应的具体buffer pool bpage页 */
  5. ulint* type){ // 读or写IO
  6. // 根据global_seg获得该aio 的os_aio_array_t数组,并返回对应的segment
  7. segment = os_aio_get_array_and_local_segment(&array, global_seg);
  8. n = array->n_slots / array->n_segments; //获得一个线程可监控的io event数
  9. /* Loop until we have found a completed request. */
  10. for (;;) {
  11. ibool any_reserved = FALSE;
  12. os_mutex_enter(array->mutex);
  13. for (i = 0; i < n; ++i) { // 遍历该线程所发起的所有aio请求
  14. slot = os_aio_array_get_nth_slot(
  15. array, i + segment * n);
  16. if (!slot->reserved) { // 该slot是否被占用
  17. continue;
  18. } else if (slot->io_already_done) { // IO请求已经完成,可以通知主线程返回数据了
  19. /* Something for us to work on. */
  20. goto found;
  21. } else {
  22. any_reserved = TRUE;
  23. }
  24. }
  25. os_mutex_exit(array->mutex);
  26. // 到这里说明没有找到一个完成的io,则再去collect
  27. os_aio_linux_collect(array, segment, n);
  28. found: // 找到一个完成的io,将内容返回
  29. *message1 = slot->message1;
  30. *message2 = slot->message2; // 返回完成IO所对应的bpage页
  31. *type = slot->type;
  32. if (slot->ret == 0 && slot->n_bytes == (long) slot->len) {
  33. if (slot->page_encrypt
  34. && slot->type == OS_FILE_READ) {
  35. os_decrypt_page(slot->buf, slot->len, slot->page_size, FALSE);
  36. }
  37. ret = TRUE;
  38. } else {
  39. errno = -slot->ret;
  40. /* os_file_handle_error does tell us if we should retry
  41. this IO. As it stands now, we don't do this retry when
  42. reaping requests from a different context than
  43. the dispatcher. This non-retry logic is the same for
  44. windows and linux native AIO.
  45. We should probably look into this to transparently
  46. re-submit the IO. */
  47. os_file_handle_error(slot->name, "Linux aio");
  48. ret = FALSE;
  49. }
  50. os_mutex_exit(array->mutex);
  51. os_aio_array_free_slot(array, slot);
  52. return(ret);
  53. }
  • 等待native IO请求完成os_aio_linux_collect
  1. os_aio_linux_collect(os_aio_array_t* array,
  2. ulint segment,
  3. ulint seg_size){
  4. events = &array->aio_events[segment * seg_size]; // 定位segment所对应的io event的数组位置
  5. /* 获得该线程的aio上下文数组 */
  6. io_ctx = array->aio_ctx[segment];
  7. /* Starting point of the segment we will be working on. */
  8. start_pos = segment * seg_size;
  9. /* End point. */
  10. end_pos = start_pos + seg_size;
  11. retry:
  12. /* Initialize the events. The timeout value is arbitrary.
  13. We probably need to experiment with it a little. */
  14. memset(events, 0, sizeof(*events) * seg_size);
  15. timeout.tv_sec = 0;
  16. timeout.tv_nsec = OS_AIO_REAP_TIMEOUT;
  17. ret = io_getevents(io_ctx, 1, seg_size, events, &timeout); // 阻塞等待该IO线程所监控的任一IO请求完成
  18. if (ret > 0) { // 有IO请求完成
  19. for (i = 0; i < ret; i++) {
  20. // 记录完成IO的请求信息到对应的os_aio_slot_t 对象
  21. os_aio_slot_t* slot;
  22. struct iocb* control;
  23. control = (struct iocb*) events[i].obj; // 获得完成的aio的iocb,即提交这个aio请求的iocb
  24. ut_a(control != NULL);
  25. slot = (os_aio_slot_t*) control->data; // 通过data获得这个aio iocb所对应的os_aio_slot_t
  26. /* Some sanity checks. */
  27. ut_a(slot != NULL);
  28. ut_a(slot->reserved);
  29. os_mutex_enter(array->mutex);
  30. slot->n_bytes = events[i].res; // 将该io执行的结果保存到slot里
  31. slot->ret = events[i].res2;
  32. slot->io_already_done = TRUE; // 标志该io已经完成了,这个标志也是外层判断的条件
  33. os_mutex_exit(array->mutex);
  34. }
  35. return;
  36. }
  37. }

综上重点对InnoDB navtive IO读写数据文件从源码角度进行了分析,有兴趣的读者也可以继续了解InnoDB自带的simulated IO的实现过程,原理雷同native IO,只是在实现方式上自己进行了处理。本篇文章对InnoDB IO请求的执行流程进行了梳理,对重点数据结构以及函数进行了分析,希望对读者日后进行源码阅读及修改有所帮助。