第六节 db.Batch()实现分析

现在对Batch()方法稍作分析,在DB定义的那一节中我们可以看到,一个DB对象拥有一个batch对象,该对象是全局的。当我们使用Batch()方法时,内部会对将传递进去的fn缓存在calls中。

其内部也是调用了Update,只不过是在Update内部遍历之前缓存的calls。

有两种情况会触发调用Update。

  1. 第一种情况是到达了MaxBatchDelay时间,就会触发Update
  2. 第二种情况是len(db.batch.calls) >= db.MaxBatchSize,即缓存的calls个数大于等于MaxBatchSize时,也会触发Update。

Batch的本质是: 将每次写、每次刷盘的操作转变成了多次写、一次刷盘,从而提升性能。

  1. // Batch calls fn as part of a batch. It behaves similar to Update,
  2. // except:
  3. //
  4. // 1. concurrent Batch calls can be combined into a single Bolt
  5. // transaction.
  6. //
  7. // 2. the function passed to Batch may be called multiple times,
  8. // regardless of whether it returns error or not.
  9. //
  10. // This means that Batch function side effects must be idempotent and
  11. // take permanent effect only after a successful return is seen in
  12. // caller.
  13. // 幂等
  14. // The maximum batch size and delay can be adjusted with DB.MaxBatchSize
  15. // and DB.MaxBatchDelay, respectively.
  16. //
  17. // Batch is only useful when there are multiple goroutines calling it.
  18. func (db *DB) Batch(fn func(*Tx) error) error {
  19. errCh := make(chan error, 1)
  20. db.batchMu.Lock()
  21. if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
  22. // There is no existing batch, or the existing batch is full; start a new one.
  23. db.batch = &batch{
  24. db: db,
  25. }
  26. db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
  27. }
  28. db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
  29. if len(db.batch.calls) >= db.MaxBatchSize {
  30. // wake up batch, it's ready to run
  31. go db.batch.trigger()
  32. }
  33. db.batchMu.Unlock()
  34. err := <-errCh
  35. if err == trySolo {
  36. err = db.Update(fn)
  37. }
  38. return err
  39. }
  40. type call struct {
  41. fn func(*Tx) error
  42. err chan<- error
  43. }
  44. type batch struct {
  45. db *DB
  46. timer *time.Timer
  47. start sync.Once
  48. calls []call
  49. }
  50. // trigger runs the batch if it hasn't already been run.
  51. func (b *batch) trigger() {
  52. b.start.Do(b.run)
  53. }
  54. // run performs the transactions in the batch and communicates results
  55. // back to DB.Batch.
  56. func (b *batch) run() {
  57. b.db.batchMu.Lock()
  58. b.timer.Stop()
  59. // Make sure no new work is added to this batch, but don't break
  60. // other batches.
  61. if b.db.batch == b {
  62. b.db.batch = nil
  63. }
  64. b.db.batchMu.Unlock()
  65. retry:
  66. // 内部多次调用Update,最后一次Commit刷盘,提升性能
  67. for len(b.calls) > 0 {
  68. var failIdx = -1
  69. err := b.db.Update(func(tx *Tx) error {
  70. 遍历calls中的函数c,多次调用,最后一次提交刷盘
  71. for i, c := range b.calls {
  72. // safelyCall里面捕获了panic
  73. if err := safelyCall(c.fn, tx); err != nil {
  74. failIdx = i
  75. //只要又失败,事务就不提交
  76. return err
  77. }
  78. }
  79. return nil
  80. })
  81. if failIdx >= 0 {
  82. // take the failing transaction out of the batch. it's
  83. // safe to shorten b.calls here because db.batch no longer
  84. // points to us, and we hold the mutex anyway.
  85. c := b.calls[failIdx]
  86. //这儿只是把失败的事务给踢出去了,然后其他的事务会重新执行
  87. b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
  88. // tell the submitter re-run it solo, continue with the rest of the batch
  89. c.err <- trySolo
  90. continue retry
  91. }
  92. // pass success, or bolt internal errors, to all callers
  93. for _, c := range b.calls {
  94. c.err <- err
  95. }
  96. break retry
  97. }
  98. }
  99. // trySolo is a special sentinel error value used for signaling that a
  100. // transaction function should be re-run. It should never be seen by
  101. // callers.
  102. var trySolo = errors.New("batch function returned an error and should be re-run solo")
  103. type panicked struct {
  104. reason interface{}
  105. }
  106. func (p panicked) Error() string {
  107. if err, ok := p.reason.(error); ok {
  108. return err.Error()
  109. }
  110. return fmt.Sprintf("panic: %v", p.reason)
  111. }
  112. func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
  113. defer func() {
  114. if p := recover(); p != nil {
  115. err = panicked{p}
  116. }
  117. }()
  118. return fn(tx)
  119. }