Streams management

Streams and durable consumers can be defined administratively outside the application (typically using the NATS CLI Tool) in which case the application only needs to know about the well-known names of the durable consumers it wants to use. But you can also manage streams and consumers programmatically.

Common stream management operations are:

  • Add (or delete) a stream. This is an idempotent function, meaning that it will create the stream if it doesn’t exist already, and if it does already exist on succeed if the already defined stream matches exactly the attributes specified in the ‘add’ call.
  • Purge a stream (delete all the messages stored in the stream)
  • Get or remove a specific message from a stream by sequence number
  • Add or update (or delete) a consumer
  • Get info and statistics on streams/consumers/account. Get/remove/get information on individual messages stored in a stream.

Go

  1. func ExampleJetStreamManager() {
  2. nc, _ := nats.Connect("localhost")
  3. js, _ := nc.JetStream()
  4. // Create a stream
  5. js.AddStream(&nats.StreamConfig{
  6. Name: "FOO",
  7. Subjects: []string{"foo"},
  8. MaxBytes: 1024,
  9. })
  10. // Update a stream
  11. js.UpdateStream(&nats.StreamConfig{
  12. Name: "FOO",
  13. MaxBytes: 2048,
  14. })
  15. // Create a durable consumer
  16. js.AddConsumer("FOO", &nats.ConsumerConfig{
  17. Durable: "BAR",
  18. })
  19. // Get information about all streams (with Context JSOpt)
  20. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  21. defer cancel()
  22. for info := range js.StreamsInfo(nats.Context(ctx)) {
  23. fmt.Println("stream name:", info.Config.Name)
  24. }
  25. // Get information about all consumers (with MaxWait JSOpt)
  26. for info := range js.ConsumersInfo("FOO", nats.MaxWait(10*time.Second)) {
  27. fmt.Println("consumer name:", info.Name)
  28. }
  29. // Delete a consumer
  30. js.DeleteConsumer("FOO", "BAR")
  31. // Delete a stream
  32. js.DeleteStream("FOO")
  33. }

Java

  1. package io.nats.examples.jetstream;
  2. import io.nats.client.Connection;
  3. import io.nats.client.JetStreamApiException;
  4. import io.nats.client.JetStreamManagement;
  5. import io.nats.client.Nats;
  6. import io.nats.client.api.PurgeResponse;
  7. import io.nats.client.api.StorageType;
  8. import io.nats.client.api.StreamConfiguration;
  9. import io.nats.client.api.StreamInfo;
  10. import io.nats.examples.ExampleArgs;
  11. import io.nats.examples.ExampleUtils;
  12. import java.util.List;
  13. import static io.nats.examples.jetstream.NatsJsUtils.*;
  14. /**
  15. * This example will demonstrate JetStream management (admin) api.
  16. */
  17. public class NatsJsManageStreams {
  18. static final String usageString =
  19. "\nUsage: java -cp <classpath> NatsJsManageStreams [-s server] [-strm stream-prefix] [-sub subject-prefix]"
  20. + "\n\nDefault Values:"
  21. + "\n [-strm] manage-stream-"
  22. + "\n [-sub] manage-subject-"
  23. + "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
  24. + "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n"
  25. + "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n"
  26. + "\nUse the URL in the -s server parameter for user/pass/token authentication.\n";
  27. public static void main(String[] args) {
  28. ExampleArgs exArgs = ExampleArgs.builder("Manage Streams", args, usageString)
  29. .defaultStream("manage-stream-")
  30. .defaultSubject("manage-subject-")
  31. .build();
  32. String stream1 = exArgs.stream + "1";
  33. String stream2 = exArgs.stream + "2";
  34. String subject1 = exArgs.subject + "1";
  35. String subject2 = exArgs.subject + "2";
  36. String subject3 = exArgs.subject + "3";
  37. String subject4 = exArgs.subject + "4";
  38. try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server))) {
  39. // Create a JetStreamManagement context.
  40. JetStreamManagement jsm = nc.jetStreamManagement();
  41. // we want to be able to completely create and delete the streams
  42. // so don't want to work with existing streams
  43. exitIfStreamExists(jsm, stream1);
  44. exitIfStreamExists(jsm, stream2);
  45. // 1. Create (add) a stream with a subject
  46. System.out.println("\n----------\n1. Configure And Add Stream 1");
  47. StreamConfiguration streamConfig = StreamConfiguration.builder()
  48. .name(stream1)
  49. .subjects(subject1)
  50. // .retentionPolicy(...)
  51. // .maxConsumers(...)
  52. // .maxBytes(...)
  53. // .maxAge(...)
  54. // .maxMsgSize(...)
  55. .storageType(StorageType.Memory)
  56. // .replicas(...)
  57. // .noAck(...)
  58. // .template(...)
  59. // .discardPolicy(...)
  60. .build();
  61. StreamInfo streamInfo = jsm.addStream(streamConfig);
  62. NatsJsUtils.printStreamInfo(streamInfo);
  63. // 2. Update stream, in this case add a subject
  64. // Thre are very few properties that can actually
  65. // - StreamConfiguration is immutable once created
  66. // - but the builder can help with that.
  67. System.out.println("----------\n2. Update Stream 1");
  68. streamConfig = StreamConfiguration.builder(streamInfo.getConfiguration())
  69. .addSubjects(subject2).build();
  70. streamInfo = jsm.updateStream(streamConfig);
  71. NatsJsUtils.printStreamInfo(streamInfo);
  72. // 3. Create (add) another stream with 2 subjects
  73. System.out.println("----------\n3. Configure And Add Stream 2");
  74. streamConfig = StreamConfiguration.builder()
  75. .name(stream2)
  76. .subjects(subject3, subject4)
  77. .storageType(StorageType.Memory)
  78. .build();
  79. streamInfo = jsm.addStream(streamConfig);
  80. NatsJsUtils.printStreamInfo(streamInfo);
  81. // 4. Get information on streams
  82. // 4.0 publish some message for more interesting stream state information
  83. // - SUBJECT1 is associated with STREAM1
  84. // 4.1 getStreamInfo on a specific stream
  85. // 4.2 get a list of all streams
  86. // 4.3 get a list of StreamInfo's for all streams
  87. System.out.println("----------\n4.1 getStreamInfo");
  88. publish(nc, subject1, 5);
  89. streamInfo = jsm.getStreamInfo(stream1);
  90. NatsJsUtils.printStreamInfo(streamInfo);
  91. System.out.println("----------\n4.2 getStreamNames");
  92. List<String> streamNames = jsm.getStreamNames();
  93. printObject(streamNames);
  94. System.out.println("----------\n4.3 getStreams");
  95. List<StreamInfo> streamInfos = jsm.getStreams();
  96. NatsJsUtils.printStreamInfoList(streamInfos);
  97. // 5. Purge a stream of it's messages
  98. System.out.println("----------\n5. Purge stream");
  99. PurgeResponse purgeResponse = jsm.purgeStream(stream1);
  100. printObject(purgeResponse);
  101. // 6. Delete the streams
  102. // Subsequent calls to getStreamInfo, deleteStream or purgeStream
  103. // will throw a JetStreamApiException "stream not found [10059]"
  104. System.out.println("----------\n6. Delete streams");
  105. jsm.deleteStream(stream1);
  106. jsm.deleteStream(stream2);
  107. // 7. Try to delete the consumer again and get the exception
  108. System.out.println("----------\n7. Delete stream again");
  109. try
  110. {
  111. jsm.deleteStream(stream1);
  112. }
  113. catch (JetStreamApiException e)
  114. {
  115. System.out.println("Exception was: '" + e.getMessage() + "'");
  116. }
  117. System.out.println("----------\n");
  118. }
  119. catch (Exception exp) {
  120. exp.printStackTrace();
  121. }
  122. }
  123. }

JavaScript

  1. import { AckPolicy, connect, Empty } from "../../src/mod.ts";
  2. const nc = await connect();
  3. const jsm = await nc.jetstreamManager();
  4. // list all the streams, the `next()` function
  5. // retrieves a paged result.
  6. const streams = await jsm.streams.list().next();
  7. streams.forEach((si) => {
  8. console.log(si);
  9. });
  10. // add a stream
  11. const stream = "mystream";
  12. const subj = `mystream.*`;
  13. await jsm.streams.add({ name: stream, subjects: [subj] });
  14. // publish a reg nats message directly to the stream
  15. for (let i = 0; i < 10; i++) {
  16. nc.publish(`${subj}.a`, Empty);
  17. }
  18. // find a stream that stores a specific subject:
  19. const name = await jsm.streams.find("mystream.A");
  20. // retrieve info about the stream by its name
  21. const si = await jsm.streams.info(name);
  22. // update a stream configuration
  23. si.config.subjects?.push("a.b");
  24. await jsm.streams.update(name, si.config);
  25. // get a particular stored message in the stream by sequence
  26. // this is not associated with a consumer
  27. const sm = await jsm.streams.getMessage(stream, { seq: 1 });
  28. console.log(sm.seq);
  29. // delete the 5th message in the stream, securely erasing it
  30. await jsm.streams.deleteMessage(stream, 5);
  31. // purge all messages in the stream, the stream itself
  32. // remains.
  33. await jsm.streams.purge(stream);
  34. // purge all messages with a specific subject (filter can be a wildcard)
  35. await jsm.streams.purge(stream, { filter: "a.b" });
  36. // purge messages with a specific subject keeping some messages
  37. await jsm.streams.purge(stream, { filter: "a.c", keep: 5 });
  38. // purge all messages with upto (not including seq)
  39. await jsm.streams.purge(stream, { seq: 100 });
  40. // purge all messages with upto sequence that have a matching subject
  41. await jsm.streams.purge(stream, { filter: "a.d", seq: 100 });
  42. // list all consumers for a stream:
  43. const consumers = await jsm.consumers.list(stream).next();
  44. consumers.forEach((ci) => {
  45. console.log(ci);
  46. });
  47. // add a new durable pull consumer
  48. await jsm.consumers.add(stream, {
  49. durable_name: "me",
  50. ack_policy: AckPolicy.Explicit,
  51. });
  52. // retrieve a consumer's configuration
  53. const ci = await jsm.consumers.info(stream, "me");
  54. console.log(ci);
  55. // delete a particular consumer
  56. await jsm.consumers.delete(stream, "me");

Python

  1. import asyncio
  2. import nats
  3. from nats.errors import TimeoutError
  4. async def main():
  5. nc = await nats.connect("localhost")
  6. # Create JetStream context.
  7. js = nc.jetstream()
  8. # Persist messages on 'foo's subject.
  9. await js.add_stream(name="sample-stream", subjects=["foo"])
  10. await nc.close()
  11. if __name__ == '__main__':
  12. asyncio.run(main())

C

  1. #include "examples.h"
  2. static const char *usage = ""\
  3. "-stream stream name (default is 'foo')\n" \
  4. "-txt text to send (default is 'hello')\n" \
  5. "-count number of messages to send\n" \
  6. "-sync publish synchronously (default is async)\n";
  7. static void
  8. _jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
  9. {
  10. int *errors = (int*) closure;
  11. printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
  12. printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
  13. *errors = (*errors + 1);
  14. // If we wanted to resend the original message, we would do something like that:
  15. //
  16. // js_PublishMsgAsync(js, &(pae->Msg), NULL);
  17. //
  18. // Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
  19. // ownership, and the library will not destroy the message when this callback returns.
  20. // No need to destroy anything, everything is handled by the library.
  21. }
  22. int main(int argc, char **argv)
  23. {
  24. natsConnection *conn = NULL;
  25. natsStatistics *stats = NULL;
  26. natsOptions *opts = NULL;
  27. jsCtx *js = NULL;
  28. jsOptions jsOpts;
  29. jsErrCode jerr = 0;
  30. natsStatus s;
  31. int dataLen=0;
  32. volatile int errors = 0;
  33. bool delStream = false;
  34. opts = parseArgs(argc, argv, usage);
  35. dataLen = (int) strlen(payload);
  36. s = natsConnection_Connect(&conn, opts);
  37. if (s == NATS_OK)
  38. s = jsOptions_Init(&jsOpts);
  39. if (s == NATS_OK)
  40. {
  41. if (async)
  42. {
  43. jsOpts.PublishAsync.ErrHandler = _jsPubErr;
  44. jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
  45. }
  46. s = natsConnection_JetStream(&js, conn, &jsOpts);
  47. }
  48. if (s == NATS_OK)
  49. {
  50. jsStreamInfo *si = NULL;
  51. // First check if the stream already exists.
  52. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  53. if (s == NATS_NOT_FOUND)
  54. {
  55. jsStreamConfig cfg;
  56. // Since we are the one creating this stream, we can delete at the end.
  57. delStream = true;
  58. // Initialize the configuration structure.
  59. jsStreamConfig_Init(&cfg);
  60. cfg.Name = stream;
  61. // Set the subject
  62. cfg.Subjects = (const char*[1]){subj};
  63. cfg.SubjectsLen = 1;
  64. // Make it a memory stream.
  65. cfg.Storage = js_MemoryStorage;
  66. // Add the stream,
  67. s = js_AddStream(&si, js, &cfg, NULL, &jerr);
  68. }
  69. if (s == NATS_OK)
  70. {
  71. printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  72. si->Config->Name, si->State.Msgs, si->State.Bytes);
  73. // Need to destroy the returned stream object.
  74. jsStreamInfo_Destroy(si);
  75. }
  76. }
  77. if (s == NATS_OK)
  78. s = natsStatistics_Create(&stats);
  79. if (s == NATS_OK)
  80. {
  81. printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
  82. start = nats_Now();
  83. }
  84. for (count = 0; (s == NATS_OK) && (count < total); count++)
  85. {
  86. if (async)
  87. s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
  88. else
  89. {
  90. jsPubAck *pa = NULL;
  91. s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
  92. if (s == NATS_OK)
  93. {
  94. if (pa->Duplicate)
  95. printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
  96. jsPubAck_Destroy(pa);
  97. }
  98. }
  99. }
  100. if ((s == NATS_OK) && async)
  101. {
  102. jsPubOptions jsPubOpts;
  103. jsPubOptions_Init(&jsPubOpts);
  104. // Let's set it to 30 seconds, if getting "Timeout" errors,
  105. // this may need to be increased based on the number of messages
  106. // being sent.
  107. jsPubOpts.MaxWait = 30000;
  108. s = js_PublishAsyncComplete(js, &jsPubOpts);
  109. if (s == NATS_TIMEOUT)
  110. {
  111. // Let's get the list of pending messages. We could resend,
  112. // etc, but for now, just destroy them.
  113. natsMsgList list;
  114. js_PublishAsyncGetPendingList(&list, js);
  115. natsMsgList_Destroy(&list);
  116. }
  117. }
  118. if (s == NATS_OK)
  119. {
  120. jsStreamInfo *si = NULL;
  121. elapsed = nats_Now() - start;
  122. printStats(STATS_OUT, conn, NULL, stats);
  123. printPerf("Sent");
  124. if (errors != 0)
  125. printf("There were %d asynchronous errors\n", errors);
  126. // Let's report some stats after the run
  127. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  128. if (s == NATS_OK)
  129. {
  130. printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  131. si->Config->Name, si->State.Msgs, si->State.Bytes);
  132. jsStreamInfo_Destroy(si);
  133. }
  134. }
  135. if (delStream && (js != NULL))
  136. {
  137. printf("\nDeleting stream %s: ", stream);
  138. s = js_DeleteStream(js, stream, NULL, &jerr);
  139. if (s == NATS_OK)
  140. printf("OK!");
  141. printf("\n");
  142. }
  143. if (s != NATS_OK)
  144. {
  145. printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
  146. nats_PrintLastErrorStack(stderr);
  147. }
  148. // Destroy all our objects to avoid report of memory leak
  149. jsCtx_Destroy(js);
  150. natsStatistics_Destroy(stats);
  151. natsConnection_Destroy(conn);
  152. natsOptions_Destroy(opts);
  153. // To silence reports of memory still in used with valgrind
  154. nats_Close();
  155. return 0;
  156. }

{% endtabs %}

{% endtabs %}