JetStream Consumers

Consumers are how client applications get the messages stored in the streams. You can have many consumers on a single stream. Consumers are like a view on a stream, can filter messages and have some state (maintained by the servers) associated with them.

Consumers can be ‘durable’ or ‘ephemeral’.

Ephemeral consumers

Ephemeral consumers are meant to be used by a single instance of an application (e.g. to get its own replay of the messages in the stream).

Ephemeral consumers are not meant to last ‘forever’, they are defined automatically at subscription time by the client library and disappear after the application disconnect.

You (automatically) create an ephemeral consumer when you call the js.Subscribe function without specifying the Durable or Bind subscription options. Calling Drain on that subscription automatically deletes the underlying ephemeral consumer. You can also explicitly create an ephemeral consumer by not passing a durable name option to the jsm.AddConsumer call.

Go

  1. func ExampleJetStream() {
  2. nc, err := nats.Connect("localhost")
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. // Use the JetStream context to produce and consumer messages
  7. // that have been persisted.
  8. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. js.AddStream(&nats.StreamConfig{
  13. Name: "FOO",
  14. Subjects: []string{"foo"},
  15. })
  16. js.Publish("foo", []byte("Hello JS!"))
  17. // ordered push consumer
  18. js.Subscribe("foo", func(msg *nats.Msg) {
  19. meta, _ := msg.Metadata()
  20. fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream)
  21. fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
  22. }, nats.OrderedConsumer())
  23. }

Java

  1. package io.nats.examples.jetstream;
  2. import io.nats.client.*;
  3. import io.nats.client.api.PublishAck;
  4. import io.nats.client.impl.NatsMessage;
  5. import io.nats.examples.ExampleArgs;
  6. import io.nats.examples.ExampleUtils;
  7. import java.nio.charset.StandardCharsets;
  8. import java.time.Duration;
  9. import java.time.temporal.TemporalUnit;
  10. public class myExample {
  11. public static void main(String[] args) {
  12. final String subject = "foo";
  13. try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions("localhost"))) {
  14. // Create a JetStream context. This hangs off the original connection
  15. // allowing us to produce data to streams and consume data from
  16. // JetStream consumers.
  17. JetStream js = nc.jetStream();
  18. // This example assumes there is a stream already created on subject "foo" and some messages already stored in that stream
  19. // create our message handler.
  20. MessageHandler handler = msg -> {
  21. System.out.println("\nMessage Received:");
  22. if (msg.hasHeaders()) {
  23. System.out.println(" Headers:");
  24. for (String key : msg.getHeaders().keySet()) {
  25. for (String value : msg.getHeaders().get(key)) {
  26. System.out.printf(" %s: %s\n", key, value);
  27. }
  28. }
  29. }
  30. System.out.printf(" Subject: %s\n Data: %s\n",
  31. msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
  32. System.out.println(" " + msg.metaData());
  33. };
  34. Dispatcher dispatcher = nc.createDispatcher();
  35. PushSubscribeOptions pso = PushSubscribeOptions.builder().ordered(true).build();
  36. JetStreamSubscription sub = js.subscribe(subject, dispatcher, handler, false, pso);
  37. Thread.sleep(100);
  38. sub.drain(Duration.ofMillis(100));
  39. nc.drain(Duration.ofMillis(100));
  40. }
  41. catch(Exception e)
  42. {
  43. e.printStackTrace();
  44. }
  45. }
  46. }

JavaScript

  1. import { connect, consumerOpts } from "../../src/mod.ts";
  2. const nc = await connect();
  3. const js = nc.jetstream();
  4. // note the consumer is not a durable - so when after the
  5. // subscription ends, the server will auto destroy the
  6. // consumer
  7. const opts = consumerOpts();
  8. opts.manualAck();
  9. opts.maxMessages(2);
  10. opts.deliverTo("xxx");
  11. const sub = await js.subscribe("a.>", opts);
  12. await (async () => {
  13. for await (const m of sub) {
  14. console.log(m.seq, m.subject);
  15. m.ack();
  16. }
  17. })();
  18. await nc.close();

Python

  1. import asyncio
  2. import nats
  3. from nats.errors import TimeoutError
  4. async def main():
  5. nc = await nats.connect("localhost")
  6. # Create JetStream context.
  7. js = nc.jetstream()
  8. # Create ordered consumer with flow control and heartbeats
  9. # that auto resumes on failures.
  10. osub = await js.subscribe("foo", ordered_consumer=True)
  11. data = bytearray()
  12. while True:
  13. try:
  14. msg = await osub.next_msg()
  15. data.extend(msg.data)
  16. except TimeoutError:
  17. break
  18. print("All data in stream:", len(data))
  19. await nc.close()
  20. if __name__ == '__main__':
  21. asyncio.run(main())

Durable consumers

Durable consumers are meant to be used by multiple instances of an application, either to distribute and scale out the processing, or to persist the position of the consumer over the stream between runs of an application.

Durable consumers as the name implies are meant to last ‘forever’ and are typically created and deleted administratively rather than by the application code which only needs to specify the durable’s well known name to use it.

You create a durable consumer using the nats consumer add CLI tool command, or programmatically by passing a durable name option to the stream creation call.

Push and Pull consumers

Technically, there are two implementations of consumers identified as ‘push’ or ‘pull’ (which refer to the way subscription interest is being done) depending on whether they have a delivery subject set or not.

A pull consumer is functionally equivalent to a push consumer using a queue group and explicit acknowledgement: the messages from the stream are distributed automatically between the subscribers to the push consumer or the ‘fetchers’ to the pull consumer. However, the recommendation is to use the pull consumer as they create less CPU load on the nats-servers and therefore scale further (note that the push consumers are still quite fast and scalable, you may only notice the difference between the two if you have sustained high message rates).

Pull

Go

  1. func ExampleJetStream() {
  2. nc, err := nats.Connect("localhost")
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. // Use the JetStream context to produce and consumer messages
  7. // that have been persisted.
  8. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. js.AddStream(&nats.StreamConfig{
  13. Name: "FOO",
  14. Subjects: []string{"foo"},
  15. })
  16. js.Publish("foo", []byte("Hello JS!"))
  17. // Publish messages asynchronously.
  18. for i := 0; i < 500; i++ {
  19. js.PublishAsync("foo", []byte("Hello JS Async!"))
  20. }
  21. select {
  22. case <-js.PublishAsyncComplete():
  23. case <-time.After(5 * time.Second):
  24. fmt.Println("Did not resolve in time")
  25. }
  26. // Create Pull based consumer with maximum 128 inflight.
  27. sub, _ = js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128))
  28. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  29. defer cancel()
  30. for {
  31. select {
  32. case <-ctx.Done():
  33. return
  34. default:
  35. }
  36. // Fetch will return as soon as any message is available rather than wait until the full batch size is available, using a batch size of more than 1 allows for higher throughput when needed.
  37. msgs, _ := sub.Fetch(10, nats.Context(ctx))
  38. for _, msg := range msgs {
  39. msg.Ack()
  40. }
  41. }
  42. }

Java

  1. package io.nats.examples.jetstream;
  2. import io.nats.client.*;
  3. import io.nats.examples.ExampleArgs;
  4. import io.nats.examples.ExampleUtils;
  5. import java.time.Duration;
  6. import static io.nats.examples.jetstream.NatsJsUtils.createStreamExitWhenExists;
  7. import static io.nats.examples.jetstream.NatsJsUtils.publishInBackground;
  8. /**
  9. * This example will demonstrate basic use of a pull subscription of:
  10. * batch size only pull: <code>pull(int batchSize)</code>
  11. */
  12. public class NatsJsPullSubBatchSize {
  13. static final String usageString =
  14. "\nUsage: java -cp <classpath> NatsJsPullSubBatchSize [-s server] [-strm stream] [-sub subject] [-dur durable] [-mcnt msgCount]"
  15. + "\n\nDefault Values:"
  16. + "\n [-strm] pull-stream"
  17. + "\n [-sub] pull-subject"
  18. + "\n [-dur] pull-durable"
  19. + "\n [-mcnt] 20"
  20. + "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
  21. + "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n"
  22. + "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n"
  23. + "\nUse the URL in the -s server parameter for user/pass/token authentication.\n";
  24. public static void main(String[] args) {
  25. ExampleArgs exArgs = ExampleArgs.builder("Pull Subscription using primitive Batch Size", args, usageString)
  26. .defaultStream("pull-stream")
  27. .defaultSubject("pull-subject")
  28. .defaultDurable("pull-durable")
  29. .defaultMsgCount(15)
  30. .build();
  31. try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server))) {
  32. // Create a JetStreamManagement context.
  33. JetStreamManagement jsm = nc.jetStreamManagement();
  34. // Use the utility to create a stream stored in memory.
  35. createStreamExitWhenExists(jsm, exArgs.stream, exArgs.subject);
  36. // Create our JetStream context.
  37. JetStream js = nc.jetStream();
  38. // start publishing the messages, don't wait for them to finish, simulating an outside producer
  39. publishInBackground(js, exArgs.subject, "pull-message", exArgs.msgCount);
  40. // Build our subscription options. Durable is REQUIRED for pull based subscriptions
  41. PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
  42. .durable(exArgs.durable) // required
  43. .build();
  44. // subscribe
  45. JetStreamSubscription sub = js.subscribe(exArgs.subject, pullOptions);
  46. nc.flush(Duration.ofSeconds(1));
  47. int red = 0;
  48. while (red < exArgs.msgCount) {
  49. sub.pull(10);
  50. Message m = sub.nextMessage(Duration.ofSeconds(1)); // first message
  51. while (m != null) {
  52. if (m.isJetStream()) {
  53. red++; // process message
  54. System.out.println("" + red + ". " + m);
  55. m.ack();
  56. }
  57. m = sub.nextMessage(Duration.ofMillis(100)); // other messages should already be on the client
  58. }
  59. }
  60. // delete the stream since we are done with it.
  61. jsm.deleteStream(exArgs.stream);
  62. }
  63. catch (Exception e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. }

JavaScript

  1. import { AckPolicy, connect, nanos } from "../../src/mod.ts";
  2. import { nuid } from "../../nats-base-client/nuid.ts";
  3. const nc = await connect();
  4. const stream = nuid.next();
  5. const subj = nuid.next();
  6. const durable = nuid.next();
  7. const jsm = await nc.jetstreamManager();
  8. await jsm.streams.add(
  9. { name: stream, subjects: [subj] },
  10. );
  11. const js = nc.jetstream();
  12. await js.publish(subj);
  13. await js.publish(subj);
  14. await js.publish(subj);
  15. await js.publish(subj);
  16. const psub = await js.pullSubscribe(subj, {
  17. mack: true,
  18. // artificially low ack_wait, to show some messages
  19. // not getting acked being redelivered
  20. config: {
  21. durable_name: durable,
  22. ack_policy: AckPolicy.Explicit,
  23. ack_wait: nanos(4000),
  24. },
  25. });
  26. (async () => {
  27. for await (const m of psub) {
  28. console.log(
  29. `[${m.seq}] ${
  30. m.redelivered ? `- redelivery ${m.info.redeliveryCount}` : ""
  31. }`,
  32. );
  33. if (m.seq % 2 === 0) {
  34. m.ack();
  35. }
  36. }
  37. })();
  38. const fn = () => {
  39. console.log("[PULL]");
  40. psub.pull({ batch: 1000, expires: 10000 });
  41. };
  42. // do the initial pull
  43. fn();
  44. // and now schedule a pull every so often
  45. const interval = setInterval(fn, 10000); // and repeat every 2s
  46. setTimeout(() => {
  47. clearInterval(interval);
  48. nc.drain();
  49. }, 20000);

Python

  1. import asyncio
  2. import nats
  3. from nats.errors import TimeoutError
  4. async def main():
  5. nc = await nats.connect("localhost")
  6. # Create JetStream context.
  7. js = nc.jetstream()
  8. # Persist messages on 'foo's subject.
  9. await js.add_stream(name="sample-stream", subjects=["foo"])
  10. for i in range(0, 10):
  11. ack = await js.publish("foo", f"hello world: {i}".encode())
  12. print(ack)
  13. # Create pull based consumer on 'foo'.
  14. psub = await js.pull_subscribe("foo", "psub")
  15. # Fetch and ack messagess from consumer.
  16. for i in range(0, 10):
  17. msgs = await psub.fetch(1)
  18. for msg in msgs:
  19. print(msg)
  20. await nc.close()
  21. if __name__ == '__main__':
  22. asyncio.run(main())

C

  1. #include "examples.h"
  2. static const char *usage = ""\
  3. "-gd use global message delivery thread pool\n" \
  4. "-sync receive synchronously (default is asynchronous)\n" \
  5. "-pull use pull subscription\n" \
  6. "-fc enable flow control\n" \
  7. "-count number of expected messages\n";
  8. static void
  9. onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
  10. {
  11. if (print)
  12. printf("Received msg: %s - %.*s\n",
  13. natsMsg_GetSubject(msg),
  14. natsMsg_GetDataLength(msg),
  15. natsMsg_GetData(msg));
  16. if (start == 0)
  17. start = nats_Now();
  18. // We should be using a mutex to protect those variables since
  19. // they are used from the subscription's delivery and the main
  20. // threads. For demo purposes, this is fine.
  21. if (++count == total)
  22. elapsed = nats_Now() - start;
  23. // Since this is auto-ack callback, we don't need to ack here.
  24. natsMsg_Destroy(msg);
  25. }
  26. static void
  27. asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
  28. {
  29. printf("Async error: %u - %s\n", err, natsStatus_GetText(err));
  30. natsSubscription_GetDropped(sub, (int64_t*) &dropped);
  31. }
  32. int main(int argc, char **argv)
  33. {
  34. natsConnection *conn = NULL;
  35. natsStatistics *stats = NULL;
  36. natsOptions *opts = NULL;
  37. natsSubscription *sub = NULL;
  38. natsMsg *msg = NULL;
  39. jsCtx *js = NULL;
  40. jsErrCode jerr = 0;
  41. jsOptions jsOpts;
  42. jsSubOptions so;
  43. natsStatus s;
  44. bool delStream = false;
  45. opts = parseArgs(argc, argv, usage);
  46. printf("Created %s subscription on '%s'.\n",
  47. (pull ? "pull" : (async ? "asynchronous" : "synchronous")), subj);
  48. s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
  49. if (s == NATS_OK)
  50. s = natsConnection_Connect(&conn, opts);
  51. if (s == NATS_OK)
  52. s = jsOptions_Init(&jsOpts);
  53. if (s == NATS_OK)
  54. s = jsSubOptions_Init(&so);
  55. if (s == NATS_OK)
  56. {
  57. so.Stream = stream;
  58. so.Consumer = durable;
  59. if (flowctrl)
  60. {
  61. so.Config.FlowControl = true;
  62. so.Config.Heartbeat = (int64_t)1E9;
  63. }
  64. }
  65. if (s == NATS_OK)
  66. s = natsConnection_JetStream(&js, conn, &jsOpts);
  67. if (s == NATS_OK)
  68. {
  69. jsStreamInfo *si = NULL;
  70. // First check if the stream already exists.
  71. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  72. if (s == NATS_NOT_FOUND)
  73. {
  74. jsStreamConfig cfg;
  75. // Since we are the one creating this stream, we can delete at the end.
  76. delStream = true;
  77. // Initialize the configuration structure.
  78. jsStreamConfig_Init(&cfg);
  79. cfg.Name = stream;
  80. // Set the subject
  81. cfg.Subjects = (const char*[1]){subj};
  82. cfg.SubjectsLen = 1;
  83. // Make it a memory stream.
  84. cfg.Storage = js_MemoryStorage;
  85. // Add the stream,
  86. s = js_AddStream(&si, js, &cfg, NULL, &jerr);
  87. }
  88. if (s == NATS_OK)
  89. {
  90. printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  91. si->Config->Name, si->State.Msgs, si->State.Bytes);
  92. // Need to destroy the returned stream object.
  93. jsStreamInfo_Destroy(si);
  94. }
  95. }
  96. if (s == NATS_OK)
  97. {
  98. if (pull)
  99. s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
  100. else if (async)
  101. s = js_Subscribe(&sub, js, subj, onMsg, NULL, &jsOpts, &so, &jerr);
  102. else
  103. s = js_SubscribeSync(&sub, js, subj, &jsOpts, &so, &jerr);
  104. }
  105. if (s == NATS_OK)
  106. s = natsSubscription_SetPendingLimits(sub, -1, -1);
  107. if (s == NATS_OK)
  108. s = natsStatistics_Create(&stats);
  109. if ((s == NATS_OK) && pull)
  110. {
  111. natsMsgList list;
  112. int i;
  113. for (count = 0; (s == NATS_OK) && (count < total); )
  114. {
  115. s = natsSubscription_Fetch(&list, sub, 1024, 5000, &jerr);
  116. if (s != NATS_OK)
  117. break;
  118. if (start == 0)
  119. start = nats_Now();
  120. count += (int64_t) list.Count;
  121. for (i=0; (s == NATS_OK) && (i<list.Count); i++)
  122. s = natsMsg_Ack(list.Msgs[i], &jsOpts);
  123. natsMsgList_Destroy(&list);
  124. }
  125. }
  126. else if ((s == NATS_OK) && async)
  127. {
  128. while (s == NATS_OK)
  129. {
  130. if (count + dropped == total)
  131. break;
  132. nats_Sleep(1000);
  133. }
  134. }
  135. else if (s == NATS_OK)
  136. {
  137. for (count = 0; (s == NATS_OK) && (count < total); count++)
  138. {
  139. s = natsSubscription_NextMsg(&msg, sub, 5000);
  140. if (s != NATS_OK)
  141. break;
  142. if (start == 0)
  143. start = nats_Now();
  144. s = natsMsg_Ack(msg, &jsOpts);
  145. natsMsg_Destroy(msg);
  146. }
  147. }
  148. if (s == NATS_OK)
  149. {
  150. printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
  151. printPerf("Received");
  152. }
  153. if (s == NATS_OK)
  154. {
  155. jsStreamInfo *si = NULL;
  156. // Let's report some stats after the run
  157. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  158. if (s == NATS_OK)
  159. {
  160. printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  161. si->Config->Name, si->State.Msgs, si->State.Bytes);
  162. jsStreamInfo_Destroy(si);
  163. }
  164. if (delStream)
  165. {
  166. printf("\nDeleting stream %s: ", stream);
  167. s = js_DeleteStream(js, stream, NULL, &jerr);
  168. if (s == NATS_OK)
  169. printf("OK!");
  170. printf("\n");
  171. }
  172. }
  173. else
  174. {
  175. printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
  176. nats_PrintLastErrorStack(stderr);
  177. }
  178. // Destroy all our objects to avoid report of memory leak
  179. jsCtx_Destroy(js);
  180. natsStatistics_Destroy(stats);
  181. natsSubscription_Destroy(sub);
  182. natsConnection_Destroy(conn);
  183. natsOptions_Destroy(opts);
  184. // To silence reports of memory still in used with valgrind
  185. nats_Close();
  186. return 0;
  187. }

A push consumer can also be used in some other use cases such as without a queue group, or with no acknowledgement or cumulative acknowledgements.

Push

Go

  1. func ExampleJetStream() {
  2. nc, err := nats.Connect("localhost")
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. // Use the JetStream context to produce and consumer messages
  7. // that have been persisted.
  8. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. js.AddStream(&nats.StreamConfig{
  13. Name: "FOO",
  14. Subjects: []string{"foo"},
  15. })
  16. js.Publish("foo", []byte("Hello JS!"))
  17. // Publish messages asynchronously.
  18. for i := 0; i < 500; i++ {
  19. js.PublishAsync("foo", []byte("Hello JS Async!"))
  20. }
  21. select {
  22. case <-js.PublishAsyncComplete():
  23. case <-time.After(5 * time.Second):
  24. fmt.Println("Did not resolve in time")
  25. }
  26. // Create async consumer on subject 'foo'. Async subscribers
  27. // ack a message once exiting the callback.
  28. js.Subscribe("foo", func(msg *nats.Msg) {
  29. meta, _ := msg.Metadata()
  30. fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream)
  31. fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
  32. })
  33. // Async subscriber with manual acks.
  34. js.Subscribe("foo", func(msg *nats.Msg) {
  35. msg.Ack()
  36. }, nats.ManualAck())
  37. // Async queue subscription where members load balance the
  38. // received messages together.
  39. // If no consumer name is specified, either with nats.Bind()
  40. // or nats.Durable() options, the queue name is used as the
  41. // durable name (that is, as if you were passing the
  42. // nats.Durable(<queue group name>) option.
  43. // It is recommended to use nats.Bind() or nats.Durable()
  44. // and preferably create the JetStream consumer beforehand
  45. // (using js.AddConsumer) so that the JS consumer is not
  46. // deleted on an Unsubscribe() or Drain() when the member
  47. // that created the consumer goes away first.
  48. // Check Godoc for the QueueSubscribe() API for more details.
  49. js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
  50. msg.Ack()
  51. }, nats.ManualAck())
  52. // Subscriber to consume messages synchronously.
  53. sub, _ := js.SubscribeSync("foo")
  54. msg, _ := sub.NextMsg(2 * time.Second)
  55. msg.Ack()
  56. // We can add a member to the group, with this member using
  57. // the synchronous version of the QueueSubscribe.
  58. sub, _ = js.QueueSubscribeSync("foo", "group")
  59. msg, _ = sub.NextMsg(2 * time.Second)
  60. msg.Ack()
  61. // ChanSubscribe
  62. msgCh := make(chan *nats.Msg, 8192)
  63. sub, _ = js.ChanSubscribe("foo", msgCh)
  64. select {
  65. case msg := <-msgCh:
  66. fmt.Println("[Received]", msg)
  67. case <-time.After(1 * time.Second):
  68. }
  69. }

Java

  1. package io.nats.examples.jetstream;
  2. import io.nats.client.*;
  3. import io.nats.client.api.PublishAck;
  4. import io.nats.examples.ExampleArgs;
  5. import io.nats.examples.ExampleUtils;
  6. import java.io.IOException;
  7. import java.nio.charset.StandardCharsets;
  8. import java.time.Duration;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. import java.util.concurrent.atomic.AtomicInteger;
  12. import static io.nats.examples.jetstream.NatsJsUtils.createStreamExitWhenExists;
  13. /**
  14. * This example will demonstrate JetStream push subscribing using a durable consumer and a queue
  15. */
  16. public class NatsJsPushSubQueueDurable {
  17. static final String usageString =
  18. "\nUsage: java -cp <classpath> NatsJsPushSubQueueDurable [-s server] [-strm stream] [-sub subject] [-q queue] [-dur durable] [-mcnt msgCount] [-scnt subCount]"
  19. + "\n\nDefault Values:"
  20. + "\n [-strm stream] qdur-stream"
  21. + "\n [-sub subject] qdur-subject"
  22. + "\n [-q queue] qdur-queue"
  23. + "\n [-dur durable] qdur-durable"
  24. + "\n [-mcnt msgCount] 100"
  25. + "\n [-scnt subCount] 5"
  26. + "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
  27. + "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n"
  28. + "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n"
  29. + "\nUse the URL in the -s server parameter for user/pass/token authentication.\n";
  30. public static void main(String[] args) {
  31. ExampleArgs exArgs = ExampleArgs.builder("Push Subscribe, Durable Consumer, Queue", args, usageString)
  32. .defaultStream("qdur-stream")
  33. .defaultSubject("qdur-subject")
  34. .defaultQueue("qdur-queue")
  35. .defaultDurable("qdur-durable")
  36. .defaultMsgCount(100)
  37. .defaultSubCount(5)
  38. .build();
  39. try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server, true))) {
  40. // Create a JetStreamManagement context.
  41. JetStreamManagement jsm = nc.jetStreamManagement();
  42. // Use the utility to create a stream stored in memory.
  43. createStreamExitWhenExists(jsm, exArgs.stream, exArgs.subject);
  44. // Create our JetStream context
  45. JetStream js = nc.jetStream();
  46. System.out.println();
  47. // Setup the subscribers
  48. // - the PushSubscribeOptions can be re-used since all the subscribers are the same
  49. // - use a concurrent integer to track all the messages received
  50. // - have a list of subscribers and threads so I can track them
  51. PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(exArgs.durable).build();
  52. AtomicInteger allReceived = new AtomicInteger();
  53. List<JsQueueSubscriber> subscribers = new ArrayList<>();
  54. List<Thread> subThreads = new ArrayList<>();
  55. for (int id = 1; id <= exArgs.subCount; id++) {
  56. // setup the subscription
  57. JetStreamSubscription sub = js.subscribe(exArgs.subject, exArgs.queue, pso);
  58. // create and track the runnable
  59. JsQueueSubscriber qs = new JsQueueSubscriber(id, exArgs, js, sub, allReceived);
  60. subscribers.add(qs);
  61. // create, track and start the thread
  62. Thread t = new Thread(qs);
  63. subThreads.add(t);
  64. t.start();
  65. }
  66. nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
  67. // create and start the publishing
  68. Thread pubThread = new Thread(new JsPublisher(js, exArgs));
  69. pubThread.start();
  70. // wait for all threads to finish
  71. pubThread.join();
  72. for (Thread t : subThreads) {
  73. t.join();
  74. }
  75. // report
  76. for (JsQueueSubscriber qs : subscribers) {
  77. qs.report();
  78. }
  79. System.out.println();
  80. // delete the stream since we are done with it.
  81. jsm.deleteStream(exArgs.stream);
  82. }
  83. catch (Exception e) {
  84. e.printStackTrace();
  85. }
  86. }
  87. static class JsPublisher implements Runnable {
  88. JetStream js;
  89. ExampleArgs exArgs;
  90. public JsPublisher(JetStream js, ExampleArgs exArgs) {
  91. this.js = js;
  92. this.exArgs = exArgs;
  93. }
  94. @Override
  95. public void run() {
  96. for (int x = 1; x <= exArgs.msgCount; x++) {
  97. try {
  98. PublishAck pa = js.publish(exArgs.subject, ("Data # " + x).getBytes(StandardCharsets.US_ASCII));
  99. } catch (IOException | JetStreamApiException e) {
  100. // something pretty wrong here
  101. e.printStackTrace();
  102. System.exit(-1);
  103. }
  104. }
  105. }
  106. }
  107. static class JsQueueSubscriber implements Runnable {
  108. int id;
  109. int thisReceived;
  110. List<String> datas;
  111. ExampleArgs exArgs;
  112. JetStream js;
  113. JetStreamSubscription sub;
  114. AtomicInteger allReceived;
  115. public JsQueueSubscriber(int id, ExampleArgs exArgs, JetStream js, JetStreamSubscription sub, AtomicInteger allReceived) {
  116. this.id = id;
  117. thisReceived = 0;
  118. datas = new ArrayList<>();
  119. this.exArgs = exArgs;
  120. this.js = js;
  121. this.sub = sub;
  122. this.allReceived = allReceived;
  123. }
  124. public void report() {
  125. System.out.printf("Sub # %d handled %d messages.\n", id, thisReceived);
  126. }
  127. @Override
  128. public void run() {
  129. while (allReceived.get() < exArgs.msgCount) {
  130. try {
  131. Message msg = sub.nextMessage(Duration.ofMillis(500));
  132. while (msg != null) {
  133. thisReceived++;
  134. allReceived.incrementAndGet();
  135. String data = new String(msg.getData(), StandardCharsets.US_ASCII);
  136. datas.add(data);
  137. System.out.printf("QS # %d message # %d %s\n", id, thisReceived, data);
  138. msg.ack();
  139. msg = sub.nextMessage(Duration.ofMillis(500));
  140. }
  141. } catch (InterruptedException e) {
  142. // just try again
  143. }
  144. }
  145. System.out.printf("QS # %d completed.\n", id);
  146. }
  147. }
  148. }

JavaScript

  1. import { AckPolicy, connect } from "../../src/mod.ts";
  2. import { nuid } from "../../nats-base-client/nuid.ts";
  3. const nc = await connect();
  4. // create a regular subscription - this is plain nats
  5. const sub = nc.subscribe("my.messages", { max: 5 });
  6. const done = (async () => {
  7. for await (const m of sub) {
  8. console.log(m.subject);
  9. m.respond();
  10. }
  11. })();
  12. const jsm = await nc.jetstreamManager();
  13. const stream = nuid.next();
  14. const subj = nuid.next();
  15. await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] });
  16. // create a consumer that delivers to the subscription
  17. await jsm.consumers.add(stream, {
  18. ack_policy: AckPolicy.Explicit,
  19. deliver_subject: "my.messages",
  20. });
  21. // publish some old nats messages
  22. nc.publish(`${subj}.A`);
  23. nc.publish(`${subj}.B`);
  24. nc.publish(`${subj}.C`);
  25. nc.publish(`${subj}.D.A`);
  26. nc.publish(`${subj}.F.A.B`);
  27. await done;
  28. await nc.close();

Python

  1. import asyncio
  2. import nats
  3. from nats.errors import TimeoutError
  4. async def main():
  5. nc = await nats.connect("localhost")
  6. # Create JetStream context.
  7. js = nc.jetstream()
  8. # Persist messages on 'foo's subject.
  9. await js.add_stream(name="sample-stream", subjects=["foo"])
  10. for i in range(0, 10):
  11. ack = await js.publish("foo", f"hello world: {i}".encode())
  12. print(ack)
  13. # Create pull based consumer on 'foo'.
  14. psub = await js.pull_subscribe("foo", "psub")
  15. # Fetch and ack messagess from consumer.
  16. for i in range(0, 10):
  17. msgs = await psub.fetch(1)
  18. for msg in msgs:
  19. print(msg)
  20. # Create single push based subscriber that is durable across restarts.
  21. sub = await js.subscribe("foo", durable="myapp")
  22. msg = await sub.next_msg()
  23. await msg.ack()
  24. # Create deliver group that will be have load balanced messages.
  25. async def qsub_a(msg):
  26. print("QSUB A:", msg)
  27. await msg.ack()
  28. async def qsub_b(msg):
  29. print("QSUB B:", msg)
  30. await msg.ack()
  31. await js.subscribe("foo", "workers", cb=qsub_a)
  32. await js.subscribe("foo", "workers", cb=qsub_b)
  33. for i in range(0, 10):
  34. ack = await js.publish("foo", f"hello world: {i}".encode())
  35. print("\t", ack)
  36. await nc.close()
  37. if __name__ == '__main__':
  38. asyncio.run(main())

C

  1. #include "examples.h"
  2. static const char *usage = ""\
  3. "-gd use global message delivery thread pool\n" \
  4. "-sync receive synchronously (default is asynchronous)\n" \
  5. "-pull use pull subscription\n" \
  6. "-fc enable flow control\n" \
  7. "-count number of expected messages\n";
  8. static void
  9. onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
  10. {
  11. if (print)
  12. printf("Received msg: %s - %.*s\n",
  13. natsMsg_GetSubject(msg),
  14. natsMsg_GetDataLength(msg),
  15. natsMsg_GetData(msg));
  16. if (start == 0)
  17. start = nats_Now();
  18. // We should be using a mutex to protect those variables since
  19. // they are used from the subscription's delivery and the main
  20. // threads. For demo purposes, this is fine.
  21. if (++count == total)
  22. elapsed = nats_Now() - start;
  23. // Since this is auto-ack callback, we don't need to ack here.
  24. natsMsg_Destroy(msg);
  25. }
  26. static void
  27. asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
  28. {
  29. printf("Async error: %u - %s\n", err, natsStatus_GetText(err));
  30. natsSubscription_GetDropped(sub, (int64_t*) &dropped);
  31. }
  32. int main(int argc, char **argv)
  33. {
  34. natsConnection *conn = NULL;
  35. natsStatistics *stats = NULL;
  36. natsOptions *opts = NULL;
  37. natsSubscription *sub = NULL;
  38. natsMsg *msg = NULL;
  39. jsCtx *js = NULL;
  40. jsErrCode jerr = 0;
  41. jsOptions jsOpts;
  42. jsSubOptions so;
  43. natsStatus s;
  44. bool delStream = false;
  45. opts = parseArgs(argc, argv, usage);
  46. printf("Created %s subscription on '%s'.\n",
  47. (pull ? "pull" : (async ? "asynchronous" : "synchronous")), subj);
  48. s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
  49. if (s == NATS_OK)
  50. s = natsConnection_Connect(&conn, opts);
  51. if (s == NATS_OK)
  52. s = jsOptions_Init(&jsOpts);
  53. if (s == NATS_OK)
  54. s = jsSubOptions_Init(&so);
  55. if (s == NATS_OK)
  56. {
  57. so.Stream = stream;
  58. so.Consumer = durable;
  59. if (flowctrl)
  60. {
  61. so.Config.FlowControl = true;
  62. so.Config.Heartbeat = (int64_t)1E9;
  63. }
  64. }
  65. if (s == NATS_OK)
  66. s = natsConnection_JetStream(&js, conn, &jsOpts);
  67. if (s == NATS_OK)
  68. {
  69. jsStreamInfo *si = NULL;
  70. // First check if the stream already exists.
  71. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  72. if (s == NATS_NOT_FOUND)
  73. {
  74. jsStreamConfig cfg;
  75. // Since we are the one creating this stream, we can delete at the end.
  76. delStream = true;
  77. // Initialize the configuration structure.
  78. jsStreamConfig_Init(&cfg);
  79. cfg.Name = stream;
  80. // Set the subject
  81. cfg.Subjects = (const char*[1]){subj};
  82. cfg.SubjectsLen = 1;
  83. // Make it a memory stream.
  84. cfg.Storage = js_MemoryStorage;
  85. // Add the stream,
  86. s = js_AddStream(&si, js, &cfg, NULL, &jerr);
  87. }
  88. if (s == NATS_OK)
  89. {
  90. printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  91. si->Config->Name, si->State.Msgs, si->State.Bytes);
  92. // Need to destroy the returned stream object.
  93. jsStreamInfo_Destroy(si);
  94. }
  95. }
  96. if (s == NATS_OK)
  97. {
  98. if (pull)
  99. s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
  100. else if (async)
  101. s = js_Subscribe(&sub, js, subj, onMsg, NULL, &jsOpts, &so, &jerr);
  102. else
  103. s = js_SubscribeSync(&sub, js, subj, &jsOpts, &so, &jerr);
  104. }
  105. if (s == NATS_OK)
  106. s = natsSubscription_SetPendingLimits(sub, -1, -1);
  107. if (s == NATS_OK)
  108. s = natsStatistics_Create(&stats);
  109. if ((s == NATS_OK) && pull)
  110. {
  111. natsMsgList list;
  112. int i;
  113. for (count = 0; (s == NATS_OK) && (count < total); )
  114. {
  115. s = natsSubscription_Fetch(&list, sub, 1024, 5000, &jerr);
  116. if (s != NATS_OK)
  117. break;
  118. if (start == 0)
  119. start = nats_Now();
  120. count += (int64_t) list.Count;
  121. for (i=0; (s == NATS_OK) && (i<list.Count); i++)
  122. s = natsMsg_Ack(list.Msgs[i], &jsOpts);
  123. natsMsgList_Destroy(&list);
  124. }
  125. }
  126. else if ((s == NATS_OK) && async)
  127. {
  128. while (s == NATS_OK)
  129. {
  130. if (count + dropped == total)
  131. break;
  132. nats_Sleep(1000);
  133. }
  134. }
  135. else if (s == NATS_OK)
  136. {
  137. for (count = 0; (s == NATS_OK) && (count < total); count++)
  138. {
  139. s = natsSubscription_NextMsg(&msg, sub, 5000);
  140. if (s != NATS_OK)
  141. break;
  142. if (start == 0)
  143. start = nats_Now();
  144. s = natsMsg_Ack(msg, &jsOpts);
  145. natsMsg_Destroy(msg);
  146. }
  147. }
  148. if (s == NATS_OK)
  149. {
  150. printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
  151. printPerf("Received");
  152. }
  153. if (s == NATS_OK)
  154. {
  155. jsStreamInfo *si = NULL;
  156. // Let's report some stats after the run
  157. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  158. if (s == NATS_OK)
  159. {
  160. printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  161. si->Config->Name, si->State.Msgs, si->State.Bytes);
  162. jsStreamInfo_Destroy(si);
  163. }
  164. if (delStream)
  165. {
  166. printf("\nDeleting stream %s: ", stream);
  167. s = js_DeleteStream(js, stream, NULL, &jerr);
  168. if (s == NATS_OK)
  169. printf("OK!");
  170. printf("\n");
  171. }
  172. }
  173. else
  174. {
  175. printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
  176. nats_PrintLastErrorStack(stderr);
  177. }
  178. // Destroy all our objects to avoid report of memory leak
  179. jsCtx_Destroy(js);
  180. natsStatistics_Destroy(stats);
  181. natsSubscription_Destroy(sub);
  182. natsConnection_Destroy(conn);
  183. natsOptions_Destroy(opts);
  184. // To silence reports of memory still in used with valgrind
  185. nats_Close();
  186. return 0;
  187. }