数据消费 (Data Consumption)

如何通过 Java SDK 消费 HStreamDB 重的数据

前提条件

确保有一个运行中并可用的 HStreamDB

概念

客户端可以从一个消费者对象中消费数据。 每一个消费者对象将通过 subscriptionID 来加入一个持有订阅的共享订阅组 (consumer group), 然后客户端可以从订阅的流中获取数据。

每个消费者对象包含一个 RawRecordReceiver 和一个 HRecordReceiver 。 由此,用户可以根据他们的需求来消费原始数据或 HRecords 。

当客户端收到一条 record 时,应当使用 responder.ack() 来返回一个 ack。HStreamDB 当前支 持的是at-least-once 消费语义,应此,若不返回 ack,经过 ackTimeoutSecond 后,这些 records 会被 server 重新发送。

消费 Records

  1. // first, create a subscription for the stream
  2. Subscription subscription =
  3. Subscription
  4. .newBuilder()
  5. .subscription("my_subscription")
  6. .stream("my_stream")
  7. .offset(new SubscriptionOffset(SubscriptionOffset.SpecialOffset.LATEST))
  8. .ackTimeoutSeconds(600)
  9. .build();
  10. client.createSubscription(subscription);
  11. // second, create a consumer attach to the subscription
  12. Consumer consumer =
  13. client
  14. .newConsumer()
  15. .subscription("my_subscription")
  16. .rawRecordReceiver(
  17. ((receivedRawRecord, responder) -> {
  18. System.out.println(receivedRawRecord.getRecordId());
  19. responder.ack();
  20. }))
  21. .build();
  22. // third, start the consumer
  23. consumer.startAsync().awaitRunning();
  • 示例中用了 rawRecordReceiver() 来消费原始数据,假如想要消费 HReacord,可以用 hRecordReceiver()