Request-Reply Semantics

The pattern of sending a message and receiving a response is encapsulated in most client libraries into a request method. Under the covers this method will publish a message with a unique reply-to subject and wait for the response before returning.

In the older versions of some libraries a completely new reply-to subject is created each time. In newer versions, a subject hierarchy is used so that a single subscriber in the client library listens for a wildcard, and requests are sent with a unique child subject of a single subject.

The primary difference between the request method and publishing with a reply-to is that the library is only going to accept one response, and in most libraries the request will be treated as a synchronous action. The library may even provide a way to set the timeout.

For example, updating the previous publish example we may request time with a one second timeout:

Go

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Send the request
  7. msg, err := nc.Request("time", nil, time.Second)
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. // Use the response
  12. log.Printf("Reply: %s", msg.Data)
  13. // Close the connection
  14. nc.Close()

Java

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. // Send the request
  3. Message msg = nc.request("time", null, Duration.ofSeconds(1));
  4. // Use the response
  5. System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));
  6. // Close the connection
  7. nc.close();

JavaScript

  1. // set up a subscription to process the request
  2. const sc = StringCodec();
  3. nc.subscribe("time", {
  4. callback: (_err, msg) => {
  5. msg.respond(sc.encode(new Date().toLocaleTimeString()));
  6. },
  7. });
  8. const r = await nc.request("time");
  9. t.log(sc.decode(r.data));

Python

  1. nc = NATS()
  2. async def sub(msg):
  3. await nc.publish(msg.reply, b'response')
  4. await nc.connect(servers=["nats://demo.nats.io:4222"])
  5. await nc.subscribe("time", cb=sub)
  6. # Send the request
  7. try:
  8. msg = await nc.request("time", b'', timeout=1)
  9. # Use the response
  10. print("Reply:", msg)
  11. except asyncio.TimeoutError:
  12. print("Timed out waiting for response")

Ruby

  1. require 'nats/client'
  2. require 'fiber'
  3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  4. nc.subscribe("time") do |msg, reply|
  5. nc.publish(reply, "response")
  6. end
  7. Fiber.new do
  8. # Use the response
  9. msg = nc.request("time", "")
  10. puts "Reply: #{msg}"
  11. end.resume
  12. end

C

  1. natsConnection *conn = NULL;
  2. natsMsg *msg = NULL;
  3. natsStatus s = NATS_OK;
  4. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  5. // Send a request and wait for up to 1 second
  6. if (s == NATS_OK)
  7. s = natsConnection_RequestString(&msg, conn, "request", "this is the request", 1000);
  8. if (s == NATS_OK)
  9. {
  10. printf("Received msg: %s - %.*s\n",
  11. natsMsg_GetSubject(msg),
  12. natsMsg_GetDataLength(msg),
  13. natsMsg_GetData(msg));
  14. // Destroy the message that was received
  15. natsMsg_Destroy(msg);
  16. }
  17. (...)
  18. // Destroy objects that were created
  19. natsConnection_Destroy(conn);

{% endtabs %}

You can think of request-reply in the library as a subscribe, get one message, unsubscribe pattern. In Go this might look something like:

  1. sub, err := nc.SubscribeSync(replyTo)
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. // Send the request immediately
  6. nc.PublishRequest(subject, replyTo, []byte(input))
  7. nc.Flush()
  8. // Wait for a single response
  9. for {
  10. msg, err := sub.NextMsg(1 * time.Second)
  11. if err != nil {
  12. log.Fatal(err)
  13. }
  14. response = string(msg.Data)
  15. break
  16. }
  17. sub.Unsubscribe()

Scatter-Gather

You can expand the request-reply pattern into something often called scatter-gather. To receive multiple messages, with a timeout, you could do something like the following, where the loop getting messages is using time as the limitation, not the receipt of a single message:

  1. sub, err := nc.SubscribeSync(replyTo)
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. nc.Flush()
  6. // Send the request
  7. nc.PublishRequest(subject, replyTo, []byte(input))
  8. // Wait for a single response
  9. max := 100 * time.Millisecond
  10. start := time.Now()
  11. for time.Now().Sub(start) < max {
  12. msg, err := sub.NextMsg(1 * time.Second)
  13. if err != nil {
  14. break
  15. }
  16. responses = append(responses, string(msg.Data))
  17. }
  18. sub.Unsubscribe()

Or, you can loop on a counter and a timeout to try to get at least N responses:

  1. sub, err := nc.SubscribeSync(replyTo)
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. nc.Flush()
  6. // Send the request
  7. nc.PublishRequest(subject, replyTo, []byte(input))
  8. // Wait for a single response
  9. max := 500 * time.Millisecond
  10. start := time.Now()
  11. for time.Now().Sub(start) < max {
  12. msg, err := sub.NextMsg(1 * time.Second)
  13. if err != nil {
  14. break
  15. }
  16. responses = append(responses, string(msg.Data))
  17. if len(responses) >= minResponses {
  18. break
  19. }
  20. }
  21. sub.Unsubscribe()