第四节 Commit()实现

Commit()方法内部实现中,总体思路是:

  1. 先判定节点要不要合并、分裂
  2. 对空闲列表的判断,是否存在溢出的情况,溢出的话,需要重新分配空间
  3. 将事务中涉及改动的页进行排序(保证尽可能的顺序IO),排序后循环写入到磁盘中,最后再执行刷盘
  4. 当数据写入成功后,再将元信息页写到磁盘中,刷盘以保证持久化
  5. 上述操作中,但凡有失败,当前事务都会进行回滚
  1. // Commit writes all changes to disk and updates the meta page.
  2. // Returns an error if a disk write error occurs, or if Commit is
  3. // called on a read-only transaction.
  4. // 先更新数据然后再更新元信息
  5. // 更新数据成功、元信息未来得及更新机器就挂掉了。数据如何恢复?
  6. func (tx *Tx) Commit() error {
  7. _assert(!tx.managed, "managed tx commit not allowed")
  8. if tx.db == nil {
  9. return ErrTxClosed
  10. } else if !tx.writable {
  11. return ErrTxNotWritable
  12. }
  13. // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
  14. // 删除时,进行平衡,页合并
  15. // Rebalance nodes which have had deletions.
  16. var startTime = time.Now()
  17. tx.root.rebalance()
  18. if tx.stats.Rebalance > 0 {
  19. tx.stats.RebalanceTime += time.Since(startTime)
  20. }
  21. // 页分裂
  22. // spill data onto dirty pages.
  23. startTime = time.Now()
  24. // 这个内部会往缓存tx.pages中加page
  25. if err := tx.root.spill(); err != nil {
  26. tx.rollback()
  27. return err
  28. }
  29. tx.stats.SpillTime += time.Since(startTime)
  30. // Free the old root bucket.
  31. tx.meta.root.root = tx.root.root
  32. opgid := tx.meta.pgid
  33. // Free the freelist and allocate new pages for it. This will overestimate
  34. // the size of the freelist but not underestimate the size (which would be bad).
  35. // 分配新的页面给freelist,然后将freelist写入新的页面
  36. tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
  37. // 空闲列表可能会增加,因此需要重新分配页用来存储空闲列表
  38. // 因为在开启写事务的时候,有去释放之前读事务占用的页信息,因此此处需要判断是否freelist会有溢出的问题
  39. p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
  40. if err != nil {
  41. tx.rollback()
  42. return err
  43. }
  44. // 将freelist写入到连续的新页中
  45. if err := tx.db.freelist.write(p); err != nil {
  46. tx.rollback()
  47. return err
  48. }
  49. // 更新元数据的页id
  50. tx.meta.freelist = p.id
  51. // If the high water mark has moved up then attempt to grow the database.
  52. // 在allocate中有可能会更改meta.pgid
  53. if tx.meta.pgid > opgid {
  54. if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
  55. tx.rollback()
  56. return err
  57. }
  58. }
  59. // Write dirty pages to disk.
  60. startTime = time.Now()
  61. // 写数据
  62. if err := tx.write(); err != nil {
  63. tx.rollback()
  64. return err
  65. }
  66. // If strict mode is enabled then perform a consistency check.
  67. // Only the first consistency error is reported in the panic.
  68. if tx.db.StrictMode {
  69. ch := tx.Check()
  70. var errs []string
  71. for {
  72. err, ok := <-ch
  73. if !ok {
  74. break
  75. }
  76. errs = append(errs, err.Error())
  77. }
  78. if len(errs) > 0 {
  79. panic("check fail: " + strings.Join(errs, "\n"))
  80. }
  81. }
  82. // Write meta to disk.
  83. // 元信息写入到磁盘
  84. if err := tx.writeMeta(); err != nil {
  85. tx.rollback()
  86. return err
  87. }
  88. tx.stats.WriteTime += time.Since(startTime)
  89. // Finalize the transaction.
  90. tx.close()
  91. // Execute commit handlers now that the locks have been removed.
  92. for _, fn := range tx.commitHandlers {
  93. fn()
  94. }
  95. return nil
  96. }
  97. // write writes any dirty pages to disk.
  98. func (tx *Tx) write() error {
  99. // Sort pages by id.
  100. // 保证写的页是有序的
  101. pages := make(pages, 0, len(tx.pages))
  102. for _, p := range tx.pages {
  103. pages = append(pages, p)
  104. }
  105. // Clear out page cache early.
  106. tx.pages = make(map[pgid]*page)
  107. sort.Sort(pages)
  108. // Write pages to disk in order.
  109. for _, p := range pages {
  110. // 页数和偏移量
  111. size := (int(p.overflow) + 1) * tx.db.pageSize
  112. offset := int64(p.id) * int64(tx.db.pageSize)
  113. // Write out page in "max allocation" sized chunks.
  114. ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
  115. // 循环写某一页
  116. for {
  117. // Limit our write to our max allocation size.
  118. sz := size
  119. // 2^31=2G
  120. if sz > maxAllocSize-1 {
  121. sz = maxAllocSize - 1
  122. }
  123. // Write chunk to disk.
  124. buf := ptr[:sz]
  125. if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
  126. return err
  127. }
  128. // Update statistics.
  129. tx.stats.Write++
  130. // Exit inner for loop if we've written all the chunks.
  131. size -= sz
  132. if size == 0 {
  133. break
  134. }
  135. // Otherwise move offset forward and move pointer to next chunk.
  136. // 移动偏移量
  137. offset += int64(sz)
  138. // 同时指针也移动
  139. ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
  140. }
  141. }
  142. // Ignore file sync if flag is set on DB.
  143. if !tx.db.NoSync || IgnoreNoSync {
  144. if err := fdatasync(tx.db); err != nil {
  145. return err
  146. }
  147. }
  148. // Put small pages back to page pool.
  149. for _, p := range pages {
  150. // Ignore page sizes over 1 page.
  151. // These are allocated using make() instead of the page pool.
  152. if int(p.overflow) != 0 {
  153. continue
  154. }
  155. buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]
  156. // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
  157. // 清空buf,然后放入pagePool中
  158. for i := range buf {
  159. buf[i] = 0
  160. }
  161. tx.db.pagePool.Put(buf)
  162. }
  163. return nil
  164. }
  165. // writeMeta writes the meta to the disk.
  166. func (tx *Tx) writeMeta() error {
  167. // Create a temporary buffer for the meta page.
  168. buf := make([]byte, tx.db.pageSize)
  169. p := tx.db.pageInBuffer(buf, 0)
  170. // 将事务的元信息写入到页中
  171. tx.meta.write(p)
  172. // Write the meta page to file.
  173. if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
  174. return err
  175. }
  176. if !tx.db.NoSync || IgnoreNoSync {
  177. if err := fdatasync(tx.db); err != nil {
  178. return err
  179. }
  180. }
  181. // Update statistics.
  182. tx.stats.Write++
  183. return nil
  184. }
  185. // allocate returns a contiguous block of memory starting at a given page.
  186. // 分配一段连续的页
  187. func (tx *Tx) allocate(count int) (*page, error) {
  188. p, err := tx.db.allocate(count)
  189. if err != nil {
  190. return nil, err
  191. }
  192. // Save to our page cache.
  193. tx.pages[p.id] = p
  194. // Update statistics.
  195. tx.stats.PageCount++
  196. tx.stats.PageAlloc += count * tx.db.pageSize
  197. return p, nil
  198. }