第二节 Bucket遍历之Cursor

本节我们先做一节内容的铺垫,暂时不讲如何创建、获取、删除一个Bucket。而是介绍一个boltdb中的新对象Cursor。

答案是:所有的上述操作都是建立在首先定位到一个Bucket所属的位置,然后才能对其进行操作。而定位一个Bucket的功能就是由Cursor来完成的。所以我们先这一节给大家介绍一下boltdb中的Cursor。

我们先看下官方文档对Cursor的描述

Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.

用大白话讲,既然一个Bucket逻辑上是一颗b+树,那就意味着我们可以对其进行遍历。前面提到的set、get操作,无非是要在Bucket上先找到合适的位置,然后再进行操作。而“找”这个操作就是交由Cursor来完成的。简而言之对Bucket这颗b+树的遍历工作由Cursor来执行。一个Bucket对象关联一个Cursor。下面我们先看看Bucket和Cursor之间的关系。

  1. // Cursor creates a cursor associated with the bucket.
  2. // The cursor is only valid as long as the transaction is open.
  3. // Do not use a cursor after the transaction is closed.
  4. func (b *Bucket) Cursor() *Cursor {
  5. // Update transaction statistics.
  6. b.tx.stats.CursorCount++
  7. // Allocate and return a cursor.
  8. return &Cursor{
  9. bucket: b,
  10. stack: make([]elemRef, 0),
  11. }
  12. }

3.2.1 Cursor结构

从上面可以清楚的看到,在获取一个游标Cursor对象时,会将当前的Bucket对象传进去,同时还初始化了一个栈对象,结合数据结构中学习的树的知识。我们也就知道,它的内部就是对树进行遍历。下面我们详细介绍Cursor这个人物。

  1. // Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.
  2. // Cursors see nested buckets with value == nil.
  3. // Cursors can be obtained from a transaction and are valid as long as the transaction is open.
  4. //
  5. // Keys and values returned from the cursor are only valid for the life of the transaction.
  6. //
  7. // Changing data while traversing with a cursor may cause it to be invalidated
  8. // and return unexpected keys and/or values. You must reposition your cursor
  9. // after mutating data.
  10. type Cursor struct {
  11. bucket *Bucket
  12. // 保存遍历搜索的路径
  13. stack []elemRef
  14. }
  15. // elemRef represents a reference to an element on a given page/node.
  16. type elemRef struct {
  17. page *page
  18. node *node
  19. index int
  20. }
  21. // isLeaf returns whether the ref is pointing at a leaf page/node.
  22. func (r *elemRef) isLeaf() bool {
  23. if r.node != nil {
  24. return r.node.isLeaf
  25. }
  26. return (r.page.flags & leafPageFlag) != 0
  27. }
  28. // count returns the number of inodes or page elements.
  29. func (r *elemRef) count() int {
  30. if r.node != nil {
  31. return len(r.node.inodes)
  32. }
  33. return int(r.page.count)
  34. }

3.2.2 Cursor对外接口

下面我们看一下Cursor对外暴露的接口有哪些。看之前也可以心里先想一下。针对一棵树我们需要哪些遍历接口呢?

主体也就是三类:定位到某一个元素的位置、在当前位置从前往后找、在当前位置从后往前找

  1. // First moves the cursor to the first item in the bucket and returns its key and value.
  2. // If the bucket is empty then a nil key and value are returned.
  3. // The returned key and value are only valid for the life of the transaction.
  4. func (c *Cursor) First() (key []byte, value []byte)
  5. // Last moves the cursor to the last item in the bucket and returns its key and value.
  6. // If the bucket is empty then a nil key and value are returned.
  7. // The returned key and value are only valid for the life of the transaction.
  8. func (c *Cursor) Last() (key []byte, value []byte)
  9. // Next moves the cursor to the next item in the bucket and returns its key and value.
  10. // If the cursor is at the end of the bucket then a nil key and value are returned.
  11. // The returned key and value are only valid for the life of the transaction.
  12. func (c *Cursor) Next() (key []byte, value []byte)
  13. // Prev moves the cursor to the previous item in the bucket and returns its key and value.
  14. // If the cursor is at the beginning of the bucket then a nil key and value are returned.
  15. // The returned key and value are only valid for the life of the transaction.
  16. func (c *Cursor) Prev() (key []byte, value []byte)
  17. // Delete removes the current key/value under the cursor from the bucket.
  18. // Delete fails if current key/value is a bucket or if the transaction is not writable.
  19. func (c *Cursor) Delete() error
  20. // Seek moves the cursor to a given key and returns it.
  21. // If the key does not exist then the next key is used. If no keys
  22. // follow, a nil key is returned.
  23. // The returned key and value are only valid for the life of the transaction.
  24. func (c *Cursor) Seek(seek []byte) (key []byte, value []byte)

下面我们详细分析一下Seek()、First()、Last()、Next()、Prev()、Delete()这三个方法的内部实现。其余的方法我们代码就不贴出来了。大致思路可以梳理一下。

3.2.3 Seek(key)实现分析

Seek()方法内部主要调用了seek()私有方法,我们重点关注seek()这个方法的实现,该方法有三个返回值,前两个为key、value、第三个为叶子节点的类型。前面提到在boltdb中,叶子节点元素有两种类型:一种是嵌套的子桶、一种是普通的key/value。而这二者就是通过flags来区分的。如果叶子节点元素为嵌套的子桶时,返回的flags为1,也就是bucketLeafFlag取值。

  1. // Seek moves the cursor to a given key and returns it.
  2. // If the key does not exist then the next key is used. If no keys
  3. // follow, a nil key is returned.
  4. // The returned key and value are only valid for the life of the transaction.
  5. func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) {
  6. k, v, flags := c.seek(seek)
  7. // If we ended up after the last element of a page then move to the next one.
  8. // 下面这一段逻辑是必须的,因为在seek()方法中,如果ref.index>ref.count()的话,就直接返回nil,nil,0了
  9. // 这里需要返回下一个
  10. if ref := &c.stack[len(c.stack)-1]; ref.index >= ref.count() {
  11. k, v, flags = c.next()
  12. }
  13. if k == nil {
  14. return nil, nil
  15. // 子桶的话
  16. } else if (flags & uint32(bucketLeafFlag)) != 0 {
  17. return k, nil
  18. }
  19. return k, v
  20. }
  21. // seek moves the cursor to a given key and returns it.
  22. // If the key does not exist then the next key is used.
  23. func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
  24. _assert(c.bucket.tx.db != nil, "tx closed")
  25. // Start from root page/node and traverse to correct page.
  26. c.stack = c.stack[:0]
  27. // 开始根据seek的key值搜索root
  28. c.search(seek, c.bucket.root)
  29. // 执行完搜索后,stack中保存了所遍历的路径
  30. ref := &c.stack[len(c.stack)-1]
  31. // If the cursor is pointing to the end of page/node then return nil.
  32. if ref.index >= ref.count() {
  33. return nil, nil, 0
  34. }
  35. //获取值
  36. // If this is a bucket then return a nil value.
  37. return c.keyValue()
  38. }
  39. // keyValue returns the key and value of the current leaf element.
  40. func (c *Cursor) keyValue() ([]byte, []byte, uint32) {
  41. //最后一个节点为叶子节点
  42. ref := &c.stack[len(c.stack)-1]
  43. if ref.count() == 0 || ref.index >= ref.count() {
  44. return nil, nil, 0
  45. }
  46. // Retrieve value from node.
  47. // 先从内存中取
  48. if ref.node != nil {
  49. inode := &ref.node.inodes[ref.index]
  50. return inode.key, inode.value, inode.flags
  51. }
  52. // 其次再从文件page中取
  53. // Or retrieve value from page.
  54. elem := ref.page.leafPageElement(uint16(ref.index))
  55. return elem.key(), elem.value(), elem.flags
  56. }

seek()中最核心的方法就是调用search()了,search()方法中,传入的就是要搜索的key值和该桶的root节点。search()方法中,其内部是通过递归的层层往下搜索,下面我们详细了解一下search()内部的实现机制。

  1. // 从根节点开始遍历
  2. // search recursively performs a binary search against a given page/node until it finds a given key.
  3. func (c *Cursor) search(key []byte, pgid pgid) {
  4. // root,3
  5. // 层层找page,bucket->tx->db->dataref
  6. p, n := c.bucket.pageNode(pgid)
  7. if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 {
  8. panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags))
  9. }
  10. e := elemRef{page: p, node: n}
  11. //记录遍历过的路径
  12. c.stack = append(c.stack, e)
  13. // If we're on a leaf page/node then find the specific node.
  14. // 如果是叶子节点,找具体的值node
  15. if e.isLeaf() {
  16. c.nsearch(key)
  17. return
  18. }
  19. if n != nil {
  20. // 先搜索node,因为node是加载到内存中的
  21. c.searchNode(key, n)
  22. return
  23. }
  24. // 其次再在page中搜索
  25. c.searchPage(key, p)
  26. }
  27. // pageNode returns the in-memory node, if it exists.
  28. // Otherwise returns the underlying page.
  29. func (b *Bucket) pageNode(id pgid) (*page, *node) {
  30. // Inline buckets have a fake page embedded in their value so treat them
  31. // differently. We'll return the rootNode (if available) or the fake page.
  32. // 内联页的话,就直接返回其page了
  33. if b.root == 0 {
  34. if id != 0 {
  35. panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
  36. }
  37. if b.rootNode != nil {
  38. return nil, b.rootNode
  39. }
  40. return b.page, nil
  41. }
  42. // Check the node cache for non-inline buckets.
  43. if b.nodes != nil {
  44. if n := b.nodes[id]; n != nil {
  45. return nil, n
  46. }
  47. }
  48. // Finally lookup the page from the transaction if no node is materialized.
  49. return b.tx.page(id), nil
  50. }
  51. //node中搜索
  52. func (c *Cursor) searchNode(key []byte, n *node) {
  53. var exact bool
  54. //二分搜索
  55. index := sort.Search(len(n.inodes), func(i int) bool {
  56. // TODO(benbjohnson): Optimize this range search. It's a bit hacky right now.
  57. // sort.Search() finds the lowest index where f() != -1 but we need the highest index.
  58. ret := bytes.Compare(n.inodes[i].key, key)
  59. if ret == 0 {
  60. exact = true
  61. }
  62. return ret != -1
  63. })
  64. if !exact && index > 0 {
  65. index--
  66. }
  67. c.stack[len(c.stack)-1].index = index
  68. // Recursively search to the next page.
  69. c.search(key, n.inodes[index].pgid)
  70. }
  71. //页中搜索
  72. func (c *Cursor) searchPage(key []byte, p *page) {
  73. // Binary search for the correct range.
  74. inodes := p.branchPageElements()
  75. var exact bool
  76. index := sort.Search(int(p.count), func(i int) bool {
  77. // TODO(benbjohnson): Optimize this range search. It's a bit hacky right now.
  78. // sort.Search() finds the lowest index where f() != -1 but we need the highest index.
  79. ret := bytes.Compare(inodes[i].key(), key)
  80. if ret == 0 {
  81. exact = true
  82. }
  83. return ret != -1
  84. })
  85. if !exact && index > 0 {
  86. index--
  87. }
  88. c.stack[len(c.stack)-1].index = index
  89. // Recursively search to the next page.
  90. c.search(key, inodes[index].pgid)
  91. }
  92. // nsearch searches the leaf node on the top of the stack for a key.
  93. // 搜索叶子页
  94. func (c *Cursor) nsearch(key []byte) {
  95. e := &c.stack[len(c.stack)-1]
  96. p, n := e.page, e.node
  97. // If we have a node then search its inodes.
  98. // 先搜索node
  99. if n != nil {
  100. //又是二分搜索
  101. index := sort.Search(len(n.inodes), func(i int) bool {
  102. return bytes.Compare(n.inodes[i].key, key) != -1
  103. })
  104. e.index = index
  105. return
  106. }
  107. // If we have a page then search its leaf elements.
  108. // 再搜索page
  109. inodes := p.leafPageElements()
  110. index := sort.Search(int(p.count), func(i int) bool {
  111. return bytes.Compare(inodes[i].key(), key) != -1
  112. })
  113. e.index = index
  114. }

到这儿我们就已经看完所有的seek()查找一个key的过程了,其内部也很简单,就是从根节点开始,通过不断递归遍历每层节点,采用二分法来定位到具体的叶子节点。到达叶子节点时,其叶子节点内部存储的数据也是有序的,因此继续按照二分查找来找到最终的下标。

值得需要注意点:

在遍历时,我们都知道,有可能遍历到的当前分支节点数据并没有在内存中,此时就需要从page中加载数据遍历。所以在遍历过程中,优先在node中找,如果node为空的时候才会采用page来查找。

3.2.4 First()、Last()实现分析

前面看了定位到具体某个key的一个过程,现在我们看一下,在定位到第一个元素时,我们知道它一定是位于最左侧的第一个叶子节点的第一个元素。同理,在定位到最后一个元素时,它一定是位于最右侧的第一个叶子节点的最后一个元素。下面是其内部的实现逻辑:

First()实现

  1. // First moves the cursor to the first item in the bucket and returns its key and value.
  2. // If the bucket is empty then a nil key and value are returned.
  3. // The returned key and value are only valid for the life of the transaction.
  4. func (c *Cursor) First() (key []byte, value []byte) {
  5. _assert(c.bucket.tx.db != nil, "tx closed")
  6. // 清空stack
  7. c.stack = c.stack[:0]
  8. p, n := c.bucket.pageNode(c.bucket.root)
  9. // 一直找到第一个叶子节点,此处在天添加stack时,一直让index设置为0即可
  10. ref := elemRef{page: p, node: n, index: 0}
  11. c.stack = append(c.stack, ref)
  12. c.first()
  13. // If we land on an empty page then move to the next value.
  14. // https://github.com/boltdb/bolt/issues/450
  15. // 当前页时空的话,找下一个
  16. if c.stack[len(c.stack)-1].count() == 0 {
  17. c.next()
  18. }
  19. k, v, flags := c.keyValue()
  20. // 是桶
  21. if (flags & uint32(bucketLeafFlag)) != 0 {
  22. return k, nil
  23. }
  24. return k, v
  25. }
  26. // first moves the cursor to the first leaf element under the last page in the stack.
  27. // 找到最后一个非叶子节点的第一个叶子节点。index=0的节点
  28. func (c *Cursor) first() {
  29. for {
  30. // Exit when we hit a leaf page.
  31. var ref = &c.stack[len(c.stack)-1]
  32. if ref.isLeaf() {
  33. break
  34. }
  35. // Keep adding pages pointing to the first element to the stack.
  36. var pgid pgid
  37. if ref.node != nil {
  38. pgid = ref.node.inodes[ref.index].pgid
  39. } else {
  40. pgid = ref.page.branchPageElement(uint16(ref.index)).pgid
  41. }
  42. p, n := c.bucket.pageNode(pgid)
  43. c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
  44. }
  45. }

Last()实现

  1. // Last moves the cursor to the last item in the bucket and returns its key and value.
  2. // If the bucket is empty then a nil key and value are returned.
  3. // The returned key and value are only valid for the life of the transaction.
  4. func (c *Cursor) Last() (key []byte, value []byte) {
  5. _assert(c.bucket.tx.db != nil, "tx closed")
  6. c.stack = c.stack[:0]
  7. p, n := c.bucket.pageNode(c.bucket.root)
  8. ref := elemRef{page: p, node: n}
  9. // 设置其index为当前页元素的最后一个
  10. ref.index = ref.count() - 1
  11. c.stack = append(c.stack, ref)
  12. c.last()
  13. k, v, flags := c.keyValue()
  14. if (flags & uint32(bucketLeafFlag)) != 0 {
  15. return k, nil
  16. }
  17. return k, v
  18. }
  19. // last moves the cursor to the last leaf element under the last page in the stack.
  20. // 移动到栈中最后一个节点的最后一个叶子节点
  21. func (c *Cursor) last() {
  22. for {
  23. // Exit when we hit a leaf page.
  24. ref := &c.stack[len(c.stack)-1]
  25. if ref.isLeaf() {
  26. break
  27. }
  28. // Keep adding pages pointing to the last element in the stack.
  29. var pgid pgid
  30. if ref.node != nil {
  31. pgid = ref.node.inodes[ref.index].pgid
  32. } else {
  33. pgid = ref.page.branchPageElement(uint16(ref.index)).pgid
  34. }
  35. p, n := c.bucket.pageNode(pgid)
  36. var nextRef = elemRef{page: p, node: n}
  37. nextRef.index = nextRef.count() - 1
  38. c.stack = append(c.stack, nextRef)
  39. }
  40. }

3.2.5 Next()、Prev()实现分析

再此我们从当前位置查找前一个或者下一个时,需要注意一个问题,如果当前节点中元素已经完了,那么此时需要回退到遍历路径的上一个节点。然后再继续查找,下面进行代码分析。

Next()分析

  1. // Next moves the cursor to the next item in the bucket and returns its key and value.
  2. // If the cursor is at the end of the bucket then a nil key and value are returned.
  3. // The returned key and value are only valid for the life of the transaction.
  4. func (c *Cursor) Next() (key []byte, value []byte) {
  5. _assert(c.bucket.tx.db != nil, "tx closed")
  6. k, v, flags := c.next()
  7. if (flags & uint32(bucketLeafFlag)) != 0 {
  8. return k, nil
  9. }
  10. return k, v
  11. }
  12. // next moves to the next leaf element and returns the key and value.
  13. // If the cursor is at the last leaf element then it stays there and returns nil.
  14. func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
  15. for {
  16. // Attempt to move over one element until we're successful.
  17. // Move up the stack as we hit the end of each page in our stack.
  18. var i int
  19. for i = len(c.stack) - 1; i >= 0; i-- {
  20. elem := &c.stack[i]
  21. if elem.index < elem.count()-1 {
  22. // 元素还有时,往后移动一个
  23. elem.index++
  24. break
  25. }
  26. }
  27. // 最后的结果elem.index++
  28. // If we've hit the root page then stop and return. This will leave the
  29. // cursor on the last element of the last page.
  30. // 所有页都遍历完了
  31. if i == -1 {
  32. return nil, nil, 0
  33. }
  34. // Otherwise start from where we left off in the stack and find the
  35. // first element of the first leaf page.
  36. // 剩余的节点里面找,跳过原先遍历过的节点
  37. c.stack = c.stack[:i+1]
  38. // 如果是叶子节点,first()啥都不做,直接退出。返回elem.index+1的数据
  39. // 非叶子节点的话,需要移动到stack中最后一个路径的第一个元素
  40. c.first()
  41. // If this is an empty page then restart and move back up the stack.
  42. // https://github.com/boltdb/bolt/issues/450
  43. if c.stack[len(c.stack)-1].count() == 0 {
  44. continue
  45. }
  46. return c.keyValue()
  47. }
  48. }

Prev()实现

  1. // Prev moves the cursor to the previous item in the bucket and returns its key and value.
  2. // If the cursor is at the beginning of the bucket then a nil key and value are returned.
  3. // The returned key and value are only valid for the life of the transaction.
  4. func (c *Cursor) Prev() (key []byte, value []byte) {
  5. _assert(c.bucket.tx.db != nil, "tx closed")
  6. // Attempt to move back one element until we're successful.
  7. // Move up the stack as we hit the beginning of each page in our stack.
  8. for i := len(c.stack) - 1; i >= 0; i-- {
  9. elem := &c.stack[i]
  10. if elem.index > 0 {
  11. // 往前移动一格
  12. elem.index--
  13. break
  14. }
  15. c.stack = c.stack[:i]
  16. }
  17. // If we've hit the end then return nil.
  18. if len(c.stack) == 0 {
  19. return nil, nil
  20. }
  21. // Move down the stack to find the last element of the last leaf under this branch.
  22. // 如果当前节点是叶子节点的话,则直接退出了,啥都不做。否则的话移动到新页的最后一个节点
  23. c.last()
  24. k, v, flags := c.keyValue()
  25. if (flags & uint32(bucketLeafFlag)) != 0 {
  26. return k, nil
  27. }
  28. return k, v
  29. }

3.2.6 Delete()方法分析

Delete()方法中,移动当前位置的元素

  1. // Delete removes the current key/value under the cursor from the bucket.
  2. // Delete fails if current key/value is a bucket or if the transaction is not writable.
  3. func (c *Cursor) Delete() error {
  4. if c.bucket.tx.db == nil {
  5. return ErrTxClosed
  6. } else if !c.bucket.Writable() {
  7. return ErrTxNotWritable
  8. }
  9. key, _, flags := c.keyValue()
  10. // Return an error if current value is a bucket.
  11. if (flags & bucketLeafFlag) != 0 {
  12. return ErrIncompatibleValue
  13. }
  14. // 从node中移除,本质上将inode数组进行移动
  15. c.node().del(key)
  16. return nil
  17. }
  18. // del removes a key from the node.
  19. func (n *node) del(key []byte) {
  20. // Find index of key.
  21. index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })
  22. // Exit if the key isn't found.
  23. if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
  24. return
  25. }
  26. // Delete inode from the node.
  27. n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)
  28. // Mark the node as needing rebalancing.
  29. n.unbalanced = true
  30. }