创建和管理 Subscription

Subscription 的属性

  • ackTimeoutSeconds

    指定 HServer 将 records 标记为 unacked 的最大等待时间,之后该记录将被再次发送。

  • maxUnackedRecords。

    允许的未 acked record 的最大数量。超过设定的大小后,服务器将停止向相应的消费者 发送 records。

创建一个 Subscription

每个 subscription 都必须指定要订阅哪个 stream,这意味着你必须确保要订阅的 stream 已经被创建。

关于订阅的名称,请参考资源命名准则

当创建一个 subscription 时,你可以像这样提供提到的属性:

  1. // CreateSubscriptionExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. import io.hstream.Subscription;
  5. public class CreateSubscriptionExample {
  6. public static void main(String[] args) throws Exception {
  7. // TODO(developer): Replace these variables before running the sample.
  8. String serviceUrl = "127.0.0.1:6570";
  9. if (System.getenv("serviceUrl") != null) {
  10. serviceUrl = System.getenv("serviceUrl");
  11. }
  12. String streamName = "your_h_records_stream_name";
  13. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  14. createSubscriptionExample(client, streamName);
  15. client.close();
  16. }
  17. public static void createSubscriptionExample(HStreamClient client, String streamName) {
  18. // TODO(developer): Specify the options while creating the subscription
  19. String subscriptionId = "your_subscription_id";
  20. Subscription subscription =
  21. Subscription.newBuilder().subscription(subscriptionId).stream(streamName)
  22. .ackTimeoutSeconds(600) // The default setting is 600 seconds
  23. .maxUnackedRecords(10000) // The default setting is 10000 records
  24. .build();
  25. client.createSubscription(subscription);
  26. }
  27. }
  1. // ExampleCreateSubscription.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. )
  7. func ExampleCreateSubscription() error {
  8. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  9. if err != nil {
  10. log.Fatalf("Creating client error: %s", err)
  11. }
  12. defer client.Close()
  13. streamName := "testStream"
  14. subId0 := "SubscriptionId0"
  15. subId1 := "SubscriptionId1"
  16. // Create a new subscription with ack timeout = 60s, max unAcked records num set to 10000 and set
  17. // subscriptionOffset to Earliest
  18. if err = client.CreateSubscription(subId0, streamName,
  19. hstream.WithAckTimeout(60),
  20. hstream.WithMaxUnackedRecords(10000),
  21. hstream.WithOffset(hstream.EARLIEST)); err != nil {
  22. log.Fatalf("Creating subscription error: %s", err)
  23. }
  24. if err = client.CreateSubscription(subId1, streamName,
  25. hstream.WithAckTimeout(600),
  26. hstream.WithMaxUnackedRecords(5000),
  27. hstream.WithOffset(hstream.LATEST)); err != nil {
  28. log.Fatalf("Creating subscription error: %s", err)
  29. }
  30. return nil
  31. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def create_subscription(client):
  16. await client.create_subscription(
  17. subscription,
  18. stream_name,
  19. ack_timeout=600,
  20. max_unacks=10000,
  21. offset=hstreamdb.SpecialOffset.EARLIEST,
  22. )

删除一个订阅

要删除一个的订阅,你需要确保没有活跃的订阅消费者,除非启用强制删除。

强制删除一个 Subscription

如果你确实想删除一个 subscription,并且有消费者正在运行,请启用强制删除。当强制 删除一个 subscription 时,该订阅将处于删除中的状态,并关闭正在运行的消费者,这意 味着你将无法加入、删除或创建一个同名的 subscription 。在删除完成后,你可以用同样 的名字创建一个订阅,这个订阅将是一个全新的订阅。即使他们订阅的是同一个流,这个新 的订阅也不会与被删除的订阅共享消费进度。

  1. // DeleteSubscriptionExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. public class DeleteSubscriptionExample {
  5. public static void main(String[] args) throws Exception {
  6. // TODO(developer): Replace these variables before running the sample.
  7. // String serviceUrl = "your-service-url-address";
  8. String serviceUrl = "127.0.0.1:6570";
  9. if (System.getenv("serviceUrl") != null) {
  10. serviceUrl = System.getenv("serviceUrl");
  11. }
  12. String subscriptionId = "your_subscription_id";
  13. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  14. deleteSubscriptionExample(client, subscriptionId);
  15. client.close();
  16. }
  17. public static void deleteSubscriptionExample(HStreamClient client, String subscriptionId) {
  18. client.deleteSubscription(subscriptionId);
  19. }
  20. public static void deleteSubscriptionForceExample(HStreamClient client, String subscriptionId) {
  21. client.deleteSubscription(subscriptionId, true);
  22. }
  23. }
  1. // ExampleDeleteSubscription.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. )
  7. func ExampleDeleteSubscription() error {
  8. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  9. if err != nil {
  10. log.Fatalf("Creating client error: %s", err)
  11. }
  12. defer client.Close()
  13. subId0 := "SubscriptionId0"
  14. subId1 := "SubscriptionId1"
  15. // force delete subscription
  16. if err = client.DeleteSubscription(subId0, true); err != nil {
  17. log.Fatalf("Force deleting subscription error: %s", err)
  18. }
  19. // delete subscription
  20. if err = client.DeleteSubscription(subId1, false); err != nil {
  21. log.Fatalf("Deleting subscription error: %s", err)
  22. }
  23. return nil
  24. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def delete_subscription(client):
  16. await client.delete_subscription(subscription, force=True)

列出 HStream 中的 subscription 信息

  1. // ListSubscriptionsExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. import io.hstream.Subscription;
  5. import java.util.List;
  6. public class ListSubscriptionsExample {
  7. public static void main(String[] args) throws Exception {
  8. // TODO(developer): Replace these variables before running the sample.
  9. String serviceUrl = "127.0.0.1:6570";
  10. if (System.getenv("serviceUrl") != null) {
  11. serviceUrl = System.getenv("serviceUrl");
  12. }
  13. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  14. listSubscriptionExample(client);
  15. client.close();
  16. }
  17. public static void listSubscriptionExample(HStreamClient client) {
  18. List<Subscription> subscriptions = client.listSubscriptions();
  19. for (Subscription subscription : subscriptions) {
  20. System.out.println(subscription.getSubscriptionId());
  21. }
  22. }
  23. }
  1. // ExampleListSubscriptions.go
  2. package examples
  3. import (
  4. "fmt"
  5. "github.com/hstreamdb/hstreamdb-go/hstream"
  6. "log"
  7. )
  8. func ExampleListSubscriptions() error {
  9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  10. if err != nil {
  11. log.Fatalf("Creating client error: %s", err)
  12. }
  13. defer client.Close()
  14. subscriptions, err := client.ListSubscriptions()
  15. if err != nil {
  16. log.Fatalf("Listing subscriptions error: %s", err)
  17. }
  18. for _, sub := range subscriptions {
  19. fmt.Printf("%+v\n", sub)
  20. }
  21. return nil
  22. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def list_subscriptions(client):
  16. subscriptions = await client.list_subscriptions()
  17. for s in subscriptions:
  18. print(s)