Publishing to streams

Go

  1. func ExampleJetStream() {
  2. nc, err := nats.Connect("localhost")
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. // Use the JetStream context to produce and consumer messages
  7. // that have been persisted.
  8. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. js.AddStream(&nats.StreamConfig{
  13. Name: "FOO",
  14. Subjects: []string{"foo"},
  15. })
  16. js.Publish("foo", []byte("Hello JS!"))
  17. // Publish messages asynchronously.
  18. for i := 0; i < 500; i++ {
  19. js.PublishAsync("foo", []byte("Hello JS Async!"))
  20. }
  21. select {
  22. case <-js.PublishAsyncComplete():
  23. case <-time.After(5 * time.Second):
  24. fmt.Println("Did not resolve in time")
  25. }
  26. }

Java

  1. package io.nats.examples.jetstream;
  2. import io.nats.client.Connection;
  3. import io.nats.client.JetStream;
  4. import io.nats.client.Message;
  5. import io.nats.client.Nats;
  6. import io.nats.client.api.PublishAck;
  7. import io.nats.client.impl.NatsMessage;
  8. import io.nats.examples.ExampleArgs;
  9. import io.nats.examples.ExampleUtils;
  10. import java.nio.charset.StandardCharsets;
  11. /**
  12. * This example will demonstrate JetStream publishing.
  13. */
  14. public class NatsJsPub {
  15. static final String usageString =
  16. "\nUsage: java -cp <classpath> NatsJsPub [-s server] [-strm stream] [-sub subject] [-mcnt msgCount] [-m messageWords+] [-r headerKey:headerValue]*"
  17. + "\n\nDefault Values:"
  18. + "\n [-strm] example-stream"
  19. + "\n [-sub] example-subject"
  20. + "\n [-mcnt] 10"
  21. + "\n [-m] hello"
  22. + "\n\nRun Notes:"
  23. + "\n - msg_count < 1 is the same as 1"
  24. + "\n - headers are optional"
  25. + "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
  26. + "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n"
  27. + "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n"
  28. + "\nUse the URL in the -s server parameter for user/pass/token authentication.\n";
  29. public static void main(String[] args) {
  30. ExampleArgs exArgs = ExampleArgs.builder("Publish", args, usageString)
  31. .defaultStream("example-stream")
  32. .defaultSubject("example-subject")
  33. .defaultMessage("hello")
  34. .defaultMsgCount(10)
  35. .build();
  36. String hdrNote = exArgs.hasHeaders() ? ", with " + exArgs.headers.size() + " header(s)" : "";
  37. System.out.printf("\nPublishing to %s%s. Server is %s\n\n", exArgs.subject, hdrNote, exArgs.server);
  38. try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server))) {
  39. // Create a JetStream context. This hangs off the original connection
  40. // allowing us to produce data to streams and consume data from
  41. // JetStream consumers.
  42. JetStream js = nc.jetStream();
  43. // Create the stream
  44. NatsJsUtils.createStreamOrUpdateSubjects(nc, exArgs.stream, exArgs.subject);
  45. int stop = exArgs.msgCount < 2 ? 2 : exArgs.msgCount + 1;
  46. for (int x = 1; x < stop; x++) {
  47. // make unique message data if you want more than 1 message
  48. String data = exArgs.msgCount < 2 ? exArgs.message : exArgs.message + "-" + x;
  49. // create a typical NATS message
  50. Message msg = NatsMessage.builder()
  51. .subject(exArgs.subject)
  52. .headers(exArgs.headers)
  53. .data(data, StandardCharsets.UTF_8)
  54. .build();
  55. // Publish a message and print the results of the publish acknowledgement.
  56. // We'll use the defaults for this simple example, but there are options
  57. // to constrain publishing to certain streams, expect sequence numbers and
  58. // more. See the NatsJsPubWithOptionsUseCases.java example for details.
  59. // An exception will be thrown if there is a failure.
  60. PublishAck pa = js.publish(msg);
  61. System.out.printf("Published message %s on subject %s, stream %s, seqno %d.\n",
  62. data, exArgs.subject, pa.getStream(), pa.getSeqno());
  63. }
  64. }
  65. catch (Exception e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. }

JavaScript

  1. import { connect, Empty } from "../../src/mod.ts";
  2. const nc = await connect();
  3. const jsm = await nc.jetstreamManager();
  4. await jsm.streams.add({ name: "B", subjects: ["b.a"] });
  5. const js = await nc.jetstream();
  6. // the jetstream client provides a publish that returns
  7. // a confirmation that the message was received and stored
  8. // by the server. You can associate various expectations
  9. // when publishing a message to prevent duplicates.
  10. // If the expectations are not met, the message is rejected.
  11. let pa = await js.publish("b.a", Empty, {
  12. msgID: "a",
  13. expect: { streamName: "B" },
  14. });
  15. console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
  16. pa = await js.publish("b.a", Empty, {
  17. msgID: "a",
  18. expect: { lastSequence: 1 },
  19. });
  20. console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
  21. await jsm.streams.delete("B");
  22. await nc.drain();

Python

  1. import asyncio
  2. import nats
  3. from nats.errors import TimeoutError
  4. async def main():
  5. nc = await nats.connect("localhost")
  6. # Create JetStream context.
  7. js = nc.jetstream()
  8. # Persist messages on 'foo's subject.
  9. await js.add_stream(name="sample-stream", subjects=["foo"])
  10. for i in range(0, 10):
  11. ack = await js.publish("foo", f"hello world: {i}".encode())
  12. print(ack)
  13. await nc.close()
  14. if __name__ == '__main__':
  15. asyncio.run(main())

C

  1. #include "examples.h"
  2. static const char *usage = ""\
  3. "-stream stream name (default is 'foo')\n" \
  4. "-txt text to send (default is 'hello')\n" \
  5. "-count number of messages to send\n" \
  6. "-sync publish synchronously (default is async)\n";
  7. static void
  8. _jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
  9. {
  10. int *errors = (int*) closure;
  11. printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
  12. printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
  13. *errors = (*errors + 1);
  14. // If we wanted to resend the original message, we would do something like that:
  15. //
  16. // js_PublishMsgAsync(js, &(pae->Msg), NULL);
  17. //
  18. // Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
  19. // ownership, and the library will not destroy the message when this callback returns.
  20. // No need to destroy anything, everything is handled by the library.
  21. }
  22. int main(int argc, char **argv)
  23. {
  24. natsConnection *conn = NULL;
  25. natsStatistics *stats = NULL;
  26. natsOptions *opts = NULL;
  27. jsCtx *js = NULL;
  28. jsOptions jsOpts;
  29. jsErrCode jerr = 0;
  30. natsStatus s;
  31. int dataLen=0;
  32. volatile int errors = 0;
  33. bool delStream = false;
  34. opts = parseArgs(argc, argv, usage);
  35. dataLen = (int) strlen(payload);
  36. s = natsConnection_Connect(&conn, opts);
  37. if (s == NATS_OK)
  38. s = jsOptions_Init(&jsOpts);
  39. if (s == NATS_OK)
  40. {
  41. if (async)
  42. {
  43. jsOpts.PublishAsync.ErrHandler = _jsPubErr;
  44. jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
  45. }
  46. s = natsConnection_JetStream(&js, conn, &jsOpts);
  47. }
  48. if (s == NATS_OK)
  49. {
  50. jsStreamInfo *si = NULL;
  51. // First check if the stream already exists.
  52. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  53. if (s == NATS_NOT_FOUND)
  54. {
  55. jsStreamConfig cfg;
  56. // Since we are the one creating this stream, we can delete at the end.
  57. delStream = true;
  58. // Initialize the configuration structure.
  59. jsStreamConfig_Init(&cfg);
  60. cfg.Name = stream;
  61. // Set the subject
  62. cfg.Subjects = (const char*[1]){subj};
  63. cfg.SubjectsLen = 1;
  64. // Make it a memory stream.
  65. cfg.Storage = js_MemoryStorage;
  66. // Add the stream,
  67. s = js_AddStream(&si, js, &cfg, NULL, &jerr);
  68. }
  69. if (s == NATS_OK)
  70. {
  71. printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  72. si->Config->Name, si->State.Msgs, si->State.Bytes);
  73. // Need to destroy the returned stream object.
  74. jsStreamInfo_Destroy(si);
  75. }
  76. }
  77. if (s == NATS_OK)
  78. s = natsStatistics_Create(&stats);
  79. if (s == NATS_OK)
  80. {
  81. printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
  82. start = nats_Now();
  83. }
  84. for (count = 0; (s == NATS_OK) && (count < total); count++)
  85. {
  86. if (async)
  87. s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
  88. else
  89. {
  90. jsPubAck *pa = NULL;
  91. s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
  92. if (s == NATS_OK)
  93. {
  94. if (pa->Duplicate)
  95. printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
  96. jsPubAck_Destroy(pa);
  97. }
  98. }
  99. }
  100. if ((s == NATS_OK) && async)
  101. {
  102. jsPubOptions jsPubOpts;
  103. jsPubOptions_Init(&jsPubOpts);
  104. // Let's set it to 30 seconds, if getting "Timeout" errors,
  105. // this may need to be increased based on the number of messages
  106. // being sent.
  107. jsPubOpts.MaxWait = 30000;
  108. s = js_PublishAsyncComplete(js, &jsPubOpts);
  109. if (s == NATS_TIMEOUT)
  110. {
  111. // Let's get the list of pending messages. We could resend,
  112. // etc, but for now, just destroy them.
  113. natsMsgList list;
  114. js_PublishAsyncGetPendingList(&list, js);
  115. natsMsgList_Destroy(&list);
  116. }
  117. }
  118. if (s == NATS_OK)
  119. {
  120. jsStreamInfo *si = NULL;
  121. elapsed = nats_Now() - start;
  122. printStats(STATS_OUT, conn, NULL, stats);
  123. printPerf("Sent");
  124. if (errors != 0)
  125. printf("There were %d asynchronous errors\n", errors);
  126. // Let's report some stats after the run
  127. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  128. if (s == NATS_OK)
  129. {
  130. printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  131. si->Config->Name, si->State.Msgs, si->State.Bytes);
  132. jsStreamInfo_Destroy(si);
  133. }
  134. }
  135. if (delStream && (js != NULL))
  136. {
  137. printf("\nDeleting stream %s: ", stream);
  138. s = js_DeleteStream(js, stream, NULL, &jerr);
  139. if (s == NATS_OK)
  140. printf("OK!");
  141. printf("\n");
  142. }
  143. if (s != NATS_OK)
  144. {
  145. printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
  146. nats_PrintLastErrorStack(stderr);
  147. }
  148. // Destroy all our objects to avoid report of memory leak
  149. jsCtx_Destroy(js);
  150. natsStatistics_Destroy(stats);
  151. natsConnection_Destroy(conn);
  152. natsOptions_Destroy(opts);
  153. // To silence reports of memory still in used with valgrind
  154. nats_Close();
  155. return 0;
  156. }

{% endtabs %}