Stream

Badger provides a Stream framework, which concurrently iterates over all or a portion of the DB, converting data into custom key-values, and streams it out serially to be sent over network, written to disk, or even written back to Badger. This is a lot faster way to iterate over Badger than using a single Iterator. Stream supports Badger in both managed and normal mode.

Stream uses the natural boundaries created by SSTables within the LSM tree, to quickly generate key ranges. Each goroutine then picks a range and runs an iterator to iterate over it. Each iterator iterates over all versions of values and is created from the same transaction, thus working over a snapshot of the DB. Every time a new key is encountered, it calls ChooseKey(item), followed by KeyToList(key, itr). This allows a user to select or reject that key, and if selected, convert the value versions into custom key-values. The goroutine batches up 4MB worth of key-values, before sending it over to a channel. Another goroutine further batches up data from this channel using smart batching algorithm and calls Send serially.

This framework is designed for high throughput key-value iteration, spreading the work of iteration across many goroutines. DB.Backup uses this framework to provide full and incremental backups quickly. Dgraph is a heavy user of this framework. In fact, this framework was developed and used within Dgraph, before getting ported over to Badger.

  1. stream := db.NewStream()
  2. // db.NewStreamAt(readTs) for managed mode.
  3. // -- Optional settings
  4. stream.NumGo = 16 // Set number of goroutines to use for iteration.
  5. stream.Prefix = []byte("some-prefix") // Leave nil for iteration over the whole DB.
  6. stream.LogPrefix = "Badger.Streaming" // For identifying stream logs. Outputs to Logger.
  7. // ChooseKey is called concurrently for every key. If left nil, assumes true by default.
  8. stream.ChooseKey = func(item *badger.Item) bool {
  9. return bytes.HasSuffix(item.Key(), []byte("er"))
  10. }
  11. // KeyToList is called concurrently for chosen keys. This can be used to convert
  12. // Badger data into custom key-values. If nil, uses stream.ToList, a default
  13. // implementation, which picks all valid key-values.
  14. stream.KeyToList = nil
  15. // -- End of optional settings.
  16. // Send is called serially, while Stream.Orchestrate is running.
  17. stream.Send = func(list *pb.KVList) error {
  18. return proto.MarshalText(w, list) // Write to w.
  19. }
  20. // Run the stream
  21. if err := stream.Orchestrate(context.Background()); err != nil {
  22. return err
  23. }
  24. // Done.