Get Records from Shards of the Stream with Reader

What is a Reader

To allow users to retrieve data from any stream shard, HStreamDB provides readers for applications to manually manage the exact position of the record to read from. Unlike subscription and consumption, a reader can be seen as a lower-level API for getting records from streams. It gives users direct access to any records in the stream, more precisely, any records from a specific shard in the stream, and it does not require or rely on subscriptions and will not send any acknowledgement back to the server. Therefore, the reader is helpful for the case that requires better flexibility or rewinding of data reading.

When a user creates a reader instance, it is required that the user needs to specify which record and which shard the reader begins from. A reader provides starting position with the following options:

  • The earliest available record in the shard
  • The latest available record in the shard
  • User-specified record location in the shard

Reader Example

To read from the shards, users are required to get the desired shard id with listShards.

The name of a reader should also follow the format specified by the guidelines

  1. // ReadDataWithReaderExample.java
  2. package docs.code.examples;
  3. import static io.hstream.StreamShardOffset.SpecialOffset.EARLIEST;
  4. import io.hstream.*;
  5. import java.util.List;
  6. import java.util.concurrent.CompletableFuture;
  7. public class ReadDataWithReaderExample {
  8. public static void main(String[] args) throws Exception {
  9. // TODO (developers): Replace these variables for your own use cases.
  10. String serviceUrl = "127.0.0.1:6570";
  11. if (System.getenv("serviceUrl") != null) {
  12. serviceUrl = System.getenv("serviceUrl");
  13. }
  14. String streamName = "your_h_records_stream_name";
  15. // Please change the value of shardId to the ones you can get from listShards
  16. long shardId = 0;
  17. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  18. readTheFirstRecordInShard(client, streamName, shardId);
  19. client.close();
  20. }
  21. public static void readTheFirstRecordInShard(
  22. HStreamClient client, String streamName, long shardId) {
  23. StreamShardOffset offset = new StreamShardOffset(EARLIEST);
  24. Reader reader =
  25. client
  26. .newReader()
  27. .readerId("your_reader_id")
  28. .streamName(streamName)
  29. .shardId(shardId)
  30. .shardOffset(offset) // default: EARLIEST
  31. .timeoutMs(1000) // default: 0
  32. .build();
  33. CompletableFuture<List<ReceivedRecord>> records =
  34. reader.read(10); // Specify the maximum available records a reader will get for one read
  35. System.out.println("Read records: " + records.join());
  36. }
  37. }
  1. // ExampleReadDataWithReader.go
  2. package examples
  3. import (
  4. "context"
  5. "github.com/hstreamdb/hstreamdb-go/hstream"
  6. "log"
  7. )
  8. func ExampleReadDataWithReader() error {
  9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  10. if err != nil {
  11. log.Fatalf("Create client error: %s", err)
  12. }
  13. defer client.Close()
  14. streamName := "testDefaultStream"
  15. readerId := "shardReader"
  16. shards, err := client.ListShards(streamName)
  17. if err != nil {
  18. log.Fatalf("List shards error: %s", err)
  19. }
  20. shardId := shards[0].ShardId
  21. reader, err := client.NewShardReader(streamName, readerId, shardId, hstream.WithReaderTimeout(100))
  22. if err != nil {
  23. log.Fatalf("Create reader error: %s", err)
  24. }
  25. defer client.DeleteShardReader(shardId, readerId)
  26. defer reader.Close()
  27. count := 0
  28. for {
  29. records, err := reader.Read(context.Background())
  30. if err != nil {
  31. log.Printf("Reader read error: %s\n", err.Error())
  32. continue
  33. }
  34. for _, record := range records {
  35. log.Printf("Reader read record [%s]:%v", record.GetRecordId().String(), record.GetPayload())
  36. }
  37. count += len(records)
  38. if count >= 100 {
  39. break
  40. }
  41. }
  42. return nil
  43. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def read_reader(client):
  16. offset = hstreamdb.ShardOffset()
  17. offset.specialOffset = hstreamdb.SpecialOffset.EARLIEST
  18. max_records = 10
  19. async with client.with_reader(
  20. stream_name, "your_reader_id", offset, 1000
  21. ) as reader:
  22. records = await reader.read(max_records)
  23. for i, r in enumerate(records):
  24. print(f"[{i}] payload: {r.payload}")