Streams

Immudb provides stream capabilities. Internally it uses “delimited” messages technique, every chunk has a trailer that describe the length of the message. In this way the receiver can recompose chunk by chunk the original payload. Stream methods accepts a readers as a part of input and output arguments. In this way the large value is decomposed in small chunks that are streamed over the wire. Client don’t need to allocate the entire value when sending and can read the received one progressively. For example a client could send a large file much greater than available ram memory.

At the moment immudb is not yet able to write the data without allocating the entire received object, but in the next release it will be possible a complete communication without allocations. The maximum size of a transaction sent with streams is temporarily limited to a payload of 32M.

Supported stream method now available in the SDK are:

  • StreamSet
  • StreamGet
  • StreamVerifiedSet
  • StreamVerifiedGet
  • StreamScan
  • StreamZScan
  • StreamHistory
  • StreamExecAll

Here an example on how to send a large file and a regular key value to immudb.

It’s possible to specify the chunk size of the stream with WithStreamChunkSize() method.

  1. client, err := immuclient.NewImmuClient(immuclient.DefaultOptions().WithStreamChunkSize(4096))
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. ctx := context.Background()
  6. lr, err := client.Login(ctx, []byte(`immudb`), []byte(`immudb`))
  7. if err != nil {
  8. log.Fatal(err)
  9. }
  10. md := metadata.Pairs("authorization", lr.Token)
  11. ctx = metadata.NewOutgoingContext(context.Background(), md)
  12. myFileName := "streams.go"
  13. key1 := []byte("key1")
  14. val1 := []byte("val1")
  15. f, err := os.Open(myFileName)
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. stats, err := os.Stat(myFileName)
  20. if err != nil {
  21. log.Fatal(err)
  22. }
  23. kv1 := &stream.KeyValue{
  24. Key: &stream.ValueSize{
  25. Content: bytes.NewBuffer(key1),
  26. Size: len(key1),
  27. },
  28. Value: &stream.ValueSize{
  29. Content: bytes.NewBuffer(val1),
  30. Size: len(val1),
  31. },
  32. }
  33. kv2 := &stream.KeyValue{
  34. Key: &stream.ValueSize{
  35. Content: bytes.NewBuffer([]byte(myFileName)),
  36. Size: len(myFileName),
  37. },
  38. Value: &stream.ValueSize{
  39. Content: f,
  40. Size: int(stats.Size()),
  41. },
  42. }
  43. kvs := []*stream.KeyValue{kv1, kv2}
  44. _, err = client.StreamSet(ctx, kvs)
  45. if err != nil {
  46. log.Fatal(err)
  47. }
  48. entry, err := client.StreamGet(ctx, &schema.KeyRequest{ Key: []byte(myFileName)})
  49. if err != nil {
  50. log.Fatal(err)
  51. }
  52. fmt.Printf("returned key %s", entry.Key)

Streams is not supported yet in this language SDK. Do you want to make a feature request or help out? Open an issue on Java sdk github projectStreams - 图1 (opens new window)

Streams is not supported yet in this language SDK. Do you want to make a feature request or help out? Open an issue on Python sdk github projectStreams - 图2 (opens new window)

Streams is not supported yet in this language SDK. Do you want to make a feature request or help out? Open an issue on Node.js sdk github projectStreams - 图3 (opens new window)

Streams is not supported yet in this language SDK. Do you want to make a feature request or help out? Open an issue on .Net sdk github projectStreams - 图4 (opens new window)

If you’re using another development language, please read up on our immugwStreams - 图5 (opens new window) option.

Chunked reading

It’s possible to read returned value chunk by chunk if needed. This grant to the clients capabilities to handle data coming from immudb chunk by chunk

To read chunk by chunk the inner gRPC protobuffer client is needed. Then it’s possible to use kvStreamReceiver to retrieve the key and a value reader. Such reader will fill provided byte array with received data and will return the number of read bytes or error. If no message is present it returns 0 and io.EOF. If the message is complete it returns 0 and nil, in that case successive calls to Read will returns a new message.

There are several receivers available (zStreamReceiver, vEntryStreamReceiver, execAllStreamReceiver) and also a primitive receiver MsgReceiver. The last one can be used to receive a simple row []byte message without additional logics.

  1. sc := client.GetServiceClient()
  2. gs, err := sc.StreamGet(ctx, &schema.KeyRequest{ Key: []byte(myFileName)})
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. kvr := stream.NewKvStreamReceiver(stream.NewMsgReceiver(gs), stream.DefaultChunkSize)
  7. key, vr, err := kvr.Next()
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. fmt.Printf("read %s key", key)
  12. chunk := make([]byte, 4096)
  13. for {
  14. l, err := vr.Read(chunk)
  15. if err != nil && err != io.EOF {
  16. log.Fatal(err)
  17. }
  18. if err == io.EOF {
  19. break
  20. }
  21. fmt.Printf("read %d byte\n", l)
  22. }

Streams is not supported yet in this language SDK. Do you want to make a feature request or help out? Open an issue on Java sdk github projectStreams - 图6 (opens new window)

Streams is not supported yet in this language SDK. Do you want to make a feature request or help out? Open an issue on Python sdk github projectStreams - 图7 (opens new window)

Streams is not supported yet in this language SDK. Do you want to make a feature request or help out? Open an issue on Node.js sdk github projectStreams - 图8 (opens new window)

Streams is not supported yet in this language SDK. Do you want to make a feature request or help out? Open an issue on .Net sdk github projectStreams - 图9 (opens new window)

If you’re using another development language, please read up on our immugwStreams - 图10 (opens new window) option.