Including a Reply Subject

The optional reply-to field when publishing a message can be used on the receiving side to respond. The reply-to subject is often called an inbox, and most libraries may provide a method for generating unique inbox subjects. Most libraries also provide for the request-reply pattern with a single call. For example to send a request to the subject time, with no content for the messages, you might:

Go

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Create a unique subject name for replies.
  7. uniqueReplyTo := nats.NewInbox()
  8. // Listen for a single response
  9. sub, err := nc.SubscribeSync(uniqueReplyTo)
  10. if err != nil {
  11. log.Fatal(err)
  12. }
  13. // Send the request.
  14. // If processing is synchronous, use Request() which returns the response message.
  15. if err := nc.PublishRequest("time", uniqueReplyTo, nil); err != nil {
  16. log.Fatal(err)
  17. }
  18. // Read the reply
  19. msg, err := sub.NextMsg(time.Second)
  20. if err != nil {
  21. log.Fatal(err)
  22. }
  23. // Use the response
  24. log.Printf("Reply: %s", msg.Data)

Java

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. // Create a unique subject name
  3. String uniqueReplyTo = NUID.nextGlobal();
  4. // Listen for a single response
  5. Subscription sub = nc.subscribe(uniqueReplyTo);
  6. sub.unsubscribe(1);
  7. // Send the request
  8. nc.publish("time", uniqueReplyTo, null);
  9. // Read the reply
  10. Message msg = sub.nextMessage(Duration.ofSeconds(1));
  11. // Use the response
  12. System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));
  13. // Close the connection
  14. nc.close();

JavaScript

  1. let nc = NATS.connect({url: "nats://demo.nats.io:4222"});
  2. // set up a subscription to process the request
  3. nc.subscribe('time', (msg, reply) => {
  4. if(reply) {
  5. nc.publish(reply, new Date().toLocaleTimeString());
  6. }
  7. });
  8. // create a subscription subject that the responding send replies to
  9. let inbox = NATS.createInbox();
  10. nc.subscribe(inbox, {max: 1}, (msg) => {
  11. t.log('the time is', msg);
  12. nc.close();
  13. });
  14. nc.publish('time', "", inbox);

Python

  1. nc = NATS()
  2. future = asyncio.Future()
  3. async def sub(msg):
  4. nonlocal future
  5. future.set_result(msg)
  6. await nc.connect(servers=["nats://demo.nats.io:4222"])
  7. await nc.subscribe("time", cb=sub)
  8. unique_reply_to = new_inbox()
  9. await nc.publish_request("time", unique_reply_to, b'')
  10. # Use the response
  11. msg = await asyncio.wait_for(future, 1)
  12. print("Reply:", msg)

Ruby

  1. require 'nats/client'
  2. require 'fiber'
  3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  4. Fiber.new do
  5. f = Fiber.current
  6. nc.subscribe("time") do |msg, reply|
  7. f.resume msg
  8. end
  9. nc.publish("time", 'example', NATS.create_inbox)
  10. # Use the response
  11. msg = Fiber.yield
  12. puts "Reply: #{msg}"
  13. end.resume
  14. end

TypeScript

  1. // set up a subscription to process the request
  2. await nc.subscribe('time', (err, msg) => {
  3. if (err) {
  4. // this example is running inside of a promise
  5. reject();
  6. return;
  7. }
  8. if (msg.reply) {
  9. nc.publish(msg.reply, new Date().toLocaleTimeString());
  10. }
  11. });
  12. // create a subscription subject that the responding send replies to
  13. let inbox = createInbox();
  14. await nc.subscribe(inbox, (err, msg) => {
  15. t.log('the time is', msg.data);
  16. // this example is running inside of a promise
  17. nc.close();
  18. resolve();
  19. }, {max: 1});
  20. nc.publish('time', "", inbox);

C

  1. natsConnection *conn = NULL;
  2. natsStatus s = NATS_OK;
  3. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  4. // Publish a message and provide a reply subject
  5. if (s == NATS_OK)
  6. s = natsConnection_PublishRequestString(conn, "request", "reply", "this is the request");
  7. (...)
  8. // Destroy objects that were created
  9. natsConnection_Destroy(conn);