Asynchronous Subscriptions

Asynchronous subscriptions use callbacks of some form to notify an application when a message arrives. These subscriptions are usually easier to work with, but do represent some form of internal work and resource usage, i.e. threads, by the library. Check your library’s documentation for any resource usage associated with asynchronous subscriptions.

Note: For a given subscription, messages are dispatched serially, one message at a time. If your application does not care about processing ordering and would prefer the messages to be dispatched concurrently, it is the application’s responsibility to move them to some internal queue to be picked up by threads/go routines.

The following example subscribes to the subject updates and handles the incoming messages:

Go

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Use a WaitGroup to wait for a message to arrive
  7. wg := sync.WaitGroup{}
  8. wg.Add(1)
  9. // Subscribe
  10. if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
  11. wg.Done()
  12. }); err != nil {
  13. log.Fatal(err)
  14. }
  15. // Wait for a message to come in
  16. wg.Wait()

Java

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. // Use a latch to wait for a message to arrive
  3. CountDownLatch latch = new CountDownLatch(1);
  4. // Create a dispatcher and inline message handler
  5. Dispatcher d = nc.createDispatcher((msg) -> {
  6. String str = new String(msg.getData(), StandardCharsets.UTF_8);
  7. System.out.println(str);
  8. latch.countDown();
  9. });
  10. // Subscribe
  11. d.subscribe("updates");
  12. // Wait for a message to come in
  13. latch.await();
  14. // Close the connection
  15. nc.close();

JavaScript

  1. const sc = StringCodec();
  2. // this is an example of a callback subscription
  3. // https://github.com/nats-io/nats.js/blob/master/README.md#async-vs-callbacks
  4. nc.subscribe("updates", {
  5. callback: (err, msg) => {
  6. if (err) {
  7. t.error(err.message);
  8. } else {
  9. t.log(sc.decode(msg.data));
  10. }
  11. },
  12. max: 1,
  13. });
  14. // here's an iterator subscription - note the code in the
  15. // for loop will block until the iterator completes
  16. // either from a break/return from the iterator, an
  17. // unsubscribe after the message arrives, or in this case
  18. // an auto-unsubscribe after the first message is received
  19. const sub = nc.subscribe("updates", { max: 1 });
  20. for await (const m of sub) {
  21. t.log(sc.decode(m.data));
  22. }
  23. // subscriptions have notifications, simply wait
  24. // the closed promise
  25. sub.closed
  26. .then(() => {
  27. t.log("subscription closed");
  28. })
  29. .catch((err) => {
  30. t.err(`subscription closed with an error ${err.message}`);
  31. });

Python

  1. nc = NATS()
  2. await nc.connect(servers=["nats://demo.nats.io:4222"])
  3. future = asyncio.Future()
  4. async def cb(msg):
  5. nonlocal future
  6. future.set_result(msg)
  7. await nc.subscribe("updates", cb=cb)
  8. await nc.publish("updates", b'All is Well')
  9. await nc.flush()
  10. # Wait for message to come in
  11. msg = await asyncio.wait_for(future, 1)

Ruby

  1. require 'nats/client'
  2. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  3. nc.subscribe("updates") do |msg|
  4. puts msg
  5. nc.close
  6. end
  7. nc.publish("updates", "All is Well")
  8. end

C

  1. static void
  2. onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
  3. {
  4. printf("Received msg: %s - %.*s\n",
  5. natsMsg_GetSubject(msg),
  6. natsMsg_GetDataLength(msg),
  7. natsMsg_GetData(msg));
  8. // Need to destroy the message!
  9. natsMsg_Destroy(msg);
  10. }
  11. (...)
  12. natsConnection *conn = NULL;
  13. natsSubscription *sub = NULL;
  14. natsStatus s;
  15. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  16. if (s == NATS_OK)
  17. {
  18. // Creates an asynchronous subscription on subject "foo".
  19. // When a message is sent on subject "foo", the callback
  20. // onMsg() will be invoked by the client library.
  21. // You can pass a closure as the last argument.
  22. s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
  23. }
  24. (...)
  25. // Destroy objects that were created
  26. natsSubscription_Destroy(sub);
  27. natsConnection_Destroy(conn);

{% endtabs %}