第四节 Bucket的相关操作

前面我们分析完了如何遍历、查找一个Bucket之后,下面我们来看看如何创建、获取、删除一个Bucket对象。

3.4.1 创建一个Bucket

1. CreateBucketIfNotExists()、CreateBucket()分析

根据指定的key来创建一个Bucket,如果指定key的Bucket已经存在,则会报错。如果指定的key之前有插入过元素,也会报错。否则的话,会在当前的Bucket中找到合适的位置,然后新建一个Bucket插入进去,最后返回给客户端。

  1. // CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
  2. // Returns an error if the bucket name is blank, or if the bucket name is too long.
  3. // The bucket instance is only valid for the lifetime of the transaction.
  4. func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
  5. child, err := b.CreateBucket(key)
  6. if err == ErrBucketExists {
  7. return b.Bucket(key), nil
  8. } else if err != nil {
  9. return nil, err
  10. }
  11. return child, nil
  12. }
  13. // CreateBucket creates a new bucket at the given key and returns the new bucket.
  14. // Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
  15. // The bucket instance is only valid for the lifetime of the transaction.
  16. func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
  17. if b.tx.db == nil {
  18. return nil, ErrTxClosed
  19. } else if !b.tx.writable {
  20. return nil, ErrTxNotWritable
  21. } else if len(key) == 0 {
  22. return nil, ErrBucketNameRequired
  23. }
  24. // Move cursor to correct position.
  25. // 拿到游标
  26. c := b.Cursor()
  27. // 开始遍历、找到合适的位置
  28. k, _, flags := c.seek(key)
  29. // Return an error if there is an existing key.
  30. if bytes.Equal(key, k) {
  31. // 是桶,已经存在了
  32. if (flags & bucketLeafFlag) != 0 {
  33. return nil, ErrBucketExists
  34. }
  35. // 不是桶、但key已经存在了
  36. return nil, ErrIncompatibleValue
  37. }
  38. // Create empty, inline bucket.
  39. var bucket = Bucket{
  40. bucket: &bucket{},
  41. rootNode: &node{isLeaf: true},
  42. FillPercent: DefaultFillPercent,
  43. }
  44. // 拿到bucket对应的value
  45. var value = bucket.write()
  46. // Insert into node.
  47. key = cloneBytes(key)
  48. // 插入到inode中
  49. // c.node()方法会在内存中建立这棵树,调用n.read(page)
  50. c.node().put(key, key, value, 0, bucketLeafFlag)
  51. // Since subbuckets are not allowed on inline buckets, we need to
  52. // dereference the inline page, if it exists. This will cause the bucket
  53. // to be treated as a regular, non-inline bucket for the rest of the tx.
  54. b.page = nil
  55. //根据key获取一个桶
  56. return b.Bucket(key), nil
  57. }
  58. // write allocates and writes a bucket to a byte slice.
  59. // 内联桶的话,其value中bucketHeaderSize后面的内容为其page的数据
  60. func (b *Bucket) write() []byte {
  61. // Allocate the appropriate size.
  62. var n = b.rootNode
  63. var value = make([]byte, bucketHeaderSize+n.size())
  64. // Write a bucket header.
  65. var bucket = (*bucket)(unsafe.Pointer(&value[0]))
  66. *bucket = *b.bucket
  67. // Convert byte slice to a fake page and write the root node.
  68. var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
  69. // 将该桶中的元素压缩存储,放在value中
  70. n.write(p)
  71. return value
  72. }
  73. // node returns the node that the cursor is currently positioned on.
  74. func (c *Cursor) node() *node {
  75. _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")
  76. // If the top of the stack is a leaf node then just return it.
  77. if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
  78. return ref.node
  79. }
  80. // Start from root and traverse down the hierarchy.
  81. var n = c.stack[0].node
  82. if n == nil {
  83. n = c.bucket.node(c.stack[0].page.id, nil)
  84. }
  85. // 非叶子节点
  86. for _, ref := range c.stack[:len(c.stack)-1] {
  87. _assert(!n.isLeaf, "expected branch node")
  88. n = n.childAt(int(ref.index))
  89. }
  90. _assert(n.isLeaf, "expected leaf node")
  91. return n
  92. }
  93. // put inserts a key/value.
  94. // 如果put的是一个key、value的话,不需要指定pgid。
  95. // 如果put的一个树枝节点,则需要指定pgid,不需要指定value
  96. func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
  97. if pgid >= n.bucket.tx.meta.pgid {
  98. panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid))
  99. } else if len(oldKey) <= 0 {
  100. panic("put: zero-length old key")
  101. } else if len(newKey) <= 0 {
  102. panic("put: zero-length new key")
  103. }
  104. // Find insertion index.
  105. index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })
  106. // Add capacity and shift nodes if we don't have an exact match and need to insert.
  107. exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
  108. if !exact {
  109. n.inodes = append(n.inodes, inode{})
  110. copy(n.inodes[index+1:], n.inodes[index:])
  111. }
  112. inode := &n.inodes[index]
  113. inode.flags = flags
  114. inode.key = newKey
  115. inode.value = value
  116. inode.pgid = pgid
  117. _assert(len(inode.key) > 0, "put: zero-length inode key")
  118. }

3.4.2 获取一个Bucket

根据指定的key来获取一个Bucket。如果找不到则返回nil。

  1. // Bucket retrieves a nested bucket by name.
  2. // Returns nil if the bucket does not exist.
  3. // The bucket instance is only valid for the lifetime of the transaction.
  4. func (b *Bucket) Bucket(name []byte) *Bucket {
  5. if b.buckets != nil {
  6. if child := b.buckets[string(name)]; child != nil {
  7. return child
  8. }
  9. }
  10. // Move cursor to key.
  11. // 根据游标找key
  12. c := b.Cursor()
  13. k, v, flags := c.seek(name)
  14. // Return nil if the key doesn't exist or it is not a bucket.
  15. if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
  16. return nil
  17. }
  18. // Otherwise create a bucket and cache it.
  19. // 根据找到的value来打开桶。
  20. var child = b.openBucket(v)
  21. // 加速缓存的作用
  22. if b.buckets != nil {
  23. b.buckets[string(name)] = child
  24. }
  25. return child
  26. }
  27. // Helper method that re-interprets a sub-bucket value
  28. // from a parent into a Bucket
  29. func (b *Bucket) openBucket(value []byte) *Bucket {
  30. var child = newBucket(b.tx)
  31. // If unaligned load/stores are broken on this arch and value is
  32. // unaligned simply clone to an aligned byte array.
  33. unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0
  34. if unaligned {
  35. value = cloneBytes(value)
  36. }
  37. // If this is a writable transaction then we need to copy the bucket entry.
  38. // Read-only transactions can point directly at the mmap entry.
  39. if b.tx.writable && !unaligned {
  40. child.bucket = &bucket{}
  41. *child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
  42. } else {
  43. child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
  44. }
  45. // Save a reference to the inline page if the bucket is inline.
  46. // 内联桶
  47. if child.root == 0 {
  48. child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
  49. }
  50. return &child
  51. }

3.4.3 删除一个Bucket

DeleteBucket()方法用来删除一个指定key的Bucket。其内部实现逻辑是先递归的删除其子桶。然后再释放该Bucket的page,并最终从叶子节点中移除

  1. // DeleteBucket deletes a bucket at the given key.
  2. // Returns an error if the bucket does not exists, or if the key represents a non-bucket value.
  3. func (b *Bucket) DeleteBucket(key []byte) error {
  4. if b.tx.db == nil {
  5. return ErrTxClosed
  6. } else if !b.Writable() {
  7. return ErrTxNotWritable
  8. }
  9. // Move cursor to correct position.
  10. c := b.Cursor()
  11. k, _, flags := c.seek(key)
  12. // Return an error if bucket doesn't exist or is not a bucket.
  13. if !bytes.Equal(key, k) {
  14. return ErrBucketNotFound
  15. } else if (flags & bucketLeafFlag) == 0 {
  16. return ErrIncompatibleValue
  17. }
  18. // Recursively delete all child buckets.
  19. child := b.Bucket(key)
  20. // 将该桶下面的所有桶都删除
  21. err := child.ForEach(func(k, v []byte) error {
  22. if v == nil {
  23. if err := child.DeleteBucket(k); err != nil {
  24. return fmt.Errorf("delete bucket: %s", err)
  25. }
  26. }
  27. return nil
  28. })
  29. if err != nil {
  30. return err
  31. }
  32. // Remove cached copy.
  33. delete(b.buckets, string(key))
  34. // Release all bucket pages to freelist.
  35. child.nodes = nil
  36. child.rootNode = nil
  37. child.free()
  38. // Delete the node if we have a matching key.
  39. c.node().del(key)
  40. return nil
  41. }
  42. // del removes a key from the node.
  43. func (n *node) del(key []byte) {
  44. // Find index of key.
  45. index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })
  46. // Exit if the key isn't found.
  47. if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
  48. return
  49. }
  50. // Delete inode from the node.
  51. n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)
  52. // Mark the node as needing rebalancing.
  53. n.unbalanced = true
  54. }
  55. // free recursively frees all pages in the bucket.
  56. func (b *Bucket) free() {
  57. if b.root == 0 {
  58. return
  59. }
  60. var tx = b.tx
  61. b.forEachPageNode(func(p *page, n *node, _ int) {
  62. if p != nil {
  63. tx.db.freelist.free(tx.meta.txid, p)
  64. } else {
  65. n.free()
  66. }
  67. })
  68. b.root = 0
  69. }