JetStream contexts

You will need the JetStream context to make any JetStream enabled operation. Some client libraries (e.g. Java) also have a JetStream Management context (which you will only need if your application needs to create/purge/delete/manage streams and consumers), while some client libraries (e.g. Golang) only have the JetStream context that you use for all operations (including stream management).

You obtain a JetStream context simply from your connection object (and you can optionally specify some JetStream options, most notably the JetStream operation timeout value). You also obtain the JetStream Management context from the connection.

Go

  1. // A JetStreamContext is the composition of a JetStream and JetStreamManagement interfaces.
  2. // In case of only requiring publishing/consuming messages, can create a context that
  3. // only uses the JetStream interface.
  4. func ExampleJetStreamContext() {
  5. nc, _ := nats.Connect("localhost")
  6. var js nats.JetStream
  7. var jsm nats.JetStreamManager
  8. var jsctx nats.JetStreamContext
  9. // JetStream that can publish/subscribe but cannot manage streams.
  10. js, _ = nc.JetStream()
  11. js.Publish("foo", []byte("hello"))
  12. // JetStream context that can manage streams/consumers but cannot produce messages.
  13. jsm, _ = nc.JetStream()
  14. jsm.AddStream(&nats.StreamConfig{Name: "FOO"})
  15. // JetStream context that can both manage streams/consumers
  16. // as well as publish/subscribe.
  17. jsctx, _ = nc.JetStream()
  18. jsctx.AddStream(&nats.StreamConfig{Name: "BAR"})
  19. jsctx.Publish("bar", []byte("hello world"))
  20. }

Java

  1. // Getting the JetStream context
  2. JetStream js = nc.jetStream();
  3. // Getting the JetStream management context
  4. JetStreamManagement jsm = nc.jetStreamManagement();

JavaScript

  1. const nc = await connect();
  2. // Getting the JetStream context
  3. const js = nc.jetstream();
  4. // Getting the JetStream management context
  5. const jsm = await nc.jetstreamManager();

Python

  1. async def main():
  2. nc = await nats.connect("localhost")
  3. # Create JetStream context.
  4. js = nc.jetstream()
  5. if __name__ == '__main__':
  6. asyncio.run(main())

C

  1. int main(int argc, char **argv)
  2. {
  3. natsConnection *conn = NULL;
  4. natsOptions *opts = NULL;
  5. jsCtx *js = NULL;
  6. jsOptions jsOpts;
  7. jsErrCode jerr = 0;
  8. volatile int errors = 0;
  9. opts = parseArgs(argc, argv, usage);
  10. dataLen = (int) strlen(payload);
  11. s = natsConnection_Connect(&conn, opts);
  12. if (s == NATS_OK)
  13. s = jsOptions_Init(&jsOpts);
  14. if (s == NATS_OK)
  15. {
  16. if (async)
  17. {
  18. jsOpts.PublishAsync.ErrHandler = _jsPubErr;
  19. jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
  20. }
  21. s = natsConnection_JetStream(&js, conn, &jsOpts);
  22. }
  23. // Destroy all our objects to avoid report of memory leak
  24. jsCtx_Destroy(js);
  25. natsConnection_Destroy(conn);
  26. natsOptions_Destroy(opts);
  27. // To silence reports of memory still in used with valgrind
  28. nats_Close();
  29. return 0;
  30. }

{% endtabs %}