创建和管理 Stream

命名资源准则

一个HStream资源的名称可以唯一地识别一个 HStream 资源,如一个 stream、 subscription 或 reader。 资源名称必须符合以下要求:

  • 以一个字母开头
  • 长度必须不超过255个字符
  • 只包含以下字符。字母[A-Za-z],数字[0-9]。 破折号-,下划线_

*用于资源名称作为SQL语句的一部分的情况。例如在 HStream SQL Shell 中或者用 SQL 创建 IO 任务时, 将会出现资源名称无法被正确解析的情况(如与关键词冲突),此时需要用户用反斜线 ` ,括住资源名称。这个限制或将会在日后的版本中被改进移除。

Stream 的属性

  • Replication factor

    为了容错性和更高的可用性,每个 Stream 都可以在集群中的节点之间进行复制。一个常 用的生产环境 Replication factor 配置是为 3,也就是说,你的数据总是有三个副本, 这在出错或你想对 Server 进行维护时将会很有帮助。这种复制是以 Stream 为单位上进 行的。

  • Backlog Retention

    该配置控制 HStreamDB 的 Stream 中的 records 被写入后保留的时间。当超过 retention 保留的时间后,HStreamDB 将会清理这些 records,不管它是否被消费过。

    • 默认值=7 天
    • 最小值=1 秒
    • 最大值=21 天

创建一个 stream

在你写入 records 或者 创建一个订阅之前先创建一个 stream。

  1. // CreateStreamExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. public class CreateStreamExample {
  5. public static void main(String[] args) throws Exception {
  6. // TODO(developer): Replace these variables before running the sample.
  7. String serviceUrl = "127.0.0.1:6570";
  8. if (System.getenv("serviceUrl") != null) {
  9. serviceUrl = System.getenv("serviceUrl");
  10. }
  11. String streamName1 = "your_h_records_stream_name";
  12. String streamName2 = "your_raw_records_stream_name";
  13. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  14. createStreamExample(client, streamName1);
  15. createStreamWithAttrsExample(client, streamName2);
  16. client.close();
  17. }
  18. public static void createStreamExample(HStreamClient client, String streamName) {
  19. client.createStream(streamName);
  20. }
  21. public static void createStreamWithAttrsExample(HStreamClient client, String streamName) {
  22. client.createStream(
  23. streamName,
  24. (short) 1 // replication factor
  25. ,
  26. 10 // Number of shards
  27. ,
  28. 7 * 24 * 3600 // backlog retention time in seconds
  29. );
  30. }
  31. }
  1. // ExampleCreateStream.go
  2. package examples
  3. import (
  4. "log"
  5. "github.com/hstreamdb/hstreamdb-go/hstream"
  6. )
  7. func ExampleCreateStream() error {
  8. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  9. if err != nil {
  10. log.Fatalf("Creating client error: %s", err)
  11. }
  12. defer client.Close()
  13. // Create a stream, only specific streamName
  14. if err = client.CreateStream("testDefaultStream"); err != nil {
  15. log.Fatalf("Creating stream error: %s", err)
  16. }
  17. // Create a new stream with 1 replica, 5 shards, set the data retention to 1800s.
  18. err = client.CreateStream("testStream",
  19. hstream.WithReplicationFactor(1),
  20. hstream.EnableBacklog(1800),
  21. hstream.WithShardCount(5))
  22. if err != nil {
  23. log.Fatalf("Creating stream error: %s", err)
  24. }
  25. return nil
  26. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def create_stream(client):
  16. await client.create_stream(
  17. stream_name, replication_factor=1, backlog=24 * 60 * 60, shard_count=1
  18. )

删除一个 Stream

只有当一个 Stream 没有所属的订阅时才允许被删除,除非传一个强制标删除的 flag 。

强制删除一个 Stream

如果你需要删除一个有订阅的 stream 时,请启用强制删除。在强制删除一个 stream 后, 原来 stream 的订阅仍然可以从 backlog 中读取数据。这些订阅的 stream 名字会变成 __deleted_stream__。同时,我们并不允许在被删除的 stream 上创建新的订阅,也不允 许向该 stream 写入新的 record。

  1. // DeleteStreamExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. public class DeleteStreamExample {
  5. public static void main(String[] args) throws Exception {
  6. // TODO(developer): Replace these variables before running the sample.
  7. // String serviceUrl = "your-service-url-address";
  8. String serviceUrl = "127.0.0.1:6570";
  9. if (System.getenv("serviceUrl") != null) {
  10. serviceUrl = System.getenv("serviceUrl");
  11. }
  12. String streamName1 = "your_h_records_stream_name";
  13. String streamName2 = "your_raw_records_stream_name";
  14. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  15. deleteStreamExample(client, streamName1);
  16. deleteStreamForceExample(client, streamName2);
  17. client.close();
  18. }
  19. public static void deleteStreamExample(HStreamClient client, String streamName) {
  20. client.deleteStream(streamName);
  21. }
  22. public static void deleteStreamForceExample(HStreamClient client, String streamName) {
  23. client.deleteStream(streamName, true);
  24. }
  25. }
  1. // ExampleDeleteStream.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. )
  7. func ExampleDeleteStream() error {
  8. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  9. if err != nil {
  10. log.Fatalf("Creating client error: %s", err)
  11. }
  12. defer client.Close()
  13. // force delete stream and ignore none exist stream
  14. if err := client.DeleteStream("testStream",
  15. hstream.EnableForceDelete,
  16. hstream.EnableIgnoreNoneExist); err != nil {
  17. log.Fatalf("Deleting stream error: %s", err)
  18. }
  19. if err := client.DeleteStream("testDefaultStream"); err != nil {
  20. log.Fatalf("Deleting stream error: %s", err)
  21. }
  22. return nil
  23. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def delete_stream(client):
  16. await client.delete_stream(stream_name, ignore_non_exist=True, force=True)

列出所有 stream 信息

可以如下拿到所有 HStream 中的 stream:

  1. // ListStreamsExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. import io.hstream.Stream;
  5. import java.util.List;
  6. public class ListStreamsExample {
  7. public static void main(String[] args) throws Exception {
  8. // TODO(developer): Replace these variables before running the sample.
  9. String serviceUrl = "127.0.0.1:6570";
  10. if (System.getenv("serviceUrl") != null) {
  11. serviceUrl = System.getenv("serviceUrl");
  12. }
  13. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  14. listStreamExample(client);
  15. client.close();
  16. }
  17. public static void listStreamExample(HStreamClient client) {
  18. List<Stream> streams = client.listStreams();
  19. for (Stream stream : streams) {
  20. System.out.println(stream.getStreamName());
  21. }
  22. }
  23. }
  1. // ExampleListStreams.go
  2. package examples
  3. import (
  4. "fmt"
  5. "github.com/hstreamdb/hstreamdb-go/hstream"
  6. "log"
  7. )
  8. func ExampleListStreams() error {
  9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  10. if err != nil {
  11. log.Fatalf("Creating client error: %s", err)
  12. }
  13. defer client.Close()
  14. streams, err := client.ListStreams()
  15. if err != nil {
  16. log.Fatalf("Listing streams error: %s", err)
  17. }
  18. for _, stream := range streams {
  19. fmt.Printf("%+v\n", stream)
  20. }
  21. return nil
  22. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def list_streams(client):
  16. ss = await client.list_streams()
  17. for s in ss:
  18. print(s)