Create and Manage Subscriptions

Attributes of a Subscription

  • ackTimeoutSeconds.

    Specifies the max amount of time for the server to mark the record as unacknowledged, after which the record will be sent again.

  • maxUnackedRecords.

    The maximum amount of unacknowledged records allowed. After exceeding the size set, the server will stop sending records to corresponding consumers.

Create a subscription

Every subscription has to specify which stream to subscribe to, which means you have to make sure the stream to be subscribed has already been created.

For the subscription name, please refer to the guidelines to name a resource

When creating a subscription, you can provide the attributes mentioned like this:

  1. // CreateSubscriptionExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. import io.hstream.Subscription;
  5. public class CreateSubscriptionExample {
  6. public static void main(String[] args) throws Exception {
  7. // TODO(developer): Replace these variables before running the sample.
  8. String serviceUrl = "127.0.0.1:6570";
  9. if (System.getenv("serviceUrl") != null) {
  10. serviceUrl = System.getenv("serviceUrl");
  11. }
  12. String streamName = "your_h_records_stream_name";
  13. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  14. createSubscriptionExample(client, streamName);
  15. client.close();
  16. }
  17. public static void createSubscriptionExample(HStreamClient client, String streamName) {
  18. // TODO(developer): Specify the options while creating the subscription
  19. String subscriptionId = "your_subscription_id";
  20. Subscription subscription =
  21. Subscription.newBuilder().subscription(subscriptionId).stream(streamName)
  22. .ackTimeoutSeconds(600) // The default setting is 600 seconds
  23. .maxUnackedRecords(10000) // The default setting is 10000 records
  24. .build();
  25. client.createSubscription(subscription);
  26. }
  27. }
  1. // ExampleCreateSubscription.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. )
  7. func ExampleCreateSubscription() error {
  8. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  9. if err != nil {
  10. log.Fatalf("Creating client error: %s", err)
  11. }
  12. defer client.Close()
  13. streamName := "testStream"
  14. subId0 := "SubscriptionId0"
  15. subId1 := "SubscriptionId1"
  16. // Create a new subscription with ack timeout = 60s, max unAcked records num set to 10000 and set
  17. // subscriptionOffset to Earliest
  18. if err = client.CreateSubscription(subId0, streamName,
  19. hstream.WithAckTimeout(60),
  20. hstream.WithMaxUnackedRecords(10000),
  21. hstream.WithOffset(hstream.EARLIEST)); err != nil {
  22. log.Fatalf("Creating subscription error: %s", err)
  23. }
  24. if err = client.CreateSubscription(subId1, streamName,
  25. hstream.WithAckTimeout(600),
  26. hstream.WithMaxUnackedRecords(5000),
  27. hstream.WithOffset(hstream.LATEST)); err != nil {
  28. log.Fatalf("Creating subscription error: %s", err)
  29. }
  30. return nil
  31. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def create_subscription(client):
  16. await client.create_subscription(
  17. subscription,
  18. stream_name,
  19. ack_timeout=600,
  20. max_unacks=10000,
  21. offset=hstreamdb.SpecialOffset.EARLIEST,
  22. )

Delete a subscription

To delete a subscription without the force flag, you need to make sure that there is no active subscription consumer.

Delete a subscription with the force flag

If you do want to delete a subscription with running consumers, enable force deletion. While force deleting a subscription, the subscription will be in deleting state and closing running consumers, which means you will not be able to join, delete or create a subscription with the same name. After the deletion completes, you can create a subscription with the same name. However, this new subscription will be a brand new subscription. Even if they subscribe to the same stream, this new subscription will not share the consumption progress with the deleted subscription.

  1. // DeleteSubscriptionExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. public class DeleteSubscriptionExample {
  5. public static void main(String[] args) throws Exception {
  6. // TODO(developer): Replace these variables before running the sample.
  7. // String serviceUrl = "your-service-url-address";
  8. String serviceUrl = "127.0.0.1:6570";
  9. if (System.getenv("serviceUrl") != null) {
  10. serviceUrl = System.getenv("serviceUrl");
  11. }
  12. String subscriptionId = "your_subscription_id";
  13. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  14. deleteSubscriptionExample(client, subscriptionId);
  15. client.close();
  16. }
  17. public static void deleteSubscriptionExample(HStreamClient client, String subscriptionId) {
  18. client.deleteSubscription(subscriptionId);
  19. }
  20. public static void deleteSubscriptionForceExample(HStreamClient client, String subscriptionId) {
  21. client.deleteSubscription(subscriptionId, true);
  22. }
  23. }
  1. // ExampleDeleteSubscription.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. )
  7. func ExampleDeleteSubscription() error {
  8. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  9. if err != nil {
  10. log.Fatalf("Creating client error: %s", err)
  11. }
  12. defer client.Close()
  13. subId0 := "SubscriptionId0"
  14. subId1 := "SubscriptionId1"
  15. // force delete subscription
  16. if err = client.DeleteSubscription(subId0, true); err != nil {
  17. log.Fatalf("Force deleting subscription error: %s", err)
  18. }
  19. // delete subscription
  20. if err = client.DeleteSubscription(subId1, false); err != nil {
  21. log.Fatalf("Deleting subscription error: %s", err)
  22. }
  23. return nil
  24. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def delete_subscription(client):
  16. await client.delete_subscription(subscription, force=True)

List subscriptions

To list all subscriptions in HStream

  1. // ListSubscriptionsExample.java
  2. package docs.code.examples;
  3. import io.hstream.HStreamClient;
  4. import io.hstream.Subscription;
  5. import java.util.List;
  6. public class ListSubscriptionsExample {
  7. public static void main(String[] args) throws Exception {
  8. // TODO(developer): Replace these variables before running the sample.
  9. String serviceUrl = "127.0.0.1:6570";
  10. if (System.getenv("serviceUrl") != null) {
  11. serviceUrl = System.getenv("serviceUrl");
  12. }
  13. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  14. listSubscriptionExample(client);
  15. client.close();
  16. }
  17. public static void listSubscriptionExample(HStreamClient client) {
  18. List<Subscription> subscriptions = client.listSubscriptions();
  19. for (Subscription subscription : subscriptions) {
  20. System.out.println(subscription.getSubscriptionId());
  21. }
  22. }
  23. }
  1. // ExampleListSubscriptions.go
  2. package examples
  3. import (
  4. "fmt"
  5. "github.com/hstreamdb/hstreamdb-go/hstream"
  6. "log"
  7. )
  8. func ExampleListSubscriptions() error {
  9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  10. if err != nil {
  11. log.Fatalf("Creating client error: %s", err)
  12. }
  13. defer client.Close()
  14. subscriptions, err := client.ListSubscriptions()
  15. if err != nil {
  16. log.Fatalf("Listing subscriptions error: %s", err)
  17. }
  18. for _, sub := range subscriptions {
  19. fmt.Printf("%+v\n", sub)
  20. }
  21. return nil
  22. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def list_subscriptions(client):
  16. subscriptions = await client.list_subscriptions()
  17. for s in subscriptions:
  18. print(s)